Commit 911a7f82 by songxiang

RabbitMQ迁移

parent b48a6003
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import com.pcloud.common.core.dto.ExapiMessageDto; import org.springframework.amqp.core.AmqpTemplate;
import com.pcloud.common.exceptions.BizException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.jms.JMSException; import com.pcloud.common.core.aspect.ParamLog;
import javax.jms.ObjectMessage; import com.pcloud.common.core.constant.MQQueueConstant;
import javax.jms.Session; import com.pcloud.common.core.dto.ExapiMessageDto;
@Component("exapiQueueBiz") @Component("exapiQueueBiz")
public class ExapiQueueBizImpl implements ExapiQueueBiz{ public class ExapiQueueBizImpl implements ExapiQueueBiz {
private static final Logger LOGGER = LoggerFactory.getLogger(ExapiQueueBiz.class);
@Autowired(required = false) @Autowired
@Qualifier("jmsExapiTemplate") private AmqpTemplate amqpTemplate;
private JmsTemplate jmsExapiTemplate;
@Override @Override
public void sendMessageQueue(ExapiMessageDto exapiMessageDto) { @ParamLog("发送对外API queue")
MessageCreator messageCreator = new MessageCreator() { public void sendMessageQueue(ExapiMessageDto exapiMessageDto) {
@Override amqpTemplate.convertAndSend(MQQueueConstant.EXAPI, exapiMessageDto);
public ObjectMessage createMessage(Session session) throws JMSException { }
return session.createObjectMessage(exapiMessageDto);
}
};
try {
jmsExapiTemplate.send(messageCreator);
} catch (Exception e) {
LOGGER.error("发送失败," + e.getMessage() + "," + exapiMessageDto, e);
throw BizException.SEND_QUEUE_FAIL;
}
LOGGER.info("发送成功," + exapiMessageDto);
}
} }
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import com.pcloud.common.core.dto.ConvertQueueDto;
import com.pcloud.common.core.dto.FrontEventDto; import com.pcloud.common.core.dto.FrontEventDto;
import com.pcloud.common.exceptions.BizException; import com.pcloud.common.exceptions.BizException;
...@@ -10,10 +9,10 @@ import com.pcloud.common.exceptions.BizException; ...@@ -10,10 +9,10 @@ import com.pcloud.common.exceptions.BizException;
* @description: * @description:
*/ */
public interface FrontEventBiz { public interface FrontEventBiz {
/** /**
* 发送事件消息 * 发送事件消息
* *
* @param frontEventDto * @param frontEventDto
*/ */
public void sendFrontEventQueue(FrontEventDto frontEventDto) throws BizException; public void sendFrontEventQueue(FrontEventDto frontEventDto) throws BizException;
} }
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.pcloud.common.core.aspect.ParamLog;
import com.pcloud.common.core.constant.MQQueueConstant;
import com.pcloud.common.core.dto.FundMessageDto; import com.pcloud.common.core.dto.FundMessageDto;
import com.pcloud.common.exceptions.BizException; import com.pcloud.common.exceptions.BizException;
/** /**
* @description 基金资源变更记录消息队列实现类
* @author PENG * @author PENG
* @date 2017年10月18日 下午2:11:52
* @version 1.0 * @version 1.0
* @description 基金资源变更记录消息队列实现类
* @date 2017年10月18日 下午2:11:52
*/ */
@Component("fundQueueBiz") @Component("fundQueueBiz")
public class FundQueueBizImpl implements FundQueueBiz { public class FundQueueBizImpl implements FundQueueBiz {
...@@ -29,29 +24,20 @@ public class FundQueueBizImpl implements FundQueueBiz { ...@@ -29,29 +24,20 @@ public class FundQueueBizImpl implements FundQueueBiz {
*/ */
private final static Logger logger = LoggerFactory.getLogger(FundQueueBizImpl.class); private final static Logger logger = LoggerFactory.getLogger(FundQueueBizImpl.class);
@Autowired(required = false) @Autowired
@Qualifier("jmsFundTemplate") private AmqpTemplate amqpTemplate;
private JmsTemplate jmsFundTemplate;
/** /**
* 发送队列 * 发送队列
*/ */
@Override @Override
@ParamLog("发送基金资源变更记录消息QUEUE")
public void sendFundMessage(FundMessageDto fundMessageDto) throws BizException { public void sendFundMessage(FundMessageDto fundMessageDto) throws BizException {
MessageCreator messageCreator = new MessageCreator() {
@Override
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(fundMessageDto);
}
};
try { try {
jmsFundTemplate.send(messageCreator); amqpTemplate.convertAndSend(MQQueueConstant.FUND, fundMessageDto);
} catch (Exception e) { } catch (Exception e) {
logger.error("发送失败," + e.getMessage() + "," + fundMessageDto.toString(), e); logger.error("发送失败," + e.getMessage() + "," + fundMessageDto.toString(), e);
throw BizException.SEND_QUEUE_FAIL; throw BizException.SEND_QUEUE_FAIL;
} }
logger.info("发送成功," + fundMessageDto.toString());
} }
} }
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import com.pcloud.common.core.dto.DynamicDto; import org.springframework.amqp.core.AmqpTemplate;
import com.pcloud.common.exceptions.BizException;
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.jms.JMSException; import com.pcloud.common.core.aspect.ParamLog;
import javax.jms.ObjectMessage; import com.pcloud.common.core.constant.MQQueueConstant;
import javax.jms.Session; import com.pcloud.common.core.dto.DynamicDto;
import com.pcloud.common.exceptions.BizException;
/** /**
* @描述: * @描述:
* @作者:lucas *
* @创建时间:2017年12月14日,15:58 * @作者:lucas @创建时间:2017年12月14日,15:58 @版本:1.0
* @版本:1.0
*/ */
@Component("merchantDynamicQueueBiz") @Component("merchantDynamicQueueBiz")
public class MerchantDynamicQueueBizImpl implements MerchantDynamicQueueBiz { public class MerchantDynamicQueueBizImpl implements MerchantDynamicQueueBiz {
private final static Logger logger= LoggerFactory.getLogger(MerchantDynamicQueueBizImpl.class); @Autowired
/** private AmqpTemplate amqpTemplate;
* 消息模板
*/
@Autowired(required=false)
@Qualifier("jmsDynamicTemplate")
private JmsTemplate jmsDynamicTemplate;
@Override @Override
public void send(DynamicDto dynamicDto) throws BizException { @ParamLog("动态监控QUEUE")
MessageCreator messageCreator = new MessageCreator() { public void send(DynamicDto dynamicDto) throws BizException {
@Override amqpTemplate.convertAndSend(MQQueueConstant.MERCHANT_DYNAMIC, dynamicDto);
public ObjectMessage createMessage(Session session) throws JMSException { }
return session.createObjectMessage(dynamicDto);
}
};
try {
jmsDynamicTemplate.send(messageCreator);
} catch (Exception e) {
logger.error("发送失败,"+e.getMessage()+","+dynamicDto,e);
throw BizException.SEND_QUEUE_FAIL;
}
logger.info("发送成功,"+dynamicDto);
}
} }
...@@ -2,6 +2,7 @@ package com.pcloud.common.core.biz; ...@@ -2,6 +2,7 @@ package com.pcloud.common.core.biz;
import com.pcloud.common.core.dto.SendAppMessageDto;
import com.pcloud.common.core.dto.SendEmailDto; import com.pcloud.common.core.dto.SendEmailDto;
import com.pcloud.common.core.dto.SendMessageDto; import com.pcloud.common.core.dto.SendMessageDto;
import com.pcloud.common.core.dto.SendNotifyDto; import com.pcloud.common.core.dto.SendNotifyDto;
...@@ -37,5 +38,10 @@ public interface MessageBiz { ...@@ -37,5 +38,10 @@ public interface MessageBiz {
* @param sendSMSDto * @param sendSMSDto
*/ */
void sendSMS(SendMessageDto smsParam); void sendSMS(SendMessageDto smsParam);
/**
* 短信推送
*/
void sendAppMessage(SendAppMessageDto sendAppMessageDto);
} }
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import java.io.IOException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import com.pcloud.common.core.aspect.ParamLog;
import com.pcloud.common.core.constant.MQQueueConstant;
import com.pcloud.common.core.dto.SendAppMessageDto;
import com.pcloud.common.core.dto.SendEmailDto; import com.pcloud.common.core.dto.SendEmailDto;
import com.pcloud.common.core.dto.SendMessageDto; import com.pcloud.common.core.dto.SendMessageDto;
import com.pcloud.common.core.dto.SendNotifyDto; import com.pcloud.common.core.dto.SendNotifyDto;
...@@ -20,148 +16,82 @@ import com.pcloud.common.utils.mq.MqMessage; ...@@ -20,148 +16,82 @@ import com.pcloud.common.utils.mq.MqMessage;
import com.pcloud.common.utils.mq.MqMessageTypeEnum; import com.pcloud.common.utils.mq.MqMessageTypeEnum;
/** /**
*
* @描述:邮件业务实现 * @描述:邮件业务实现
* @作者:shichunshan * @作者:shichunshan
* @创建时间:2016年6月24日,下午2:19:27 * @创建时间:2016年6月24日,下午2:19:27 @版本:1.0
* @版本:1.0
*/ */
@Component("mailBiz") @Component("messageQueueBiz")
public class MessageBizImpl implements MessageBiz{ public class MessageBizImpl implements MessageBiz {
private final static Logger logger=LoggerFactory.getLogger(MessageBizImpl.class); private final static Logger logger = LoggerFactory.getLogger(MessageBizImpl.class);
@Autowired
private AmqpTemplate amqpTemplate;
/** /**
* 消息模板 * 发送邮件
*
* @param sendEmailDto
*/ */
@Autowired(required = false)
@Qualifier("jmsMessageTemplate")
private JmsTemplate jmsMessageTemplate;
@Override @Override
@ParamLog("发送邮件")
public void sendEmail(SendEmailDto sendEmailDto) { public void sendEmail(SendEmailDto sendEmailDto) {
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MqMessage mqMessage = null;
try {
mqMessage = formateMessage(sendEmailDto);
logger.info("mqMessage+++++++++++" + mqMessage);
} catch (IOException e) {
logger.error("邮件发送失败," + sendEmailDto);
return null;
}
return session.createObjectMessage(mqMessage);
}
};
try { try {
jmsMessageTemplate.send(messageCreator); ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendEmailDto);
MqMessage mqMessage = new MqMessage(MqMessageTypeEnum.EMAIL, content);
amqpTemplate.convertAndSend(MQQueueConstant.MESSAGE, mqMessage);
} catch (Exception e) { } catch (Exception e) {
logger.error("邮件发送失败," + e.getMessage() + "," + sendEmailDto, e); logger.error("邮件发送失败," + e.getMessage() + "," + sendEmailDto, e);
return;
} }
logger.info("邮件发送成功," + sendEmailDto);
} }
/** /**
* 格式化邮件内容 * 发送站内信
*
* @param mailParam
* 邮件参数
* @return 队列消息
* @throws IOException
* 序列化为json错误
*/ */
private MqMessage formateMessage(SendEmailDto sendEmailDto) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendEmailDto);
return new MqMessage(MqMessageTypeEnum.EMAIL, content);
}
@Override @Override
@ParamLog("发送站内信")
public void sendLetter(SendNotifyDto sendNotifyDto) { public void sendLetter(SendNotifyDto sendNotifyDto) {
MessageCreator messageCreator = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MqMessage mqMessage = null;
try {
mqMessage = formateMessage(sendNotifyDto);
logger.info("mqMessage+++++++++++" + mqMessage);
} catch (IOException e) {
logger.error("站内信发送失败," + sendNotifyDto);
return null;
}
return session.createObjectMessage(mqMessage);
}
};
try { try {
jmsMessageTemplate.send(messageCreator); ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendNotifyDto);
MqMessage mqMessage = new MqMessage(MqMessageTypeEnum.LETTER, content);
amqpTemplate.convertAndSend(MQQueueConstant.MESSAGE, mqMessage);
} catch (Exception e) { } catch (Exception e) {
logger.error("站内信发送失败," + e.getMessage() + "," + sendNotifyDto, e); logger.error("站内信发送失败," + e.getMessage() + "," + sendNotifyDto, e);
return;
} }
logger.info("站内信发送成功," + sendNotifyDto);
} }
/** /**
* 格式化站内信内容 * 发送短信
*
* @param sendNotifyDto
* 站内信参数
* @return 队列消息
* @throws IOException
* 序列化为json错误
*/ */
private MqMessage formateMessage(SendNotifyDto sendNotifyDto) throws IOException {
ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendNotifyDto);
return new MqMessage(MqMessageTypeEnum.LETTER, content);
}
@Override @Override
public void sendSMS(SendMessageDto smsParam) { @ParamLog("发送短信")
MessageCreator messageCreator = new MessageCreator() { public void sendSMS(SendMessageDto sendMessageDto) {
@Override
public Message createMessage(Session session) throws JMSException {
MqMessage mqMessage = null;
try {
mqMessage = formateMessage(smsParam);
logger.info("mqMessage+++++++++++" + mqMessage);
} catch (IOException e) {
logger.error("短信发送失败," + smsParam);
return null;
}
return session.createObjectMessage(mqMessage);
}
};
try { try {
jmsMessageTemplate.send(messageCreator); ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendMessageDto);
MqMessage mqMessage = new MqMessage(MqMessageTypeEnum.SHORT_MESSAGE, content);
amqpTemplate.convertAndSend(MQQueueConstant.MESSAGE, mqMessage);
} catch (Exception e) { } catch (Exception e) {
logger.error("短信发送失败," + e.getMessage() + "," + smsParam, e); logger.error("短信发送失败," + e.getMessage() + "," + sendMessageDto, e);
return;
} }
logger.info("短信发送成功," + smsParam);
} }
/** /**
* 格式化内容 * 发送app推送
*
* @param sendSMSDto
* 邮件参数
* @return 队列消息
* @throws IOException
* 序列化为json错误
*/ */
private MqMessage formateMessage(SendMessageDto smsParam) throws IOException { @Override
ObjectMapper objectMapper = new ObjectMapper(); @ParamLog("发送APP推送")
String content = objectMapper.writeValueAsString(smsParam); public void sendAppMessage(SendAppMessageDto sendAppMessageDto) {
return new MqMessage(MqMessageTypeEnum.SHORT_MESSAGE, content); try {
ObjectMapper objectMapper = new ObjectMapper();
String content = objectMapper.writeValueAsString(sendAppMessageDto);
MqMessage mqMessage = new MqMessage(MqMessageTypeEnum.APP_MESSAGE, content);
amqpTemplate.convertAndSend(MQQueueConstant.MESSAGE, mqMessage);
} catch (Exception e) {
logger.error("APP推送发送失败," + e.getMessage() + "," + sendAppMessageDto, e);
}
} }
} }
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import javax.jms.JMSException; import org.springframework.amqp.core.AmqpTemplate;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.pcloud.common.core.aspect.ParamLog;
import com.pcloud.common.core.constant.MQQueueConstant;
import com.pcloud.common.core.dto.OperationLogDto; import com.pcloud.common.core.dto.OperationLogDto;
import com.pcloud.common.exceptions.BizException; import com.pcloud.common.exceptions.BizException;
@Service("operationLogQueueBiz") @Service("operationLogQueueBiz")
public class OperationLogQueueBizImpl implements OperationLogQueueBiz { public class OperationLogQueueBizImpl implements OperationLogQueueBiz {
/** @Autowired
* private AmqpTemplate amqpTemplate;
*/
private final static Logger logger = LoggerFactory.getLogger(OperationLogQueueBizImpl.class);
/**
* 消息模板
*/
@Autowired(required = false)
@Qualifier("jmsLogTemplate")
private JmsTemplate jmsLogTemplate;
@Override @Override
@ParamLog("日志记录QUEUE")
public void send(OperationLogDto operationLog) throws BizException { public void send(OperationLogDto operationLog) throws BizException {
MessageCreator messageCreator = new MessageCreator() { amqpTemplate.convertAndSend(MQQueueConstant.LOG, operationLog);
@Override
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(operationLog);
}
};
try {
jmsLogTemplate.send(messageCreator);
} catch (Exception e) {
logger.error("发送失败," + e.getMessage() + "," + operationLog, e);
throw BizException.SEND_QUEUE_FAIL;
}
logger.info("发送成功," + operationLog);
} }
} }
package com.pcloud.common.core.biz; package com.pcloud.common.core.biz;
import com.pcloud.common.core.dto.WeektaskMessageDto; import org.springframework.amqp.core.AmqpTemplate;
import com.pcloud.common.exceptions.BizException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.jms.JMSException; import com.pcloud.common.core.aspect.ParamLog;
import javax.jms.ObjectMessage; import com.pcloud.common.core.constant.MQQueueConstant;
import javax.jms.Session; import com.pcloud.common.core.dto.WeektaskMessageDto;
import com.pcloud.common.exceptions.BizException;
/** /**
* 编辑周任务队列实现 * 编辑周任务队列实现
...@@ -21,37 +16,18 @@ import javax.jms.Session; ...@@ -21,37 +16,18 @@ import javax.jms.Session;
* @create 2017-09-21 09:33 * @create 2017-09-21 09:33
**/ **/
@Component("weektaskQueueBiz") @Component("weektaskQueueBiz")
public class WeektaskQueueBizImpl implements WeektaskQueueBiz{ public class WeektaskQueueBizImpl implements WeektaskQueueBiz {
private final static Logger logger= LoggerFactory.getLogger(WeektaskQueueBizImpl.class); @Autowired
private AmqpTemplate amqpTemplate;
@Autowired(required=false)
@Qualifier("jmsWeektaskTemplate") /**
private JmsTemplate jmsWeekstaskTemplate; * 发送队列
*/
// @Autowired(required= false) @Override
// @Qualifier("raystask") @ParamLog("红榜任务QUEUE")
// private ActiveMQQueue templateQueue; public void sendMessageQueue(WeektaskMessageDto weektaskMessageDto) throws BizException {
amqpTemplate.convertAndSend(MQQueueConstant.WEEK_TASK, weektaskMessageDto);
/** }
* 发送队列
*/
@Override
public void sendMessageQueue(WeektaskMessageDto weektaskMessageDto) throws BizException {
MessageCreator messageCreator = new MessageCreator() {
@Override
public ObjectMessage createMessage(Session session) throws JMSException {
return session.createObjectMessage(weektaskMessageDto);
}
};
try {
jmsWeekstaskTemplate.send(messageCreator);
} catch (Exception e) {
logger.error("发送失败," + e.getMessage() + "," + weektaskMessageDto, e);
throw BizException.SEND_QUEUE_FAIL;
}
logger.info("发送成功," + weektaskMessageDto);
}
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment