Commit 8a2c815d authored by 大森林's avatar 大森林 Committed by Gitee
Browse files

!8 优化阿里云RocketMQ配置属性,修复Message无法消费到的问题

Merge pull request !8 from 皮生/dev
parents 29416263 4161f0f5
...@@ -81,7 +81,6 @@ spring: ...@@ -81,7 +81,6 @@ spring:
# namesrvAddr: 127.0.0.1:9876 # namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey # accessKey: accessKey
# secretKey: secretKey # secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP # producerId: JEEPAY-GROUP
#日志配置参数。 #日志配置参数。
......
...@@ -91,7 +91,8 @@ spring: ...@@ -91,7 +91,8 @@ spring:
# namesrvAddr: 127.0.0.1:9876 # namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey # accessKey: accessKey
# secretKey: secretKey # secretKey: secretKey
# consumerId: JEEPAY-GROUP # consumerId: JEEPAY-GROUP-MGR
# broadcastConsumerId: JEEPAY-GROUP-MGR-BROADCAST # 广播模式消费者ID
# producerId: JEEPAY-GROUP # producerId: JEEPAY-GROUP
#日志配置参数。 #日志配置参数。
......
...@@ -91,7 +91,8 @@ spring: ...@@ -91,7 +91,8 @@ spring:
# namesrvAddr: 127.0.0.1:9876 # namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey # accessKey: accessKey
# secretKey: secretKey # secretKey: secretKey
# consumerId: JEEPAY-GROUP # consumerId: JEEPAY-GROUP-MCH
# broadcastConsumerId: JEEPAY-GROUP-MCH-BROADCAST # 广播模式消费者ID
# producerId: JEEPAY-GROUP # producerId: JEEPAY-GROUP
#日志配置参数。 #日志配置参数。
......
...@@ -91,9 +91,11 @@ spring: ...@@ -91,9 +91,11 @@ spring:
# namesrvAddr: 127.0.0.1:9876 # namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey # accessKey: accessKey
# secretKey: secretKey # secretKey: secretKey
# consumerId: JEEPAY-GROUP # consumerId: JEEPAY-GROUP-PAY
# broadcastConsumerId: JEEPAY-GROUP-PAY-BROADCAST # 广播模式消费者ID
# producerId: JEEPAY-GROUP # producerId: JEEPAY-GROUP
#日志配置参数。 #日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。 # 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。 # 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。
......
...@@ -24,6 +24,8 @@ public class AliYunRocketMQFactory { ...@@ -24,6 +24,8 @@ public class AliYunRocketMQFactory {
private String secretKey; private String secretKey;
@Value("${aliyun-rocketmq.consumerId}") @Value("${aliyun-rocketmq.consumerId}")
private String consumerId; private String consumerId;
@Value("${aliyun-rocketmq.broadcastConsumerId}")
private String broadcastConsumerId;
@Value("${aliyun-rocketmq.producerId}") @Value("${aliyun-rocketmq.producerId}")
private String producerId; private String producerId;
...@@ -33,7 +35,7 @@ public class AliYunRocketMQFactory { ...@@ -33,7 +35,7 @@ public class AliYunRocketMQFactory {
properties.put(PropertyKeyConst.ProducerId, producerId); properties.put(PropertyKeyConst.ProducerId, producerId);
properties.put(PropertyKeyConst.AccessKey, accessKey); properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.SecretKey, secretKey);
// 判断是否为空(生产环境走k8s集群公共配置,不获取本地配置文件的值) // 判断是否为空(生产环境走k8s集群环境变量自动注入,不获取本地配置文件的值)
if (StringUtils.isNotEmpty(namesrvAddr)) { if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
} }
...@@ -46,7 +48,7 @@ public class AliYunRocketMQFactory { ...@@ -46,7 +48,7 @@ public class AliYunRocketMQFactory {
properties.put(PropertyKeyConst.ConsumerId, consumerId); properties.put(PropertyKeyConst.ConsumerId, consumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey); properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.SecretKey, secretKey);
// 判断是否为空(生产环境走k8s集群公共配置,不获取本地配置文件的值) // 判断是否为空(生产环境走k8s集群环境变量自动注入,不获取本地配置文件的值)
if (StringUtils.isNotEmpty(namesrvAddr)) { if (StringUtils.isNotEmpty(namesrvAddr)) {
properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr); properties.put(PropertyKeyConst.NAMESRV_ADDR, namesrvAddr);
} }
...@@ -56,7 +58,7 @@ public class AliYunRocketMQFactory { ...@@ -56,7 +58,7 @@ public class AliYunRocketMQFactory {
@Bean(name = "broadcastConsumerClient") @Bean(name = "broadcastConsumerClient")
public Consumer broadcastConsumerClient() { public Consumer broadcastConsumerClient() {
Properties properties = new Properties(); Properties properties = new Properties();
properties.put(PropertyKeyConst.ConsumerId, consumerId); properties.put(PropertyKeyConst.ConsumerId, broadcastConsumerId);
properties.put(PropertyKeyConst.AccessKey, accessKey); properties.put(PropertyKeyConst.AccessKey, accessKey);
properties.put(PropertyKeyConst.SecretKey, secretKey); properties.put(PropertyKeyConst.SecretKey, secretKey);
// 广播订阅方式设置 // 广播订阅方式设置
......
...@@ -39,29 +39,10 @@ import java.util.TreeMap; ...@@ -39,29 +39,10 @@ import java.util.TreeMap;
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ) @ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ALIYUN_ROCKET_MQ)
public class AliYunRocketMQSender implements IMQSender { public class AliYunRocketMQSender implements IMQSender {
private static final List<Integer> DELAY_TIME_LEVEL = new ArrayList<>(); /** 最大延迟24小时 */
private static final int MAX_DELAY_TIME = 60 * 60 * 24;
static { /** 最小延迟1秒 */
// 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h private static final int MIN_DELAY_TIME = 1;
DELAY_TIME_LEVEL.add(1);
DELAY_TIME_LEVEL.add(5);
DELAY_TIME_LEVEL.add(10);
DELAY_TIME_LEVEL.add(30);
DELAY_TIME_LEVEL.add(60 * 1);
DELAY_TIME_LEVEL.add(60 * 2);
DELAY_TIME_LEVEL.add(60 * 3);
DELAY_TIME_LEVEL.add(60 * 4);
DELAY_TIME_LEVEL.add(60 * 5);
DELAY_TIME_LEVEL.add(60 * 6);
DELAY_TIME_LEVEL.add(60 * 7);
DELAY_TIME_LEVEL.add(60 * 8);
DELAY_TIME_LEVEL.add(60 * 9);
DELAY_TIME_LEVEL.add(60 * 10);
DELAY_TIME_LEVEL.add(60 * 20);
DELAY_TIME_LEVEL.add(60 * 30);
DELAY_TIME_LEVEL.add(60 * 60 * 1);
DELAY_TIME_LEVEL.add(60 * 60 * 2);
}
@Autowired @Autowired
private AliYunRocketMQFactory aliYunRocketMQFactory; private AliYunRocketMQFactory aliYunRocketMQFactory;
...@@ -77,7 +58,7 @@ public class AliYunRocketMQSender implements IMQSender { ...@@ -77,7 +58,7 @@ public class AliYunRocketMQSender implements IMQSender {
public void send(AbstractMQ mqModel, int delaySeconds) { public void send(AbstractMQ mqModel, int delaySeconds) {
Message message = new Message(mqModel.getMQName(), AliYunRocketMQFactory.defaultTag, mqModel.toMessage().getBytes()); Message message = new Message(mqModel.getMQName(), AliYunRocketMQFactory.defaultTag, mqModel.toMessage().getBytes());
if (delaySeconds > 0) { if (delaySeconds > 0) {
long delayTime = System.currentTimeMillis() + getNearDelayLevel(delaySeconds) * 1000; long delayTime = System.currentTimeMillis() + delayTimeCorrector(delaySeconds) * 1000;
// 设置消息需要被投递的时间。 // 设置消息需要被投递的时间。
message.setStartDeliverTime(delayTime); message.setStartDeliverTime(delayTime);
} }
...@@ -94,17 +75,16 @@ public class AliYunRocketMQSender implements IMQSender { ...@@ -94,17 +75,16 @@ public class AliYunRocketMQSender implements IMQSender {
} }
/** /**
* 获取最接近的节点值 * 检查延迟时间的有效性并返回校正后的延迟时间
**/ **/
private int getNearDelayLevel(int delay) { private int delayTimeCorrector(int delay) {
// 如果包含则直接返回 if (delay < MIN_DELAY_TIME) {
if (DELAY_TIME_LEVEL.contains(delay)) { return MIN_DELAY_TIME;
return DELAY_TIME_LEVEL.indexOf(delay) + 1; }
if (delay > MAX_DELAY_TIME) {
return MAX_DELAY_TIME;
} }
//两个时间的绝对值 - 位置 return delay;
TreeMap<Integer, Integer> resultMap = new TreeMap<>();
DELAY_TIME_LEVEL.stream().forEach(time -> resultMap.put(Math.abs(delay - time), DELAY_TIME_LEVEL.indexOf(time) + 1));
return resultMap.firstEntry().getValue();
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment