Commit 0ba6b3bd authored by xiaoyu's avatar xiaoyu
Browse files

MQ整体优化

parent 010d8799
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.topic;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.utils.JsonKit;
import com.jeequan.jeepay.mch.mq.service.MqModifyMchAppService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/*
* 更改商户应用信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:10
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyMchApp extends MqModifyMchAppService {
@Autowired private JmsTemplate jmsTemplate;
/** 推送消息到各个节点 **/
@Override
public void send(String mchNo, String appId) {
JSONObject jsonObject = JsonKit.newJson("mchNo", mchNo);
jsonObject.put("appId", appId);
this.jmsTemplate.convertAndSend(new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_APP), jsonObject.toString());
}
}
...@@ -23,6 +23,7 @@ import com.jeequan.jeepay.core.entity.MchPayPassage; ...@@ -23,6 +23,7 @@ import com.jeequan.jeepay.core.entity.MchPayPassage;
import com.jeequan.jeepay.core.entity.PayOrder; import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.core.exception.BizException; import com.jeequan.jeepay.core.exception.BizException;
import com.jeequan.jeepay.core.model.ApiRes; import com.jeequan.jeepay.core.model.ApiRes;
import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.core.utils.SeqKit; import com.jeequan.jeepay.core.utils.SeqKit;
import com.jeequan.jeepay.core.utils.SpringBeansUtil; import com.jeequan.jeepay.core.utils.SpringBeansUtil;
import com.jeequan.jeepay.core.utils.StringKit; import com.jeequan.jeepay.core.utils.StringKit;
...@@ -31,8 +32,7 @@ import com.jeequan.jeepay.pay.ctrl.ApiController; ...@@ -31,8 +32,7 @@ import com.jeequan.jeepay.pay.ctrl.ApiController;
import com.jeequan.jeepay.pay.exception.ChannelException; import com.jeequan.jeepay.pay.exception.ChannelException;
import com.jeequan.jeepay.pay.model.IsvConfigContext; import com.jeequan.jeepay.pay.model.IsvConfigContext;
import com.jeequan.jeepay.pay.model.MchAppConfigContext; import com.jeequan.jeepay.pay.model.MchAppConfigContext;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import com.jeequan.jeepay.pay.mq.queue.service.MqServiceImpl;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg; import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRQ; import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRQ;
import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRS; import com.jeequan.jeepay.pay.rqrs.payorder.UnifiedOrderRS;
...@@ -64,8 +64,8 @@ public abstract class AbstractPayOrderController extends ApiController { ...@@ -64,8 +64,8 @@ public abstract class AbstractPayOrderController extends ApiController {
@Autowired private ConfigContextService configContextService; @Autowired private ConfigContextService configContextService;
@Autowired private PayMchNotifyService payMchNotifyService; @Autowired private PayMchNotifyService payMchNotifyService;
@Autowired private SysConfigService sysConfigService; @Autowired private SysConfigService sysConfigService;
@Autowired private MqServiceImpl mqServiceImpl; @Autowired private MqCommonService mqCommonService;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon receiveCommon;
/** 统一下单 (新建订单模式) **/ /** 统一下单 (新建订单模式) **/
...@@ -326,7 +326,7 @@ public abstract class AbstractPayOrderController extends ApiController { ...@@ -326,7 +326,7 @@ public abstract class AbstractPayOrderController extends ApiController {
//判断是否需要轮询查单 //判断是否需要轮询查单
if(channelRetMsg.isNeedQuery()){ if(channelRetMsg.isNeedQuery()){
mqServiceImpl.sendChannelOrderQuery(mqReceiveServiceImpl.buildMsg(payOrderId, 1), 5 * 1000); mqCommonService.send(receiveCommon.buildMsg(payOrderId, 1), 5 * 1000, CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY);
} }
} }
......
...@@ -13,12 +13,12 @@ ...@@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.queue; package com.jeequan.jeepay.pay.mq;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.pay.mq.config.MqThreadExecutor; import com.jeequan.jeepay.pay.mq.config.MqThreadExecutor;
import com.jeequan.jeepay.pay.mq.queue.service.MqPayOrderMchNotifyService; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage; import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
...@@ -36,16 +36,34 @@ import javax.jms.Queue; ...@@ -36,16 +36,34 @@ import javax.jms.Queue;
import javax.jms.TextMessage; import javax.jms.TextMessage;
/* /*
* 商户订单回调MQ通知 * 上游渠道订单轮询查单
* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。
*
* *
* @author terrfly * @author terrfly
* @site https://www.jeepay.vip * @site https://www.jeepay.vip
* @date 2021/6/8 17:34 * @date 2021/6/8 17:30
*/ */
@Slf4j @Slf4j
@Component @Component
@Profile(CS.MQTYPE.ACTIVE_MQ) @Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService { public class ActiveMqMessage extends MqCommonService {
@Autowired private JmsTemplate jmsTemplate;
@Lazy
@Autowired
private MqReceiveCommon mqReceiveCommon;
@Bean("activeChannelOrderQuery")
public Queue mqQueue4ChannelOrderQuery(){
return new ActiveMQQueue(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY);
}
@Lazy
@Autowired
@Qualifier("activeChannelOrderQuery")
private Queue mqQueue4ChannelOrderQuery;
@Bean("activePayOrderMchNotifyInner") @Bean("activePayOrderMchNotifyInner")
public Queue mqQueue4PayOrderMchNotifyInner(){ public Queue mqQueue4PayOrderMchNotifyInner(){
...@@ -56,22 +74,59 @@ public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService { ...@@ -56,22 +74,59 @@ public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
@Autowired @Autowired
@Qualifier("activePayOrderMchNotifyInner") @Qualifier("activePayOrderMchNotifyInner")
private Queue mqQueue4PayOrderMchNotifyInner; private Queue mqQueue4PayOrderMchNotifyInner;
@Autowired private JmsTemplate jmsTemplate;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
public MqQueue4PayOrderMchNotify(){ /**
super(); * 发送消息
* @param msg
* @param sendType
*/
@Override
public void send(String msg, String sendType) {
if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) {
channelOrderQuery(msg);
}else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) {
payOrderMchNotify(msg);
}
} }
/** 发送MQ消息 **/ /**
* 发送延迟消息
* @param msg
* @param delay
* @param sendType
*/
@Override @Override
public void send(String msg) { public void send(String msg, long delay, String sendType) {
if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) {
channelOrderQueryFixed(msg, delay);
}else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) {
payOrderMchNotifyFixed(msg, delay);
}
}
/** 发送订单查询消息 **/
public void channelOrderQuery(String msg) {
this.jmsTemplate.convertAndSend(mqQueue4ChannelOrderQuery, msg);
}
/** 发送订单查询延迟消息 **/
public void channelOrderQueryFixed(String msg, long delay) {
jmsTemplate.send(mqQueue4ChannelOrderQuery, session -> {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
});
}
/** 发送订单回调消息 **/
public void payOrderMchNotify(String msg) {
this.jmsTemplate.convertAndSend(mqQueue4PayOrderMchNotifyInner, msg); this.jmsTemplate.convertAndSend(mqQueue4PayOrderMchNotifyInner, msg);
} }
/** 发送MQ消息 **/ /** 发送订单回调延迟消息 **/
@Override public void payOrderMchNotifyFixed(String msg, long delay) {
public void send(String msg, long delay) {
jmsTemplate.send(mqQueue4PayOrderMchNotifyInner, session -> { jmsTemplate.send(mqQueue4PayOrderMchNotifyInner, session -> {
TextMessage tm = session.createTextMessage(msg); TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
...@@ -81,17 +136,20 @@ public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService { ...@@ -81,17 +136,20 @@ public class MqQueue4PayOrderMchNotify extends MqPayOrderMchNotifyService {
}); });
} }
/** 接收 查单消息 **/
@Lazy
@JmsListener(destination = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receiveChannelOrderQuery(String msg) {
mqReceiveCommon.channelOrderQuery(msg);
}
/** 接收 支付订单商户回调消息 **/ /** 接收 支付订单商户回调消息 **/
@Lazy
@Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY) @Async(MqThreadExecutor.EXECUTOR_PAYORDER_MCH_NOTIFY)
@JmsListener(destination = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY) @JmsListener(destination = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY)
public void receive(String msg) { public void receivePayOrderMchNotify(String msg) {
Integer currentCount = mqReceiveServiceImpl.payOrderMchNotify(msg); mqReceiveCommon.payOrderMchNotify(msg);
if (currentCount == null) return;
// 通知延时次数
// 1 2 3 4 5 6
// 0 30 60 90 120 150
send(msg, currentCount * 30 * 1000);
} }
} }
...@@ -13,15 +13,16 @@ ...@@ -13,15 +13,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.queue; package com.jeequan.jeepay.pay.mq;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.pay.mq.queue.service.MqPayOrderMchNotifyService; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -38,36 +39,67 @@ import org.springframework.stereotype.Component; ...@@ -38,36 +39,67 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@Component @Component
@Profile(CS.MQTYPE.RABBIT_MQ) @Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqDelayed4PayOrderMchNotify extends MqPayOrderMchNotifyService { public class RabbitMqMessage extends MqCommonService {
@Autowired private RabbitTemplate rabbitTemplate; @Autowired private RabbitTemplate rabbitTemplate;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
/** 发送MQ消息 **/ @Lazy
@Autowired
private MqReceiveCommon mqReceiveCommon;
@Override @Override
public void send(String msg) { public void send(String msg, String sendType) {
rabbitTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg); if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) {
channelOrderQuery(msg);
}else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) {
payOrderMchNotify(msg);
}
} }
/** 发送MQ消息 **/
@Override @Override
public void send(String msg, long delay) { public void send(String msg, long delay, String sendType) {
if (sendType.equals(CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY)) {
channelOrderQueryFixed(msg, delay);
}else if (sendType.equals(CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY)) {
payOrderMchNotifyFixed(msg, delay);
}
}
/** 发送订单查询消息 **/
public void channelOrderQuery(String msg) {
rabbitTemplate.convertAndSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg);
}
/** 发送订单查询延迟消息 **/
public void channelOrderQueryFixed(String msg, long delay) {
rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg, a ->{
a.getMessageProperties().setDelay(Math.toIntExact(delay));
return a;
});
}
/** 发送订单回调消息 **/
public void payOrderMchNotify(String msg) {
rabbitTemplate.convertAndSend(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg);
}
/** 发送订单回调延迟消息 **/
public void payOrderMchNotifyFixed(String msg, long delay) {
rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg, a ->{ rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY, msg, a ->{
a.getMessageProperties().setDelay(Math.toIntExact(delay)); a.getMessageProperties().setDelay(Math.toIntExact(delay));
return a; return a;
}); });
} }
/** 接收 查单消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receiveChannelOrderQuery(String msg) {
mqReceiveCommon.channelOrderQuery(msg);
}
/** 接收 支付订单商户回调消息 **/ /** 接收 支付订单商户回调消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY) @RabbitListener(queues = CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY)
public void receive(String msg) { public void receivePayOrderMchNotify(String msg) {
Integer currentCount = mqReceiveServiceImpl.payOrderMchNotify(msg); mqReceiveCommon.payOrderMchNotify(msg);
// 通知延时次数
// 1 2 3 4 5 6
// 0 30 60 90 120 150
send(msg, currentCount * 30 * 1000);
} }
} }
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.activemq.topic;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
...@@ -35,12 +35,12 @@ import org.springframework.stereotype.Component; ...@@ -35,12 +35,12 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ) @Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyIsvInfo{ public class MqTopic4ModifyIsvInfo{
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon mqReceiveCommon;
/** 接收 更新系统配置项的消息 **/ /** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_ISV_INFO, containerFactory = "jmsListenerContainer") @JmsListener(destination = CS.MQ.TOPIC_MODIFY_ISV_INFO, containerFactory = "jmsListenerContainer")
public void receive(String isvNo) { public void receive(String isvNo) {
mqReceiveServiceImpl.modifyIsvInfo(isvNo); mqReceiveCommon.modifyIsvInfo(isvNo);
} }
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.activemq.topic;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
...@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component; ...@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ) @Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifyMchInfo{ public class MqTopic4ModifyMchInfo{
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon mqReceiveCommon;
/** 接收 [商户配置信息] 的消息 /** 接收 [商户配置信息] 的消息
* 已知推送节点: * 已知推送节点:
...@@ -44,7 +44,7 @@ public class MqTopic4ModifyMchInfo{ ...@@ -44,7 +44,7 @@ public class MqTopic4ModifyMchInfo{
* **/ * **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_INFO, containerFactory = "jmsListenerContainer") @JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_INFO, containerFactory = "jmsListenerContainer")
public void receive(String mchNo) { public void receive(String mchNo) {
mqReceiveServiceImpl.modifyMchInfo(mchNo); mqReceiveCommon.modifyMchInfo(mchNo);
} }
/** 接收 [商户应用支付参数配置信息] 的消息 /** 接收 [商户应用支付参数配置信息] 的消息
...@@ -54,7 +54,7 @@ public class MqTopic4ModifyMchInfo{ ...@@ -54,7 +54,7 @@ public class MqTopic4ModifyMchInfo{
* **/ * **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_APP, containerFactory = "jmsListenerContainer") @JmsListener(destination = CS.MQ.TOPIC_MODIFY_MCH_APP, containerFactory = "jmsListenerContainer")
public void receiveMchApp(String mchNoAndAppId) { public void receiveMchApp(String mchNoAndAppId) {
mqReceiveServiceImpl.modifyMchApp(mchNoAndAppId); mqReceiveCommon.modifyMchApp(mchNoAndAppId);
} }
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.activemq.topic;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile; import org.springframework.context.annotation.Profile;
...@@ -35,12 +35,12 @@ import org.springframework.stereotype.Component; ...@@ -35,12 +35,12 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.ACTIVE_MQ) @Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopic4ModifySysConfig{ public class MqTopic4ModifySysConfig{
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon mqReceiveCommon;
/** 接收 更新系统配置项的消息 **/ /** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer") @JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer")
public void receive(String msg) { public void receive(String msg) {
mqReceiveServiceImpl.initDbConfig(msg); mqReceiveCommon.initDbConfig(msg);
} }
} }
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.config;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.queue.service; package com.jeequan.jeepay.pay.mq.config;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import org.springframework.amqp.core.*; import org.springframework.amqp.core.*;
...@@ -32,9 +32,9 @@ import java.util.Map; ...@@ -32,9 +32,9 @@ import java.util.Map;
* @site https://www.jeepay.vip * @site https://www.jeepay.vip
* @date 2021/6/25 17:10 * @date 2021/6/25 17:10
*/ */
@Profile(CS.MQTYPE.RABBIT_MQ)
@Configuration @Configuration
public class DelayedRabbitMqConfig { @Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqConfig {
@Bean("channelOrderQuery") @Bean("channelOrderQuery")
public Queue channelOrderQuery() { public Queue channelOrderQuery() {
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.Queue;
import javax.jms.TextMessage;
/*
* 上游渠道订单轮询查单
* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。
*
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:30
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqQueue4ChannelOrderQuery extends MqChannelOrderQueryService {
@Autowired private JmsTemplate jmsTemplate;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
@Bean("activeChannelOrderQuery")
public Queue mqQueue4ChannelOrderQuery(){
return new ActiveMQQueue(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY);
}
@Lazy
@Autowired
@Qualifier("activeChannelOrderQuery")
private Queue mqQueue4ChannelOrderQuery;
/** 发送MQ消息 **/
@Override
public void send(String msg) {
this.jmsTemplate.convertAndSend(mqQueue4ChannelOrderQuery, msg);
}
/** 发送MQ消息 **/
@Override
public void send(String msg, long delay) {
jmsTemplate.send(mqQueue4ChannelOrderQuery, session -> {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
});
}
/** 接收 查单消息 **/
@JmsListener(destination = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receive(String msg) {
mqReceiveServiceImpl.channelOrderQuery(msg);
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl;
import com.jeequan.jeepay.pay.mq.queue.service.MqChannelOrderQueryService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* RabbitMQ
* 上游渠道订单轮询查单
* 如:微信的条码支付,没有回调接口, 需要轮询查单完成交易结果通知。
*
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqDelayed4ChannelOrderQuery extends MqChannelOrderQueryService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl;
public static final String buildMsg(String payOrderId, int count){
return payOrderId + "," + count;
}
/** 发送MQ消息 **/
@Override
public void send(String msg) {
rabbitTemplate.convertAndSend(CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg);
}
/** 发送MQ消息 **/
@Override
public void send(String msg, long delay) {
rabbitTemplate.convertAndSend(CS.DELAYED_EXCHANGE, CS.MQ.QUEUE_CHANNEL_ORDER_QUERY, msg, a ->{
a.getMessageProperties().setDelay(Math.toIntExact(delay));
return a;
});
}
/** 接收 查单消息 **/
@RabbitListener(queues = CS.MQ.QUEUE_CHANNEL_ORDER_QUERY)
public void receive(String msg) {
mqReceiveServiceImpl.channelOrderQuery(msg);
}
}
package com.jeequan.jeepay.pay.mq.queue.service;
/**
* RabbitMq
* 通道订单查询
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
public abstract class MqChannelOrderQueryService {
public abstract void send(String msg);
public abstract void send(String msg, long delay);
}
package com.jeequan.jeepay.pay.mq.queue.service;
/**
* RabbitMq
* 商户订单回调
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
public abstract class MqPayOrderMchNotifyService {
public abstract void send(String msg);
public abstract void send(String msg, long delay);
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.pay.mq.queue.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* mq消息推送
*
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Service
public class MqServiceImpl {
@Autowired private MqChannelOrderQueryService mqChannelOrderQueryService;
@Autowired private MqPayOrderMchNotifyService mqPayOrderMchNotifyService;
/** 通道订单查询推送 **/
public void sendChannelOrderQuery(String msg){
mqChannelOrderQueryService.send(msg);
}
public void sendChannelOrderQuery(String msg, long delay){
mqChannelOrderQueryService.send(msg, delay);
}
/** 商户订单回调 **/
public void PayOrderMchNotify(String msg){
mqPayOrderMchNotifyService.send(msg);
}
public void PayOrderMchNotify(String msg, long delay){
mqPayOrderMchNotifyService.send(msg, delay);
}
}
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.rabbitmq;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -35,12 +35,12 @@ import org.springframework.stereotype.Component; ...@@ -35,12 +35,12 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.RABBIT_MQ) @Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqDirect4ModifyIsvInfo { public class RabbitMqDirect4ModifyIsvInfo {
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon mqReceiveCommon;
/** 接收 更新服务商信息的消息 **/ /** 接收 更新服务商信息的消息 **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_ISV_INFO) @RabbitListener(queues = CS.MQ.TOPIC_MODIFY_ISV_INFO)
public void receive(String isvNo) { public void receive(String isvNo) {
mqReceiveServiceImpl.modifyIsvInfo(isvNo); mqReceiveCommon.modifyIsvInfo(isvNo);
} }
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.rabbitmq;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component; ...@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.RABBIT_MQ) @Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqDirect4ModifyMchInfo { public class RabbitMqDirect4ModifyMchInfo {
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon mqReceiveCommon;
/** 接收 [商户配置信息] 的消息 /** 接收 [商户配置信息] 的消息
* 已知推送节点: * 已知推送节点:
...@@ -44,7 +44,7 @@ public class RabbitMqDirect4ModifyMchInfo { ...@@ -44,7 +44,7 @@ public class RabbitMqDirect4ModifyMchInfo {
* **/ * **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_INFO) @RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_INFO)
public void receive(String mchNo) { public void receive(String mchNo) {
mqReceiveServiceImpl.modifyMchInfo(mchNo); mqReceiveCommon.modifyMchInfo(mchNo);
} }
/** 接收 [商户应用支付参数配置信息] 的消息 /** 接收 [商户应用支付参数配置信息] 的消息
...@@ -54,7 +54,7 @@ public class RabbitMqDirect4ModifyMchInfo { ...@@ -54,7 +54,7 @@ public class RabbitMqDirect4ModifyMchInfo {
* **/ * **/
@RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_APP) @RabbitListener(queues = CS.MQ.TOPIC_MODIFY_MCH_APP)
public void receiveMchApp(String mchNoAndAppId) { public void receiveMchApp(String mchNoAndAppId) {
mqReceiveServiceImpl.modifyMchApp(mchNoAndAppId); mqReceiveCommon.modifyMchApp(mchNoAndAppId);
} }
......
...@@ -13,10 +13,10 @@ ...@@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq.topic; package com.jeequan.jeepay.pay.mq.rabbitmq;
import com.jeequan.jeepay.core.constants.CS; import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.pay.mq.MqReceiveServiceImpl; import com.jeequan.jeepay.pay.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.Queue;
...@@ -38,12 +38,12 @@ import org.springframework.stereotype.Component; ...@@ -38,12 +38,12 @@ import org.springframework.stereotype.Component;
@Profile(CS.MQTYPE.RABBIT_MQ) @Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqDirect4ModifySysConfig { public class RabbitMqDirect4ModifySysConfig {
@Autowired private MqReceiveServiceImpl mqReceiveServiceImpl; @Autowired private MqReceiveCommon mqReceiveCommon;
/** 接收 更新系统配置项的消息 **/ /** 接收 更新系统配置项的消息 **/
@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))}) @RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))})
public void receive(String msg) { public void receive(String msg) {
mqReceiveServiceImpl.initDbConfig(msg); mqReceiveCommon.initDbConfig(msg);
} }
} }
...@@ -13,14 +13,15 @@ ...@@ -13,14 +13,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package com.jeequan.jeepay.pay.mq; package com.jeequan.jeepay.pay.mq.receive;
import cn.hutool.http.HttpException; import cn.hutool.http.HttpException;
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.MchNotifyRecord; import com.jeequan.jeepay.core.entity.MchNotifyRecord;
import com.jeequan.jeepay.core.entity.PayOrder; import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.pay.mq.queue.MqQueue4ChannelOrderQuery; import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg; import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ChannelOrderReissueService; import com.jeequan.jeepay.pay.service.ChannelOrderReissueService;
import com.jeequan.jeepay.pay.service.ConfigContextService; import com.jeequan.jeepay.pay.service.ConfigContextService;
...@@ -28,8 +29,8 @@ import com.jeequan.jeepay.service.impl.MchNotifyRecordService; ...@@ -28,8 +29,8 @@ import com.jeequan.jeepay.service.impl.MchNotifyRecordService;
import com.jeequan.jeepay.service.impl.PayOrderService; import com.jeequan.jeepay.service.impl.PayOrderService;
import com.jeequan.jeepay.service.impl.SysConfigService; import com.jeequan.jeepay.service.impl.SysConfigService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
/** /**
...@@ -41,113 +42,136 @@ import org.springframework.stereotype.Service; ...@@ -41,113 +42,136 @@ import org.springframework.stereotype.Service;
*/ */
@Slf4j @Slf4j
@Service @Service
public class MqReceiveServiceImpl { public class MqReceiveCommon {
@Autowired private SysConfigService sysConfigService; @Autowired
@Autowired private ConfigContextService configContextService; private SysConfigService sysConfigService;
@Autowired private PayOrderService payOrderService; @Autowired
@Autowired private MchNotifyRecordService mchNotifyRecordService; private ConfigContextService configContextService;
@Autowired private ChannelOrderReissueService channelOrderReissueService; @Autowired
@Autowired private MqQueue4ChannelOrderQuery mqQueue4ChannelOrderQuery; private PayOrderService payOrderService;
@Autowired
private ChannelOrderReissueService channelOrderReissueService;
@Autowired
private MchNotifyRecordService mchNotifyRecordService;
@Autowired
private MqCommonService mqCommonService;
/** 接收 [商户配置信息] 的消息 **/
public void modifyMchInfo(String mchNo) { public void modifyMchInfo(String mchNo) {
log.info("接收 [商户配置信息] 的消息, msg={}", mchNo); log.info("接收 [商户配置信息] 的消息, msg={}", mchNo);
configContextService.initMchInfoConfigContext(mchNo); configContextService.initMchInfoConfigContext(mchNo);
} }
/** 接收 [商户应用支付参数配置信息] 的消息 **/
public void modifyMchApp(String mchNoAndAppId) { public void modifyMchApp(String mchNoAndAppId) {
log.info("接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId); log.info("接收 [商户应用支付参数配置信息] 的消息, msg={}", mchNoAndAppId);
JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId); JSONObject jsonObject = (JSONObject) JSONObject.parse(mchNoAndAppId);
configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId")); configContextService.initMchAppConfigContext(jsonObject.getString("mchNo"), jsonObject.getString("appId"));
} }
/** 重置ISV信息 **/
public void modifyIsvInfo(String isvNo) { public void modifyIsvInfo(String isvNo) {
log.info("重置ISV信息, msg={}", isvNo); log.info("重置ISV信息, msg={}", isvNo);
configContextService.initIsvConfigContext(isvNo); configContextService.initIsvConfigContext(isvNo);
} }
public Integer payOrderMchNotify(String msg) { /** 接收商户订单回调通知 **/
log.info("接收商户通知MQ, msg={}", msg); public void payOrderMchNotify(String msg) {
Long notifyId = Long.parseLong(msg);
MchNotifyRecord record = mchNotifyRecordService.getById(notifyId);
if(record == null || record.getState() != MchNotifyRecord.STATE_ING){
log.info("查询通知记录不存在或状态不是通知中");
return null;
}
if( record.getNotifyCount() >= record.getNotifyCountLimit() ){
log.info("已达到最大发送次数");
return null;
}
//1. (发送结果最多6次)
Integer currentCount = record.getNotifyCount() + 1;
String notifyUrl = record.getNotifyUrl();
String res = "";
try { try {
res = HttpUtil.createPost(notifyUrl).timeout(20000).execute().body(); log.info("接收商户通知MQ, msg={}", msg);
} catch (HttpException e) { Long notifyId = Long.parseLong(msg);
log.error("http error", e); MchNotifyRecord record = mchNotifyRecordService.getById(notifyId);
} if(record == null || record.getState() != MchNotifyRecord.STATE_ING){
log.info("查询通知记录不存在或状态不是通知中");
return;
}
if( record.getNotifyCount() >= record.getNotifyCountLimit() ){
log.info("已达到最大发送次数");
return;
}
if(currentCount == 1){ //第一次通知: 更新为已通知 //1. (发送结果最多6次)
payOrderService.updateNotifySent(record.getOrderId()); Integer currentCount = record.getNotifyCount() + 1;
}
//通知成功 String notifyUrl = record.getNotifyUrl();
if("SUCCESS".equalsIgnoreCase(res)){ String res = "";
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_SUCCESS, res); try {
} res = HttpUtil.createPost(notifyUrl).timeout(20000).execute().body();
} catch (HttpException e) {
log.error("http error", e);
}
//通知次数 >= 最大通知次数时, 更新响应结果为异常, 不在继续延迟发送消息 if(currentCount == 1){ //第一次通知: 更新为已通知
if( currentCount >= record.getNotifyCountLimit() ){ payOrderService.updateNotifySent(record.getOrderId());
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_FAIL, res); }
}
// 继续发送MQ 延迟发送 //通知成功
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_ING, res); if("SUCCESS".equalsIgnoreCase(res)){
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_SUCCESS, res);
}
return currentCount; //通知次数 >= 最大通知次数时, 更新响应结果为异常, 不在继续延迟发送消息
if( currentCount >= record.getNotifyCountLimit() ){
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_FAIL, res);
}
// 继续发送MQ 延迟发送
mchNotifyRecordService.updateNotifyResult(notifyId, MchNotifyRecord.STATE_ING, res);
// 通知延时次数
// 1 2 3 4 5 6
// 0 30 60 90 120 150
mqCommonService.send(msg, currentCount * 30 * 1000, CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY);
return;
}catch (Exception e) {
log.error(e.getMessage());
return;
}
} }
/** 接收订单查单通知 **/
public void channelOrderQuery(String msg) { public void channelOrderQuery(String msg) {
try {
String [] arr = msg.split(",");
String payOrderId = arr[0];
int currentCount = Integer.parseInt(arr[1]);
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount);
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
String [] arr = msg.split(","); if(payOrder.getState() != PayOrder.STATE_ING) {
String payOrderId = arr[0]; log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId);
int currentCount = Integer.parseInt(arr[1]); return;
log.info("接收轮询查单通知MQ, payOrderId={}, count={}", payOrderId, currentCount); }
currentCount++ ;
PayOrder payOrder = payOrderService.getById(payOrderId);
if(payOrder == null) {
log.warn("查询支付订单为空,payOrderId={}", payOrderId);
return;
}
if(payOrder.getState() != PayOrder.STATE_ING) { if (payOrder == null) return;
log.warn("订单状态不是支付中,不需查询渠道.payOrderId={}", payOrderId); ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder);
return;
}
ChannelRetMsg channelRetMsg = channelOrderReissueService.processPayOrder(payOrder); //返回null 可能为接口报错等, 需要再次轮询
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){
//返回null 可能为接口报错等, 需要再次轮询 //最多查询6次
if(channelRetMsg == null || channelRetMsg.getChannelState() == null || channelRetMsg.getChannelState().equals(ChannelRetMsg.ChannelState.WAITING)){ if(currentCount <= 6){
mqCommonService.send(buildMsg(payOrderId, currentCount), 5 * 1000, CS.MQ.MQ_TYPE_CHANNEL_ORDER_QUERY); //延迟5s再次查询
}else{
//最多查询6次 //TODO 调用【撤销订单】接口
if(currentCount <= 6){
mqQueue4ChannelOrderQuery.send(buildMsg(payOrderId, currentCount), 5 * 1000); //延迟5s再次查询
}else{
//TODO 调用【撤销订单】接口 }
}else{ //其他状态, 不需要再次轮询。
} }
}catch (Exception e) {
}else{ //其他状态, 不需要再次轮询。 log.error(e.getMessage());
return;
} }
return;
} }
/** 接收系统配置修改通知 **/
public void initDbConfig(String msg) { public void initDbConfig(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg); log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg); sysConfigService.initDBConfig(msg);
......
...@@ -16,12 +16,13 @@ ...@@ -16,12 +16,13 @@
package com.jeequan.jeepay.pay.service; package com.jeequan.jeepay.pay.service;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.MchNotifyRecord; import com.jeequan.jeepay.core.entity.MchNotifyRecord;
import com.jeequan.jeepay.core.entity.PayOrder; import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.core.entity.RefundOrder; import com.jeequan.jeepay.core.entity.RefundOrder;
import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.core.utils.JeepayKit; import com.jeequan.jeepay.core.utils.JeepayKit;
import com.jeequan.jeepay.core.utils.StringKit; import com.jeequan.jeepay.core.utils.StringKit;
import com.jeequan.jeepay.pay.mq.queue.service.MqServiceImpl;
import com.jeequan.jeepay.pay.rqrs.payorder.QueryPayOrderRS; import com.jeequan.jeepay.pay.rqrs.payorder.QueryPayOrderRS;
import com.jeequan.jeepay.pay.rqrs.refund.QueryRefundOrderRS; import com.jeequan.jeepay.pay.rqrs.refund.QueryRefundOrderRS;
import com.jeequan.jeepay.service.impl.MchNotifyRecordService; import com.jeequan.jeepay.service.impl.MchNotifyRecordService;
...@@ -42,8 +43,8 @@ import org.springframework.stereotype.Service; ...@@ -42,8 +43,8 @@ import org.springframework.stereotype.Service;
public class PayMchNotifyService { public class PayMchNotifyService {
@Autowired private MchNotifyRecordService mchNotifyRecordService; @Autowired private MchNotifyRecordService mchNotifyRecordService;
@Autowired private MqServiceImpl mqService;
@Autowired private ConfigContextService configContextService; @Autowired private ConfigContextService configContextService;
@Autowired private MqCommonService mqCommonService;
/** 商户通知信息, 只有订单是终态,才会发送通知, 如明确成功和明确失败 **/ /** 商户通知信息, 只有订单是终态,才会发送通知, 如明确成功和明确失败 **/
...@@ -84,7 +85,7 @@ public class PayMchNotifyService { ...@@ -84,7 +85,7 @@ public class PayMchNotifyService {
//推送到MQ //推送到MQ
Long notifyId = mchNotifyRecord.getNotifyId(); Long notifyId = mchNotifyRecord.getNotifyId();
mqService.PayOrderMchNotify(notifyId + ""); mqCommonService.send(notifyId + "", CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY);
} catch (Exception e) { } catch (Exception e) {
log.error("推送失败!", e); log.error("推送失败!", e);
...@@ -129,7 +130,7 @@ public class PayMchNotifyService { ...@@ -129,7 +130,7 @@ public class PayMchNotifyService {
//推送到MQ //推送到MQ
Long notifyId = mchNotifyRecord.getNotifyId(); Long notifyId = mchNotifyRecord.getNotifyId();
mqService.PayOrderMchNotify(notifyId + ""); mqCommonService.send(notifyId + "", CS.MQ.MQ_TYPE_PAY_ORDER_MCH_NOTIFY);
} catch (Exception e) { } catch (Exception e) {
log.error("推送失败!", e); log.error("推送失败!", e);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment