Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
jinli gu
Jeepay
Commits
a307feb4
Commit
a307feb4
authored
Dec 14, 2021
by
大森林
Committed by
Gitee
Dec 14, 2021
Browse files
!7 支持阿里云RocketMQ商业版
Merge pull request !7 from 皮生/dev
parents
21814d03
91d06832
Changes
15
Hide whitespace changes
Inline
Side-by-side
conf/devCommons/config/application.yml
View file @
a307feb4
...
...
@@ -76,6 +76,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意:aliyun-rocketmq配置项请放置到根目录, 不是spring的二级配置! )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。
...
...
@@ -111,5 +119,5 @@ isys:
access-key-secret
:
SECRET_SECRET_SECRET
#AccessKeySecret
mq
:
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ
aliYunRocketMQ
】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
conf/manager/application.yml
View file @
a307feb4
...
...
@@ -86,6 +86,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意:aliyun-rocketmq配置项请放置到根目录, 不是spring的二级配置! )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。
...
...
@@ -124,5 +132,5 @@ isys:
access-key-secret
:
SECRET_SECRET_SECRET
#AccessKeySecret
mq
:
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ
aliYunRocketMQ
】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
conf/merchant/application.yml
View file @
a307feb4
...
...
@@ -86,6 +86,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意:aliyun-rocketmq配置项请放置到根目录, 不是spring的二级配置! )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。
...
...
@@ -124,5 +132,5 @@ isys:
access-key-secret
:
SECRET_SECRET_SECRET
#AccessKeySecret
mq
:
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ
aliYunRocketMQ
】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
conf/payment/application.yml
View file @
a307feb4
...
...
@@ -86,6 +86,14 @@ spring:
# producer:
# group: JEEPAY-GROUP
## 阿里云rocketmq配置 ( 注意:aliyun-rocketmq配置项请放置到根目录, 不是spring的二级配置! )
#aliyun-rocketmq:
# namesrvAddr: 127.0.0.1:9876
# accessKey: accessKey
# secretKey: secretKey
# consumerId: JEEPAY-GROUP
# producerId: JEEPAY-GROUP
#日志配置参数。
# 当存在logback-spring.xml文件时: 该配置将引进到logback配置, springboot配置不生效。
# 不存在logback-spring.xml 文件时, 使用springboot的配置, 同样可用。
...
...
@@ -122,5 +130,5 @@ isys:
access-key-secret
:
SECRET_SECRET_SECRET
#AccessKeySecret
mq
:
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ 】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
vender
:
activeMQ
# 切换MQ厂商, 支持:【 activeMQ rabbitMQ rocketMQ
aliYunRocketMQ
】, 需正确配置 【对应的yml参数】 和 【jeepay-components-mq项目下pom.xml中的依赖包】。
jeepay-components/jeepay-components-mq/pom.xml
View file @
a307feb4
...
...
@@ -16,6 +16,10 @@
<version>
Final
</version>
</parent>
<properties>
<aliyun-openservices-ons-client.version>
1.8.8.1.Final
</aliyun-openservices-ons-client.version>
</properties>
<!-- 项目依赖声明 -->
<dependencies>
...
...
@@ -66,6 +70,14 @@
<scope>
provided
</scope>
</dependency>
<!-- AliyunRocketMQ -->
<dependency>
<groupId>
com.aliyun.openservices
</groupId>
<artifactId>
ons-client
</artifactId>
<version>
${aliyun-openservices-ons-client.version}
</version>
<scope>
provided
</scope>
</dependency>
<!-- ↑↑↑↑↑↑ MQ依赖包 ↑↑↑↑↑↑ -->
</dependencies>
...
...
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/constant/MQVenderCS.java
View file @
a307feb4
...
...
@@ -29,5 +29,6 @@ public class MQVenderCS {
public
static
final
String
ACTIVE_MQ
=
"activeMQ"
;
public
static
final
String
RABBIT_MQ
=
"rabbitMQ"
;
public
static
final
String
ROCKET_MQ
=
"rocketMQ"
;
public
static
final
String
ALIYUN_ROCKET_MQ
=
"aliYunRocketMQ"
;
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AbstractAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq
;
import
com.aliyun.openservices.ons.api.*
;
import
com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum
;
import
com.jeequan.jeepay.components.mq.vender.IMQMsgReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.InitializingBean
;
import
org.springframework.beans.factory.annotation.Autowired
;
@Slf4j
public
abstract
class
AbstractAliYunRocketMQReceiver
implements
IMQMsgReceiver
,
InitializingBean
{
@Autowired
private
AliYunRocketMQFactory
aliYunRocketMQFactory
;
/**
* 获取topic名称
*
* @return
*/
public
abstract
String
getMQName
();
/**
* 获取业务名称
*
* @return
*/
public
abstract
String
getCusumerName
();
/**
* 发送类型
*
* @return
*/
public
MQSendTypeEnum
getMQType
()
{
// QUEUE - 点对点 (只有1个消费者可消费。 ActiveMQ的queue模式 )
return
MQSendTypeEnum
.
QUEUE
;
}
@Override
public
void
afterPropertiesSet
()
throws
Exception
{
Consumer
consumerClient
=
MQSendTypeEnum
.
BROADCAST
.
equals
(
getMQType
())
?
// 广播订阅模式
aliYunRocketMQFactory
.
broadcastConsumerClient
()
:
aliYunRocketMQFactory
.
consumerClient
();
consumerClient
.
subscribe
(
this
.
getMQName
(),
AliYunRocketMQFactory
.
defaultTag
,
new
MessageListener
()
{
@Override
public
Action
consume
(
Message
message
,
ConsumeContext
context
)
{
try
{
receiveMsg
(
new
String
(
message
.
getBody
()));
log
.
info
(
"【{}】MQ消息消费成功topic:{}, messageId:{}"
,
getCusumerName
(),
message
.
getTopic
(),
message
.
getMsgID
());
return
Action
.
CommitMessage
;
}
catch
(
Exception
e
)
{
log
.
error
(
"【{}】MQ消息消费失败topic:{}, messageId:{}"
,
getCusumerName
(),
message
.
getTopic
(),
message
.
getMsgID
(),
e
);
}
return
Action
.
ReconsumeLater
;
}
});
consumerClient
.
start
();
log
.
info
(
"初始化[{}]消费者topic: {},tag: {}成功"
,
getCusumerName
(),
this
.
getMQName
(),
AliYunRocketMQFactory
.
defaultTag
);
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQFactory.java
0 → 100644
View file @
a307feb4
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq
;
import
com.aliyun.openservices.ons.api.*
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.stereotype.Service
;
import
java.util.Properties
;
@Service
public
class
AliYunRocketMQFactory
{
public
static
final
String
defaultTag
=
"Default"
;
@Value
(
"${aliyun-rocketmq.namesrvAddr:}"
)
public
String
namesrvAddr
;
@Value
(
"${aliyun-rocketmq.accessKey}"
)
private
String
accessKey
;
@Value
(
"${aliyun-rocketmq.secretKey}"
)
private
String
secretKey
;
@Value
(
"${aliyun-rocketmq.consumerId}"
)
private
String
consumerId
;
@Value
(
"${aliyun-rocketmq.producerId}"
)
private
String
producerId
;
@Bean
(
name
=
"producerClient"
)
public
Producer
producerClient
()
{
Properties
properties
=
new
Properties
();
properties
.
put
(
PropertyKeyConst
.
ProducerId
,
producerId
);
properties
.
put
(
PropertyKeyConst
.
AccessKey
,
accessKey
);
properties
.
put
(
PropertyKeyConst
.
SecretKey
,
secretKey
);
// 判断是否为空(生产环境走k8s集群公共配置,不获取本地配置文件的值)
if
(
StringUtils
.
isNotEmpty
(
namesrvAddr
))
{
properties
.
put
(
PropertyKeyConst
.
NAMESRV_ADDR
,
namesrvAddr
);
}
return
ONSFactory
.
createProducer
(
properties
);
}
@Bean
(
name
=
"consumerClient"
)
public
Consumer
consumerClient
()
{
Properties
properties
=
new
Properties
();
properties
.
put
(
PropertyKeyConst
.
ConsumerId
,
consumerId
);
properties
.
put
(
PropertyKeyConst
.
AccessKey
,
accessKey
);
properties
.
put
(
PropertyKeyConst
.
SecretKey
,
secretKey
);
// 判断是否为空(生产环境走k8s集群公共配置,不获取本地配置文件的值)
if
(
StringUtils
.
isNotEmpty
(
namesrvAddr
))
{
properties
.
put
(
PropertyKeyConst
.
NAMESRV_ADDR
,
namesrvAddr
);
}
return
ONSFactory
.
createConsumer
(
properties
);
}
@Bean
(
name
=
"broadcastConsumerClient"
)
public
Consumer
broadcastConsumerClient
()
{
Properties
properties
=
new
Properties
();
properties
.
put
(
PropertyKeyConst
.
ConsumerId
,
consumerId
);
properties
.
put
(
PropertyKeyConst
.
AccessKey
,
accessKey
);
properties
.
put
(
PropertyKeyConst
.
SecretKey
,
secretKey
);
// 广播订阅方式设置
properties
.
put
(
PropertyKeyConst
.
MessageModel
,
PropertyValueConst
.
BROADCASTING
);
// 判断是否为空(生产环境走k8s集群环境变量自动注入,不获取本地配置文件的值)
if
(
StringUtils
.
isNotEmpty
(
namesrvAddr
))
{
properties
.
put
(
PropertyKeyConst
.
NAMESRV_ADDR
,
namesrvAddr
);
}
return
ONSFactory
.
createConsumer
(
properties
);
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/AliYunRocketMQSender.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq
;
import
com.alibaba.fastjson.JSONObject
;
import
com.aliyun.openservices.ons.api.Message
;
import
com.aliyun.openservices.ons.api.Producer
;
import
com.aliyun.openservices.ons.api.SendResult
;
import
com.jeequan.jeepay.components.mq.model.AbstractMQ
;
import
com.jeequan.jeepay.components.mq.vender.IMQSender
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.stereotype.Component
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.TreeMap
;
/**
* 阿里云rocketMQ 消息发送器的实现
*/
@Slf4j
@Component
public
class
AliYunRocketMQSender
implements
IMQSender
{
private
static
final
List
<
Integer
>
DELAY_TIME_LEVEL
=
new
ArrayList
<>();
static
{
// 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
DELAY_TIME_LEVEL
.
add
(
1
);
DELAY_TIME_LEVEL
.
add
(
5
);
DELAY_TIME_LEVEL
.
add
(
10
);
DELAY_TIME_LEVEL
.
add
(
30
);
DELAY_TIME_LEVEL
.
add
(
60
*
1
);
DELAY_TIME_LEVEL
.
add
(
60
*
2
);
DELAY_TIME_LEVEL
.
add
(
60
*
3
);
DELAY_TIME_LEVEL
.
add
(
60
*
4
);
DELAY_TIME_LEVEL
.
add
(
60
*
5
);
DELAY_TIME_LEVEL
.
add
(
60
*
6
);
DELAY_TIME_LEVEL
.
add
(
60
*
7
);
DELAY_TIME_LEVEL
.
add
(
60
*
8
);
DELAY_TIME_LEVEL
.
add
(
60
*
9
);
DELAY_TIME_LEVEL
.
add
(
60
*
10
);
DELAY_TIME_LEVEL
.
add
(
60
*
20
);
DELAY_TIME_LEVEL
.
add
(
60
*
30
);
DELAY_TIME_LEVEL
.
add
(
60
*
60
*
1
);
DELAY_TIME_LEVEL
.
add
(
60
*
60
*
2
);
}
@Autowired
private
AliYunRocketMQFactory
aliYunRocketMQFactory
;
private
Producer
producerClient
;
@Override
public
void
send
(
AbstractMQ
mqModel
)
{
Message
message
=
new
Message
(
mqModel
.
getMQName
(),
AliYunRocketMQFactory
.
defaultTag
,
mqModel
.
toMessage
().
getBytes
());
sendMessage
(
message
);
}
@Override
public
void
send
(
AbstractMQ
mqModel
,
int
delaySeconds
)
{
Message
message
=
new
Message
(
mqModel
.
getMQName
(),
AliYunRocketMQFactory
.
defaultTag
,
mqModel
.
toMessage
().
getBytes
());
if
(
delaySeconds
>
0
)
{
long
delayTime
=
System
.
currentTimeMillis
()
+
getNearDelayLevel
(
delaySeconds
)
*
1000
;
// 设置消息需要被投递的时间。
message
.
setStartDeliverTime
(
delayTime
);
}
sendMessage
(
message
);
}
private
void
sendMessage
(
Message
message
)
{
if
(
producerClient
==
null
)
{
producerClient
=
aliYunRocketMQFactory
.
producerClient
();
}
producerClient
.
start
();
SendResult
sendResult
=
producerClient
.
send
(
message
);
log
.
info
(
"消息队列推送返回结果:{}"
,
JSONObject
.
toJSONString
(
sendResult
));
}
/**
* 获取最接近的节点值
**/
private
int
getNearDelayLevel
(
int
delay
)
{
// 如果包含则直接返回
if
(
DELAY_TIME_LEVEL
.
contains
(
delay
))
{
return
DELAY_TIME_LEVEL
.
indexOf
(
delay
)
+
1
;
}
//两个时间的绝对值 - 位置
TreeMap
<
Integer
,
Integer
>
resultMap
=
new
TreeMap
<>();
DELAY_TIME_LEVEL
.
stream
().
forEach
(
time
->
resultMap
.
put
(
Math
.
abs
(
delay
-
time
),
DELAY_TIME_LEVEL
.
indexOf
(
time
)
+
1
));
return
resultMap
.
firstEntry
().
getValue
();
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/receive/CleanMchLoginAuthCacheAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive
;
import
com.jeequan.jeepay.components.mq.constant.MQVenderCS
;
import
com.jeequan.jeepay.components.mq.executor.MqThreadExecutor
;
import
com.jeequan.jeepay.components.mq.model.CleanMchLoginAuthCacheMQ
;
import
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnBean
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.scheduling.annotation.Async
;
import
org.springframework.stereotype.Component
;
/**
* AliYunRocketMQ消息接收器:仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 清除商户登录信息
*/
@Component
@ConditionalOnProperty
(
name
=
MQVenderCS
.
YML_VENDER_KEY
,
havingValue
=
MQVenderCS
.
ALIYUN_ROCKET_MQ
)
@ConditionalOnBean
(
CleanMchLoginAuthCacheMQ
.
IMQReceiver
.
class
)
@Slf4j
public
class
CleanMchLoginAuthCacheAliYunRocketMQReceiver
extends
AbstractAliYunRocketMQReceiver
{
private
static
final
String
cusumerName
=
"清除商户登录消息"
;
@Autowired
private
CleanMchLoginAuthCacheMQ
.
IMQReceiver
mqReceiver
;
/**
* 接收 【 queue 】 类型的消息
**/
@Override
@Async
(
MqThreadExecutor
.
EXECUTOR_PAYORDER_MCH_NOTIFY
)
public
void
receiveMsg
(
String
msg
)
{
mqReceiver
.
receive
(
CleanMchLoginAuthCacheMQ
.
parse
(
msg
));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public
String
getMQName
()
{
return
CleanMchLoginAuthCacheMQ
.
MQ_NAME
;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public
String
getCusumerName
()
{
return
cusumerName
;
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/receive/PayOrderDivisionAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive
;
import
com.jeequan.jeepay.components.mq.constant.MQVenderCS
;
import
com.jeequan.jeepay.components.mq.model.PayOrderDivisionMQ
;
import
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnBean
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.stereotype.Component
;
/**
* AliYunRocketMQ消息接收器:仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 支付订单分账通知
*/
@Slf4j
@Component
@ConditionalOnProperty
(
name
=
MQVenderCS
.
YML_VENDER_KEY
,
havingValue
=
MQVenderCS
.
ALIYUN_ROCKET_MQ
)
@ConditionalOnBean
(
PayOrderDivisionMQ
.
IMQReceiver
.
class
)
public
class
PayOrderDivisionAliYunRocketMQReceiver
extends
AbstractAliYunRocketMQReceiver
{
private
static
final
String
cusumerName
=
"支付订单分账消息"
;
@Autowired
private
PayOrderDivisionMQ
.
IMQReceiver
mqReceiver
;
/**
* 接收 【 queue 】 类型的消息
**/
@Override
public
void
receiveMsg
(
String
msg
)
{
mqReceiver
.
receive
(
PayOrderDivisionMQ
.
parse
(
msg
));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public
String
getMQName
()
{
return
PayOrderDivisionMQ
.
MQ_NAME
;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public
String
getCusumerName
()
{
return
cusumerName
;
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/receive/PayOrderMchNotifyAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive
;
import
com.jeequan.jeepay.components.mq.constant.MQVenderCS
;
import
com.jeequan.jeepay.components.mq.executor.MqThreadExecutor
;
import
com.jeequan.jeepay.components.mq.model.PayOrderMchNotifyMQ
;
import
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnBean
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.scheduling.annotation.Async
;
import
org.springframework.stereotype.Component
;
/**
* AliYunRocketMQ消息接收器:仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 支付订单商户通知
*/
@Slf4j
@Component
@ConditionalOnProperty
(
name
=
MQVenderCS
.
YML_VENDER_KEY
,
havingValue
=
MQVenderCS
.
ALIYUN_ROCKET_MQ
)
@ConditionalOnBean
(
PayOrderMchNotifyMQ
.
IMQReceiver
.
class
)
public
class
PayOrderMchNotifyAliYunRocketMQReceiver
extends
AbstractAliYunRocketMQReceiver
{
private
static
final
String
cusumerName
=
"支付订单商户消息"
;
@Autowired
private
PayOrderMchNotifyMQ
.
IMQReceiver
mqReceiver
;
/**
* 接收 【 queue 】 类型的消息
**/
@Override
@Async
(
MqThreadExecutor
.
EXECUTOR_PAYORDER_MCH_NOTIFY
)
public
void
receiveMsg
(
String
msg
)
{
mqReceiver
.
receive
(
PayOrderMchNotifyMQ
.
parse
(
msg
));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public
String
getMQName
()
{
return
PayOrderMchNotifyMQ
.
MQ_NAME
;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public
String
getCusumerName
()
{
return
cusumerName
;
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/receive/PayOrderReissueAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive
;
import
com.jeequan.jeepay.components.mq.constant.MQVenderCS
;
import
com.jeequan.jeepay.components.mq.executor.MqThreadExecutor
;
import
com.jeequan.jeepay.components.mq.model.PayOrderReissueMQ
;
import
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnBean
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.scheduling.annotation.Async
;
import
org.springframework.stereotype.Component
;
/**
* AliYunRocketMQ消息接收器:仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 支付订单补单(一般用于没有回调的接口,比如微信的条码支付)
*/
@Slf4j
@Component
@ConditionalOnProperty
(
name
=
MQVenderCS
.
YML_VENDER_KEY
,
havingValue
=
MQVenderCS
.
ALIYUN_ROCKET_MQ
)
@ConditionalOnBean
(
PayOrderReissueMQ
.
IMQReceiver
.
class
)
public
class
PayOrderReissueAliYunRocketMQReceiver
extends
AbstractAliYunRocketMQReceiver
{
private
static
final
String
cusumerName
=
"支付订单补单消息"
;
@Autowired
private
PayOrderReissueMQ
.
IMQReceiver
mqReceiver
;
/**
* 接收 【 queue 】 类型的消息
**/
@Override
@Async
(
MqThreadExecutor
.
EXECUTOR_PAYORDER_MCH_NOTIFY
)
public
void
receiveMsg
(
String
msg
)
{
mqReceiver
.
receive
(
PayOrderReissueMQ
.
parse
(
msg
));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public
String
getMQName
()
{
return
PayOrderReissueMQ
.
MQ_NAME
;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public
String
getCusumerName
()
{
return
cusumerName
;
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/receive/ResetAppConfigAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive
;
import
com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum
;
import
com.jeequan.jeepay.components.mq.constant.MQVenderCS
;
import
com.jeequan.jeepay.components.mq.model.ResetAppConfigMQ
;
import
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnBean
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.stereotype.Component
;
/**
* AliYunRocketMQ消息接收器:仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 更新系统配置参数
*/
@Slf4j
@Component
@ConditionalOnProperty
(
name
=
MQVenderCS
.
YML_VENDER_KEY
,
havingValue
=
MQVenderCS
.
ALIYUN_ROCKET_MQ
)
@ConditionalOnBean
(
ResetAppConfigMQ
.
IMQReceiver
.
class
)
public
class
ResetAppConfigAliYunRocketMQReceiver
extends
AbstractAliYunRocketMQReceiver
{
private
static
final
String
cusumerName
=
"更新系统配置参数消息"
;
@Autowired
private
ResetAppConfigMQ
.
IMQReceiver
mqReceiver
;
/**
* 接收 【 MQSendTypeEnum.BROADCAST 】 广播类型的消息
* <p>
* 注意:
* AliYunRocketMQ的广播模式(fanout)交换机 --》全部的Queue
* 如果queue包含多个消费者, 【例如,manager和payment的监听器是名称相同的queue下的消费者(Consumers) 】, 两个消费者是工作模式且存在竞争关系, 导致只能一个来消费。
* 解决:
* 每个topic的QUEUE都声明一个FANOUT交换机, 消费者声明一个系统产生的【随机队列】绑定到这个交换机上,然后往交换机发消息,只要绑定到这个交换机上都能收到消息。
* 参考: https://bbs.csdn.net/topics/392509262?list=70088931
**/
@Override
public
void
receiveMsg
(
String
msg
)
{
mqReceiver
.
receive
(
ResetAppConfigMQ
.
parse
(
msg
));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public
String
getMQName
()
{
return
ResetAppConfigMQ
.
MQ_NAME
;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public
String
getCusumerName
()
{
return
cusumerName
;
}
/**
* 发送类型
*
* @return
*/
@Override
public
MQSendTypeEnum
getMQType
()
{
// RocketMQ的广播模式
return
MQSendTypeEnum
.
BROADCAST
;
}
}
jeepay-components/jeepay-components-mq/src/main/java/com/jeequan/jeepay/components/mq/vender/aliyunrocketmq/receive/ResetIsvMchAppInfoAliYunRocketMQReceiver.java
0 → 100644
View file @
a307feb4
/*
* Copyright (c) 2021-2031, 河北计全科技有限公司 (https://www.jeequan.com & jeequan@126.com).
* <p>
* Licensed under the GNU LESSER GENERAL PUBLIC LICENSE 3.0;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.gnu.org/licenses/lgpl.html
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.receive
;
import
com.jeequan.jeepay.components.mq.constant.MQSendTypeEnum
;
import
com.jeequan.jeepay.components.mq.constant.MQVenderCS
;
import
com.jeequan.jeepay.components.mq.model.ResetIsvMchAppInfoConfigMQ
;
import
com.jeequan.jeepay.components.mq.vender.aliyunrocketmq.AbstractAliYunRocketMQReceiver
;
import
lombok.extern.slf4j.Slf4j
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnBean
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
org.springframework.stereotype.Component
;
/**
* AliYunRocketMQ消息接收器:仅在vender=AliYunRocketMQ时 && 项目实现IMQReceiver接口时 进行实例化
* 业务: 更新服务商/商户/商户应用配置信息
*/
@Slf4j
@Component
@ConditionalOnProperty
(
name
=
MQVenderCS
.
YML_VENDER_KEY
,
havingValue
=
MQVenderCS
.
ALIYUN_ROCKET_MQ
)
@ConditionalOnBean
(
ResetIsvMchAppInfoConfigMQ
.
IMQReceiver
.
class
)
public
class
ResetIsvMchAppInfoAliYunRocketMQReceiver
extends
AbstractAliYunRocketMQReceiver
{
private
static
final
String
cusumerName
=
"更新服务商/商户/商户应用配置信息消息"
;
@Autowired
private
ResetIsvMchAppInfoConfigMQ
.
IMQReceiver
mqReceiver
;
/**
* 接收 【 MQSendTypeEnum.BROADCAST 】 广播类型的消息
* <p>
* 注意:
* AliYunRocketMQ的广播模式(fanout)交换机 --》全部的Queue
* 如果queue包含多个消费者, 【例如,manager和payment的监听器是名称相同的queue下的消费者(Consumers) 】, 两个消费者是工作模式且存在竞争关系, 导致只能一个来消费。
* 解决:
* 每个topic的QUEUE都声明一个FANOUT交换机, 消费者声明一个系统产生的【随机队列】绑定到这个交换机上,然后往交换机发消息,只要绑定到这个交换机上都能收到消息。
* 参考: https://bbs.csdn.net/topics/392509262?list=70088931
**/
@Override
public
void
receiveMsg
(
String
msg
)
{
mqReceiver
.
receive
(
ResetIsvMchAppInfoConfigMQ
.
parse
(
msg
));
}
/**
* 获取topic名称
*
* @return
*/
@Override
public
String
getMQName
()
{
return
ResetIsvMchAppInfoConfigMQ
.
MQ_NAME
;
}
/**
* 获取业务名称
*
* @return
*/
@Override
public
String
getCusumerName
()
{
return
cusumerName
;
}
/**
* 发送类型
*
* @return
*/
@Override
public
MQSendTypeEnum
getMQType
()
{
// RocketMQ的广播模式
return
MQSendTypeEnum
.
BROADCAST
;
}
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment