unpubsyn取消同步数据的实现

前言

之前skynet_flysubsyn监听同步机制只支持发布更新,但实际业务中经常会遇到需要取消已发布数据的场景。比如一个玩家下线了,它的状态信息不再有效,需要通知所有监听方”这个数据已经作废了”。之前的做法只能发布一个空数据或者约定一个特殊值表示”无效”,这样不够优雅也不够明确。所以决定给subsyn加上unpubsyn取消同步的能力,让推送端能主动通知所有监听端“数据已取消”,监听端可以通过独立的cancel handler来处理取消逻辑。

设计思路

需求分析

  1. 推送端需要一个unpubsyn(channel_name)接口来取消已发布的同步数据。
  2. 取消后,所有正在订阅该channel的监听端都需要收到通知。
  3. 监听端收到取消通知后,不应该退出订阅循环,而是继续等待下一次pubsyn(因为取消只是”数据暂时无效”,不代表以后不会再发布)。
  4. 监听端的cancel回调应该和普通数据回调分离,用独立的handler处理,避免逻辑混杂。
  5. 对于psubsyn模式匹配订阅,某个channel被取消时也需要通知对应的pwatch监听者。

通信链路

整个取消同步的通信链路如下:

1
2
3
4
5
6
7
8
9
推送端 watch_server.unpubsyn(channel_name)
↓ (skynet.call到frpc_server)
frpc_server CMD.unpubsyn(channel_name)
↓ (删除channel数据,通知等待中的agent)
pub_message(cancel_subsyn pack_id)
frpc_client_m watch消息循环
↓ (收到cancel_subsyn,rsp_source_map返回WATCH_SYN_RET.cancel)
watch_syn_client 订阅循环
↓ (cancel分支,调用独立的cancel handler)

关键设计决策

1. cancel后循环不退出

订阅循环收到cancel后,只是清空version和luamsg,然后继续下一轮循环重新subsyn。这样如果推送端后面又pubsyn了新数据,监听端能立即收到。

2. 独立的cancel handler

cancel handler和普通watch handler完全分离。用户通过watch_cancelpwatch_cancel等API注册取消回调,跟watchpwatch互不干扰。这样业务代码可以清晰地分离”数据更新处理”和”数据取消处理”两种逻辑。

3. psubsyn的cancel处理

对于psubsyn模式匹配订阅,有两种取消场景:

  • 整个psubsyn模式被取消:直接cancel分支处理,channel_name参数为nil
  • 模式中某个channel被unpubsyn:通过增量diff(name_map变更)检测到channel被移除,触发pwatch_cancel,channel_name参数为被移除的具体channel名

4. frpc_server端清理

CMD.unpubsyn执行时会:

  1. 删除g_subsyn_channel_info_map[channel_name]
  2. 调用unset_syn_pchannel_name从psubsyn的name_map中移除该channel
  3. 通知所有等待中的agent(cancel_subsyn pack_id)
  4. 清理g_subsyn_map[channel_name]中的agent订阅关系

使用例子

推送端

1
2
3
4
5
6
7
8
9
10
local watch_server = require "skynet-fly.rpc.watch_server"

-- 发布同步数据
watch_server.pubsyn("player:1001:status", {online = true, level = 50})

-- 玩家下线了,取消同步
watch_server.unpubsyn("player:1001:status")

-- 玩家又上线了,重新发布
watch_server.pubsyn("player:1001:status", {online = true, level = 51})

监听端

1
2
3
4
5
6
7
8
9
10
11
12
local watch_syn_client = require "skynet-fly.rpc.watch_syn_client"

-- 监听数据变更
watch_syn_client.watch("game_svr", "player:1001:status", "status_watcher", function(cluster_name, data)
log.info("player status updated:", cluster_name, data.online, data.level)
end)

-- 独立的cancel handler
watch_syn_client.watch_cancel("game_svr", "player:1001:status", "status_cancel", function(cluster_name, channel_name)
log.info("player status cancelled:", cluster_name, channel_name)
-- 做清理逻辑,比如标记玩家离线
end)

pwatch模式匹配 + cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-- 模式匹配监听所有玩家状态
watch_syn_client.pwatch("game_svr", "player:*:status", "all_status", function(cluster_name, data)
log.info("some player status:", cluster_name, data.online)
end)

-- 某个玩家被unpubsyn时触发
watch_syn_client.pwatch_cancel("game_svr", "player:*:status", "player_offline", function(cluster_name, pchannel_name, channel_name)
if channel_name then
-- channel_name = "player:1001:status" 具体哪个被取消了
log.info("player offline:", channel_name)
else
-- 整个模式被取消(不常见)
log.info("all player status cancelled")
end
end)

注销cancel handler

1
2
3
-- 不再关心取消通知
watch_syn_client.unwatch_cancel("game_svr", "player:1001:status", "status_cancel")
watch_syn_client.unpwatch_cancel("game_svr", "player:*:status", "player_offline")

断连重连的行为

这个问题在实现中需要特别注意,这里做下说明:

场景1:先断连后取消

  1. 监听端和frpc_server断连,订阅循环收到disconnect退出。
  2. 推送端执行unpubsyn,由于agent已不在订阅列表中,cancel消息不会发给已断连的监听端。
  3. 监听端重连后watch_up重新拉起订阅循环,发现channel数据已被删除,进入等待状态。
  4. cancel handler不会触发(消息已丢失)。

场景2:先取消后断连

  1. 监听端收到cancel通知,cancel handler被调用。
  2. 订阅循环继续,重新发起subsyn请求。
  3. 如果此时连接断开,subsyn请求失败,循环退出。
  4. 重连后watch_up再次拉起。

重入安全

  • g_cluster_reqing_map确保同一channel不会有两个订阅循环并发运行。
  • source_map单次响应机制确保cancel和disconnect不会对同一请求回复两次。
  • cancel分支不break循环,与disconnect/reconnect逻辑不冲突。

新增API一览

推送端 (watch_server)

API 说明
M.unpubsyn(channel_name) 取消同步数据

监听端 (watch_syn_client)

API 说明
M.watch_cancel(svr_name, channel_name, handle_name, handler) 监听取消通知
M.unwatch_cancel(svr_name, channel_name, handle_name) 注销取消通知
M.watch_cancel_byid(svr_name, svr_id, channel_name, handle_name, handler) 指定节点监听取消通知
M.unwatch_cancel_byid(svr_name, svr_id, channel_name, handle_name) 指定节点注销取消通知
M.pwatch_cancel(svr_name, pchannel_name, handle_name, handler) 模式匹配取消通知
M.unpwatch_cancel(svr_name, pchannel_name, handle_name) 注销模式匹配取消通知
M.pwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name, handler) 指定节点模式匹配取消通知
M.unpwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name) 指定节点注销模式匹配取消通知

API文档

测试代码

skynetfly源码地址


unpubsyn取消同步数据的实现
https://huahua132.github.io/2026/05/15/skynet_fly_ss/unpubsyn/
作者
huahua132
发布于
2026年5月15日
许可协议