技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上)


                                                                                                                                                <p>在Pulsar broker中, 消息的Retention, Expiry和Backlog quota是比较重要的功能,它们表现的是Pulsar对于流经它的数据的管理。</p> 

但是受限于复杂度和文档语言等因素,使用者可能无法在第一时间很直观的了解它们。 因此本文将对这三个功能进行详细的介绍,包括概念、行为、应用、实现和注意事项等方面,希望能够对大家有所帮助。

另外,这三个特性属于Pulsar的高级特性,阅读本文之前,建议先对Pulsar的基本概念有所了解。

文章较长,并且偏向工具类,建议大家先收藏,如果暂时没耐心看完,也可以在后续时间慢慢阅读。

话不多说,直接开冲!

一、Retention

1. 概念

Retention是Pulsar对消息的保留策略,它针对于所有消息,包含已经消费的消息和未消费的消息。

Pulsar的消息保留略有一点反直觉,我们一般会认为消息保留针对已经消费完毕的消息,在消费完毕后保留一段时间之后再进行清理,腾出磁盘空间。很多资料以及之前版本的Pulsar官网都是这样理解和表达的,但是实际并非如此。

Retention首先保证的是消息回溯,比如将保留策略设置为3天,用户一定能够回溯3天之内的消息。在此之后再考虑删除更早的消息,以腾出磁盘空间。 由于不同Subscription的消费进度可能不同,有的较快,有的较慢,因此Pulsar对数据的删除是以该Topic中消费最慢的Subscription为基准,被它消费过后的数据才是可删除的。但是,删除与否还需要看Retention如何设置。同时,无论未被消费的消息是否超出了Retention的限制,它们都会保留,不会删除。

总结一下:如果未消费的消息超出了Retention限制,则保留所有未消费的消息,删除所有已消费的消息;如果未被消费的消息没有超出Retention限制,则保留所有未消费的消息以及一部分已经消费的消息,删除其余已消费的消息。

2. 行为

Pulsar的数据清理由定时任务驱动,每隔一段时间Broker会检查当前实例上的所有Topic,如果Topic设置了Retention策略,它会根据相应的策略来检查该Topic中的数据,并清理数据。 Retention策略包含了两项限制,分别是:retentionTimeInMinutes(单位:分钟)和retentionSizeInMB(单位:MB)。如果有任意一项条件满足,Pulsar将执行清理流程。

数据清理的维度在Topic级别,粒度在Ledger级别。即:

  1. 如果旧数据清理完成,该Topic所有的Subscription的所有Consumer将无法消费已经清理的旧数据,即使调用seekAPI重置游标;
  2. Broker无法按照retentionTimeInMinutesretentionSizeInMB来完全精确地清理磁盘,Pulsar对于数据的管理是在Ledger维度,因此,如果在一个Ledger中有些数据应该被清理而另一些数据应该被保留,这个Ledger也会被保留,不做清理。在最差的情况下,会为每个Topic/Partition额外保存2GB数据(精度取决于broker.confmanagedLedgerMaxSizePerLedgerMbytes(default=2GB))。

对于消费者们来说,数据清理是在清理流程之后立即生效的,也就意味着在清理流程完成之后,Consumer便无法消费更早的数据;但是在磁盘的角度来看,Pulsar的数据清理流程和实际的磁盘空间清理之间会有一个短暂延迟,这是因为磁盘清理流程是由Bookkeeper异步调度的。这个延迟通常不会太久,在Pulsar的清理流程完成之后,很快就能看到数据磁盘空间的释放。

3. 应用

a.监控

Pulsar并未专门为Retention暴露指标,但是可以通过一些指标来间接的监控Retention的情况:

  1. pulsar_storage_size:该指标表示Pulsar的存储空间使用情况,如果该指标持续增长,可能是Retention策略没有生效,需要检查一下Retention策略是否设置正确。正确的Retention策略应该是周期性的清理旧数据,pulsar_storage_size表现为周期性的上下波动;
  2. Topic internal stats: Pulsar提供了一些Topic的内部指标,它暴露了当前Topic的Ledger大小,Ledger关闭时间等指标,可以通过这些指标间接监控Topic的数据清理情况(通过sizetimestamp计算该Ledger是否应该删除):
pulsar-admin topics stats-internal persistent://my-tenant/my-ns/my-topic
{
    ...
  "ledgers" : [ 
  {
    "ledgerId" : 20,
    "entries" : 50000,
    "size" : 500000,
    "timestamp": 1627584000000
  },
   {
    "ledgerId" : 21,
    "entries" : 50000,
    "size" : 500000,
    "timestamp": 1627585000000
   }
   ],
    ...
}

b. 设置

Retention的设置分为namespacetopic两个级别(实际上有3个,还有一种是Broker级别,本文不讨论该级别)。在Namespace级别设置了Retention之后,该Namespace的所有Topic都会继承它;但是我们可以在Topic级别对其进行覆盖,使该Topic使用自定义的Retention策略。

  1. Namespace级别
    • 查看Namespace的Retention策略
pulsar-admin namespaces get-retention my-tenant/my-ns
{
  "retentionTimeInMinutes": 10,
  "retentionSizeInMB": 500
}
  • 设置Namespace的Retention策略
pulsar-admin namespaces set-retention my-tenant/my-ns --time 10 --size 500
  • 删除Namespace的Retention策略
pulsar-admin namespaces remove-retention my-tenant/my-ns
  1. Topic级别
  • 查看Topic的Retention策略
pulsar-admin topicPolicies get-retention persistent://my-tenant/my-ns/my-topic
{
  "retentionTimeInMinutes": 10,
  "retentionSizeInMB": 500
}
  • 设置Topic的Retention策略
pulsar-admin topicPolicies set-retention persistent://my-tenant/my-ns/my-topic --time 10 --size 500
  • 删除Topic的Retention策略
pulsar-admin topicPolicies remove-retention persistent://my-tenant/my-ns/my-topic

4. 实现

Retention执行的入口是BrokerService#startConsumedLedgersMonitor(), 这里不对实现做详细分析,只是简单的介绍一下Retention策略的执行流程,如果各位感兴趣,可以自行查看源码。

a. Retention Checker初始化

Pulsar broker启动时,向线程池注册一个定时任务,执行周期是broker.confretentionCheckIntervalInSeconds(default=120s)。 Broker每120s会检查一次所有的Topic,如果Topic设置了Retention策略,则执行数据清理。

b. 清理流程

Retention的执行流程如下:

  1. 遍历所有的Topic,如果Topic设置了Retention策略,执行后续流程。否则,跳过该Topic;
  2. 找到该Topic中消费最慢的Subscription,得到它的消费位置;
  3. 根据上一步得到的消费位置,找出在它之前的所有Ledger,得到所有消费完毕的Ledger列表;
  4. 由创建时间从远到近遍历上一步得到Ledger列表,并根据当前Ledger的元数据做相应的计算。注意:当前正在写入的Ledger不会进入清理流程:
    1. 累加当前Ledger的size,如果Topic数据的总大小减去累加的size大于等于retentionSizeInMB,将该Ledger加入待清理列表;
    2. 根据当前时间和Ledger元数据中timestamp(即Ledegr的关闭时间)计算当前Ledger的存活时间(当前时间减去Ledger关闭时间),如果存活时间大于retentionTimeInMinutes,将该Ledger加入待清理列表;
  5. 待清理列表中的Ledger分为两部分,一部分是存在于Bookkeeper中,另一部分是存在于Tiered storage的中(如果设置了Ledger offload)。 针对这两部分Ledger,Pulsar会分别调用不同的清理接口(Bookkeeper或者Tiered storage接口)清理数据。
未经允许不得转载:紫竹林-程序员中文网 » 技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

    Warning: A non-numeric value encountered in /www/wwwroot/www.zc91.cn/wp-content/themes/dux1/modules/mo_comments_list.php on line 26
  1. #0

    博主,我可以拿来当毕设吗

    红客白帽5年前 (2020-06-30)回复

  2. Warning: A non-numeric value encountered in /www/wwwroot/www.zc91.cn/wp-content/themes/dux1/modules/mo_comments_list.php on line 26
  3. #0

    这他喵才是值得上热榜的文章啊

    早九晚十二5年前 (2020-06-26)回复
关于我们 免责申明 意见反馈 隐私政策
程序员中文网:公益在线网站,帮助学习者快速成长!
关注微信 技术交流
推荐文章
每天精选资源文章推送
推荐文章
随时随地碎片化学习
推荐文章
发现有趣的