持久化方式可以分成两大类
一般来讲性能对比上:文件系统>关系型数据库DB
RocketMq的文件存储系统有两点优化以保证性能:
顺序写
,保证了消息存储的速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度,但是磁盘随机写的速度只有大概100KB/s零拷贝技术有个是不能超过2G,所以RocketMQ默认设置单个CommitLog日志数据文件为1G
RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的
CommitLog
:消息真正的物理存储文件是CommitLog,默认一个文件一个G,存储的是Topic,QueueId和Message,一个存储满了会自动创建一个新的。ConsumeQueue
:是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址,为了加快消息的读取速度。消费者消费某条消息时,先查询索引获取CommitLog的对应的物理地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件,文件很小,通常会加载到内存中。如果该文件丢失或者损坏,可以通过CommitLog恢复IndexFile
:也是个索引文件,为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程RocketMq是天生支持分布式的,可以配置主从以及水平扩展
Master角色的Broker支持读和写,Slave角色的Broker仅支持读,也就是 Producer只能和Master角色的Broker连接写入消息;Consumer可以连接 Master角色的Broker,也可以连接Slave角色的Broker来读取消息。
如果一个Broker组有Master和Slave,消息需要从Master复制到Slave 上,有同步和异步两种复制方式。
Producer端,每个实例在发消息的时候,默认会轮询所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下,如下图:
如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让queue的总数量大于等于consumer的数量。
消费者的集群模式–启动多个消费者就可以保证消费者的负载均衡(均摊队列)
默认使用的是均摊队列:会按照queue的数量和实例的数量平均分配queue给每个实例,这样每个消费者可以均摊消费的队列,如下图所示6个队列和三个生产者。
对于广播模式并不是负载均衡的,要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。
因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。
无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
消息队列 RocketMQ 默认允许每条消息最多重试 16 次
,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,如果依然失败就会进入死信队列。
一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
也可以通过配置,让其不再重试,但是不建议这样
public class MessageListenerImpl implements MessageListener {
@Override
public Action consume(Message message, ConsumeContext context) {
try {
doConsumeMessage(message);
} catch (Throwable e) {
//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;
return Action.CommitMessage;
}
//消息处理正常,直接返回 Action.CommitMessage;
return Action.CommitMessage;
}
}
死信消息具有以下特性:
死信队列具有以下特性:
查看死信队列
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
消息队列 RocketMQ 消费者在接收到消息以后,有必要根据业务上的唯一 Key 对消息做幂等处理的必要性。
在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:
发送时消息重复
当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
消费时消息重复
消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)
当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。
因为 Message ID 有可能出现冲突(重复)的情况,所以真正安全的幂等处理,不建议以 Message ID 作为处理依据。 最好的方式是以业务唯一标识作为幂等处理的关键依据,而业务的唯一标识可以通过消息 Key 进行设置:
Message message = new Message();
message.setKey("ORDERID_100");
SendResult sendResult = producer.send(message);
订阅方收到消息时可以根据消息的 Key 进行幂等处理:
consumer.subscribe("ons_test", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
String key = message.getKey()
// 根据业务唯一标识的 key 做幂等处理
}
});
接下来,就需要根据业务进行处理:
这个可能出现在消费端出了问题,不消费了,或者消费的极其极其慢,导致大量的消息无法消费,最后消息队列集群的磁盘都快写满了
首先最重要的就是修复消费者,接着最大的问题就是在消费者重启后,如何快速处理积压的消息?
是上面内容的一个总结,需要分点进行设计
扩展性
高可用性
消息重试机制
心跳检测机制
负载均衡模式
顺序消息
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- nryq.cn 版权所有 赣ICP备2024042798号-6
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务