unpubsyn取消同步数据的实现
前言
之前skynet_fly的subsyn监听同步机制只支持发布和更新,但实际业务中经常会遇到需要取消已发布数据的场景。比如一个玩家下线了,它的状态信息不再有效,需要通知所有监听方”这个数据已经作废了”。之前的做法只能发布一个空数据或者约定一个特殊值表示”无效”,这样不够优雅也不够明确。所以决定给subsyn加上unpubsyn取消同步的能力,让推送端能主动通知所有监听端“数据已取消”,监听端可以通过独立的cancel handler来处理取消逻辑。
设计思路
需求分析
- 推送端需要一个
unpubsyn(channel_name)接口来取消已发布的同步数据。 - 取消后,所有正在订阅该channel的监听端都需要收到通知。
- 监听端收到取消通知后,不应该退出订阅循环,而是继续等待下一次
pubsyn(因为取消只是”数据暂时无效”,不代表以后不会再发布)。 - 监听端的cancel回调应该和普通数据回调分离,用独立的handler处理,避免逻辑混杂。
- 对于
psubsyn模式匹配订阅,某个channel被取消时也需要通知对应的pwatch监听者。
通信链路
整个取消同步的通信链路如下:
1 | |
关键设计决策
1. cancel后循环不退出
订阅循环收到cancel后,只是清空version和luamsg,然后继续下一轮循环重新subsyn。这样如果推送端后面又pubsyn了新数据,监听端能立即收到。
2. 独立的cancel handler
cancel handler和普通watch handler完全分离。用户通过watch_cancel、pwatch_cancel等API注册取消回调,跟watch、pwatch互不干扰。这样业务代码可以清晰地分离”数据更新处理”和”数据取消处理”两种逻辑。
3. psubsyn的cancel处理
对于psubsyn模式匹配订阅,有两种取消场景:
- 整个psubsyn模式被取消:直接cancel分支处理,
channel_name参数为nil - 模式中某个channel被unpubsyn:通过增量diff(name_map变更)检测到channel被移除,触发pwatch_cancel,
channel_name参数为被移除的具体channel名
4. frpc_server端清理
CMD.unpubsyn执行时会:
- 删除
g_subsyn_channel_info_map[channel_name] - 调用
unset_syn_pchannel_name从psubsyn的name_map中移除该channel - 通知所有等待中的agent(
cancel_subsynpack_id) - 清理
g_subsyn_map[channel_name]中的agent订阅关系
使用例子
推送端
1 | |
监听端
1 | |
pwatch模式匹配 + cancel
1 | |
注销cancel handler
1 | |
断连重连的行为
这个问题在实现中需要特别注意,这里做下说明:
场景1:先断连后取消
- 监听端和frpc_server断连,订阅循环收到
disconnect退出。 - 推送端执行
unpubsyn,由于agent已不在订阅列表中,cancel消息不会发给已断连的监听端。 - 监听端重连后
watch_up重新拉起订阅循环,发现channel数据已被删除,进入等待状态。 - cancel handler不会触发(消息已丢失)。
场景2:先取消后断连
- 监听端收到cancel通知,cancel handler被调用。
- 订阅循环继续,重新发起subsyn请求。
- 如果此时连接断开,subsyn请求失败,循环退出。
- 重连后
watch_up再次拉起。
重入安全
g_cluster_reqing_map确保同一channel不会有两个订阅循环并发运行。source_map单次响应机制确保cancel和disconnect不会对同一请求回复两次。- cancel分支不break循环,与disconnect/reconnect逻辑不冲突。
新增API一览
推送端 (watch_server)
| API | 说明 |
|---|---|
M.unpubsyn(channel_name) |
取消同步数据 |
监听端 (watch_syn_client)
| API | 说明 |
|---|---|
M.watch_cancel(svr_name, channel_name, handle_name, handler) |
监听取消通知 |
M.unwatch_cancel(svr_name, channel_name, handle_name) |
注销取消通知 |
M.watch_cancel_byid(svr_name, svr_id, channel_name, handle_name, handler) |
指定节点监听取消通知 |
M.unwatch_cancel_byid(svr_name, svr_id, channel_name, handle_name) |
指定节点注销取消通知 |
M.pwatch_cancel(svr_name, pchannel_name, handle_name, handler) |
模式匹配取消通知 |
M.unpwatch_cancel(svr_name, pchannel_name, handle_name) |
注销模式匹配取消通知 |
M.pwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name, handler) |
指定节点模式匹配取消通知 |
M.unpwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name) |
指定节点注销模式匹配取消通知 |
API文档
测试代码
unpubsyn取消同步数据的实现
https://huahua132.github.io/2026/05/15/skynet_fly_ss/unpubsyn/