给demo hallserver大厅服加上队列

前言

因为skynet中单进程写业务也容易遇到异步重入问题,写简单个人系统也得警惕此问题,着实让人难受。索性直接用queue包裹一些执行入口,这样简单个人系统没有跨服的业务根本无需考虑此问题了。

实现方案

大厅服的业务函数处理通常以player_id为第一参数用来处理玩家个人系统的数据。我们以player_id来区分不同的队列,这样既可以有效利用skynet多携程优势,又能避免个人系统的重入问题。
而对于需要处理多个玩家数据的函数,如果想简单的避免重入问题,我们需要等所有玩家queue处理完后,再执行,并且阻塞住后续的玩家queue请求。之前实现mult_queue刚好符合这样的需求。

mult_queue

包裹入口处理函数

大厅服常用的处理入口有玩家消息luaCMD消息GM消息登录,重连,掉线,登出动作定时器skynet.fockhotfix热更
不常用的处理入口有订阅订阅同步整点报时

基于这些需求封装了queue_helper,然后在对应入口函数处,使用对应queue方法包裹住对应处理函数即可确保避免异步重入问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
local mult_queue = require "skynet-fly.mult_queue":new()
local timer = require "skynet-fly.timer"
local skynet = require "skynet"
local hotfix = require "skynet-fly.hotfix.hotfix"
local log = require "skynet-fly.log"

local M = {}

--并发玩家函数,player_id相同串行,不同可并发
function M.multi_player_func(player_id, func, ...)
return mult_queue:multi(player_id, func, ...)
end

--调用单发执行,执行单发时,并发都暂时等待,此函数适用于操作所有玩家数据时
function M.unique(func, ...)
return mult_queue:unique(func, ...)
end

--玩家定时器操作
function M.new_player_timer(player_id, expire, times, callback, ...)
return timer:new(expire, times, M.multi_player_func, player_id, callback, ...)
end

--单发定时器 调用单发执行,执行单发时,并发都暂时等待,此函数适用于操作所有玩家数据时
function M.new_unique_timer(expire, times, callback, ...)
return timer:new(expire, times, M.unique, callback, ...)
end

local function player_fork_func(player_id, func, ...)
M.multi_player_func(player_id, func, ...)
end

--玩家fork操作
function M.player_fork(player_id, func, ...)
skynet.fork(player_fork_func, player_id, func, ...)
end

local function fork(func, ...)
M.unique(func, ...)
end

--单发fork操作
function M.fork(func, ...)
skynet.fork(fork, func, ...)
end

--重写hotfix
local old_hotfix_func = hotfix.hotfix
function hotfix.hotfix(hotfixmods)
return M.unique(old_hotfix_func, hotfixmods)
end

return M

玩家消息处理

修改 common/plug/hall_plug.lua,在注册消息处理时做相应包裹。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   --消息前置处理函数
local function msg_handle_pre(func)
return function(player_id, pack_id, pack_body)
--加入队列执行
return queue_helper.multi_player_func(player_id, func, player_id, pack_id, pack_body)
end
end

--注册handle
for _, m in ipairs(g_modules_list) do
local handle = m.handle
if handle then
for pack_id,func in pairs(handle) do
g_interface_mgr:handle(pack_id, msg_handle_pre(func))
end
end
local switch_ignores = m.switch_ignores
if switch_ignores then
for pack_id in pairs(switch_ignores) do
g_switch_ignore_map[pack_id] = true
end
end
end

luaCMD消息

跟玩家消息差不多,只是得规范第一参数是player_id,如果不是就会用unique

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
   --CMD消息前置处理函数
local function cmd_handle_pre(func)
return function(key, ...)
--加入队列执行
local k_type = type(key)
if k_type == 'number' then
return queue_helper.multi_player_func(key, func, key, ...)
else
return queue_helper.unique(func, key, ...)
end
end
end

--设置CMD命令
for _, m in ipairs(g_modules_list) do
local register_cmd = m.register_cmd
if register_cmd then
for cmdname,func in pairs(register_cmd) do
assert(not M.register_cmd[cmdname], "exists cmdname: " .. cmdname)
M.register_cmd[cmdname] = cmd_handle_pre(func)
end
end
end

GM消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
--注册gm 命令
local function reg_gm_cmd(cmd_name, func, help_des)
assert(not g_gm_cmd_map[cmd_name], "exists cmd_name:" .. cmd_name)
assert(type(func) == 'function', "not func")
assert(type(help_des) == 'string', "not help_des") --描述信息
if cmd_name ~= 'help' then
g_gm_cmd_map[cmd_name] = {
func = cmd_handle_pre(func),
help_des = help_des
}
else
g_gm_cmd_map[cmd_name] = {
func = func,
help_des = help_des
}
end
end

登录,重连,掉线,登出动作

都是player_id相关的,包裹即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--登录
local function queue_on_login(player_id, is_jump_join)
g_interface_mgr:queue(player_id, queue_helper.multi_player_func, player_id, on_login, player_id, is_jump_join)
end

--掉线
function M.disconnect(player_id)
g_interface_mgr:queue(player_id, queue_helper.multi_player_func, player_id, on_disconnect, player_id)
end

--重连
local function queue_on_reconnect(player_id)
g_interface_mgr:queue(player_id, queue_helper.multi_player_func, player_id, on_reconnect, player_id)
end

--登出
function M.goout(player_id, is_jump_exit)
g_interface_mgr:queue(player_id, queue_helper.multi_player_func, player_id, on_goout, player_id, is_jump_exit)
end

定时器

定时器使用queue_helper中包裹的。

skynet.fock

fock也使用queue_helper包裹的。

hotfix热更

重写了hotfix动作,加了queue包裹。

其他未处理的入口

比如time_point回调,watch,watch_syn等等回调,需要使用者在恰当的时机用queue_helper包裹相关处理函数。

使用注意点

慎用unique

因为执行完前会阻塞所有mult包裹的处理。

注意避免环队列问题

队列中的处理函数调用call最终调用到进入相同队列的处理函数,就会形成环队列。

如何避免,检查call调用链会不会回归。


给demo hallserver大厅服加上队列
https://huahua132.github.io/2025/05/24/skynet_fly_ss/demo_hall_queue/
作者
huahua132
发布于
2025年5月24日
许可协议