Commit 593eb2e9 by 裴大威

Merge branch 'feat-1002409' into 'master'

delay queue

See merge request rays/pcloud-common-parent!78
parents a22ba5f5 f4644430
...@@ -10,7 +10,10 @@ package com.pcloud.common.core.constant; ...@@ -10,7 +10,10 @@ package com.pcloud.common.core.constant;
* @date:2018年8月17日,上午10:48:18 * @date:2018年8月17日,上午10:48:18
*/ */
public class MQTopicProducer { public class MQTopicProducer {
/**
* 个人号发送文字消息
*/
public static final String SELF_ROBOT_SEND_TEXT = "topic.SelfRobotSendText";
/** /**
* 监听小号上下线 * 监听小号上下线
*/ */
......
package com.pcloud.common.core.mq;
import java.io.Serializable;
import lombok.Builder;
import lombok.Data;
/**
* 延时队列通用DTO
*/
@Data
@Builder
public class DelayQueueDTO implements Serializable {
private static final long serialVersionUID = -5631122429661157876L;
/**
* 消息键
*/
private String key;
/**
* 消息类型
*/
private String type;
/**
* 消息内容
*/
private Object msg;
/**
* 消息过期时间(单位:ms)
*/
private Integer timeout;
}
package com.pcloud.common.core.mq;
public class MQExchangeConstants {
/**
* rays 通用交换机
*/
public static final String DEAD_LETTER_EXCHANGE = "rays.dlx";
/**
* rays 通用routing key
*/
public static final String DEAD_ROUTING_KEY = "rays.dlk";
/**
* book 延时队列交换机
*/
public static final String BOOK_DELAYED_EXCHANGE = "book.exchange.delay";
/**
* book 延时队列 routing key
*/
public static final String BOOK_DELAY_ROUTING_KEY = "book.routingkey.delay";
/**
* book 延时队列交换机
*/
public static final String WECHATGROUP_DELAYED_EXCHANGE = "wechatgroup.exchange.delay";
/**
* book 延时队列 routing key
*/
public static final String WECHATGROUP_DELAY_ROUTING_KEY = "wechatgroup.routingkey.delay";
}
package com.pcloud.common.core.mq;
public class MQQueueConstants {
/**
* 延时队列后缀
*/
public static final String IMMEDIATE_QUEUE_FOR_DELAY = "queue.delay.immediate";
/**
* book延时队列
*/
public static final String BOOK_IMMEDIATE_QUEUE_FOR_DELAY = "book." + IMMEDIATE_QUEUE_FOR_DELAY;
/**
* wechatgroup延时队列
*/
public static final String WECHATGROUP_IMMEDIATE_QUEUE_FOR_DELAY = "wechatgroup." + IMMEDIATE_QUEUE_FOR_DELAY;
}
...@@ -24,23 +24,23 @@ import com.pcloud.common.core.constant.MQTopicProducer; ...@@ -24,23 +24,23 @@ import com.pcloud.common.core.constant.MQTopicProducer;
@Configuration @Configuration
public class RabbitMQFactory { public class RabbitMQFactory {
private static final String DEAD_LETTER_EXCHANGE = "rays.dlx"; public static final String X_DELAYED_TYPE ="x-delayed-type";
public static final String X_DELAYED_MESSAGE ="x-delayed-message";
private static final String DEAD_ROUTING_KEY = "rays.dlk"; public static final String DIRECT ="direct";
public static final String TOPIC ="topic";
private static TopicExchange topicExchange = null; private static TopicExchange topicExchange = null;
/** /**
* 声明业务队列同时与死信队列绑定,当业务队列的消息失败时会转发到死信队列中在进行处理,防止信息丢失 * 声明业务队列同时与死信队列绑定,当业务队列的消息失败时会转发到死信队列中在进行处理,防止信息丢失
* *
* @param queueName * @param queueName
* 队列名称 * 队列名称
* @return
*/ */
public static Queue queueBuilder(String queueName) { public static Queue queueBuilder(String queueName) {
Map<String, Object> map = new HashMap<String, Object>(); Map<String, Object> map = new HashMap<String, Object>();
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 设置死信交换机 map.put("x-dead-letter-exchange", MQExchangeConstants.DEAD_LETTER_EXCHANGE);// 设置死信交换机
map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);// 设置死信routingKey map.put("x-dead-letter-routing-key", MQExchangeConstants.DEAD_ROUTING_KEY);// 设置死信routingKey
return QueueBuilder.durable(queueName).withArguments(map).build(); return QueueBuilder.durable(queueName).withArguments(map).build();
} }
...@@ -51,22 +51,21 @@ public class RabbitMQFactory { ...@@ -51,22 +51,21 @@ public class RabbitMQFactory {
@Bean @Bean
public DirectExchange deadLetterExchange() { public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false); return new DirectExchange(MQExchangeConstants.DEAD_LETTER_EXCHANGE, true, false);
} }
@Bean @Bean
public Binding deadBinding() { public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadLetterExchange()).with(DEAD_ROUTING_KEY); return BindingBuilder.bind(deadQueue()).to(deadLetterExchange()).with(MQExchangeConstants.DEAD_ROUTING_KEY);
} }
/** /**
* Topic模式下生产者与消费者到交换机的绑定 * Topic模式下生产者与消费者到交换机的绑定
* *
* @param queue * @param queue
* 消费者队列 * 消费者队列
* @param producer * @param producer
* 生产者名称 * 生产者名称
* @return
*/ */
public static Binding bindingExchange(Queue queue, String producer) { public static Binding bindingExchange(Queue queue, String producer) {
return BindingBuilder.bind(queue).to(getTopicExchange()).with(producer); return BindingBuilder.bind(queue).to(getTopicExchange()).with(producer);
...@@ -77,10 +76,6 @@ public class RabbitMQFactory { ...@@ -77,10 +76,6 @@ public class RabbitMQFactory {
return getTopicExchange(); return getTopicExchange();
} }
/**
*
* @return
*/
private static TopicExchange getTopicExchange() { private static TopicExchange getTopicExchange() {
if (topicExchange == null) { if (topicExchange == null) {
topicExchange = new TopicExchange(MQTopicProducer.EXCHAGE); topicExchange = new TopicExchange(MQTopicProducer.EXCHAGE);
...@@ -88,4 +83,4 @@ public class RabbitMQFactory { ...@@ -88,4 +83,4 @@ public class RabbitMQFactory {
return topicExchange; return topicExchange;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment