远程订阅发布模式
watch_server.lua
sever
M.publish(channel_name, …)
描述: 发布名为channel_name的事件
参数:
- channel_name (string): 事件名M.pubsyn(channel_name, …)
描述: 发布名为channel_name的同步事件
参数:
- channel_name (string): 事件名M.unpubsyn(channel_name)
描述: 取消同步数据,通知所有订阅该channel的监听端数据已取消
参数:
- channel_name (string): 事件名
说明:
- 调用后,frpc_server 会删除该 channel 的同步数据
- 所有正在订阅该 channel 的监听端会收到 cancel 通知
- 如果该 channel 匹配某个 psubsyn 模式,也会从模式的 name_map 中移除
- 取消后可以重新调用 pubsyn 发布新数据
watch_client.lua
sub/pub client
M.watch(svr_name, channel_name, handle_name, handler)
描述: watch监听 svr_name 的所有结点
参数:
- svr_name (string): 集群svr_name
- channel_name (string): 事件名
- handle_name (string): 回调处理名
- handler (function): 回调函数M.unwatch(svr_name, channel_name, handle_name, handler)
描述: 取消监听 svr_name 的所有结点
参数:
- svr_name (string): 集群svr_name
- channel_name (string): 事件名
- handle_name (string): 回调处理名M.watch_byid(svr_name, svr_id, channel_name, handle_name, handler)
描述: 指定svr_id监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- channel_name (string): 事件名
- handle_name (string): 回调处理名
- handler (function): 回调函数M.unwatch_byid(svr_name, svr_id, channel_name, handle_name)
描述: 指定svr_id取消监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- channel_name (string): 事件名
- handle_name (string): 回调处理名
watch_syn_client.lua
subsyn/pubsyn client
M.watch(svr_name, channel_name, handle_name, handler)
描述: watch监听 svr_name 的所有结点
参数:
- svr_name (string): 集群svr_name
- channel_name (string): 事件名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, ...)M.unwatch(svr_name, channel_name, handle_name, handler)
描述: 取消监听 svr_name 的所有结点
参数:
- svr_name (string): 集群svr_name
- channel_name (string): 事件名
- handle_name (string): 回调处理名M.watch_byid(svr_name, svr_id, channel_name, handle_name, handler)
描述: 指定svr_id监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- channel_name (string): 事件名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, ...)M.unwatch_byid(svr_name, svr_id, channel_name, handle_name)
描述: 指定svr_id取消监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- channel_name (string): 事件名
- handle_name (string): 回调处理名
cancel 监听(取消同步通知)
当推送端调用 M.unpubsyn(channel_name) 取消同步数据时,订阅端可以通过独立的 cancel handler 接收取消通知。cancel handler 与普通的 watch handler 是独立的,互不影响。
M.watch_cancel(svr_name, channel_name, handle_name, handler)
描述: 监听 svr_name 所有结点的 channel 取消通知
参数:
- svr_name (string): 集群svr_name
- channel_name (string): 事件名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, channel_name)
说明:
- 当对应的 channel 被 unpubsyn 时,handler 会被调用
- 取消后订阅循环不会退出,会继续等待下一次 pubsynM.unwatch_cancel(svr_name, channel_name, handle_name)
描述: 注销 svr_name 所有结点的 channel 取消通知监听
参数:
- svr_name (string): 集群svr_name
- channel_name (string): 事件名
- handle_name (string): 回调处理名M.watch_cancel_byid(svr_name, svr_id, channel_name, handle_name, handler)
描述: 监听指定 svr_id 的 channel 取消通知
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- channel_name (string): 事件名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, channel_name)M.unwatch_cancel_byid(svr_name, svr_id, channel_name, handle_name)
描述: 注销指定 svr_id 的 channel 取消通知监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- channel_name (string): 事件名
- handle_name (string): 回调处理名
pwatch 模式匹配监听
M.pwatch(svr_name, pchannel_name, handle_name, handler)
描述: 批量模式匹配监听 svr_name 所有结点
参数:
- svr_name (string): 集群svr_name
- pchannel_name (string): 模式匹配名(如*:age:address)
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, ...)M.unpwatch(svr_name, pchannel_name, handle_name)
描述: 取消批量模式匹配监听 svr_name 所有结点
参数:
- svr_name (string): 集群svr_name
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名M.pwatch_byid(svr_name, svr_id, pchannel_name, handle_name, handler)
描述: 指定 svr_id 批量模式匹配监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, ...)M.unpwatch_byid(svr_name, svr_id, pchannel_name, handle_name)
描述: 取消指定 svr_id 批量模式匹配监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名
pwatch cancel 监听(模式匹配取消通知)
当匹配模式中的某个 channel 被 unpubsyn 取消时,pwatch_cancel handler 会被触发。
M.pwatch_cancel(svr_name, pchannel_name, handle_name, handler)
描述: 监听 svr_name 所有结点的模式匹配 channel 取消通知
参数:
- svr_name (string): 集群svr_name
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, pchannel_name, channel_name)
说明:
- channel_name: 被取消的具体 channel 名称(单个 channel 被 unpubsyn 时有值)
- channel_name 为 nil 时表示整个 psubsyn 模式被取消M.unpwatch_cancel(svr_name, pchannel_name, handle_name)
描述: 注销 svr_name 所有结点的模式匹配 channel 取消通知监听
参数:
- svr_name (string): 集群svr_name
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名M.pwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name, handler)
描述: 监听指定 svr_id 的模式匹配 channel 取消通知
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名
- handler (function): 回调函数function(cluster_name, pchannel_name, channel_name)M.unpwatch_cancel_byid(svr_name, svr_id, pchannel_name, handle_name)
描述: 注销指定 svr_id 的模式匹配 channel 取消通知监听
参数:
- svr_name (string): 集群svr_name
- svr_id (number): 指定svr_id
- pchannel_name (string): 模式匹配名
- handle_name (string): 回调处理名
使用示例
推送端(watch_server)
1 | |
订阅端(watch_syn_client)
1 | |
断连重连与取消同步的行为说明
场景1:取消同步前断连又重连
- 断连时订阅循环收到
disconnect,循环退出 - frpc_server 端清理 agent 的订阅关系
- 推送端执行
unpubsyn时,已断连的 agent 不在推送列表中,cancel 通知丢失 - 重连后
watch_up回调重新拉起订阅循环 - 由于 channel 数据已被删除,订阅者进入等待状态(等待下一次 pubsyn)
- cancel handler 不会被触发(cancel 消息已丢失)
场景2:cancel_subsyn 先到达,随后断连
- 订阅者收到 cancel 通知,cancel handler 被调用
- 订阅循环不退出,继续等待下一次 pubsyn
- 如果此时连接断开,subsyn 请求失败 → 循环退出
watch_up回调重新拉起
关于重入安全
g_cluster_reqing_map机制确保同一 channel 不会有两个订阅循环同时运行source_map单次响应机制确保 cancel 和 disconnect 不会对同一请求响应两次- cancel 分支处理后循环继续运行(不 break),不会与重连逻辑冲突