Commit 6930f5f6 authored by terrfly's avatar terrfly
Browse files

升级rocketMQ版本, 添加新版MQ处理方式;

parent c201d5f0
......@@ -51,23 +51,23 @@ spring:
timeout: 1000
password:
#activeMQ配置
# #activeMQ配置 ( 注意: activeMQ配置项需在spring的下级 )
activemq:
broker-url: tcp://localhost:61616 #连接地址
#
# #rabbitmq配置 ( 注意: rabbitmq配置项需在spring的下级 )
# rabbitmq:
# addresses: 127.0.0.1:5672
# username: guest
# password: guest
# dynamic: true
# virtual-host: /
# rabbitmq配置
# rabbitmq:
# addresses: 127.0.0.1:5672
# username: guest
# password: guest
# dynamic: true
# virtual-host: /
#rocketmq配置
# rocketmq:
# name-server: 127.0.0.1:9876
# producer:
# group: rocket-group
## rocketmq配置 ( 注意:rocketmq配置项请放置到根目录, 不是spring的二级配置! )
#rocketmq:
# name-server: 127.0.0.1:9876
# producer:
# group: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
......@@ -101,5 +101,5 @@ isys:
access-key-secret: SECRET_SECRET_SECRET #AccessKeySecret
mq:
vender: rabbitMQ # activeMQ rabbitMQ rocketMQ
vender: activeMQ # activeMQ rabbitMQ rocketMQ
......@@ -35,7 +35,7 @@
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<!-- 添加对rabbitMQ的支持 -->
......@@ -45,6 +45,13 @@
<scope>provided</scope>
</dependency>
<!-- 添加对rocketMQ的支持 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
......
......@@ -17,13 +17,13 @@ package com.jeequan.jeepay.components.mq.constant;
/**
* 定义MQ消息类型:
* QUEUE - 点对点 (只有1个消费者可消费)
* TOPIC - 订阅模式 (所有接收者都可接收到)
* QUEUE - 点对点 (只有1个消费者可消费。 ActiveMQ的queue模式
* BROADCAST - 订阅模式 (所有接收者都可接收到。 ActiveMQ的topic模式, RabbitMQ的fanout类型的交换机, RocketMQ的广播模式 )
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/23 16:49
*/
public enum MQSendTypeEnum {
QUEUE, TOPIC
QUEUE, BROADCAST
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
*
* 定义MQ消息格式
* 业务场景: [ 清除商户登录信息 ]
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 15:25
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CleanMchLoginAuthCacheMQ extends AbstractMQ {
/** 【!重要配置项!】 定义MQ名称 **/
public static final String MQ_NAME = "QUEUE_CLEAN_MCH_LOGIN_AUTH_CACHE";
/** 内置msg 消息体定义 **/
private MsgPayload payload;
/** 【!重要配置项!】 定义Msg消息载体 **/
@Data
@AllArgsConstructor
public static class MsgPayload {
/** 用户ID集合 **/
private List<Long> userIdList;
}
@Override
public String getMQName() {
return MQ_NAME;
}
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static CleanMchLoginAuthCacheMQ build(List<Long> userIdList){
return new CleanMchLoginAuthCacheMQ(new MsgPayload(userIdList));
}
/** 解析MQ消息, 一般用于接收MQ消息时 **/
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
/** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}
......@@ -19,6 +19,7 @@ import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
......@@ -47,8 +48,8 @@ public class PayOrderMchNotifyMQ extends AbstractMQ {
@AllArgsConstructor
public static class MsgPayload {
/** 支付订单号 **/
private String payOrderId;
/** 通知单号 **/
private Long notifyId;
}
......@@ -60,7 +61,7 @@ public class PayOrderMchNotifyMQ extends AbstractMQ {
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 Topic - 订阅模式
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
......@@ -69,8 +70,8 @@ public class PayOrderMchNotifyMQ extends AbstractMQ {
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static PayOrderMchNotifyMQ build(String payOrderId){
return new PayOrderMchNotifyMQ(new MsgPayload(payOrderId));
public static PayOrderMchNotifyMQ build(Long notifyId){
return new PayOrderMchNotifyMQ(new MsgPayload(notifyId));
}
/** 解析MQ消息, 一般用于接收MQ消息时 **/
......
......@@ -50,6 +50,9 @@ public class PayOrderReissueMQ extends AbstractMQ {
/** 支付订单号 **/
private String payOrderId;
/** 通知次数 **/
private Integer count;
}
@Override
......@@ -60,7 +63,7 @@ public class PayOrderReissueMQ extends AbstractMQ {
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 Topic - 订阅模式
return MQSendTypeEnum.QUEUE; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
......@@ -69,8 +72,8 @@ public class PayOrderReissueMQ extends AbstractMQ {
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static PayOrderReissueMQ build(String payOrderId){
return new PayOrderReissueMQ(new MsgPayload(payOrderId));
public static PayOrderReissueMQ build(String payOrderId, Integer count){
return new PayOrderReissueMQ(new MsgPayload(payOrderId, count));
}
/** 解析MQ消息, 一般用于接收MQ消息时 **/
......
......@@ -37,7 +37,7 @@ import lombok.NoArgsConstructor;
public class ResetAppConfigMQ extends AbstractMQ {
/** 【!重要配置项!】 定义MQ名称 **/
public static final String MQ_NAME = "TOPIC_RESET_APP_CONFIG";
public static final String MQ_NAME = "BROADCAST_RESET_APP_CONFIG";
/** 内置msg 消息体定义 **/
private MsgPayload payload;
......@@ -46,6 +46,9 @@ public class ResetAppConfigMQ extends AbstractMQ {
@Data
@AllArgsConstructor
public static class MsgPayload {
private String groupKey;
}
@Override
......@@ -56,7 +59,7 @@ public class ResetAppConfigMQ extends AbstractMQ {
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.TOPIC; // QUEUE - 点对点 、 Topic - 订阅模式
return MQSendTypeEnum.BROADCAST; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
......@@ -65,8 +68,8 @@ public class ResetAppConfigMQ extends AbstractMQ {
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static ResetAppConfigMQ build(){
return new ResetAppConfigMQ(new MsgPayload());
public static ResetAppConfigMQ build(String groupKey){
return new ResetAppConfigMQ(new MsgPayload(groupKey));
}
/** 解析MQ消息, 一般用于接收MQ消息时 **/
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.model;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
*
* 定义MQ消息格式
* 业务场景: [ 更新服务商/商户/商户应用配置信息 ]
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 15:25
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ResetIsvMchAppInfoConfigMQ extends AbstractMQ {
/** 【!重要配置项!】 定义MQ名称 **/
public static final String MQ_NAME = "BROADCAST_RESET_ISV_MCH_APP_INFO_CONFIG";
/** 内置msg 消息体定义 **/
private MsgPayload payload;
/** 【!重要配置项!】 定义Msg消息载体 **/
@Data
@AllArgsConstructor
public static class MsgPayload {
public enum RESET_TYPE{
ISV_INFO, MCH_INFO, MCH_APP
}
/** 重置类型 **/
private Enum resetType;
/** isvNo **/
private String isvNo;
/** mchNo **/
private String mchNo;
/** appId **/
private String appId;
}
@Override
public String getMQName() {
return MQ_NAME;
}
/** 【!重要配置项!】 **/
@Override
public MQSendTypeEnum getMQType(){
return MQSendTypeEnum.BROADCAST; // QUEUE - 点对点 、 BROADCAST - 广播模式
}
@Override
public String toMessage() {
return JSONObject.toJSONString(payload);
}
/** 【!重要配置项!】 构造MQModel , 一般用于发送MQ时 **/
public static ResetIsvMchAppInfoConfigMQ build(Enum resetType, String isvNo, String mchNo, String appId){
return new ResetIsvMchAppInfoConfigMQ(new MsgPayload(resetType, isvNo, mchNo, appId));
}
/** 解析MQ消息, 一般用于接收MQ消息时 **/
public static MsgPayload parse(String msg){
return JSON.parseObject(msg, MsgPayload.class);
}
/** 定义 IMQReceiver 接口: 项目实现该接口则可接收到对应的业务消息 **/
public interface IMQReceiver{
void receive(MsgPayload payload);
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.CleanMchLoginAuthCacheMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* activeMQ 消息接收器:仅在vender=activeMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 清除商户登录信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
@ConditionalOnBean(CleanMchLoginAuthCacheMQ.IMQReceiver.class)
public class CleanMchLoginAuthCacheActiveMQReceiver implements IMQMsgReceiver {
@Autowired
private CleanMchLoginAuthCacheMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@JmsListener(destination = CleanMchLoginAuthCacheMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg));
}
}
......@@ -27,7 +27,7 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* activeMQ 消息接收器:仅在vender=avtiveMQ时 && 项目实现IMQReceiver接口时 进行实例化
* activeMQ 消息接收器:仅在vender=activeMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 支付订单商户通知
*
* @author terrfly
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.PayOrderReissueMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* activeMQ 消息接收器:仅在vender=activeMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 支付订单补单(一般用于没有回调的接口,比如微信的条码支付)
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
@ConditionalOnBean(PayOrderReissueMQ.IMQReceiver.class)
public class PayOrderReissueActiveMQReceiver implements IMQMsgReceiver {
@Autowired
private PayOrderReissueMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@JmsListener(destination = PayOrderReissueMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(PayOrderReissueMQ.parse(msg));
}
}
......@@ -26,7 +26,7 @@ import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* activeMQ消息接收器:仅在vender=avtiveMQ时 && 项目实现IMQReceiver接口时 进行实例化
* activeMQ消息接收器:仅在vender=activeMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 更新系统配置参数
*
* @author terrfly
......@@ -41,7 +41,7 @@ public class ResetAppConfigActiveMQReceiver implements IMQMsgReceiver {
@Autowired
private ResetAppConfigMQ.IMQReceiver mqReceiver;
/** 接收 【 topic 】 类型的消息 **/
/** 接收 【 MQSendTypeEnum.BROADCAST 广播类型的消息 **/
@JmsListener(destination = ResetAppConfigMQ.MQ_NAME, containerFactory = ActiveMQConfig.TOPIC_LISTENER_CONTAINER)
public void receiveMsg(String msg){
mqReceiver.receive(ResetAppConfigMQ.parse(msg));
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.activemq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.ResetIsvMchAppInfoConfigMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import com.jeequan.jeepay.components.mq.vender.activemq.ActiveMQConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* activeMQ消息接收器:仅在vender=activeMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 更新服务商/商户/商户应用配置信息;
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ACTIVE_MQ)
@ConditionalOnBean(ResetIsvMchAppInfoConfigMQ.IMQReceiver.class)
public class ResetIsvMchAppInfoActiveMQReceiver implements IMQMsgReceiver {
@Autowired
private ResetIsvMchAppInfoConfigMQ.IMQReceiver mqReceiver;
/** 接收 【 MQSendTypeEnum.BROADCAST 】 广播类型的消息 **/
@JmsListener(destination = ResetIsvMchAppInfoConfigMQ.MQ_NAME, containerFactory = ActiveMQConfig.TOPIC_LISTENER_CONTAINER)
public void receiveMsg(String msg){
mqReceiver.receive(ResetIsvMchAppInfoConfigMQ.parse(msg));
}
}
......@@ -76,8 +76,8 @@ public class RabbitMQConfig {
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(amq.getMQName(),
BeanDefinitionBuilder.rootBeanDefinition(Queue.class).addConstructorArgValue(amq.getMQName()).getBeanDefinition());
// topic模式
if(amq.getMQType() == MQSendTypeEnum.TOPIC){
// 广播模式
if(amq.getMQType() == MQSendTypeEnum.BROADCAST){
// 动态注册交换机, 交换机名称/bean名称 = FANOUT_EXCHANGE_NAME_PREFIX + amq.getMQName()
rabbitMQBeanProcessor.beanDefinitionRegistry.registerBeanDefinition(FANOUT_EXCHANGE_NAME_PREFIX +amq.getMQName(),
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.CleanMchLoginAuthCacheMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* rabbitMQ消息接收器:仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 清除商户登录信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(CleanMchLoginAuthCacheMQ.IMQReceiver.class)
public class CleanMchLoginAuthCacheRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private CleanMchLoginAuthCacheMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@RabbitListener(queues = CleanMchLoginAuthCacheMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg));
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.executor.MqThreadExecutor;
import com.jeequan.jeepay.components.mq.model.PayOrderReissueMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* rabbitMQ消息接收器:仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 支付订单补单(一般用于没有回调的接口,比如微信的条码支付)
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(PayOrderReissueMQ.IMQReceiver.class)
public class PayOrderReissueRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private PayOrderReissueMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@RabbitListener(queues = PayOrderReissueMQ.MQ_NAME)
public void receiveMsg(String msg){
mqReceiver.receive(PayOrderReissueMQ.parse(msg));
}
}
......@@ -45,7 +45,7 @@ public class ResetAppConfigRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private ResetAppConfigMQ.IMQReceiver mqReceiver;
/** 接收 【 topic 】 类型的消息
/** 接收 【 MQSendTypeEnum.BROADCAST 广播类型的消息
*
* 注意:
* RabbitMQ的广播模式(fanout)交换机 --》全部的Queue
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rabbitmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.ResetIsvMchAppInfoConfigMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import com.jeequan.jeepay.components.mq.vender.rabbitmq.RabbitMQConfig;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* rabbitMQ消息接收器:仅在vender=rabbitMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 更新服务商/商户/商户应用配置信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.RABBIT_MQ)
@ConditionalOnBean(ResetIsvMchAppInfoConfigMQ.IMQReceiver.class)
public class ResetIsvMchAppInfoRabbitMQReceiver implements IMQMsgReceiver {
@Autowired
private ResetIsvMchAppInfoConfigMQ.IMQReceiver mqReceiver;
/** 接收 【 MQSendTypeEnum.BROADCAST 】 广播类型的消息
*
* 注意:
* RabbitMQ的广播模式(fanout)交换机 --》全部的Queue
* 如果queue包含多个消费者, 【例如,manager和payment的监听器是名称相同的queue下的消费者(Consumers) 】, 两个消费者是工作模式且存在竞争关系, 导致只能一个来消费。
* 解决:
* 每个topic的QUEUE都声明一个FANOUT交换机, 消费者声明一个系统产生的【随机队列】绑定到这个交换机上,然后往交换机发消息,只要绑定到这个交换机上都能收到消息。
* 参考: https://bbs.csdn.net/topics/392509262?list=70088931
*
* **/
@RabbitListener(
bindings = {@QueueBinding(value = @Queue(), // 注意这里不要定义队列名称,系统会随机产生
exchange = @Exchange(name = RabbitMQConfig.FANOUT_EXCHANGE_NAME_PREFIX + ResetIsvMchAppInfoConfigMQ.MQ_NAME,
type = ExchangeTypes.FANOUT ))} )
public void receiveMsg(String msg){
mqReceiver.receive(ResetIsvMchAppInfoConfigMQ.parse(msg));
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rocketmq;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.AbstractMQ;
import com.jeequan.jeepay.components.mq.vender.IMQSender;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
/**
* rocketMQ 消息发送器的实现
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/26 11:52
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ROCKET_MQ)
public class RocketMQSender implements IMQSender {
private static final List<Integer> DELAY_TIME_LEVEL = new ArrayList<>();
static{
// 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
DELAY_TIME_LEVEL.add(1);
DELAY_TIME_LEVEL.add(5);
DELAY_TIME_LEVEL.add(10);
DELAY_TIME_LEVEL.add(30);
DELAY_TIME_LEVEL.add(60 * 1);
DELAY_TIME_LEVEL.add(60 * 2);
DELAY_TIME_LEVEL.add(60 * 3);
DELAY_TIME_LEVEL.add(60 * 4);
DELAY_TIME_LEVEL.add(60 * 5);
DELAY_TIME_LEVEL.add(60 * 6);
DELAY_TIME_LEVEL.add(60 * 7);
DELAY_TIME_LEVEL.add(60 * 8);
DELAY_TIME_LEVEL.add(60 * 9);
DELAY_TIME_LEVEL.add(60 * 10);
DELAY_TIME_LEVEL.add(60 * 20);
DELAY_TIME_LEVEL.add(60 * 30);
DELAY_TIME_LEVEL.add(60 * 60 * 1);
DELAY_TIME_LEVEL.add(60 * 60 * 2);
}
@Autowired private RocketMQTemplate rocketMQTemplate;
@Override
public void send(AbstractMQ mqModel) {
rocketMQTemplate.convertAndSend(mqModel.getMQName(), mqModel.toMessage());
}
@Override
public void send(AbstractMQ mqModel, int delay) {
// RocketMQ不支持自定义延迟时间, 需要根据传入的参数进行最近的匹配。
rocketMQTemplate.syncSend(mqModel.getMQName(), MessageBuilder.withPayload(mqModel.toMessage()).build(),300000, getNearDelayLevel(delay));
}
/** 获取最接近的节点值 **/
private int getNearDelayLevel(int delay){
// 如果包含则直接返回
if(DELAY_TIME_LEVEL.contains(delay)){
return DELAY_TIME_LEVEL.indexOf(delay) + 1;
}
//两个时间的绝对值 - 位置
TreeMap<Integer, Integer> resultMap = new TreeMap<>();
DELAY_TIME_LEVEL.stream().forEach(time -> resultMap.put(Math.abs(delay - time), DELAY_TIME_LEVEL.indexOf(time) + 1));
return resultMap.firstEntry().getValue();
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.components.mq.vender.rocketmq.receive;
import com.jeequan.jeepay.components.mq.constant.MQVenderCS;
import com.jeequan.jeepay.components.mq.model.CleanMchLoginAuthCacheMQ;
import com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* rocketMQ消息接收器:仅在vender=rocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 清除商户登录信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/7/22 17:06
*/
@Component
@ConditionalOnProperty(name = MQVenderCS.YML_VENDER_KEY, havingValue = MQVenderCS.ROCKET_MQ)
@ConditionalOnBean(CleanMchLoginAuthCacheMQ.IMQReceiver.class)
@RocketMQMessageListener(topic = CleanMchLoginAuthCacheMQ.MQ_NAME, consumerGroup = CleanMchLoginAuthCacheMQ.MQ_NAME)
public class CleanMchLoginAuthCacheRocketMQReceiver implements IMQMsgReceiver, RocketMQListener<String> {
@Autowired
private CleanMchLoginAuthCacheMQ.IMQReceiver mqReceiver;
/** 接收 【 queue 】 类型的消息 **/
public void receiveMsg(String msg){
mqReceiver.receive(CleanMchLoginAuthCacheMQ.parse(msg));
}
@Override
public void onMessage(String message) {
this.receiveMsg(message);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment