Commit cf2a0c61 authored by dingzhiwei's avatar dingzhiwei
Browse files

Merge branch 'dev'

parents 387e9627 0ec9d1c0
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mgr.mq.config;
import com.jeequan.jeepay.core.constants.CS;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
/**
* RabbitMq
* 队列交换机注册
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Configuration
@EnableRabbit
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqConfig {
@Bean("modifyIsvInfo")
public Queue modifyIsvInfo() { return new Queue(CS.MQ.TOPIC_MODIFY_ISV_INFO,true); }
@Bean("modifyMchApp")
public Queue modifyMchApp() {
return new Queue(CS.MQ.TOPIC_MODIFY_MCH_APP,true);
}
@Bean("modifyMchInfo")
public Queue modifyMchInfo() {
return new Queue(CS.MQ.TOPIC_MODIFY_MCH_INFO,true);
}
@Bean("modifySysConfig")
public Queue modifySysConfig() {
return new Queue(CS.MQ.FANOUT_MODIFY_SYS_CONFIG,true);
}
@Bean("payOrderMchNotify")
public Queue payOrderMchNotify() {
return new Queue(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY,true);
}
@Bean("mchUserRemove")
public Queue mchUserRemove() {
return new Queue(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE,true);
}
//创建 fanout 交换机
@Bean("fanoutExchange")
FanoutExchange fanoutExchange() {
return new FanoutExchange(CS.FANOUT_EXCHANGE_SYS_CONFIG,true,false);
}
//创建 direct 交换机
@Bean("directExchange")
DirectExchange directExchange() {
return new DirectExchange(CS.DIRECT_EXCHANGE,true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_ISV_INFO
@Bean
Binding bindingIsvInfo(@Qualifier("modifyIsvInfo") Queue modifyIsvInfo, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(modifyIsvInfo).to(directExchange).with(CS.MQ.TOPIC_MODIFY_ISV_INFO);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_MCH_APP
@Bean
Binding bindingMchApp(@Qualifier("modifyMchApp") Queue modifyMchApp, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(modifyMchApp).to(directExchange).with(CS.MQ.TOPIC_MODIFY_MCH_APP);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_MCH_INFO
@Bean
Binding bindingMchInfo(@Qualifier("modifyMchInfo") Queue modifyMchInfo, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(modifyMchInfo).to(directExchange).with(CS.MQ.TOPIC_MODIFY_MCH_INFO);
}
//绑定 将队列和交换机绑定
@Bean
Binding bindingSysConfig(Queue modifySysConfig, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(modifySysConfig).to(fanoutExchange);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_PAYORDER_MCH_NOTIFY
@Bean
Binding bindingPayOrderMchNotify(@Qualifier("payOrderMchNotify") Queue payOrderMchNotify, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(payOrderMchNotify).to(directExchange).with(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:QUEUE_MODIFY_MCH_USER_REMOVE
@Bean
Binding bindingMchUserRemove(@Qualifier("mchUserRemove") Queue mchUserRemove, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(mchUserRemove).to(directExchange).with(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE);
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mgr.mq.topic;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.utils.JsonKit;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/*
* 更改商户应用信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:10
*/
@Slf4j
@Component
public class MqTopic4ModifyMchApp extends ActiveMQTopic{
@Autowired private JmsTemplate jmsTemplate;
public MqTopic4ModifyMchApp(){
super(CS.MQ.TOPIC_MODIFY_MCH_APP);
}
/** 推送消息到各个节点 **/
public void push(String mchNo, String appId) {
JSONObject jsonObject = JsonKit.newJson("mchNo", mchNo);
jsonObject.put("appId", appId);
this.jmsTemplate.convertAndSend(this, jsonObject.toString());
}
}
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mgr.mq.topic;
import com.jeequan.jeepay.core.constants.CS;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
/*
* 更改商户信息
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:10
*/
@Slf4j
@Component
public class MqTopic4ModifyMchInfo extends ActiveMQTopic{
@Autowired private JmsTemplate jmsTemplate;
public MqTopic4ModifyMchInfo(){
super(CS.MQ.TOPIC_MODIFY_MCH_INFO);
}
/** 推送消息到各个节点 **/
public void push(String mchNo) {
this.jmsTemplate.convertAndSend(this, mchNo);
}
}
......@@ -50,9 +50,24 @@ spring:
timeout: 1000
database: 1 #1库:运营平台 #2库:商户系统 #3库:支付网关
password:
# 注意:以下MQ配置需注意【如需使用activeMQ则需将rabbitMQ配置注释即可】
# profiles:
# include:
# - activeMQ
# - rabbitMQ # 需要安装延迟队列插件:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
#activeMQ配置
activemq:
broker-url: tcp://localhost:61616 #连接地址
# activemq:
# broker-url: tcp://localhost:61616 #连接地址
#rabbitmq配置
# rabbitmq:
# addresses: 127.0.0.1:5672
# username: guest
# password: guest
# dynamic: true
# virtual-host: /
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
......
......@@ -89,6 +89,12 @@
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 添加对rabbitMQ的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 引入 jeepay-sdk-java -->
<dependency>
<groupId>com.jeequan</groupId>
......
......@@ -16,17 +16,19 @@
package com.jeequan.jeepay.mch.ctrl.merchant;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.jeequan.jeepay.core.aop.MethodLog;
import com.jeequan.jeepay.core.constants.ApiCodeEnum;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.MchApp;
import com.jeequan.jeepay.core.exception.BizException;
import com.jeequan.jeepay.core.model.ApiRes;
import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.core.utils.JsonKit;
import com.jeequan.jeepay.mch.ctrl.CommonCtrl;
import com.jeequan.jeepay.mch.mq.topic.MqTopic4ModifyMchApp;
import com.jeequan.jeepay.service.impl.MchAppService;
import com.jeequan.jeepay.service.impl.MchInfoService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.prepost.PreAuthorize;
......@@ -44,7 +46,7 @@ import org.springframework.web.bind.annotation.*;
public class MchAppController extends CommonCtrl {
@Autowired private MchAppService mchAppService;
@Autowired private MqTopic4ModifyMchApp mqTopic4ModifyMchApp;
@Autowired private MqCommonService mqCommonService;
/**
* @Author: ZhuXiao
......@@ -126,7 +128,9 @@ public class MchAppController extends CommonCtrl {
return ApiRes.fail(ApiCodeEnum.SYS_OPERATION_FAIL_UPDATE);
}
// 推送修改应用消息
mqTopic4ModifyMchApp.push(getCurrentMchNo(), mchApp.getAppId());
JSONObject jsonObject = JsonKit.newJson("mchNo", mchApp.getMchNo());
jsonObject.put("appId", appId);
mqCommonService.send(jsonObject.toJSONString(), CS.MQ.MQ_TYPE_MODIFY_MCH_APP);
return ApiRes.ok();
}
......@@ -148,7 +152,9 @@ public class MchAppController extends CommonCtrl {
mchAppService.removeByAppId(appId);
// 推送mq到目前节点进行更新数据
mqTopic4ModifyMchApp.push(getCurrentMchNo(), appId);
JSONObject jsonObject = JsonKit.newJson("mchNo", mchApp.getMchNo());
jsonObject.put("appId", appId);
mqCommonService.send(jsonObject.toJSONString(), CS.MQ.MQ_TYPE_MODIFY_MCH_APP);
return ApiRes.ok();
}
......
......@@ -15,6 +15,7 @@
*/
package com.jeequan.jeepay.mch.ctrl.merchant;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.aop.MethodLog;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.MchInfo;
......@@ -22,8 +23,9 @@ import com.jeequan.jeepay.core.entity.PayInterfaceConfig;
import com.jeequan.jeepay.core.entity.PayInterfaceDefine;
import com.jeequan.jeepay.core.exception.BizException;
import com.jeequan.jeepay.core.model.ApiRes;
import com.jeequan.jeepay.core.mq.MqCommonService;
import com.jeequan.jeepay.core.utils.JsonKit;
import com.jeequan.jeepay.mch.ctrl.CommonCtrl;
import com.jeequan.jeepay.mch.mq.topic.MqTopic4ModifyMchApp;
import com.jeequan.jeepay.service.impl.MchInfoService;
import com.jeequan.jeepay.service.impl.PayInterfaceConfigService;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -45,8 +47,8 @@ import java.util.List;
public class MchPayInterfaceConfigController extends CommonCtrl {
@Autowired private PayInterfaceConfigService payInterfaceConfigService;
@Autowired private MqTopic4ModifyMchApp mqTopic4ModifyMchApp;
@Autowired private MchInfoService mchInfoService;
@Autowired private MqCommonService mqCommonService;
/**
* @Author: ZhuXiao
......@@ -124,8 +126,9 @@ public class MchPayInterfaceConfigController extends CommonCtrl {
if (!result) {
throw new BizException("配置失败");
}
mqTopic4ModifyMchApp.push(getCurrentMchNo(), infoId); // 推送mq到目前节点进行更新数据
JSONObject jsonObject = JsonKit.newJson("mchNo", getCurrentMchNo());
jsonObject.put("appId", infoId);
mqCommonService.send(jsonObject.toJSONString(), CS.MQ.MQ_TYPE_MODIFY_MCH_APP);
return ApiRes.ok();
}
......
......@@ -13,14 +13,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.topic;
package com.jeequan.jeepay.mch.mq;
import com.alibaba.fastjson.JSONObject;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.utils.JsonKit;
import com.jeequan.jeepay.core.mq.MqCommonService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
......@@ -33,20 +36,37 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
public class MqTopic4ModifyMchApp extends ActiveMQTopic{
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class ActiveMqSend extends MqCommonService {
@Autowired private JmsTemplate jmsTemplate;
public MqTopic4ModifyMchApp(){
super(CS.MQ.TOPIC_MODIFY_MCH_APP);
@Bean("activeMqSendModifyMchApp")
public ActiveMQTopic mqTopic4ModifyMchApp(){
return new ActiveMQTopic(CS.MQ.TOPIC_MODIFY_MCH_APP);
}
@Lazy
@Autowired
@Qualifier("activeMqSendModifyMchApp")
private ActiveMQTopic mqTopic4ModifyMchApp;
/** 推送消息到各个节点 **/
public void push(String mchNo, String appId) {
JSONObject jsonObject = JsonKit.newJson("mchNo", mchNo);
jsonObject.put("appId", appId);
@Override
public void send(String msg, String sendType) {
if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改
topicModifyMchApp(msg);
}
}
@Override
public void send(String msg, long delay, String sendType) {
}
this.jmsTemplate.convertAndSend(this, jsonObject.toString());
/** 发送商户应用修改信息 **/
public void topicModifyMchApp(String msg) {
this.jmsTemplate.convertAndSend(mqTopic4ModifyMchApp, msg);
}
}
......@@ -13,35 +13,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mgr.mq.queue;
package com.jeequan.jeepay.mch.mq;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.mq.MqCommonService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/**
* 商户订单回调MQ通知
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/21 18:03
*/
* RabbitMq
* 商户应用修改推送
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
public class MqQueue4PayOrderMchNotify extends ActiveMQQueue{
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqSend extends MqCommonService {
@Autowired private JmsTemplate jmsTemplate;
@Autowired private RabbitTemplate rabbitTemplate;
public MqQueue4PayOrderMchNotify(){
super(CS.MQ.QUEUE_PAYORDER_MCH_NOTIFY);
/** 推送消息 **/
@Override
public void send(String msg, String sendType) {
if (sendType.equals(CS.MQ.MQ_TYPE_MODIFY_MCH_APP)) { // 商户应用修改
directModifyMchApp(msg);
}
}
/** 发送MQ消息 **/
public void send(Long notifyId) {
this.jmsTemplate.convertAndSend(this, notifyId + "");
@Override
public void send(String msg, long delay, String sendType) {
}
/** 发送商户应用修改信息 **/
public void directModifyMchApp(String msg) {
rabbitTemplate.convertAndSend(CS.DIRECT_EXCHANGE, CS.MQ.TOPIC_MODIFY_MCH_APP, msg);
}
}
......@@ -13,45 +13,43 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mgr.mq.queue;
package com.jeequan.jeepay.mch.mq.activemq.queue;
import com.alibaba.fastjson.JSONArray;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import java.util.Collection;
/**
* 商户用户信息清除
* 商户用户登录信息清除
*
* @author pangxiaoyu
* @site https://www.jeepay.vip
* @date 2021-06-07 07:15
* @date 2021-04-27 15:50
*/
@Slf4j
@Component
public class MqQueue4ModifyMchUserRemove extends ActiveMQQueue{
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqQueueReceive extends ActiveMQQueue {
@Autowired private JmsTemplate jmsTemplate;
@Autowired private MqReceiveCommon mqReceiveCommon;
public MqQueue4ModifyMchUserRemove(){
public MqQueueReceive(){
super(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE);
}
/**
* @author: pangxiaoyu
* @date: 2021/6/7 16:16
* @describe: 推送消息到各个节点
* @date: 2021/6/7 16:17
* @describe: 接收 商户用户登录信息清除消息
*/
public void push(Collection<Long> userIdList) {
if(userIdList == null || userIdList.isEmpty()){
return ;
}
this.jmsTemplate.convertAndSend(this,JSONArray.toJSONString(userIdList));
@JmsListener(destination = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE)
public void receive(String userIdStr) {
mqReceiveCommon.removeMchUser(userIdStr);
}
}
......@@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.topic;
package com.jeequan.jeepay.mch.mq.activemq.topic;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
......@@ -33,23 +34,19 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
public class MqTopic4ModifySysConfig extends ActiveMQTopic{
@Profile(CS.MQTYPE.ACTIVE_MQ)
public class MqTopicReceive extends ActiveMQTopic{
@Autowired private SysConfigService sysConfigService;
@Autowired private MqReceiveCommon mqReceiveCommon;
public MqTopic4ModifySysConfig(){
public MqTopicReceive(){
super(CS.MQ.TOPIC_MODIFY_SYS_CONFIG);
}
/** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer")
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
mqReceiveCommon.initDbConfig(msg);
}
}
......@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.topic;
package com.jeequan.jeepay.mch.mq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
......
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.config;
import com.jeequan.jeepay.core.constants.CS;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
/**
* RabbitMq
* 队列交换机注册
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021/6/25 17:10
*/
@Profile(CS.MQTYPE.RABBIT_MQ)
@Configuration
@EnableRabbit
public class RabbitMqConfig {
@Bean("modifyMchApp")
public Queue modifyMchApp() {
return new Queue(CS.MQ.TOPIC_MODIFY_MCH_APP,true);
}
//创建 direct 交换机
@Bean("directExchange")
DirectExchange directExchange() {
return new DirectExchange(CS.DIRECT_EXCHANGE,true,false);
}
//绑定 将队列和交换机绑定, 并设置用于匹配键:TOPIC_MODIFY_MCH_APP
@Bean
Binding bindingMchApp(@Qualifier("modifyMchApp") Queue modifyMchApp, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(modifyMchApp).to(directExchange).with(CS.MQ.TOPIC_MODIFY_MCH_APP);
}
}
......@@ -13,48 +13,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mgr.mq.topic;
package com.jeequan.jeepay.mch.mq.rabbitmq;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import com.jeequan.jeepay.mch.mq.receive.MqReceiveCommon;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component;
/*
* 更改系统配置参数
*
* @author terrfly
* @site https://www.jeepay.vip
* @date 2021/6/8 17:13
*/
/**
* 消息接收
* @author pangxiaoyu
* @site https://www.jeepay.vip
* @date 2021-04-27 15:50
*/
@Slf4j
@Component
public class MqTopic4ModifySysConfig extends ActiveMQTopic{
@Autowired private JmsTemplate jmsTemplate;
@Autowired private SysConfigService sysConfigService;
public MqTopic4ModifySysConfig(){
super(CS.MQ.TOPIC_MODIFY_SYS_CONFIG);
@Profile(CS.MQTYPE.RABBIT_MQ)
public class RabbitMqReceive {
@Autowired private MqReceiveCommon mqReceiveCommon;
/**
* @author: pangxiaoyu
* @date: 2021/6/7 16:17
* @describe: 接收 商户用户登录信息清除消息
*/
@RabbitListener(queues = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE)
public void receiveRemoveMchUser(String userIdStr) {
mqReceiveCommon.removeMchUser(userIdStr);
}
/** 接收 更新系统配置项的消息 **/
@JmsListener(destination = CS.MQ.TOPIC_MODIFY_SYS_CONFIG, containerFactory = "jmsListenerContainer")
public void receive(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
@RabbitListener(bindings = {@QueueBinding(value = @Queue(),exchange = @Exchange(name = CS.FANOUT_EXCHANGE_SYS_CONFIG,type = "fanout"))})
public void receiveInitDbConfig(String msg) {
mqReceiveCommon.initDbConfig(msg);
}
/** 推送消息到各个节点 **/
public void push(String msg) {
this.jmsTemplate.convertAndSend(this, msg);
}
}
......@@ -13,44 +13,33 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jeequan.jeepay.mch.mq.queue;
package com.jeequan.jeepay.mch.mq.receive;
import com.alibaba.fastjson.JSONArray;
import com.jeequan.jeepay.core.cache.RedisUtil;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.service.impl.SysConfigService;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
/**
* 更新商户配置mq
* 处理公共接收消息方法
*
* @author pangxiaoyu
* @author xiaoyu
* @site https://www.jeepay.vip
* @date 2021-04-27 15:50
* @date 2021/6/25 17:10
*/
@Slf4j
@Component
public class MqQueue4ModifyMchUserRemove extends ActiveMQQueue {
@Service
public class MqReceiveCommon {
public MqQueue4ModifyMchUserRemove(){
super(CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE);
}
/**
* @author: pangxiaoyu
* @date: 2021/6/7 16:17
* @describe: 接收 更新系统配置项的消息
*/
@JmsListener(destination = CS.MQ.QUEUE_MODIFY_MCH_USER_REMOVE)
public void receive(String userIdStr) {
@Autowired private SysConfigService sysConfigService;
public void removeMchUser(String userIdStr) {
log.info("成功接收删除商户用户登录的订阅通知, msg={}", userIdStr);
// 字符串转List<Long>
List<Long> userIdList = JSONArray.parseArray(userIdStr, Long.class);
......@@ -73,6 +62,9 @@ public class MqQueue4ModifyMchUserRemove extends ActiveMQQueue {
log.info("无权限登录用户信息已清除");
}
public void initDbConfig(String msg) {
log.info("成功接收更新系统配置的订阅通知, msg={}", msg);
sysConfigService.initDBConfig(msg);
log.info("系统配置静态属性已重置");
}
}
......@@ -50,9 +50,24 @@ spring:
timeout: 1000
database: 2 #1库:运营平台 #2库:商户系统 #3库:支付网关
password:
# 注意:以下MQ配置需注意【如需使用activeMQ则需将rabbitMQ配置注释即可】
# profiles:
# include:
# - activeMQ
# - rabbitMQ # 需要安装延迟队列插件:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/
#activeMQ配置
activemq:
broker-url: tcp://localhost:61616 #连接地址
# activemq:
# broker-url: tcp://localhost:61616 #连接地址
#rabbitmq配置
# rabbitmq:
# addresses: 127.0.0.1:5672
# username: guest
# password: guest
# dynamic: true
# virtual-host: /
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
......
......@@ -73,6 +73,12 @@
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 添加对rabbitMQ的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--wx_pay https://github.com/wechat-group/WxJava -->
<dependency>
<groupId>com.github.binarywang</groupId>
......
......@@ -71,6 +71,11 @@ public abstract class AbstractChannelNoticeService implements IChannelNoticeServ
return requestKitBean.getReqParamJSON();
}
/**request.getParameter 获取参数 并转换为JSON格式 **/
protected String getReqParamFromBody() {
return requestKitBean.getReqParamFromBody();
}
/** 获取文件路径 **/
protected String getCertFilePath(String certFilePath) {
return channelCertConfigKitBean.getCertFilePath(certFilePath);
......
......@@ -16,24 +16,24 @@
package com.jeequan.jeepay.pay.channel.wxpay;
import com.alibaba.fastjson.JSONObject;
import com.github.binarywang.wxpay.bean.ecommerce.SignatureHeader;
import com.github.binarywang.wxpay.bean.notify.SignatureHeader;
import com.github.binarywang.wxpay.bean.notify.WxPayOrderNotifyResult;
import com.github.binarywang.wxpay.bean.notify.WxPayOrderNotifyV3Result;
import com.github.binarywang.wxpay.config.WxPayConfig;
import com.github.binarywang.wxpay.constant.WxPayConstants;
import com.github.binarywang.wxpay.service.WxPayService;
import com.github.binarywang.wxpay.v3.auth.AutoUpdateCertificatesVerifier;
import com.github.binarywang.wxpay.v3.auth.PrivateKeySigner;
import com.github.binarywang.wxpay.v3.auth.WxPayCredentials;
import com.github.binarywang.wxpay.v3.util.AesUtils;
import com.github.binarywang.wxpay.v3.util.PemUtils;
import com.jeequan.jeepay.core.constants.CS;
import com.jeequan.jeepay.core.entity.PayOrder;
import com.jeequan.jeepay.core.exception.BizException;
import com.jeequan.jeepay.core.exception.ResponseException;
import com.jeequan.jeepay.pay.channel.AbstractChannelNoticeService;
import com.jeequan.jeepay.pay.model.MchAppConfigContext;
import com.jeequan.jeepay.pay.rqrs.msg.ChannelRetMsg;
import com.jeequan.jeepay.pay.service.ConfigContextService;
import com.jeequan.jeepay.pay.model.MchAppConfigContext;
import com.jeequan.jeepay.service.impl.PayOrderService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
......@@ -46,7 +46,6 @@ import org.springframework.stereotype.Service;
import javax.servlet.http.HttpServletRequest;
import java.io.FileInputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.security.PrivateKey;
/*
......@@ -87,22 +86,10 @@ public class WxpayChannelNoticeService extends AbstractChannelNoticeService {
throw new BizException("获取商户信息失败");
}
// 验签
if (!verifyNotifySign(request, mchAppConfigContext)) {
throw new BizException("验签失败");
}
// 获取加密信息
JSONObject params = getReqParamJSON();
JSONObject resource = params.getJSONObject("resource");
String cipherText = resource.getString("cipherText");
String associatedData = resource.getString("associatedData");
String nonce = resource.getString("nonce");
// 验签 && 获取订单回调数据
WxPayOrderNotifyV3Result.DecryptNotifyResult result = parseOrderNotifyV3Result(request, mchAppConfigContext);
// 解密
String result = AesUtils.decryptToString(associatedData, nonce, cipherText, mchAppConfigContext.getWxServiceWrapper().getWxPayService().getConfig().getApiV3Key());
JSONObject decryptJSON = JSONObject.parseObject(result);
return MutablePair.of(decryptJSON.getString("out_trade_no"), decryptJSON);
return MutablePair.of(result.getOutTradeNo(), result);
} else { // V2接口回调
String xmlResult = IOUtils.toString(request.getInputStream(), request.getCharacterEncoding());
......@@ -141,12 +128,12 @@ public class WxpayChannelNoticeService extends AbstractChannelNoticeService {
}else if (CS.PAY_IF_VERSION.WX_V3.equals(mchAppConfigContext.getWxServiceWrapper().getApiVersion())) { // V3
// 获取回调参数
JSONObject resultJSON = (JSONObject) params;
WxPayOrderNotifyV3Result.DecryptNotifyResult result = (WxPayOrderNotifyV3Result.DecryptNotifyResult) params;
// 验证参数
verifyWxPayParams(resultJSON, payOrder);
verifyWxPayParams(result, payOrder);
String channelState = resultJSON.getString("trade_state");
String channelState = result.getTradeState();
if ("SUCCESS".equals(channelState)) {
channelResult.setChannelState(ChannelRetMsg.ChannelState.CONFIRM_SUCCESS);
}else if("CLOSED".equals(channelState)
......@@ -155,17 +142,21 @@ public class WxpayChannelNoticeService extends AbstractChannelNoticeService {
channelResult.setChannelState(ChannelRetMsg.ChannelState.CONFIRM_FAIL); //支付失败
}
channelResult.setChannelOrderId(resultJSON.getString("transaction_id")); //渠道订单号
JSONObject payer = resultJSON.getJSONObject("payer");
channelResult.setChannelOrderId(result.getTransactionId()); //渠道订单号
WxPayOrderNotifyV3Result.Payer payer = result.getPayer();
if (payer != null) {
channelResult.setChannelUserId(StringUtils.isNotBlank(payer.getString("openid")) ? payer.getString("openid") : payer.getString("sp_openid")); //支付用户ID
channelResult.setChannelUserId(payer.getOpenid()); //支付用户ID
}
}else {
throw ResponseException.buildText("API_VERSION ERROR");
}
ResponseEntity okResponse = textResp("SUCCESS");
JSONObject resJSON = new JSONObject();
resJSON.put("code", "SUCCESS");
resJSON.put("message", "成功");
ResponseEntity okResponse = jsonResp(resJSON);
channelResult.setResponseEntity(okResponse); //响应数据
return channelResult;
......@@ -203,38 +194,42 @@ public class WxpayChannelNoticeService extends AbstractChannelNoticeService {
* @param mchAppConfigContext 商户配置
* @return true:校验通过 false:校验不通过
*/
private boolean verifyNotifySign(HttpServletRequest request, MchAppConfigContext mchAppConfigContext) throws Exception {
private WxPayOrderNotifyV3Result.DecryptNotifyResult parseOrderNotifyV3Result(HttpServletRequest request, MchAppConfigContext mchAppConfigContext) throws Exception {
SignatureHeader header = new SignatureHeader();
header.setTimeStamp(request.getHeader("Wechatpay-Timestamp"));
header.setNonce(request.getHeader("Wechatpay-Nonce"));
header.setSerialNo(request.getHeader("Wechatpay-Serial"));
header.setSigned(request.getHeader("Wechatpay-Signature"));
header.setSerial(request.getHeader("Wechatpay-Serial"));
header.setSignature(request.getHeader("Wechatpay-Signature"));
String beforeSign = String.format("%s\n%s\n%s\n",
header.getTimeStamp(),
header.getNonce(),
getReqParamJSON().toJSONString());
// 获取加密信息
String params = getReqParamFromBody();
WxPayConfig wxPayConfig = mchAppConfigContext.getWxServiceWrapper().getWxPayService().getConfig();
log.info("\n【请求头信息】:{}\n【加密数据】:{}", header.toString(), params);
WxPayService wxPayService = mchAppConfigContext.getWxServiceWrapper().getWxPayService();
WxPayConfig wxPayConfig = wxPayService.getConfig();
// 自动获取微信平台证书
PrivateKey privateKey = PemUtils.loadPrivateKey(new FileInputStream(wxPayConfig.getPrivateKeyPath()));
AutoUpdateCertificatesVerifier verifier = new AutoUpdateCertificatesVerifier(
new WxPayCredentials(wxPayConfig.getMchId(), new PrivateKeySigner(wxPayConfig.getCertSerialNo(), privateKey)),
wxPayConfig.getApiV3Key().getBytes("utf-8"));
wxPayConfig.setVerifier(verifier);
wxPayService.setConfig(wxPayConfig);
WxPayOrderNotifyV3Result result = wxPayService.parseOrderNotifyV3Result(params, header);
return verifier.verify(header.getSerialNo(),
beforeSign.getBytes(StandardCharsets.UTF_8), header.getSigned());
return result.getResult();
}
/**
* V3接口验证微信支付通知参数
* @return
*/
public void verifyWxPayParams(JSONObject result, PayOrder payOrder) {
public void verifyWxPayParams(WxPayOrderNotifyV3Result.DecryptNotifyResult result, PayOrder payOrder) {
try {
// 核对金额
Integer total_fee = result.getInteger("total"); // 总金额
Integer total_fee = result.getAmount().getTotal(); // 总金额
long wxPayAmt = new BigDecimal(total_fee).longValue();
long dbPayAmt = payOrder.getAmount().longValue();
if (dbPayAmt != wxPayAmt) {
......
......@@ -65,7 +65,7 @@ public class WxpayChannelUserService implements IChannelUserService {
oauth2Url = DEFAULT_OAUTH_URL;
}
return String.format(oauth2Url + "?appid=%s&scope=snsapi_base&state=&redirect_uri=%s", appId, callbackUrlEncode);
return String.format(oauth2Url + "?appid=%s&scope=snsapi_base&state=&redirect_uri=%s&response_type=code#wechat_redirect", appId, callbackUrlEncode);
}
@Override
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment