博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMq使用过程的那些小事
阅读量:6296 次
发布时间:2019-06-22

本文共 4168 字,大约阅读时间需要 13 分钟。

在使用rocketmq之前使用了rabbitmq,会出现丢消息的情况,进而果断放弃,继续投入到大Java的怀抱,不过也遇到了一些问题,这里总结一下:

  1. 使用过程中新加节点需要手动创建topic

  2. 消费者处理不合理,不能实现负载均衡

针对第二点:我之前一直使用的是pull方式,按顺序来消费,一旦程序重启则从头一个一个消费,显然这种效率很低,

而且因为一个代码问题,如果我在offsize = 0的情况获取不了数据,则min offsize不增长,这种情况导致如果数据隔天了则不能消费,这时候消费者相当于在空跑。

List
msgList = new ArrayList
(); try { Set
mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); }

使用这种方法可以稳定的pull出数据,但是这种情况效率很低。

使用多线程方式:

private Queue
> messageQueue = new LinkedBlockingQueue
>(); DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(groupName); consumer.setNamesrvAddr(url); scheduleService = new MQPullConsumerScheduleService(groupName); scheduleService.setMessageModel(MessageModel.CLUSTERING); scheduleService.setDefaultMQPullConsumer(consumer); List
msgList = new ArrayList
(); try { Set
mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); } List
msgList = new ArrayList
(); try { Set
mqs = consumer.fetchSubscribeMessageQueues(queueName); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: " + mq + "%n"); try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); if (pullResult != null) { putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: if (pullResult.getMsgFoundList() != null && pullResult.getMsgFoundList().size() > 0) { for (MessageExt messageExt : pullResult.getMsgFoundList()) { msgList.add(new MessageVo(messageExt.getMsgId(), messageExt.getBody())); } } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; default: break; } } } catch (Exception e) { e.printStackTrace(); }

通过回调的方式来定时调用。这种方式是多线程来实现的。

使用这种方式不能部署两个程序,因为会导致groupName冲突

转载地址:http://ztvta.baihongyu.com/

你可能感兴趣的文章
如何清理mac系统垃圾
查看>>
企业中最佳虚拟机软件应用程序—Parallels Deskto
查看>>
Nginx配置文件详细说明
查看>>
怎么用Navicat Premium图标编辑器创建表
查看>>
Spring配置文件(2)配置方式
查看>>
MariaDB/Mysql 批量插入 批量更新
查看>>
ItelliJ IDEA开发工具使用—创建一个web项目
查看>>
solr-4.10.4部署到tomcat6
查看>>
切片键(Shard Keys)
查看>>
淘宝API-类目
查看>>
virtualbox 笔记
查看>>
Git 常用命令
查看>>
驰骋工作流引擎三种项目集成开发模式
查看>>
SUSE11修改主机名方法
查看>>
jdk6.0 + Tomcat6.0的简单jsp,Servlet,javabean的调试
查看>>
Android:apk签名
查看>>
2(2).选择排序_冒泡(双向循环链表)
查看>>
MySQL 索引 BST树、B树、B+树、B*树
查看>>
微信支付
查看>>
CodeBlocks中的OpenGL
查看>>