搜索
您的当前位置:首页正文

rocketMQ订阅关系

来源:榕意旅游网
rocketMQ订阅关系

场景:2 个消费者进程中,创建了 2 个消费者,同属于 1 个消费组,但是订阅了不同的 topic,会因为订阅信息相互覆盖,导致拉不到消息。

原因是 rocketMQ 的订阅关系,是根据 group 来管理的,c1 订阅 t1,c2 订阅 t2,他们同属于 group,当 c1 拉取 t1 的消息时,broker 发现group 订阅的是 t2,就不会返回消息给 c1。

client 在 MQClientInstance 中发送⼼跳给所有的 broker,⼼跳中包含 consumer 的订阅信息,

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override

public void run() { try {

MQClientInstance.this.cleanOfflineBroker();

MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) {

log.error(\"ScheduledTask sendHeartbeatToAllBroker exception\ } }

}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);// org.apache.rocketmq.client.impl.factory.MQClientInstance#prepareHeartbeatDataprivate HeartbeatData prepareHeartbeatData() {

HeartbeatData heartbeatData = new HeartbeatData(); // clientID

heartbeatData.setClientID(this.clientId);

// Consumer

for (Map.Entry entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) {

ConsumerData consumerData = new ConsumerData(); consumerData.setGroupName(impl.groupName());

consumerData.setConsumeType(impl.consumeType()); consumerData.setMessageModel(impl.messageModel());

consumerData.setConsumeFromWhere(impl.consumeFromWhere()); consumerData.getSubscriptionDataSet().addAll(impl.subscriptions()); consumerData.setUnitMode(impl.isUnitMode());

heartbeatData.getConsumerDataSet().add(consumerData); } }

// Producer

for (Map.Entry entry : this.producerTable.entrySet()) { MQProducerInner impl = entry.getValue(); if (impl != null) {

ProducerData producerData = new ProducerData(); producerData.setGroupName(entry.getKey());

heartbeatData.getProducerDataSet().add(producerData); } }

return heartbeatData;}

broker 处理⼼跳请求

// org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) { RemotingCommand response = RemotingCommand.createResponseCommand(null);

HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class); ClientChannelInfo clientChannelInfo = new ClientChannelInfo( ctx.channel(),

heartbeatData.getClientID(), request.getLanguage(), request.getVersion() );

for (ConsumerData data : heartbeatData.getConsumerDataSet()) { SubscriptionGroupConfig subscriptionGroupConfig =

this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig( data.getGroupName());

boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) {

isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0;

if (data.isUnitMode()) {

topicSysFlag = TopicSysFlag.buildSysFlag(false, true); }

String newTopic = MixAll.getRetryTopic(data.getGroupName());

this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic,

subscriptionGroupConfig.getRetryQueueNums(),

PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); }

boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo,

data.getConsumeType(), data.getMessageModel(),

data.getConsumeFromWhere(), data.getSubscriptionDataSet(),

isNotifyConsumerIdsChangedEnable );

if (changed) {

log.info(\"registerConsumer info changed {} {}\ data.toString(),

RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); } }

for (ProducerData data : heartbeatData.getProducerDataSet()) {

this.brokerController.getProducerManager().registerProducer(data.getGroupName(), clientChannelInfo); }

response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response;}

broker ⽐较当前 consumer group 的订阅信息,如果当前消费组加⼊了新的消费者,或者订阅的 topic 发⽣了变化,则通知消费者进⾏rebalance

// org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo,

ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set subList, boolean isNotifyConsumerIdsChangedEnable) {

ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) {

ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; }

boolean r1 =

consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);

// 本⽂讨论的场景,重点在这,group 的订阅数据被覆盖

boolean r2 = consumerGroupInfo.updateSubscription(subList);

if (r1 || r2) {

if (isNotifyConsumerIdsChangedEnable) {

this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } }

this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2;}

发⽣ ConsumerGroupEvent.CHANGE 事件,broker 通知 client 进⾏ rebalance

// org.apache.rocketmq.broker.client.DefaultConsumerIdsChangeListener#handlepublic void handle(ConsumerGroupEvent event, String group, Object... args) { if (event == null) { return; }

switch (event) { case CHANGE:

if (args == null || args.length < 1) { return; }

List channels = (List) args[0];

if (channels != null && brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {

for (Channel chl : channels) {

this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group); } }

break;

case UNREGISTER:

this.brokerController.getConsumerFilterManager().unRegister(group); break;

case REGISTER:

if (args == null || args.length < 1) { return; }

Collection subscriptionDataList = (Collection) args[0]; this.brokerController.getConsumerFilterManager().register(group, subscriptionDataList); break; default:

throw new RuntimeException(\"Unknown event \" + event); }}

broker 向 client 发送 NOTIFY_CONSUMER_IDS_CHANGED 请求

// org.apache.rocketmq.broker.client.net.Broker2Client#notifyConsumerIdsChangedpublic void notifyConsumerIdsChanged( final Channel channel,

final String consumerGroup) { if (null == consumerGroup) {

log.error(\"notifyConsumerIdsChanged consumerGroup is null\"); return; }

NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); RemotingCommand request =

RemotingCommand.createRequestCommand(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader); try {

this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) {

log.error(\"notifyConsumerIdsChanged exception, \" + consumerGroup, e.getMessage()); }}

client 收到 NOTIFY_CONSUMER_IDS_CHANGED 请求,触发 rebalance

// org.apache.rocketmq.client.impl.ClientRemotingProcessor#notifyConsumerIdsChangedpublic RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { try {

final NotifyConsumerIdsChangedRequestHeader requestHeader =

(NotifyConsumerIdsChangedRequestHeader) request.decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); log.info(\"receive broker's notification[{}], the consumer group: {} changed, rebalance immediately\ RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup());

this.mqClientFactory.rebalanceImmediately(); } catch (Exception e) {

log.error(\"notifyConsumerIdsChanged exception\ }

return null;}

需要说明的是,client ⼀直在进⾏ rebalance,只是间隔 10s ⽽已。这⾥只是让 client ⽴马 rebalance,但其实并 没有任何效果,因为消费者数量没有变化。

但是由于 broker 保存的订阅关系⼀直被修改,导致 consumer 去拉数据时,会因为订阅关系不存在,⽽拉不到数据。

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null == subscriptionData) {

log.warn(\"the consumer's subscription not exist, group: {}, topic:{}\ response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);

response.setRemark(\"the consumer's subscription not exist\" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC)); return response;}

因篇幅问题不能全部显示,请点此查看更多更全内容

Top