Skip to content

Commit d0bb885

Browse files
committed
SpringBoot 2.0集成Rocketmq测试实时、延迟、条件过滤队列
1 parent 7474fc0 commit d0bb885

16 files changed

+647
-28
lines changed

SpringBoot-Rocketmq/.gitignore

+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
.gradle
2+
/build/
3+
!gradle/wrapper/gradle-wrapper.jar
4+
5+
### STS ###
6+
.apt_generated
7+
.classpath
8+
.factorypath
9+
.project
10+
.settings
11+
.springBeans
12+
.sts4-cache
13+
14+
### IntelliJ IDEA ###
15+
.idea
16+
*.iws
17+
*.iml
18+
*.ipr
19+
/out/
20+
21+
### NetBeans ###
22+
/nbproject/private/
23+
/nbbuild/
24+
/dist/
25+
/nbdist/
26+
/.nb-gradle/
27+
28+
### VS Code ###
29+
.vscode/

SpringBoot-Rocketmq/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
### SpringBoot 集成 RocketMq
2+
3+

SpringBoot-Rocketmq/build.gradle

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
dependencies {
2+
compile project(':SpringBoot-Utils')
3+
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
4+
compile 'com.alibaba:fastjson:1.2.47'
5+
testCompile('org.springframework.boot:spring-boot-starter-test:2.0.4.RELEASE')
6+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.dashuai.learning.rocketmq;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class RocketmqApplication {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(RocketmqApplication.class, args);
11+
}
12+
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package com.dashuai.learning.rocketmq.config;
2+
3+
import com.dashuai.learning.rocketmq.mq.Producer;
4+
import com.dashuai.learning.rocketmq.mq.delay.DelayProducer;
5+
import com.dashuai.learning.rocketmq.mq.filter.FilterProducer;
6+
import org.apache.rocketmq.client.exception.MQClientException;
7+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
8+
import org.springframework.beans.factory.annotation.Value;
9+
import org.springframework.context.annotation.Bean;
10+
import org.springframework.context.annotation.Configuration;
11+
12+
/**
13+
* Created in 2019.04.03
14+
*
15+
* @author Liaozihong
16+
*/
17+
@Configuration
18+
public class RocketmqConfiguration {
19+
20+
@Value("${apache.rocketmq.namesrvAddr}")
21+
private String namesrvAddr;
22+
23+
/**
24+
* Mq producer default mq producer.
25+
*
26+
* @return the default mq producer
27+
*/
28+
@Bean
29+
public DefaultMQProducer mqProducer() {
30+
//生产者的组名
31+
DefaultMQProducer producer = new DefaultMQProducer("testGroup1");
32+
//指定NameServer地址,多个地址以 ; 隔开
33+
producer.setNamesrvAddr(namesrvAddr);
34+
producer.setVipChannelEnabled(false);
35+
try {
36+
producer.start();
37+
System.out.println("-------->:producer启动了");
38+
} catch (MQClientException e) {
39+
e.printStackTrace();
40+
}
41+
return producer;
42+
}
43+
44+
/**
45+
* Producer producer.
46+
*
47+
* @return the producer
48+
*/
49+
@Bean
50+
public Producer producer() {
51+
return new Producer(mqProducer());
52+
}
53+
54+
@Bean
55+
public DelayProducer delayProducer() {
56+
return new DelayProducer(mqProducer());
57+
}
58+
59+
@Bean
60+
public FilterProducer filterProducer() {
61+
return new FilterProducer(mqProducer());
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.dashuai.learning.rocketmq.controller;
2+
3+
import com.dashuai.learning.rocketmq.mq.Producer;
4+
import com.dashuai.learning.rocketmq.mq.delay.DelayProducer;
5+
import com.dashuai.learning.rocketmq.mq.filter.FilterProducer;
6+
import com.dashuai.learning.utils.result.ApiResult;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.web.bind.annotation.RequestMapping;
9+
import org.springframework.web.bind.annotation.RequestParam;
10+
import org.springframework.web.bind.annotation.RestController;
11+
12+
/**
13+
* Created in 2019.04.03
14+
*
15+
* @author Liaozihong
16+
*/
17+
@RestController
18+
public class TestController {
19+
/**
20+
* The Producer.
21+
*/
22+
@Autowired
23+
Producer producer;
24+
25+
@Autowired
26+
DelayProducer delayProducer;
27+
28+
@Autowired
29+
FilterProducer filterProducer;
30+
31+
/**
32+
* Test send api result.
33+
*
34+
* @param msg the msg
35+
* @return the api result
36+
*/
37+
@RequestMapping(value = "/send")
38+
public ApiResult testSend(@RequestParam String msg) {
39+
String result = producer.sendMsg("PushTopic", "push", msg);
40+
return ApiResult.prepare().success(result);
41+
}
42+
43+
@RequestMapping(value = "/sendDelay")
44+
public ApiResult testSendDelay(String msg) {
45+
String result = delayProducer.sendDelayMsg(msg);
46+
return ApiResult.prepare().success(result);
47+
}
48+
49+
@RequestMapping(value = "/sendFilter")
50+
public ApiResult testSendFilter(String msg) {
51+
String result = filterProducer.sendConditionMsg(msg);
52+
return ApiResult.prepare().success(result);
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package com.dashuai.learning.rocketmq.mq;
2+
3+
4+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6+
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7+
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
8+
import org.apache.rocketmq.common.message.MessageExt;
9+
import org.springframework.beans.factory.annotation.Value;
10+
import org.springframework.boot.CommandLineRunner;
11+
import org.springframework.stereotype.Component;
12+
13+
import java.nio.charset.StandardCharsets;
14+
import java.util.Date;
15+
16+
@Component
17+
public class Consumer implements CommandLineRunner {
18+
/**
19+
* 消费者
20+
*/
21+
@Value("${apache.rocketmq.consumer.pushConsumer}")
22+
private String pushConsumer;
23+
24+
/**
25+
* NameServer 地址
26+
*/
27+
@Value("${apache.rocketmq.namesrvAddr}")
28+
private String namesrvAddr;
29+
30+
31+
/**
32+
* 初始化RocketMq的监听信息,渠道信息
33+
*/
34+
public void messageListener() {
35+
36+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(pushConsumer);
37+
38+
consumer.setNamesrvAddr(namesrvAddr);
39+
try {
40+
41+
// 订阅PushTopic下Tag为push的消息,都订阅消息
42+
consumer.subscribe("PushTopic", "push");
43+
44+
// 程序第一次启动从消息队列头获取数据
45+
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
46+
//可以修改每次消费消息的数量,默认设置是每次消费一条
47+
consumer.setConsumeMessageBatchMaxSize(1);
48+
49+
//在此监听中消费信息,并返回消费的状态信息
50+
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
51+
52+
// 会把不同的消息分别放置到不同的队列中
53+
for (MessageExt message : msgs) {
54+
System.out.println("Receive message[msgId=" + message.getMsgId() + "] , 发送时间:"
55+
+ (new Date(message.getStoreTimestamp())) + "消息体:"
56+
+ new String(message.getBody(), StandardCharsets.UTF_8) + ",以消费!");
57+
}
58+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
59+
});
60+
61+
consumer.start();
62+
63+
} catch (Exception e) {
64+
e.printStackTrace();
65+
}
66+
}
67+
68+
@Override
69+
public void run(String... args) {
70+
this.messageListener();
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package com.dashuai.learning.rocketmq.mq;
2+
3+
4+
import org.apache.rocketmq.client.exception.MQBrokerException;
5+
import org.apache.rocketmq.client.exception.MQClientException;
6+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
7+
import org.apache.rocketmq.client.producer.SendResult;
8+
import org.apache.rocketmq.common.message.Message;
9+
import org.apache.rocketmq.remoting.common.RemotingHelper;
10+
import org.apache.rocketmq.remoting.exception.RemotingException;
11+
import org.springframework.util.StopWatch;
12+
13+
import java.io.UnsupportedEncodingException;
14+
import java.util.Optional;
15+
16+
/**
17+
* Created in 2019.04.03
18+
*
19+
* @author Liaozihong
20+
*/
21+
public class Producer {
22+
private DefaultMQProducer mqProducer;
23+
24+
/**
25+
* Instantiates a new Producer.
26+
*
27+
* @param mqProducer the mq producer
28+
*/
29+
public Producer(DefaultMQProducer mqProducer) {
30+
this.mqProducer = mqProducer;
31+
}
32+
33+
/**
34+
* Send msg string.
35+
*
36+
* @param topic the topic
37+
* @param tags the tags
38+
* @param body the body
39+
* @return the string
40+
*/
41+
public String sendMsg(String topic, String tags, String body) {
42+
Message message = null;
43+
try {
44+
message = new Message(topic, tags, body.getBytes(RemotingHelper.DEFAULT_CHARSET));
45+
} catch (UnsupportedEncodingException e) {
46+
e.printStackTrace();
47+
}
48+
StopWatch stop = new StopWatch();
49+
stop.start();
50+
SendResult result = null;
51+
try {
52+
result = mqProducer.send(message);
53+
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
54+
e.printStackTrace();
55+
}
56+
System.out.println("发送响应:MsgId:" + Optional.ofNullable(result).map(SendResult::getMsgId).orElse(null) +
57+
",发送状态:" + Optional.ofNullable(result).map(SendResult::getSendStatus).orElse(null));
58+
stop.stop();
59+
return "{\"MsgId\":\"" + Optional.ofNullable(result).map(SendResult::getMsgId).orElse(null) + "\"}";
60+
}
61+
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.dashuai.learning.rocketmq.mq.delay;
2+
3+
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
4+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
5+
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
6+
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
7+
import org.apache.rocketmq.client.exception.MQClientException;
8+
import org.apache.rocketmq.common.message.MessageExt;
9+
import org.springframework.beans.factory.annotation.Value;
10+
import org.springframework.boot.CommandLineRunner;
11+
import org.springframework.stereotype.Component;
12+
13+
import java.nio.charset.StandardCharsets;
14+
import java.util.Date;
15+
import java.util.List;
16+
17+
/**
18+
* Created in 2019.04.04
19+
*
20+
* @author Liaozihong
21+
*/
22+
@Component
23+
public class DelayConsumer implements CommandLineRunner {
24+
/**
25+
* NameServer 地址
26+
*/
27+
@Value("${apache.rocketmq.namesrvAddr}")
28+
private String namesrvAddr;
29+
30+
/**
31+
* Delay mq consumer.
32+
*
33+
* @throws MQClientException the mq client exception
34+
*/
35+
public void delayMqConsumer() throws MQClientException {
36+
// Instantiate message consumer
37+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
38+
39+
consumer.setNamesrvAddr(namesrvAddr);
40+
// Subscribe topics
41+
consumer.subscribe("TestTopic", "*");
42+
// Register message listener
43+
consumer.registerMessageListener(new MessageListenerConcurrently() {
44+
@Override
45+
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
46+
for (MessageExt message : messages) {
47+
System.out.println("Receive message[msgId=" + message.getMsgId() + "] , 发送时间:"
48+
+ (new Date(message.getStoreTimestamp())) + "消息体:"
49+
+ new String(message.getBody(), StandardCharsets.UTF_8) + ",以消费!");
50+
}
51+
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
52+
}
53+
});
54+
consumer.start();
55+
}
56+
57+
@Override
58+
public void run(String... args) throws Exception {
59+
delayMqConsumer();
60+
}
61+
}

0 commit comments

Comments
 (0)