RocketMQ的消费者分配MessageQueue的六种负载均衡算法
RocketMQ 的 Consumer 在指定消费哪些 Message Queue 时,有几种分配方式,下面是总结: 主要有六种方式:
- 平均分配
- 循环分配
- 用户自定义,指定 ConsumerGroup 对应的 MessageQueue.
- 用户按机房自定义,指定 ConsumerGroup 消费哪些机房,从而只消费对应机房里的 MessageQueue
- 机房就近分配。
- 一致性 hash 算法的方式来分配
详细解释及核心源码片段
1.平均分配策略
按照以下步骤进行分配
1.把消费者进行排序;
2.计算每个消费者可以平均分配的 MessageQueue 数量; 3.如果消费者数量大于 MessageQueue 数量,多出的消费者就分不到; 4.如果不可以平分,就使用 MessageQueue 总数量对消费者数量求余数 mod; 5.对前 mod 数量消费者,每个消费者加一个,这样就获取到了每个消费者分配的 MessageQueue 数量。
比如 4 个 MessageQueue 和 3 个消费者的情况:
源代码的逻辑非常简单,如下:
// AllocateMessageQueueAveragely 这个类
// 4 个 MessageQueue 和 3 个消费者的情况,假如第一个,index = 0
int index = cidAll.indexOf(currentCID);
// mod = 1
int mod = mqAll.size() % cidAll.size();
// averageSize = 2
int averageSize =
mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
+ 1 : mqAll.size() / cidAll.size());
// startIndex = 0
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
// range = 2,所以第一个消费者分配到了2个
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
result.add(mqAll.get((startIndex + i) % mqAll.size()));
}
2.循环分配策略
这个很容易理解,遍历消费者,把 MessageQueue 分一个给遍历到的消费者,如果 MessageQueue 数量比消费者多,需要进行多次遍历,遍历次数等于 (MessageQueue 数量/消费者数量),还是以 4 个 MessageQueue 和 3 个消费者的情况,如下图:
源代码如下:
//AllocateMessageQueueAveragelyByCircle 这个类
//4 个 MessageQueue 和 3 个消费者的情况,假如第一个,index = 0
int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
if (i % cidAll.size() == index) {
//i == 0 或者 i == 3 都会走到这里
result.add(mqAll.get(i));
}
}
3.用户自定义 message queue
这种策略在消费者启动的时候可以指定消费哪些 MessageQueue。可以参考下面代码:
AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
//绑定消费 messageQueue1
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1","broker1",0)));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
consumer.start();
4.用户按机房自定义分配策略
这种方式 Consumer 只消费指定机房的 MessageQueue,如下图:Consumer0、Consumer1、Consumer2 绑定 room1 和 room2 这两个机房,而 room3 这个机房没有消费者。
Consumer 启动的时候需要绑定机房名称。可以参考下面代码:
AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
//绑定消费 room1 和 room2 这两个机房
allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1","room2")));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
consumer.start();
这种策略 broker 的命名必须按照格式:机房名@brokerName,因为消费者分配队列的时候,首先按照机房名称过滤出所有的 MessageQueue,然后再按照平均分配策略进行分配。
//AllocateMessageQueueByMachineRoom 这个类
List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
String[] temp = mq.getBrokerName().split("@");
if (temp.length == 2 && consumeridcs.contains(temp[0])) {
premqAll.add(mq);
}
}
//上面按照机房名称过滤出所有的 MessageQueue 放入premqAll,后面就是平均分配策略
5.机房就近分配策略
跟按照机房分配原则相比,就近分配的好处是可以对没有消费者的机房进行分配。如下图,机房 3 的 MessageQueue 也分配到了消费者:
如果一个机房没有消费者,则会把这个机房的 MessageQueue 分配给集群中所有的消费者。
源码所在类:AllocateMachineRoomNearby。
6.一致性 hash 算法分配策略
把所有的消费者经过 Hash 计算分布到 Hash 环上,对所有的 MessageQueue 进行 Hash 计算,找到顺时针方向最近的消费者节点进行绑定。如下图:
源代码如下:
//所在类 AllocateMessageQueueConsistentHash
Collection<ClientNode> cidNodes = new ArrayList<ClientNode>();
for (String cid : cidAll) {
cidNodes.add(new ClientNode(cid));
}
//使用消费者构建 Hash 环,把消费者分布在 Hash 环节点上
final ConsistentHashRouter<ClientNode> router; //for building hash ring
if (customHashFunction != null) {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt, customHashFunction);
} else {
router = new ConsistentHashRouter<ClientNode>(cidNodes, virtualNodeCnt);
}
//对 MessageQueue 做 Hash 运算,找到环上距离最近的消费者
List<MessageQueue> results = new ArrayList<MessageQueue>();
for (MessageQueue mq : mqAll) {
ClientNode clientNode = router.routeNode(mq.toString());
if (clientNode != null && currentCID.equals(clientNode.getKey())) {
results.add(mq);
}
}