maven 工程添加庫
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
gradle 工程添加庫
compile 'org.apache.rocketmq:rocketmq-client:4.4.0'
注意:
客戶端版本要和服務(wù)端版本的一致,或者會發(fā)生一些奇怪的問題:
我遇到過版本不一致會發(fā)生,消息無法確認(rèn)消息消費(fèi),也就是說 客戶端已經(jīng)消費(fèi)了,也提交成功了,但是服務(wù)端沒有同步到!
注意: 要到控制臺創(chuàng)建 Topic 隊(duì)列名稱
public class SyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// 設(shè)置 name server 服務(wù)地址, 這里是是設(shè)置本機(jī)
producer.setNamesrvAddr("127.0.0.1:9876");
// 啟動實(shí)例
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設(shè)置 name server 服務(wù)地址, 這里是是設(shè)置本機(jī)
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//Call send message to deliver message to one of brokers.
producer.sendOneway(msg);
}
//Wait for sending to complete
Thread.sleep(5000);
producer.shutdown();
}
}
ClientConfig 配置初始化
private static ClientConfig initClientConfig() {
ClientConfig mClientConfig = new ClientConfig();
// 客戶端本機(jī) IP 地址,某些機(jī)器會發(fā)生無法識別客戶端IP地址情況,需要應(yīng)用在代碼中強(qiáng)制指定
// Name Server 地址列表,多個 NameServer 地址用分號 隔開
mClientConfig.setNamesrvAddr("127.0.0.1:9876");
// 客戶端實(shí)例名稱,客戶端創(chuàng)建的多個 Producer、 Consumer 實(shí)際是共用一個內(nèi)部實(shí)例(這個實(shí)例包含
// 網(wǎng)絡(luò)連接、線程資源等),默認(rèn)值:DEFAULT
mClientConfig.setInstanceName("DEFAULT");
// 通信層異步回調(diào)線程數(shù),默認(rèn)值4
mClientConfig.setClientCallbackExecutorThreads(10);
// 輪詢 Name Server 間隔時(shí)間,單位毫秒,默認(rèn):30000
// mClientConfig.setPollNameServerInterval(30000);
// 向 Broker 發(fā)送心跳間隔時(shí)間,單位毫秒,默認(rèn):30000
mClientConfig.setHeartbeatBrokerInterval(30000);
// 持久化 Consumer 消費(fèi)進(jìn)度間隔時(shí)間,單位毫秒,默認(rèn):5000
mClientConfig.setPersistConsumerOffsetInterval(5000);
return ClientConfig;
}
Producer 初始化
private static void initProducer() {
try {
DefaultMQProducer mProducer = new DefaultMQProducer();
ClientConfig config = initClientConfig();
mProducer.resetClientConfig(config);
// 在發(fā)送消息時(shí),自動創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù) 默認(rèn)值 4
mProducer.setDefaultTopicQueueNums(4);
// 發(fā)送消息超時(shí)時(shí)間,單位毫秒 : 默認(rèn)值 10000
mProducer.setSendMsgTimeout(10000);
// 消息Body超過多大開始壓縮(Consumer收到消息會自動解壓縮),單位字節(jié) 默認(rèn)值 4096
mProducer.setCompressMsgBodyOverHowmuch(4096);
// 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送 默認(rèn)值 FALSE
mProducer.setRetryAnotherBrokerWhenNotStoreOK(false);
mProducer.setProducerGroup(DEFAULT_GROUP);
// mProducer.setRetryTimesWhenSendAsyncFailed(3);
mProducer.start();
} catch (Exception e) {
LOG.error("init producer error:", e);
}
}
發(fā)送消息
public boolean sendMessage(String queue, String body, String tags) {
try {
if(StringUtils.isEmpty(tags))
{
tags = StringUtils.getEmpty();
}
Message msg = new Message(queue, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
// Call send message to deliver message to one of brokers.
SendResult sendResult = mProducer.send(msg);
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
return true;
}
LOG.warn("send queue error : " + FastJsonHelper.jsonEncode(sendResult));
} catch (Exception e) {
LOG.error("send queue error:", e);
}
return false;
}
更多建議: