Commit d7a69ea5 authored by cbwleft's avatar cbwleft
Browse files

使用策略模式将业务与具体的MQ操作分离

parent d1a86eb2
......@@ -2,18 +2,12 @@ package org.xxpay.boot.service.mq;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.activemq.ScheduledMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import org.xxpay.common.util.MyLog;
import org.xxpay.boot.service.BaseService;
import javax.jms.*;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.Date;
......@@ -25,44 +19,22 @@ import java.util.Date;
* @version V1.0
* @Copyright: www.xxpay.org
*/
@Component
public class Mq4PayNotify extends BaseService {
public abstract class Mq4PayNotify extends BaseService {
@Autowired
private Queue payNotifyQueue;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private RestTemplate restTemplate;
private static final MyLog _log = MyLog.getLog(Mq4PayNotify.class);
protected static final MyLog _log = MyLog.getLog(Mq4PayNotify.class);
public void send(String msg) {
_log.info("发送MQ消息:msg={}", msg);
this.jmsTemplate.convertAndSend(this.payNotifyQueue, msg);
}
public abstract void send(String msg);
/**
* 发送延迟消息
* @param msg
* @param delay
*/
public void send(String msg, long delay) {
_log.info("发送MQ延时消息:msg={},delay={}", msg, delay);
jmsTemplate.send(this.payNotifyQueue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
}
});
}
public abstract void send(String msg, long delay);
@JmsListener(destination = MqConfig.PAY_NOTIFY_QUEUE_NAME)
public void receive(String msg) {
_log.info("do notify task, msg={}", msg);
JSONObject msgObj = JSON.parseObject(msg);
......
......@@ -3,6 +3,7 @@ package org.xxpay.boot.service.mq;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import javax.jms.Queue;
......@@ -17,10 +18,10 @@ import javax.jms.Queue;
public class MqConfig {
public static final String PAY_NOTIFY_QUEUE_NAME = "pay.notify.queue";
@Bean
public Queue payNotifyQueue() {
return new ActiveMQQueue(PAY_NOTIFY_QUEUE_NAME);
public static class Impl{
public static final String ACTIVE_MQ = "activeMQ";
public static final String RABBIT_MQ = "rabbitMQ";
}
}
package org.xxpay.boot.service.mq.impl;
import javax.jms.*;
import org.apache.activemq.ScheduledMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Profile;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
import org.xxpay.boot.service.mq.Mq4PayNotify;
import org.xxpay.boot.service.mq.MqConfig;
import static org.xxpay.boot.service.mq.MqConfig.PAY_NOTIFY_QUEUE_NAME;
@Component
@Profile(MqConfig.Impl.ACTIVE_MQ)
public class ActiveMq4PayNotify extends Mq4PayNotify{
@Bean
public Queue payNotifyQueue() {
return new ActiveMQQueue(PAY_NOTIFY_QUEUE_NAME);
}
@Autowired
private Queue payNotifyQueue;
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void send(String msg) {
_log.info("发送MQ消息:msg={}", msg);
jmsTemplate.convertAndSend(payNotifyQueue, msg);
}
@Override
public void send(String msg, long delay) {
_log.info("发送MQ延时消息:msg={},delay={}", msg, delay);
jmsTemplate.send(this.payNotifyQueue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage tm = session.createTextMessage(msg);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1*1000);
tm.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 1);
return tm;
}
});
}
@JmsListener(destination = PAY_NOTIFY_QUEUE_NAME)
public void onMessage(String msg) {
receive(msg);
}
}
......@@ -24,6 +24,7 @@ spring:
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000 # 通过connectProperties属性来打开mergeSql功能;慢SQL记录
profiles:
active: prod
include: activeMQ
activemq:
broker-url: failover:(tcp://127.0.0.1:61616?wireFormat.maxInactivityDuration=0)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment