RocketMQ 常規(guī)消息

2023-07-12 17:23 更新

maven 工程添加庫

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

gradle 工程添加庫

compile 'org.apache.rocketmq:rocketmq-client:4.4.0'

注意:

客戶端版本要和服務(wù)端版本的一致,或者會發(fā)生一些奇怪的問題:

我遇到過版本不一致會發(fā)生,消息無法確認(rèn)消息消費(fèi),也就是說 客戶端已經(jīng)消費(fèi)了,也提交成功了,但是服務(wù)端沒有同步到!

注意: 要到控制臺創(chuàng)建 Topic 隊(duì)列名稱

  • 同步發(fā)送消息(Send Messages Synchronously)
public class SyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new
            DefaultMQProducer("please_rename_unique_group_name");
        // 設(shè)置 name server 服務(wù)地址, 這里是是設(shè)置本機(jī)
        producer.setNamesrvAddr("127.0.0.1:9876");
        // 啟動實(shí)例
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}
  • 異常發(fā)送消息(Send Messages Asynchronously)
public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 設(shè)置 name server 服務(wù)地址, 這里是是設(shè)置本機(jī)
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);

        int messageCount = 100;
        final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }

                    @Override
                    public void onException(Throwable e) {
                        countDownLatch.countDown();
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.shutdown();
    }
}
  • Send Messages in One-way Mode
public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("localhost:9876");
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 100; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            producer.sendOneway(msg);
        }
        //Wait for sending to complete
        Thread.sleep(5000);        
        producer.shutdown();
    }
}

ClientConfig 配置初始化

private static ClientConfig initClientConfig() {

   ClientConfig mClientConfig = new ClientConfig();

  // 客戶端本機(jī) IP 地址,某些機(jī)器會發(fā)生無法識別客戶端IP地址情況,需要應(yīng)用在代碼中強(qiáng)制指定
  // Name Server 地址列表,多個 NameServer 地址用分號 隔開
  mClientConfig.setNamesrvAddr("127.0.0.1:9876");
  // 客戶端實(shí)例名稱,客戶端創(chuàng)建的多個 Producer、 Consumer 實(shí)際是共用一個內(nèi)部實(shí)例(這個實(shí)例包含
  // 網(wǎng)絡(luò)連接、線程資源等),默認(rèn)值:DEFAULT
  mClientConfig.setInstanceName("DEFAULT");
  // 通信層異步回調(diào)線程數(shù),默認(rèn)值4
  mClientConfig.setClientCallbackExecutorThreads(10);
  // 輪詢 Name Server 間隔時(shí)間,單位毫秒,默認(rèn):30000
  // mClientConfig.setPollNameServerInterval(30000);
  // 向 Broker 發(fā)送心跳間隔時(shí)間,單位毫秒,默認(rèn):30000
  mClientConfig.setHeartbeatBrokerInterval(30000);
  // 持久化 Consumer 消費(fèi)進(jìn)度間隔時(shí)間,單位毫秒,默認(rèn):5000
  mClientConfig.setPersistConsumerOffsetInterval(5000);

  return ClientConfig;
}

Producer 初始化

private static void initProducer() {
  try {
    DefaultMQProducer mProducer = new DefaultMQProducer();

    ClientConfig config = initClientConfig();
    mProducer.resetClientConfig(config);
    // 在發(fā)送消息時(shí),自動創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù) 默認(rèn)值 4
    mProducer.setDefaultTopicQueueNums(4);
    // 發(fā)送消息超時(shí)時(shí)間,單位毫秒 : 默認(rèn)值 10000
    mProducer.setSendMsgTimeout(10000);
    // 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節(jié) 默認(rèn)值 4096
    mProducer.setCompressMsgBodyOverHowmuch(4096);
    // 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送 默認(rèn)值 FALSE
    mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);

    mProducer.setProducerGroup(DEFAULT_GROUP);
//			mProducer.setRetryTimesWhenSendAsyncFailed(3);
    mProducer.start();
  } catch (Exception e) {
    LOG.error("init producer error:", e);
  }
}

發(fā)送消息

public boolean sendMessage(String queue, String body, String tags) {
  try {
    if(StringUtils.isEmpty(tags))
    {
      tags = StringUtils.getEmpty();
    }
    Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));

    // Call send message to deliver message to one of brokers.
    SendResult sendResult = mProducer.send(msg);
    if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
      return true;
    }
    LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
  } catch (Exception e) {
    LOG.error("send queue error:", e);
  }
  return false;
}


以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號