Commit 81c94049 authored by xiaoyu's avatar xiaoyu
Browse files

增加RabbitMQ兼容

parent 7d78af1a
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* mq消息中转
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Service
public class MqServiceImpl {
@Autowired private MqModifyMchAppService mqModifyMchAppService;
public void sendModifyMchApp(String mchNo, String appId){
mqModifyMchAppService.send(mchNo, appId);
}
}
......@@ -18,9 +18,11 @@ package com.jeequan.jeepay.mch.mq.topic;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.utils.JsonKit;
import com.jeequan.jeepay.mch.mq.service.MqModifyMchAppService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
......@@ -33,20 +35,17 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
public class MqTopic4ModifyMchApp extends ActiveMQTopic{
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyMchApp extends MqModifyMchAppService {
@Autowired private JmsTemplate jmsTemplate;
public MqTopic4ModifyMchApp(){
super(CS.MQ.TOPIC_MODIFY_MCH_APP);
}
/** 推送消息到各个节点 **/
public void push(String mchNo, String appId) {
@Override
public void send(String mchNo, String appId) {
JSONObject jsonObject = JsonKit.newJson("mchNo", mchNo);
jsonObject.put("appId", appId);
this.jmsTemplate.convertAndSend(this, jsonObject.toString());
this.jmsTemplate.convertAndSend(new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_APP), jsonObject.toString());
}
}
......@@ -20,6 +20,7 @@ import com.jeequan.jeepay.service.impl.SysConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
......@@ -33,6 +34,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifySysConfig extends ActiveMQTopic{
@Autowired private SysConfigService sysConfigService;
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.topic;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.utils.JsonKit;
import com.jeequan.jeepay.mch.mq.service.MqModifyMchAppService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/**
* RabbitMq
* 商户应用修改推送
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifyMchApp extends MqModifyMchAppService {
@Autowired private RabbitTemplate rabbitTemplate;
/** 推送消息到各个节点 **/
@Override
public void send(String mchNo, String appId) {
JSONObject jsonObject = JsonKit.newJson("mchNo", mchNo);
jsonObject.put("appId", appId);
rabbitTemplate.convertAndSend(CS.TOPIC_EXCHANGE, CS.MQ.TOPIC_MODIFY_ISV_INFO, jsonObject.toString());
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* RabbitMq
* 系统信息修改推送
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifySysConfig{
@Autowired private SysConfigService sysConfigService;
/** 接收 更新系统配置项的消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE)
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
}
}
......@@ -51,8 +51,8 @@ spring:
database: 2 #1库:运营平台 #2库:商户系统 #3库:支付网关
password:
#activeMQ配置
activemq:
broker-url: tcp://localhost:61616 #连接地址
# activemq:
# broker-url: tcp://localhost:61616 #连接地址
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
......
......@@ -73,6 +73,12 @@
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 添加对rabbitMQ的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--wx_pay https://github.com/wechat-group/WxJava -->
<dependency>
<groupId>com.github.binarywang</groupId>
......
......@@ -31,7 +31,7 @@ import com.jeequan.jeepay.pay.ctrl.ApiController;
import com.jeequan.jeepay.pay.exception.ChannelException;
import com.jeequan.jeepay.pay.model.IsvConfigContext;
import com.jeequan.jeepay.pay.model.MchAppConfigContext;
import com.jeequan.jeepay.pay.mq.queue.MqQueue4ChannelOrderQuery;
import com.jeequan.jeepay.pay.mq.queue.service.MqServiceImpl;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRQ;
import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRS;
......@@ -63,7 +63,7 @@ public abstract class AbstractPayOrderController extends ApiController {
@Autowired private ConfigContextService configContextService;
@Autowired private PayMchNotifyService payMchNotifyService;
@Autowired private SysConfigService sysConfigService;
@Autowired private MqQueue4ChannelOrderQuery mqChannelOrderQueryQueue;
@Autowired private MqServiceImpl mqService;
/** 统一下单 (新建订单模式) **/
protected ApiRes unifiedOrder(String wayCode, UnifiedOrderRQ bizRQ){
......@@ -323,7 +323,7 @@ public abstract class AbstractPayOrderController extends ApiController {
//判断是否需要轮询查单
if(channelRetMsg.isNeedQuery()){
mqChannelOrderQueryQueue.send(MqQueue4ChannelOrderQuery.buildMsg(payOrderId, 1), 5 * 1000);
mqService.sendChannelOrderQuery(MqServiceImpl.buildMsg(payOrderId, 1), 5 * 1000);
}
}
......
......@@ -17,6 +17,8 @@ package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import com.jeequan.jeepay.pay.mq.queue.service.MqServiceImpl;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.service.impl.PayOrderService;
......@@ -24,10 +26,15 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import javax.jms.TextMessage;
/*
......@@ -41,29 +48,33 @@ import javax.jms.TextMessage;
*/
@Slf4j
@Component
public class MqQueue4ChannelOrderQuery extends ActiveMQQueue{
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqQueue4ChannelOrderQuery extends MqChannelOrderQueryService {
@Autowired private JmsTemplate jmsTemplate;
@Autowired private PayOrderService payOrderService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
@Bean("channelOrderQuery")
public Queue mqQueue4ChannelOrderQuery(){
return new ActiveMQQueue(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY);
}
/** 构造函数 */
public MqQueue4ChannelOrderQuery(){
super(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY);
}
@Lazy
@Autowired
@Qualifier("channelOrderQuery")
private Queue mqQueue4ChannelOrderQuery;
/** 发送MQ消息 **/
@Override
public void send(String msg) {
this.jmsTemplate.convertAndSend(this, msg);
this.jmsTemplate.convertAndSend(mqQueue4ChannelOrderQuery, msg);
}
/** 发送MQ消息 **/
@Override
public void send(String msg, long delay) {
jmsTemplate.send(this, session -> {
jmsTemplate.send(mqQueue4ChannelOrderQuery, session -> {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
......@@ -101,7 +112,7 @@ public class MqQueue4ChannelOrderQuery extends ActiveMQQueue{
//最多查询6次
if(currentCount <= 6){
send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
send(MqServiceImpl.buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
......
......@@ -20,6 +20,7 @@ import cn.hutool.http.HttpUtil;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.MchNotifyRecord;
import com.jeequan.jeepay.pay.mq.config.MqThreadExecutor;
import com.jeequan.jeepay.pay.mq.queue.service.MqPayOrderMchNotifyService;
import com.jeequan.jeepay.service.impl.MchNotifyRecordService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
......@@ -29,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.scheduling.annotation.Async;
......@@ -46,16 +48,17 @@ import javax.jms.TextMessage;
*/
@Slf4j
@Component
public class MqQueue4PayOrderMchNotify {
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
@Bean("mqQueue4PayOrderMchNotifyInner")
@Bean("payOrderMchNotifyInner")
public Queue mqQueue4PayOrderMchNotifyInner(){
return new ActiveMQQueue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY);
}
@Lazy
@Autowired
@Qualifier("mqQueue4PayOrderMchNotifyInner")
@Qualifier("payOrderMchNotifyInner")
private Queue mqQueue4PayOrderMchNotifyInner;
@Autowired private JmsTemplate jmsTemplate;
@Autowired private MchNotifyRecordService mchNotifyRecordService;
......@@ -66,11 +69,13 @@ public class MqQueue4PayOrderMchNotify {
}
/** 发送MQ消息 **/
@Override
public void send(String msg) {
this.jmsTemplate.convertAndSend(mqQueue4PayOrderMchNotifyInner, msg);
}
/** 发送MQ消息 **/
@Override
public void send(String msg, long delay) {
jmsTemplate.send(mqQueue4PayOrderMchNotifyInner, session -> {
TextMessage tm = session.createTextMessage(msg);
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* RabbitMQ
* 上游渠道订单轮询查单
* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。
*
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqQueue4ChannelOrderQuery extends MqChannelOrderQueryService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private PayOrderService payOrderService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
/** 发送MQ消息 **/
@Override
public void send(String msg) {
rabbitTemplate.convertAndSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg);
}
/** 发送MQ消息 **/
@Override
public void send(String msg, long delay) {
rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg, a ->{
a.getMessageProperties().setDelay(Math.toIntExact(delay));
return a;
});
}
/** 接收 更新系统配置项的消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receive(String msg) {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) {
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
//返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//最多查询6次
if(currentCount <= 6){
send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
}
}else{ //其他状态, 不需要再次轮询。
}
return;
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import com.jeequan.jeepay.pay.mq.queue.service.MqPayOrderMchNotifyService;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* RabbitMQ
* 上游渠道订单轮询查单
* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。
*
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private PayOrderService payOrderService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
/** 发送MQ消息 **/
@Override
public void send(String msg) {
rabbitTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg);
}
/** 发送MQ消息 **/
@Override
public void send(String msg, long delay) {
rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg, a ->{
a.getMessageProperties().setDelay(Math.toIntExact(delay));
return a;
});
}
/** 接收 更新系统配置项的消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY)
public void receive(String msg) {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) {
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
//返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//最多查询6次
if(currentCount <= 6){
send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
}
}else{ //其他状态, 不需要再次轮询。
}
return;
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue.service;
import com.jeequan.jeepay.core.constants.CS;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMq
* 延迟消息队列绑定交换机
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Profile(CS.MQTYPE.RABBIT_MQ)
@Configuration
public class DelayedRabbitMqConfig {
@Bean("channelOrderQuery")
public Queue channelOrderQuery() {
return new Queue(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY,true);
}
@Bean("payOrderMchNotify")
public Queue payOrderMchNotify() {
return new Queue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY,true);
}
//Topic交换机 起名:testDirectExchange
@Bean
CustomExchange customExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(CS.DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_CHANNEL_ORDER_QUERY
@Bean
Binding bindingChannelOrderQuery(@Qualifier("channelOrderQuery") Queue channelOrderQuery) {
return BindingBuilder.bind(channelOrderQuery).to(customExchange()).with(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY).noargs();
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_PAYORDER_MCH_NOTIFY
@Bean
Binding bindingPayOrderNotify(@Qualifier("payOrderMchNotify") Queue payOrderMchNotify) {
return BindingBuilder.bind(payOrderMchNotify).to(customExchange()).with(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY).noargs();
}
}
package com.jeequan.jeepay.pay.mq.queue.service;
/**
* RabbitMq
* 通道订单查询
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
public abstract class MqChannelOrderQueryService {
public abstract void send(String msg);
public abstract void send(String msg, long delay);
}
package com.jeequan.jeepay.pay.mq.queue.service;
/**
* RabbitMq
* 商户订单回调
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
public abstract class MqPayOrderMchNotifyService {
public abstract void send(String msg);
public abstract void send(String msg, long delay);
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* mq消息推送
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Service
public class MqServiceImpl {
@Autowired private MqChannelOrderQueryService mqChannelOrderQueryService;
@Autowired private MqPayOrderMchNotifyService mqPayOrderMchNotifyService;
/** 通道订单查询推送 **/
public void sendChannelOrderQuery(String msg){
mqChannelOrderQueryService.send(msg);
}
public void sendChannelOrderQuery(String msg, long delay){
mqChannelOrderQueryService.send(msg, delay);
}
/** 商户订单回调 **/
public void PayOrderMchNotify(String msg){
mqPayOrderMchNotifyService.send(msg);
}
public void PayOrderMchNotify(String msg, long delay){
mqPayOrderMchNotifyService.send(msg, delay);
}
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
}
......@@ -15,7 +15,9 @@
*/
package com.jeequan.jeepay.pay.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.stereotype.Component;
......@@ -30,6 +32,7 @@ import javax.jms.ConnectionFactory;
* @date 2021/6/8 17:31
*/
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class JMSConfig {
/** 新增jmsListenerContainer, 用于接收topic类型的消息 **/
......
......@@ -19,6 +19,7 @@ import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
......@@ -31,6 +32,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyIsvInfo{
@Autowired private ConfigContextService configContextService;
......
......@@ -20,6 +20,7 @@ import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
......@@ -32,6 +33,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyMchInfo{
@Autowired private ConfigContextService configContextService;
......
......@@ -19,6 +19,7 @@ import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
......@@ -31,6 +32,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifySysConfig{
@Autowired private SysConfigService sysConfigService;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment