Commit 010d8799 authored by xiaoyu's avatar xiaoyu
Browse files

mq优化

parent a4ed4566
......@@ -16,7 +16,7 @@
package com.jeequan.jeepay.mch.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import com.jeequan.jeepay.mch.mq.service.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifySysConfig extends ActiveMQTopic{
@Autowired private SysConfigService sysConfigService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
public MqTopic4ModifySysConfig(){
super(CS.MQ.TOPIC_MODIFY_SYS_CONFIG);
......@@ -46,10 +46,7 @@ public class MqTopic4ModifySysConfig extends ActiveMQTopic{
/** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer")
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
mqReceiveServiceImpl.initDbConfig(msg);
}
......
......@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifyMchApp extends MqModifyMchAppService {
public class RabbitMqDirect4ModifyMchApp extends MqModifyMchAppService {
@Autowired private RabbitTemplate rabbitTemplate;
......@@ -47,7 +47,7 @@ public class RabbitMqTopic4ModifyMchApp extends MqModifyMchAppService {
JSONObject jsonObject = JsonKit.newJson("mchNo", mchNo);
jsonObject.put("appId", appId);
rabbitTemplate.convertAndSend(CS.TOPIC_EXCHANGE, CS.MQ.TOPIC_MODIFY_ISV_INFO, jsonObject.toString());
rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_MCH_APP, jsonObject.toString());
}
}
......@@ -16,8 +16,11 @@
package com.jeequan.jeepay.mch.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import com.jeequan.jeepay.mch.mq.service.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
......@@ -33,17 +36,14 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifySysConfig{
public class RabbitMqDirect4ModifySysConfig {
@Autowired private SysConfigService sysConfigService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 更新系统配置项的消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE)
@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))})
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
mqReceiveServiceImpl.initDbConfig(msg);
}
......
......@@ -31,6 +31,7 @@ import com.jeequan.jeepay.pay.ctrl.ApiController;
import com.jeequan.jeepay.pay.exception.ChannelException;
import com.jeequan.jeepay.pay.model.IsvConfigContext;
import com.jeequan.jeepay.pay.model.MchAppConfigContext;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.queue.service.MqServiceImpl;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRQ;
......@@ -63,7 +64,9 @@ public abstract class AbstractPayOrderController extends ApiController {
@Autowired private ConfigContextService configContextService;
@Autowired private PayMchNotifyService payMchNotifyService;
@Autowired private SysConfigService sysConfigService;
@Autowired private MqServiceImpl mqService;
@Autowired private MqServiceImpl mqServiceImpl;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 统一下单 (新建订单模式) **/
protected ApiRes unifiedOrder(String wayCode, UnifiedOrderRQ bizRQ){
......@@ -323,7 +326,7 @@ public abstract class AbstractPayOrderController extends ApiController {
//判断是否需要轮询查单
if(channelRetMsg.isNeedQuery()){
mqService.sendChannelOrderQuery(MqServiceImpl.buildMsg(payOrderId, 1), 5 * 1000);
mqServiceImpl.sendChannelOrderQuery(mqReceiveServiceImpl.buildMsg(payOrderId, 1), 5 * 1000);
}
}
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq;
import cn.hutool.http.HttpException;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.entity.MchNotifyRecord;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.queue.MqQueue4ChannelOrderQuery;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import com.jeequan.jeepay.service.impl.MchNotifyRecordService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import com.jeequan.jeepay.service.impl.SysConfigService;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 处理公共接收消息方法
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Service
public class MqReceiveServiceImpl {
@Autowired private SysConfigService sysConfigService;
@Autowired private ConfigContextService configContextService;
@Autowired private PayOrderService payOrderService;
@Autowired private MchNotifyRecordService mchNotifyRecordService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
@Autowired private MqQueue4ChannelOrderQuery mqQueue4ChannelOrderQuery;
public void modifyMchInfo(String mchNo) {
log.info("接收 [商户配置信息] 的消息, msg={}", mchNo);
configContextService.initMchInfoConfigContext(mchNo);
}
public void modifyMchApp(String mchNoAndAppId) {
log.info("接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId);
JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId);
configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId"));
}
public void modifyIsvInfo(String isvNo) {
log.info("重置ISV信息, msg={}", isvNo);
configContextService.initIsvConfigContext(isvNo);
}
public Integer payOrderMchNotify(String msg) {
log.info("接收商户通知MQ, msg={}", msg);
Long notifyId = Long.parseLong(msg);
MchNotifyRecord record = mchNotifyRecordService.getById(notifyId);
if(record == null || record.getState() != MchNotifyRecord.STATE_ING){
log.info("查询通知记录不存在或状态不是通知中");
return null;
}
if( record.getNotifyCount() >= record.getNotifyCountLimit() ){
log.info("已达到最大发送次数");
return null;
}
//1. (发送结果最多6次)
Integer currentCount = record.getNotifyCount() + 1;
String notifyUrl = record.getNotifyUrl();
String res = "";
try {
res = HttpUtil.createPost(notifyUrl).timeout(20000).execute().body();
} catch (HttpException e) {
log.error("http error", e);
}
if(currentCount == 1){ //第一次通知: 更新为已通知
payOrderService.updateNotifySent(record.getOrderId());
}
//通知成功
if("SUCCESS".equalsIgnoreCase(res)){
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_SUCCESS, res);
}
//通知次数 >= 最大通知次数时, 更新响应结果为异常, 不在继续延迟发送消息
if( currentCount >= record.getNotifyCountLimit() ){
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_FAIL, res);
}
// 继续发送MQ 延迟发送
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_ING, res);
return currentCount;
}
public void channelOrderQuery(String msg) {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) {
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
//返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//最多查询6次
if(currentCount <= 6){
mqQueue4ChannelOrderQuery.send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
}
}else{ //其他状态, 不需要再次轮询。
}
return;
}
public void initDbConfig(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
}
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
}
......@@ -16,12 +16,8 @@
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import com.jeequan.jeepay.pay.mq.queue.service.MqServiceImpl;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
......@@ -52,8 +48,7 @@ import javax.jms.TextMessage;
public class MqQueue4ChannelOrderQuery extends MqChannelOrderQueryService {
@Autowired private JmsTemplate jmsTemplate;
@Autowired private PayOrderService payOrderService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
@Bean("activeChannelOrderQuery")
public Queue mqQueue4ChannelOrderQuery(){
......@@ -84,45 +79,10 @@ public class MqQueue4ChannelOrderQuery extends MqChannelOrderQueryService {
}
/** 接收 更新系统配置项的消息 **/
/** 接收 查单消息 **/
@JmsListener(destination = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receive(String msg) {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) {
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
//返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//最多查询6次
if(currentCount <= 6){
send(MqServiceImpl.buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
}
}else{ //其他状态, 不需要再次轮询。
}
return;
mqReceiveServiceImpl.channelOrderQuery(msg);
}
......
......@@ -15,14 +15,10 @@
*/
package com.jeequan.jeepay.pay.mq.queue;
import cn.hutool.http.HttpException;
import cn.hutool.http.HttpUtil;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.MchNotifyRecord;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.config.MqThreadExecutor;
import com.jeequan.jeepay.pay.mq.queue.service.MqPayOrderMchNotifyService;
import com.jeequan.jeepay.service.impl.MchNotifyRecordService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
......@@ -61,8 +57,7 @@ public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
@Qualifier("activePayOrderMchNotifyInner")
private Queue mqQueue4PayOrderMchNotifyInner;
@Autowired private JmsTemplate jmsTemplate;
@Autowired private MchNotifyRecordService mchNotifyRecordService;
@Autowired private PayOrderService payOrderService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
public MqQueue4PayOrderMchNotify(){
super();
......@@ -86,56 +81,12 @@ public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
});
}
/** 接收 更新系统配置项的消息 **/
/** 接收 支付订单商户回调消息 **/
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@JmsListener(destination = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY)
public void receive(String msg) {
log.info("接收商户通知MQ, msg={}", msg);
Long notifyId = Long.parseLong(msg);
MchNotifyRecord record = mchNotifyRecordService.getById(notifyId);
if(record == null || record.getState() != MchNotifyRecord.STATE_ING){
log.info("查询通知记录不存在或状态不是通知中");
return ;
}
if( record.getNotifyCount() >= record.getNotifyCountLimit() ){
log.info("已达到最大发送次数");
return ;
}
//1. (发送结果最多6次)
Integer currentCount = record.getNotifyCount() + 1;
String notifyUrl = record.getNotifyUrl();
String res = "";
try {
res = HttpUtil.createPost(notifyUrl).timeout(20000).execute().body();
} catch (HttpException e) {
log.error("http error", e);
}
if(currentCount == 1){ //第一次通知: 更新为已通知
payOrderService.updateNotifySent(record.getOrderId());
}
//通知成功
if("SUCCESS".equalsIgnoreCase(res)){
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_SUCCESS, res);
return ;
}
//通知次数 >= 最大通知次数时, 更新响应结果为异常, 不在继续延迟发送消息
if( currentCount >= record.getNotifyCountLimit() ){
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_FAIL, res);
return ;
}
// 继续发送MQ 延迟发送
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_ING, res);
Integer currentCount = mqReceiveServiceImpl.payOrderMchNotify(msg);
if (currentCount == null) return;
// 通知延时次数
// 1 2 3 4 5 6
// 0 30 60 90 120 150
......
......@@ -16,11 +16,8 @@
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
......@@ -41,11 +38,11 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqQueue4ChannelOrderQuery extends MqChannelOrderQueryService {
public class RabbitMqDelayed4ChannelOrderQuery extends MqChannelOrderQueryService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private PayOrderService payOrderService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
......@@ -67,45 +64,10 @@ public class RabbitMqQueue4ChannelOrderQuery extends MqChannelOrderQueryService
}
/** 接收 更新系统配置项的消息 **/
/** 接收 查单消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receive(String msg) {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) {
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
//返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//最多查询6次
if(currentCount <= 6){
send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
}
}else{ //其他状态, 不需要再次轮询。
}
return;
mqReceiveServiceImpl.channelOrderQuery(msg);
}
......
......@@ -16,12 +16,8 @@
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.queue.service.MqPayOrderMchNotifyService;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
......@@ -42,15 +38,10 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
public class RabbitMqDelayed4PayOrderMchNotify extends MqPayOrderMchNotifyService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private PayOrderService payOrderService;
@Autowired private ChannelOrderReissueService channelOrderReissueService;
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 发送MQ消息 **/
@Override
......@@ -68,45 +59,14 @@ public class RabbitMqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService
}
/** 接收 更新系统配置项的消息 **/
/** 接收 支付订单商户回调消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY)
public void receive(String msg) {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) {
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
//返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//最多查询6次
if(currentCount <= 6){
send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口
}
}else{ //其他状态, 不需要再次轮询。
}
return;
Integer currentCount = mqReceiveServiceImpl.payOrderMchNotify(msg);
// 通知延时次数
// 1 2 3 4 5 6
// 0 30 60 90 120 150
send(msg, currentCount * 30 * 1000);
}
......
......@@ -51,7 +51,4 @@ public class MqServiceImpl {
mqPayOrderMchNotifyService.send(msg, delay);
}
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
}
......@@ -16,7 +16,7 @@
package com.jeequan.jeepay.pay.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
......@@ -35,14 +35,12 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyIsvInfo{
@Autowired private ConfigContextService configContextService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_ISV_INFO, containerFactory = "jmsListenerContainer")
public void receive(String isvNo) {
log.info("重置ISV信息, msg={}", isvNo);
configContextService.initIsvConfigContext(isvNo);
mqReceiveServiceImpl.modifyIsvInfo(isvNo);
}
......
......@@ -15,9 +15,8 @@
*/
package com.jeequan.jeepay.pay.mq.topic;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
......@@ -36,7 +35,7 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyMchInfo{
@Autowired private ConfigContextService configContextService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 [商户配置信息] 的消息
* 已知推送节点:
......@@ -45,9 +44,7 @@ public class MqTopic4ModifyMchInfo{
* **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_INFO, containerFactory = "jmsListenerContainer")
public void receive(String mchNo) {
log.info("接收 [商户配置信息] 的消息, msg={}", mchNo);
configContextService.initMchInfoConfigContext(mchNo);
mqReceiveServiceImpl.modifyMchInfo(mchNo);
}
/** 接收 [商户应用支付参数配置信息] 的消息
......@@ -57,10 +54,7 @@ public class MqTopic4ModifyMchInfo{
* **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_APP, containerFactory = "jmsListenerContainer")
public void receiveMchApp(String mchNoAndAppId) {
log.info("接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId);
JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId);
configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId"));
mqReceiveServiceImpl.modifyMchApp(mchNoAndAppId);
}
......
......@@ -16,7 +16,7 @@
package com.jeequan.jeepay.pay.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
......@@ -35,15 +35,12 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifySysConfig{
@Autowired private SysConfigService sysConfigService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer")
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
mqReceiveServiceImpl.initDbConfig(msg);
}
}
......@@ -16,13 +16,11 @@
package com.jeequan.jeepay.pay.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
......@@ -35,16 +33,14 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifyIsvInfo {
public class RabbitMqDirect4ModifyIsvInfo {
@Autowired private ConfigContextService configContextService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 更新系统配置项的消息 **/
/** 接收 更新服务商信息的消息 **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_ISV_INFO)
public void receive(String isvNo) {
log.info("重置ISV信息, msg={}", isvNo);
configContextService.initIsvConfigContext(isvNo);
mqReceiveServiceImpl.modifyIsvInfo(isvNo);
}
......
......@@ -15,14 +15,12 @@
*/
package com.jeequan.jeepay.pay.mq.topic;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
......@@ -35,9 +33,9 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifyMchInfo {
public class RabbitMqDirect4ModifyMchInfo {
@Autowired private ConfigContextService configContextService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 [商户配置信息] 的消息
* 已知推送节点:
......@@ -46,9 +44,7 @@ public class RabbitMqTopic4ModifyMchInfo {
* **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_INFO)
public void receive(String mchNo) {
log.info("接收 [商户配置信息] 的消息, msg={}", mchNo);
configContextService.initMchInfoConfigContext(mchNo);
mqReceiveServiceImpl.modifyMchInfo(mchNo);
}
/** 接收 [商户应用支付参数配置信息] 的消息
......@@ -58,10 +54,7 @@ public class RabbitMqTopic4ModifyMchInfo {
* **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_APP)
public void receiveMchApp(String mchNoAndAppId) {
log.info("接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId);
JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId);
configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId"));
mqReceiveServiceImpl.modifyMchApp(mchNoAndAppId);
}
......
......@@ -16,12 +16,14 @@
package com.jeequan.jeepay.pay.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
......@@ -34,17 +36,14 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqTopic4ModifySysConfig {
public class RabbitMqDirect4ModifySysConfig {
@Autowired private SysConfigService sysConfigService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 接收 更新系统配置项的消息 **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_SYS_CONFIG)
@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))})
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
mqReceiveServiceImpl.initDbConfig(msg);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment