<p>在 Pulsar broker 中, 消息的 Retention, Expiry 和 Backlog quota 是比较重要的功能,它们表现的是 Pulsar 对于流经它的数据的管理。 但是受限于复杂度和文档语言等因素,开发者可能无法在第一时间很直观的了解它们。</p>
本系列上篇为大家介绍了 Retention 和 Expiry 的概念、行为、应用、实现和注意事项技术文档 | Pulsar 中的消息保留、过期及积压机制解析(上),本文将带来关于 Backlog quota 的解析。
Backlog quota
1. 概念
Backlog 意为消息积压,指未被消费的消息;quota 意为配额,指对于未消费消息的限制。因此 Backlog quota 是为了限制消息堆积。
当消费者的消费速率跟不上生产者的生产速率时,会出现消息堆积的情况,这在日常开发过程中非常常见。尽管**相比于其他消息队列,Pulsar 提供了几乎可以无限扩容消费者数量的机制 **(Shared, Key_Shared订阅模式) 来提高消费速率, 但是在实际的业务场景中,消息堆积的情况也时有发生。
为了应对这种情况,Pulsar 提供了 Backlog quota 机制来在一定程度治理它。当然,这种治理无法提高消费者的消费速率,只是在生产速率和消费速率之间做出一种平衡,比如说它的一种治理策略是自动清理 Backlog 消息。
2. 行为
Pulsar 在 Topic 级别和 Subscription 级别都有 Backlog 的概念。Topic 级别的 Backlog 是指该 Topic 下所有 Subscription 的 Backlog 总和 (pulsar_msg_backlog 和 pulsar_storage_backlog_size 的含义略有差异,这里使用 pulsar_msg_backlog 的含义),Subscription 级别的 Backlog 是指当前 Subscription 的 Backlog。
Backlog quota 机制实际工作在 Subscription 级别,它和 Expiry 机制略有相似,但更加强大。它对于 Backlog 有两项限制、两种作用域和三种治理策略:
2.1 两项限制
-
limitTime:Backlog 的最大存活时间,单位是秒,超过这个时间的 Backlog 会进入治理流程;
-
limitSize:Backlog 的最大大小,单位是字节,超过这个大小的 Backlog 会进入治理流程;
2.2 两种作用域
-
destination_storage:针对 Topic 的 Backlog 的存储空间,和 limitSize 搭配使用;
-
message_age:针对 Topic 的 Backlog 的消息存活时间,和 limitTime 搭配使用;
2.3 三种治理策略
-
producer_request_hold:当 Backlog 超过限制,Pulsar 会挂起 Producer 的链接请求,直到 Backlog 降到限制以下;
-
consumer_backlog_eviction:当 Backlog 超过了限制,Pulsar 会自动移动所有超限的 Subscription 的游标(相当于自动确认这些消息,使得这些消息对 Consumer 不可见),将 Backlog 降低到限制以下;
-
producer_exception:当 Backlog 超过了限制,客户端创建 Producer 会抛出异常,直到 Backlog 降到限制以下。
3. 应用
3.1 监控
-
Pulsar 在 Prometheus 上提供了 pulsar_msg_backlog 和 pulsar_storage_backlog_size 来分别观测 Topic 级别的未消费的消息数量、未消费消息的总大小。如果这两个指标数值较高,说明该 Topic 消息积压严重。
另外,Pulsar 也提供了 pulsar_subscription_back_log 这一 Subscription 级别的指标,当我们发现某个 Topic 的 backlog 数值较高时,可以通过查看该 Topic 下的 pulsar_subscription_back_log 指标来找到具体的 Subscription;
-
通过 Topic stats 来监控 Backlog quota 的情况:
pulsar-admin topics stats persistent://my-tenant/my-ns/my-topic
3.2 设置
和 Retention 和 Expiry 一样,Backlog quota 的设置也分为两个级别:namespace 和 topic 级别。在 Namespace 级别设置了之后,该 Namespace 的所有 Topic 都会继承该策略;在 Topic 级别设置了之后,该 Topic 会覆盖 Namespace 的设置。
Namespace 级别
-
查看当前 Namespace 的 Backlog quota
pulsar-admin namespaces get-backlog-quotas my-tenant/my-ns
-
设置 Namespace 的 Backlog quota
pulsar-admin namespaces set-backlog-quota my-tenant/my-ns –limitTime 3600 –policy producer_request_hold –type message_age
-
删除 Namespace 的 Backlog quota
pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns
Topic 级别
-
查看当前 Topic 的 Backlog quota
pulsar-admin topics get-backlog-quotas persistent://my-tenant/my-ns/my-topic
-
设置 Topic 的 Backlog quota
pulsar-admin topics set-backlog-quota persistent://my-tenant/my-ns/my-topic –limitTime 3600 –policy producer_request_hold –type message_age
-
删除 Topic 的 Backlog quota
pulsar-admin topics remove-backlog-quota persistent://my-tenant/my-ns/my-topic
4. 实现
Backlogquota 机制的触发有两个入口,分别是 ServerCnx#handleProducer(https://github.com/apache/pulsar/blob/v3.0.4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L1448) 和 BrokerService#startBacklogQuotaChecker()(https://github.com/apache/pulsar/blob/v3.0.4/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L657) ,前者和后者略有差距,这里不做详细解释。仅以 BrokerService#startBacklogQuotaChecker() 为例,简单介绍 Backlog quota 的执行流程。
4.1 Backlog Quota Checker 初始化
在 Pulsar 启动时, BrokerService#startBacklogQuotaChecker() 会检查当前 Broker 是否允许 Backlog quota 检查(broker.conf的backlogQuotaCheckEnabled(default=true))。如果允许,向线程池注册一个定时任务,定时任务的执行周期是 broker.conf 的 backlogQuotaCheckIntervalInSeconds(default=60s)。Pulsar 每隔 60s 检查一次所有 Topic 的 Backlog quota,如果 Topic 设置了 Backlog quota,执行后续流程。
4.2 Backlog Quota 执行流程
-
遍历所有 Topic,如果 Topic 设置了 Backlog quota,执行后续流程。否则,跳过该 Topic;
-
优先根据 limitSize 检查该 Topic 消费最慢的 Subscription 的 Backlog 是否超过了限制,如果未超限,再根据 limitTime 检查;
-
如果 Backlog 超过了限制,根据 policy 执行相应的治理策略:
-
producer_request_hold:挂起 Producer 的链接请求,直到 Backlog 降到限制以下;
-
consumer_backlog_eviction:自动移动所有超限的 Subscription 的游标,将 Backlog 降低到限制以下;
-
producer_exception:客户端创建 Producer 会抛出异常,直到 Backlog 降到限制以下。
-
5. 注意事项
-
Pulsar 暴露出的 Prometheus 指标中的 pulsar_storage_backlog_size 并不完全精准,它只是一个近似值。在 Shared 和 Key_Shared 模式下,允许消息单独确认消息,但是这些单独确认的消息不会加入到 Backlog 的计算中,因此这个指标并不会精准反映 Backlog 情况,它通常会比实际数值大;
-
pulsar_msg_backlog 一般也是近似值,不会将 Ack 空洞计算在内,但是如果将 broker.conf 的 exposePreciseBacklogInPrometheus 设置为 true,则会将单独确认的消息计算在内,pulsar_msg_backlog 会更加精准;
-
由于 EntryFilter(https://pulsar.apache.org/docs/3.2.x/develop-plugin/#entry-filter) 机制的存在,在消费消息时可以根据 EntryFilter 过滤掉一些消息,这些被过滤掉的消息严格来说并不算 Backlog,但是我们在计算 Backlog 时,不可能将 Bookkeeper 中的所有消息都拉取出来计算。因此如果 Broker 挂载了 EntryFilter 插件,pulsar_msg_backlog 和 pulsar_storage_backlog_size 以及 pulsar_subscription_back_log 都无法精准反映实际的 Backlog 情况,它们通常会比实际数据大一些;
-
不管是根据 limitSize 还是 limitTime 来限制 Backlog,当 policy=consumer_backlog_eviction 时,都无法完全精准的清理 Backlog。理想情况下,会将 Backlog 降低到原来的 10%;
-
如果在 Broker 端禁用 Backlog quota checker (将 broker.conf 中 backlogQuotaCheckEnabled 设置为 false),并且设置的 Backlog quota 的 policy=consumer_backlog_eviction,Pulsar 将不会自动清理 Backlog;
-
如果使用 Backlog quota,然后 Backlog 达到了阈值,并且 policy=producer_request_hold 或 producer_exception ,在 Broker 重启或自动重平衡时,会导致所有的 Producer 无法链接到 Broker,进而无法生产消息,直到 Backlog 降到限制以下;
-
如果使用 limitTime 限制 Backlog,需要注意 Client 和 Broker 的时间同步,否则可能会导致 Backlog 无法正确的清理。因为此时 Backlog 的判断是以 Broker 的时间为基准的;
-
如果该 Topic 设置了 Retention,Backlog quota 必须要小于 Retention。假设 Retention 设置了 10GB,Backlog quota 必须要小于 10GB。
总结
在文章最后,对 Pulsar 的 Retention, Expiry 和 Backlog quota 做一个总结:
-
Retention 是 Pulsar 对于过期数据的保留和清理策略,它工作在 Topic 级别,通过定时任务清理过期数据,将全部 Subscription 都消费过后的数据从存储介质上删除来清理存储空间;
-
Expiry 即为 Message TTL,它工作在 Subscription 级别,通过定时任务来检查 Subscription 中超时未消费的消息,并自动的将这些消息确认,使其对消费者不可见;
-
Backlog quota 是对未被消费的消息的限制,它实际工作在 Subscription 级别,通过定时任务来检查 Subscription 中的 Backlog,如果 Backlog 超过了限制,会执行相应的治理策略,拒绝新的 Producer 链接或者自动确认消息。
这三个功能并不冲突,它们可以组合使用,我们可以**通过 Retention 删除过期数据,通过 Expiry 处理超时未消费的数据,通过 Backlog quota 治理消息堆积。**但是由于他们三者都涉及到了对数据的操作,大家在使用时应当谨慎,在使用前根据实际业务仔细评估,避免数据丢失或者数据不一致的情况。
社区将持续输出更多 Pulsar 的技术内容;欢迎加入社群讨论或在评论区留言,与我们交流更多关于 Pulsar 的问题。
</div>