..

pulsar 队列实践

Architecture

架构

pul-system-architecture.png

消息订阅方式

pul-subscribe

消息的生命周期

生产数据
被不同的消费者, 并进行 ACK
过期 和 清理

更多消息相关, 见 这里

Event 和 元数据

Event 治理

在 Ease 的后端服务中, 我们是通过 common-lib 包进行封装和处理的

该部分的组织形式如下

事件定义
生产者
消费者
event
payload
eventType
eventCode
desc
eventProducer
Topic: pulsar-media
SendEvent
eventConsumer
handler1
handler2
onEvent(event)

当用户完成一个音频的播放时候的操作

音频播放完成事件
生产者
消费者
event
MediaLogFinishPayload
mediaId
memberId
playTo
MEDIA_LOG_FINISH
eventProducer
Topic: media
发送事件: MEDIA_LOG_FINISH
eventConsumer
DE完播计算 handler
用户播放数据缓存刷新 handler
XXX Handler

元数据

  1. orderingKey

    这里支持传 orderingKey, 用来作为 有序消费。我们经常的场景是: 同一个用户id 的某种类型数据,希望可以有序地消费

    比如用户的统计数据, 如果是队列本身可以保证有序, 则大概率是有序的, 可以避免过多的数据库的锁竞争问题

  2. 消费次数和消息丢失的问题

    pulsar 理论上是 at-least-once semantics, 一些重要的业务需要自己能保证是可重复执行的

    更多内容见 concepts-messaging

  3. 事件和事务的问题

    有时候会因为业务本身的复杂性, 在很多个业务逻辑的 上下文,都会发送对应的事件

     @Transactional
     void doSth() {
       // ① 更新用户信息
       updateMember
       sendMemberUpdateEvent
    
       // ② 更新Vip状态
       updateMemberVip
       sendMemberVipUpdateEvent
    
       // ③ 记录日志
       updateMemberVipLog
     }
    

    如果上面在 ③ 记录日志的时候报错了,那么会导致 ① 和 ② 回滚, 此时事件理论上被发出去了, 会导致其他的业务逻辑的问题。

    故这里发送事件的逻辑专门进行了处理。注意

    所有的事件,都是在 事务提交 之后,才会进行发送。

    代码实现逻辑见这里

部署方式

deploy1

如何跟踪和查询

方式1: 根据用户id 或者 deviceId 进行查询

example1

方式2: 查看用户的消费 上下文

example2

Reference

未找到相关的 Issues 进行评论

请联系 @dengqinghua 初始化创建