rocketmq遇到的坑
时间: 2020-06-03来源:OSCHINA
前景提要
记录下搭建rocketmq过程中遇到的坑:(集群机子代号这里列为:mq-a,mq-b,mq-a-s, mq-b-s)
rocketmq搭建的是 双主双从 模式。三台机器,机器 a 、b 分别安装 双主 ,机器c 安装 双从 。 启动三个 nameserver,双主broker-master,双从broker-slave
服务器 rocketmq 是4.7.0版本
1. 第一个坑 - 项目rokcetmq 版本和服务器rocketmq版本没对上 :rocketmq 双主双从 搭建完,从 github 上下载的 rocketmq管理后台,本地跑起来,能成功连上rocketmq。然后自己写了的一个demo,发下报错,报错信息如下:

后来发现是 demo项目于中 pom的 rocketmq 依赖是 4.3.0, 和服务器4.7.0 对不上,然后我项目改成了 4.7.0的版本依赖。然后就ok了

附上 provider生产者的 demo代码 和 consumer消费者 的 demo 代码: ========>>>> provider 生产者代码: public class TestProvider { public static void main(String[] args) { try { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("p1"); // Specify name server addresses. producer.setNamesrvAddr("43.230.143.17:9876;58.82.250.253:9876;58.82.208.238:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 10; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message( "testTopic" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); Thread.sleep(1000); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } catch (Exception e) { e.printStackTrace(); } } } ========>>>> consumer 消费者代码: public class TestConsumer { public static void main(String[] args) { try { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("c1"); // Specify name server addresses. consumer.setNamesrvAddr("58.82.250.253:9876;43.230.143.17:9876;58.82.208.238:9876"); // Subscribe one more more topics to consume. consumer.subscribe("testTopic", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener((MessageListenerConcurrently)(msgs, context) -> { // System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs.toString()); System.out.println(" Receive New Messages: " + Arrays.toString(msgs.toArray())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } catch (Exception e) { e.printStackTrace(); } } }

科技资讯:

科技学院:

科技百科:

科技书籍:

网站大全:

软件大全:

热门排行