远程订阅发布模式

watch_server.lua

sever

  • M.publish(channel_name, …)
    描述: 发布名为channel_name的事件
    参数:
    - channel_name (string): 事件名

  • M.pubsyn(channel_name, …)
    描述: 发布名为channel_name的同步事件
    参数:
    - channel_name (string): 事件名

  • M.unpubsyn(channel_name)
    描述: 取消同步数据,通知所有订阅该channel的监听端数据已取消
    参数:
    - channel_name (string): 事件名
    说明:
    - 调用后,frpc_server 会删除该 channel 的同步数据
    - 所有正在订阅该 channel 的监听端会收到 cancel 通知
    - 如果该 channel 匹配某个 psubsyn 模式,也会从模式的 name_map 中移除
    - 取消后可以重新调用 pubsyn 发布新数据

watch_client.lua

sub/pub client

  • M.watch(svr_name, channel_name, handle_name, handler)
    描述: watch监听 svr_name 的所有结点
    参数:
    - svr_name (string): 集群svr_name
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数

  • M.unwatch(svr_name, channel_name, handle_name, handler)
    描述: 取消监听 svr_name 的所有结点
    参数:
    - svr_name (string): 集群svr_name
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名

  • M.watch_byid(svr_name, svr_id, channel_name, handle_name, handler)
    描述: 指定svr_id监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数

  • M.unwatch_byid(svr_name, svr_id, channel_name, handle_name)
    描述: 指定svr_id取消监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名

watch_syn_client.lua

subsyn/pubsyn client

  • M.watch(svr_name, channel_name, handle_name, handler)
    描述: watch监听 svr_name 的所有结点
    参数:
    - svr_name (string): 集群svr_name
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, ...)

  • M.unwatch(svr_name, channel_name, handle_name, handler)
    描述: 取消监听 svr_name 的所有结点
    参数:
    - svr_name (string): 集群svr_name
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名

  • M.watch_byid(svr_name, svr_id, channel_name, handle_name, handler)
    描述: 指定svr_id监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, ...)

  • M.unwatch_byid(svr_name, svr_id, channel_name, handle_name)
    描述: 指定svr_id取消监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名

cancel 监听(取消同步通知)

当推送端调用 M.unpubsyn(channel_name) 取消同步数据时,订阅端可以通过独立的 cancel handler 接收取消通知。cancel handler 与普通的 watch handler 是独立的,互不影响。

  • M.watch_cancel(svr_name, channel_name, handle_name, handler)
    描述: 监听 svr_name 所有结点的 channel 取消通知
    参数:
    - svr_name (string): 集群svr_name
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, channel_name)
    说明:
    - 当对应的 channel 被 unpubsyn 时,handler 会被调用
    - 取消后订阅循环不会退出,会继续等待下一次 pubsyn

  • M.unwatch_cancel(svr_name, channel_name, handle_name)
    描述: 注销 svr_name 所有结点的 channel 取消通知监听
    参数:
    - svr_name (string): 集群svr_name
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名

  • M.watch_cancel_byid(svr_name, svr_id, channel_name, handle_name, handler)
    描述: 监听指定 svr_id 的 channel 取消通知
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, channel_name)

  • M.unwatch_cancel_byid(svr_name, svr_id, channel_name, handle_name)
    描述: 注销指定 svr_id 的 channel 取消通知监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - channel_name (string): 事件名
    - handle_name (string): 回调处理名

pwatch 模式匹配监听

  • M.pwatch(svr_name, pchannel_name, handle_name, handler)
    描述: 批量模式匹配监听 svr_name 所有结点
    参数:
    - svr_name (string): 集群svr_name
    - pchannel_name (string): 模式匹配名(如 *:age:address
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, ...)

  • M.unpwatch(svr_name, pchannel_name, handle_name)
    描述: 取消批量模式匹配监听 svr_name 所有结点
    参数:
    - svr_name (string): 集群svr_name
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名

  • M.pwatch_byid(svr_name, svr_id, pchannel_name, handle_name, handler)
    描述: 指定 svr_id 批量模式匹配监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, ...)

  • M.unpwatch_byid(svr_name, svr_id, pchannel_name, handle_name)
    描述: 取消指定 svr_id 批量模式匹配监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名

pwatch cancel 监听(模式匹配取消通知)

当匹配模式中的某个 channel 被 unpubsyn 取消时,pwatch_cancel handler 会被触发。

  • M.pwatch_cancel(svr_name, pchannel_name, handle_name, handler)
    描述: 监听 svr_name 所有结点的模式匹配 channel 取消通知
    参数:
    - svr_name (string): 集群svr_name
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, pchannel_name, channel_name)
    说明:
    - channel_name: 被取消的具体 channel 名称(单个 channel 被 unpubsyn 时有值)
    - channel_name 为 nil 时表示整个 psubsyn 模式被取消

  • M.unpwatch_cancel(svr_name, pchannel_name, handle_name)
    描述: 注销 svr_name 所有结点的模式匹配 channel 取消通知监听
    参数:
    - svr_name (string): 集群svr_name
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名

  • M.pwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name, handler)
    描述: 监听指定 svr_id 的模式匹配 channel 取消通知
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名
    - handler (function): 回调函数 function(cluster_name, pchannel_name, channel_name)

  • M.unpwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name)
    描述: 注销指定 svr_id 的模式匹配 channel 取消通知监听
    参数:
    - svr_name (string): 集群svr_name
    - svr_id (number): 指定svr_id
    - pchannel_name (string): 模式匹配名
    - handle_name (string): 回调处理名

使用示例

推送端(watch_server)

1
2
3
4
5
6
7
8
9
10
local watch_server = require "skynet-fly.rpc.watch_server"

-- 发布同步数据
watch_server.pubsyn("player_info", {name = "test", level = 10})

-- 取消同步数据(通知所有订阅者)
watch_server.unpubsyn("player_info")

-- 取消后可以重新发布
watch_server.pubsyn("player_info", {name = "test", level = 11})

订阅端(watch_syn_client)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
local watch_syn_client = require "skynet-fly.rpc.watch_syn_client"

-- 监听数据变更
watch_syn_client.watch("frpc_s", "player_info", "my_handler", function(cluster_name, data)
log.info("data updated from", cluster_name, data.name, data.level)
end)

-- 监听取消通知(独立于 watch handler)
watch_syn_client.watch_cancel("frpc_s", "player_info", "my_cancel_handler", function(cluster_name, channel_name)
log.info("data cancelled from", cluster_name, channel_name)
end)

-- 模式匹配监听
watch_syn_client.pwatch("frpc_s", "*:age:address", "pwatch_handler", function(cluster_name, data)
log.info("pwatch data from", cluster_name)
end)

-- 模式匹配取消通知
watch_syn_client.pwatch_cancel("frpc_s", "*:age:address", "pcancel_handler", function(cluster_name, pchannel_name, channel_name)
if channel_name then
log.info("channel removed:", channel_name, "from pattern", pchannel_name)
else
log.info("entire pattern cancelled:", pchannel_name)
end
end)

断连重连与取消同步的行为说明

场景1:取消同步前断连又重连

  1. 断连时订阅循环收到 disconnect,循环退出
  2. frpc_server 端清理 agent 的订阅关系
  3. 推送端执行 unpubsyn 时,已断连的 agent 不在推送列表中,cancel 通知丢失
  4. 重连后 watch_up 回调重新拉起订阅循环
  5. 由于 channel 数据已被删除,订阅者进入等待状态(等待下一次 pubsyn)
  6. cancel handler 不会被触发(cancel 消息已丢失)

场景2:cancel_subsyn 先到达,随后断连

  1. 订阅者收到 cancel 通知,cancel handler 被调用
  2. 订阅循环不退出,继续等待下一次 pubsyn
  3. 如果此时连接断开,subsyn 请求失败 → 循环退出
  4. watch_up 回调重新拉起

关于重入安全

  • g_cluster_reqing_map 机制确保同一 channel 不会有两个订阅循环同时运行
  • source_map 单次响应机制确保 cancel 和 disconnect 不会对同一请求响应两次
  • cancel 分支处理后循环继续运行(不 break),不会与重连逻辑冲突

远程订阅发布模式
https://huahua132.github.io/2024/06/29/skynet_fly_api/frpc_watch/
作者
huahua132
发布于
2024年6月29日
许可协议