Commit f4644430 by 裴大威

delay queue

parent a22ba5f5
......@@ -10,7 +10,10 @@ package com.pcloud.common.core.constant;
* @date:2018年8月17日,上午10:48:18
*/
public class MQTopicProducer {
/**
* 个人号发送文字消息
*/
public static final String SELF_ROBOT_SEND_TEXT = "topic.SelfRobotSendText";
/**
* 监听小号上下线
*/
......
package com.pcloud.common.core.mq;
import java.io.Serializable;
import lombok.Builder;
import lombok.Data;
/**
* 延时队列通用DTO
*/
@Data
@Builder
public class DelayQueueDTO implements Serializable {
private static final long serialVersionUID = -5631122429661157876L;
/**
* 消息键
*/
private String key;
/**
* 消息类型
*/
private String type;
/**
* 消息内容
*/
private Object msg;
/**
* 消息过期时间(单位:ms)
*/
private Integer timeout;
}
package com.pcloud.common.core.mq;
public class MQExchangeConstants {
/**
* rays 通用交换机
*/
public static final String DEAD_LETTER_EXCHANGE = "rays.dlx";
/**
* rays 通用routing key
*/
public static final String DEAD_ROUTING_KEY = "rays.dlk";
/**
* book 延时队列交换机
*/
public static final String BOOK_DELAYED_EXCHANGE = "book.exchange.delay";
/**
* book 延时队列 routing key
*/
public static final String BOOK_DELAY_ROUTING_KEY = "book.routingkey.delay";
/**
* book 延时队列交换机
*/
public static final String WECHATGROUP_DELAYED_EXCHANGE = "wechatgroup.exchange.delay";
/**
* book 延时队列 routing key
*/
public static final String WECHATGROUP_DELAY_ROUTING_KEY = "wechatgroup.routingkey.delay";
}
package com.pcloud.common.core.mq;
public class MQQueueConstants {
/**
* 延时队列后缀
*/
public static final String IMMEDIATE_QUEUE_FOR_DELAY = "queue.delay.immediate";
/**
* book延时队列
*/
public static final String BOOK_IMMEDIATE_QUEUE_FOR_DELAY = "book." + IMMEDIATE_QUEUE_FOR_DELAY;
/**
* wechatgroup延时队列
*/
public static final String WECHATGROUP_IMMEDIATE_QUEUE_FOR_DELAY = "wechatgroup." + IMMEDIATE_QUEUE_FOR_DELAY;
}
......@@ -24,23 +24,23 @@ import com.pcloud.common.core.constant.MQTopicProducer;
@Configuration
public class RabbitMQFactory {
private static final String DEAD_LETTER_EXCHANGE = "rays.dlx";
private static final String DEAD_ROUTING_KEY = "rays.dlk";
public static final String X_DELAYED_TYPE ="x-delayed-type";
public static final String X_DELAYED_MESSAGE ="x-delayed-message";
public static final String DIRECT ="direct";
public static final String TOPIC ="topic";
private static TopicExchange topicExchange = null;
/**
* 声明业务队列同时与死信队列绑定,当业务队列的消息失败时会转发到死信队列中在进行处理,防止信息丢失
*
*
* @param queueName
* 队列名称
* @return
*/
public static Queue queueBuilder(String queueName) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);// 设置死信交换机
map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);// 设置死信routingKey
map.put("x-dead-letter-exchange", MQExchangeConstants.DEAD_LETTER_EXCHANGE);// 设置死信交换机
map.put("x-dead-letter-routing-key", MQExchangeConstants.DEAD_ROUTING_KEY);// 设置死信routingKey
return QueueBuilder.durable(queueName).withArguments(map).build();
}
......@@ -51,22 +51,21 @@ public class RabbitMQFactory {
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
return new DirectExchange(MQExchangeConstants.DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadLetterExchange()).with(DEAD_ROUTING_KEY);
return BindingBuilder.bind(deadQueue()).to(deadLetterExchange()).with(MQExchangeConstants.DEAD_ROUTING_KEY);
}
/**
* Topic模式下生产者与消费者到交换机的绑定
*
*
* @param queue
* 消费者队列
* @param producer
* 生产者名称
* @return
*/
public static Binding bindingExchange(Queue queue, String producer) {
return BindingBuilder.bind(queue).to(getTopicExchange()).with(producer);
......@@ -77,10 +76,6 @@ public class RabbitMQFactory {
return getTopicExchange();
}
/**
*
* @return
*/
private static TopicExchange getTopicExchange() {
if (topicExchange == null) {
topicExchange = new TopicExchange(MQTopicProducer.EXCHAGE);
......@@ -88,4 +83,4 @@ public class RabbitMQFactory {
return topicExchange;
}
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment