发布日期:2025-12-17 22:56 点击次数:200
背景在我们的项目中,业务中常常需要解决定时提醒,定时发布等需求,而我们会常常使用 pulsar 的定时消息作为我们的定时处理的方案,而 pulsar 天然支持定时消息的发送。Pulsar 原生支持定时消息发送,因此在有空时,我就想深入研究一下它内部是如何实现定时消息机制的,写本文的初衷就是因为好奇。
本文将结合源码以及图示介绍 pulsar 定时消息的处理流程。
结合源码分析我们首先必须要明确一个概念:pulsar 的延迟消息,是延迟投递到消费者 。也就是说 Producer 在发送消息时立即写入 Broker ,然后存储到 BookKeeper,延迟逻辑完全发生在 Broker 的消费路径。
第二个概念:我们知道,当我们订阅一个 topic 时 #后端 #消息队列 #开源 #每天一个知识点,我们会在 broker 上存储我们的订阅信息。
每个 Subscription 都有自己: Cursor (订阅消费指针)Dispatcher (调度器)延迟消息调度器 DelayedDeliveryTracker (按需创建,懒加载的,下文有源码)如何发送一个消息,如果我们自己使用 pursal 实现一个定时发布,会写出以下代码
producer.newMessage
.deliverAfter(10, TimeUnit.SECONDS)
.value(data)
.send;
但其实本质上是 now + 10s,计算出过期时间,然后写入消息的 deliverAtTimeStamp 字段上
@Override
public TypedMessageBuilder deliverAt(long timestamp) {
msgMetadata.setDeliverAtTime(timestamp);
return this;
}
源码位置:github.com/apache/puls…
然后消息在发送的时候会去 checkAndStartPublish
这个代码会初始化 delayedDeliveryTracker,然后将消息放到 delayedDeliveryTracker 里面
@Override
public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
if (!topic.isDelayedDeliveryEnabled) {
// If broker has the feature disabled, always deliver messages immediately
return false;
}
synchronized (this) { if (delayedDeliveryTracker.isEmpty) { if (!msgMetadata.hasDeliverAtTime) { // No need to initialize the tracker here return false; }
// Initialize the tracker the first time we need to use it delayedDeliveryTracker = Optional.of( topic.getBrokerService.getDelayedDeliveryTrackerFactory.newTracker(this)); }
delayedDeliveryTracker.get.resetTickTime(topic.getDelayedDeliveryTickTimeMillis);
// 没有设置延时时间,则将延时时间设置为-1 long deliverAtTime = msgMetadata.hasDeliverAtTime ? msgMetadata.getDeliverAtTime : -1L; return delayedDeliveryTracker.get.addMessage(ledgerId, entryId, deliverAtTime); } }
那么就有一个问题了,为什么没有延时的消息也会放在 delayedDeliveryTracker 中呢?
继续往下看,我们到 addMessage 里面
这个地方,我们发现如果延时消息小于0,即当前没有延时消息,或者延时消息已经过期,则不放入,返回 false,回到上一层
源码位置:github.com/apache/puls…
@Override
public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
// 这一步判断,是否添加message(也就是ledgerId 和 entryId)到tracker
if (deliverAt new Long2ObjectRBTreeMap) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap) .add(entryId); delayedMessagesCount.incrementAndGet;
// 这一步下面就是调用定时器,即时间轮 updateTimer;
checkAndUpdateHighest(deliverAt);
return true; }
pulsar 使用 ledgerId + entryId 定位一条消息,entryId 是自增的,ledgerId 是不重复的
接着上述方法的调用时间器的代码深入
protected void updateTimer {
// 如果当前没有任何延迟消息 if (getNumberOfDelayedMessages == 0) {
// 如果之前已经设置了一个 timeout(计时任务),现在不需要了,取消掉 if (timeout != null) { currentTimeoutTarget = -1; timeout.cancel; timeout = null; } return; }
// 找到当前延迟队列中“下一条要被投递的消息”的 deliverAt 时间(最小时间戳) long timestamp = nextDeliveryTime;
// 如果这个 timestamp 和之前设定的目标时间一样,// 说明 timer 已经为这个时间点设置好了,不需要重复调度 if (timestamp == currentTimeoutTarget) { return; }
// 如果之前已经设置了定时任务,但现在发现需要重新设置(如出现更早的延迟消息, 就先取消旧的 timeout if (timeout != null) { timeout.cancel; }
long now = clock.millis;
// 距离目标投递时间还有多久 long delayMillis = timestamp - now;
// 如果 delay
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
// 应该设置的 delay = max(消息实际 delay, tick 间隔 delay) long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);
if (log.isDebugEnabled) { log.debug("[{}] Start timer in {} millis", dispatcher.getName, calculatedDelayMillis); }
currentTimeoutTarget = timestamp;
// 注册新的 timeout(即定时任务)到时间轮中 // 到 calculatedDelayMillis 毫秒后,时间轮会回调 this.run 或 checkAndTrigger timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS); }
当我们定位到 timer 时,我们会发现,其实就是用的 netty 的 timer
netty 的时间轮在很多 java 相关的框架都有使用,比如 redisson 在实现看门狗的时候也是用的 Netty 的 HashedWheelTimer ,还有 dubbo。
时间轮每个 broker 是只有一个,这个我们可以看到 timer 在 TrackerFactory 中,即所有的 Tracker 队列 共享一个时间轮。
时间轮到底什么是时间轮呢?我看了有一篇文章写的很好,非常通俗易懂,建议收藏。zhuanlan.zhihu.com/p/609284043
完整处理流程按照上述源码流程,我画了一个图来描述 pulsar 定时消息的图。
整个流程描述如下:
Producer 发送消息时,会在消息的 metadata 中写入 deliverAt 时间(延迟投递时间)。 消息到达 Broker 后,Broker 会像普通消息一样立即将其持久化到 BookKeeper。每个 Subscription 拥有自己的 Cursor。Cursor 会按顺序从 BookKeeper 读取该 Topic 的消息,并将读取到的消息交给 Dispatcher 处理。Dispatcher 在处理消息时会解析 metadata:如果 deliverAt 大于当前时间,说明消息尚未到期。此时 Dispatcher 不会将消息投递给消费者,而是将其加入该订阅对应的延迟队列(DelayedDeliveryTracker)。DelayedDeliveryTracker 会向 Broker 的全局时间轮(HashedWheelTimer)注册一个定时检查任务。Tracker 本身负责管理所有延迟消息的时间索引。时间轮在每一次 tick 时会回调 Tracker,让 Tracker 自行判断哪些延迟消息已经到达投递时间。一旦某条消息到期,Tracker 会将其调度回 Dispatcher,由 Dispatcher 再次从 BookKeeper 读取该消息,并按照订阅策略正常投递给消费者。参考zhuanlan.zhihu.com/p/609284043github.com/apache/puls…
下一篇:没有了