【RocketMQ】顺序消息实现总结
全局有序
在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。
局部有序
假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID,比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID,在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列,这样同一个订单的数据就可以保证发送到一个消息队列中,消费者端使用MessageListenerOrderly
处理有序消息,这就是RocketMQ的局部有序,保证消息在某个消息队列中有序。
接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):
生产者
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
// 创建生产者
DefaultMQProducer producer = new DefaultMQProducer("生产者组");
// 启动
producer.start();
// 创建TAG
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
// 生成订单ID
int orderId = i % 10;
// 创建消息
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 获取订单ID
Integer id = (Integer) arg;
// 对消息队列个数取余
int index = id % mqs.size();
// 根据取余结果选择消息要发送给哪个消息队列
return mqs.get(index);
}
}, orderId); // 这里传入了订单ID
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 订阅主题
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
// 注册消息监听器,使用的是MessageListenerOrderly
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
// 打印消息
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID相关消息有序发送,接下来就看消费者是如何保证消息的顺序消费的。
定时任务对消息队列加锁
消费者在启动的时候,会对是否是顺序消费进行判断(监听器是否是MessageListenerOrderly
类型来判断),如果是顺序消费,会使用ConsumeMessageOrderlyService
,并调用它的start方法进行启动,在集群模式模式下,start方法中会启动一个定时加锁的任务,周期性的对该消费者负责的消息队列进行加锁。
为什么集群模式下需要加锁?
因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在顺序消费情况下,集群模式下需要对消息队列加锁,当某个消息队列被锁定时,其他的消费者不能进行消费。
加锁的具体逻辑如下,首先获取当前消费者负责的所有消息队列MessageQueue
,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:
(1)根据broker名称查找broker的详细信息;
(2)创建加锁请求,在请求中设置要加锁的消息队列,将请求发送给broker,表示要对这些消息队列进行加锁;
(3)Broker返回请求处理结果,响应结果中包含了加锁成功的消息队列,对于加锁成功的消息队列将消息队列MessageQueue
,将其对应的ProcessQueue
中的locked属性置为true表示该消息队列已加锁成功,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue
对象中的locked属性置为false表示加锁失败;
顺序消息拉取
上面可知,在使用顺序消息时,定时任务会周期性的对当前消费者负责的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配了新的消息队列,此时还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果是新分配到当前消费者的消息队列,同样会向Broker发送请求,对MessageQueue
进行加锁,加锁成功将其对应的ProcessQueue
中的locked属性置为true才可以拉取消息。
顺序消息消费
消息拉取成功之后,会将消息提交到线程池中进行处理,对于顺序消费处理逻辑如下:
获取消息队列
MessageQueue
的对象锁,每个MessageQueue
对应了一把Object
对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,由多个线程同时进行消费,所以某个线程在处理某个消息队列的消息时需要对该消息队列MessageQueue
加锁,防止其他线程并发消费该消息队列的锁,破坏消息的顺序性;public class MessageQueueLock { private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>(); public Object fetchLockObject(final MessageQueue mq) { // 获取消息队列对应的对象锁,也就是一个Object类型的对象 Object objLock = this.mqLockTable.get(mq); // 如果获取为空 if (null == objLock) { // 创建对象 objLock = new Object(); // 加入到Map中 Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock); if (prevLock != null) { objLock = prevLock; } } return objLock; } }
上一步获取锁成功之后,会再次校验该
MessageQueue
对应的ProcessQueue
中的锁(locked状态),看是否过期或者已经失效,过期或者失效稍后会重新进行加锁;获取
ProcessQueue
的中的consumeLock
消费锁,获取成功之后调用消息监听器的consumeMessage
方法开始消费消费;public class ProcessQueue { // 消息消费锁 private final Lock consumeLock = new ReentrantLock(); public Lock getConsumeLock() { // 获取消息消费锁 return consumeLock; } }
消息消费完毕,释放
ProcessQueue
的consumeLock
消费锁;方法执行完毕,释放
MessageQueue
对应的Object
对象锁;
在第1步中就已经获取了MessageQueue
对应的Object
对象锁对消息队列进行加锁了,那么为什么在第3步消费消息之前还要再加一个消费锁呢?
猜测有可能是在消费者进行负载均衡时,当前消费者负责的消息队列发生变化,可能移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueue
的consumeLock
消费锁进行加锁,相当于锁住ProcessQueue
,防止正在消费的过程中,ProcessQueue
被负载均衡移除。
既然如此,负载均衡的时候为什么不使用MessageQueue
对应的Object
对象锁进行加锁而要使用ProcessQueue
中的consumeLock
消费锁?
这里应该是为了减小锁的粒度,因为消费者在MessageQueue
对应的Object
加锁后,还进行了一系列的判断,校验都成功之后获取ProcessQueue
中的consumeLock
加锁,之后开始消费消息,消费完毕释放所有的锁,如果负载均衡使用MessageQueue
的Object
对象锁需要等待整个过程结束,锁的粒度较粗,这样显然会降低性能,而如果使用消息消费锁,只需要等待第3步和第4步结束就可以获取锁,减少等待的时间,而且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage
方法在执行的过程中(消息被消费的过程)ProcessQueue
不被移除即可。
总结
消费者端,是通过加锁来保证消息的顺序消费,一共有三把锁:
向Broker申请的消息队列锁
集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue
中的locked
变量中。消息队列锁
对应MessageQueue
对应的Object
对象锁,消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前需要对MessageQueue
加锁,锁住要处理的消息队列,主要是处理多线程之间的竞争,保证消息的顺序性。消息消费锁
对应ProcessQueue
中的consumeLock
,消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除。
对应的相关源码可参考: