关于skynet_fly gate,ws_gate消息包设计

前言

最近skynet_fly被群友用来写项目,并且上线了,感觉还是很欣慰的,群友用了我的房间类游戏架构,大概三周的时间,游戏就直接上线了,过程中咨询过我一些问题,其中关于协议这一块,解释的比较久,并且最后是我把我写好的中国象棋客户端上传了github,让他给客户端参考,才较快解决了前后端协议对接。所以想写这一篇来拆解一下协议封装。

协议图

skynet gate协议包

skynet协议包就是上图红色部分,大端2字节的包体长度拼接包体

skynet_fly 协议包

skynet_fly的gate和ws_gate协议基于skynet协议把包体拆分成了大端2字节包名长度包名消息体

为啥这么做

因为通常消息派发都会在框架层处理,业务层只需要监听处理就行。这样业务层只需要监听处理对应的包名,消息体

实现

包体处理

这个是对于包体的打包解包,通过函数式编程,抽象了两个基础方法。只需要提供具体的编码解码方式就可以创建对应的打包解包函数。
代码路径 lualib/netpack/netpack_base

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
local assert = assert
local spack = string.pack

local M = {}

function M.create_pack(encode)
return function(name,tab)
assert(name)
assert(tab)

local ok,str = encode(name,tab)
if not ok then
return nil,str
end

local pbmsgbuff = spack(">I2",name:len()) .. name .. str
return pbmsgbuff
end
end

function M.create_unpack(decode)
return function(msgbuff)
assert(msgbuff)
local name_sz = (msgbuff:byte(1) << 8) + msgbuff:byte(2)
local packname = msgbuff:sub(3,3 + name_sz - 1)
local pack_str = msgbuff:sub(3 + name_sz)

local ok,tab = decode(packname,pack_str)
if not ok then
return nil,tab
end

return packname,tab
end
end

return M

例如json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
local json = require "cjson"
local netpack_base = require "netpack_base"

local pcall = pcall
local assert = assert

local M = {}


function M.encode(name,tab)
assert(tab)
return pcall(json.encode,tab)
end

function M.decode(name,pstr)
assert(pstr)
return pcall(json.decode,pstr)
end

M.pack = netpack_base.create_pack(M.encode)

M.unpack = netpack_base.create_unpack(M.decode)

return M

整包处理

这是对于整包的处理,包含包体长度,也是采用了函数式编程,提供了单发消息广播消息解包客户端收包处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
local skynet = require "skynet"
local socket = require "socket"
local netpack = require "netpack"
local websocket = require "websocket"
local log = require "log"

local pcall = pcall
local string = string
local assert = assert

local M = {}

-------------------------------------------------------
--基于skynet gate 的消息发送
-------------------------------------------------------
function M.create_gate_send(pack)
return function(gate,fd,name,tab)
assert(fd)
assert(name)
assert(tab)

local msg,err = pack(name,tab)
if not msg then
log.error("util_net_base.pack err ",name,tab,err)
return
end

return socket.write(fd,netpack.pack(msg))
end
end

-------------------------------------------------------
--基于skynet gate 的消息广播
-------------------------------------------------------
function M.create_gate_broadcast(pack)
return function(gate_list,fd_list,name,tab)
assert(fd_list and #fd_list > 0)
assert(name)
assert(tab)

local msg,err = pack(name,tab)
if not msg then
log.error("util_net_base.pack err ",name,tab,err)
return
end

for i = 1,#fd_list do
--netpack.pack会分配内存,write会释放内存,所以必须一个write一个包
socket.write(fd_list[i], netpack.pack(msg))
end
end
end
--------------------------------------------------------
--基于skynet gate 的消息解包
--------------------------------------------------------
function M.create_gate_unpack(unpack)
return function(msg,sz)
assert(msg)
assert(sz)

local msgstr = skynet.tostring(msg,sz)
if sz < 2 then
log.info("unpack invalid msg ",msgstr,sz)
return nil
end

local packname,tab = unpack(msgstr)
if not packname then
log.error("unpack err ",tab)
return
end

return packname,tab
end
end
--------------------------------------------------------
--通用的客户端消息接送处理
--------------------------------------------------------
function M.create_recv(read,unpack)
return function(fd,dispatch)
assert(fd)
assert(dispatch)

local is_cancel = false
local total_msg = ""

local function unpack_msg()
local sz = total_msg:len()
if sz < 2 then
return nil
end

local pack_sz = (total_msg:byte(1) << 8) + total_msg:byte(2)
if sz < pack_sz + 2 then
return nil
end

local offset = 2
local msg = total_msg:sub(offset + 1,offset + pack_sz)
total_msg = total_msg:sub(offset + 1 + pack_sz)
return msg
end

skynet.fork(function()
while not is_cancel do
local ok,msg = pcall(read,fd)
if not ok or msg == false then
log.error("read faild ",fd,msg)
break
end

total_msg = total_msg .. msg
while total_msg:len() > 0 do
local one_pack = unpack_msg()
if not one_pack then break end
skynet.fork(dispatch,fd,unpack(one_pack))
end
end

socket.close(fd)
end)

return function()
is_cancel = true
end
end
end

--------------------------------------------------------
--基于skynet ws_gate 的消息发送
--------------------------------------------------------
local function create_ws_gate_send(type)
local send_type = 'send_' .. type
return function(pack)
return function(gate,fd,name,tab)
assert(fd)
assert(name)
assert(tab)

local msg,err = pack(name,tab)
if not msg then
log.error("util_net_base.pack err ",name,tab,err)
return
end

--大端2字节表示包长度
local send_buffer = string.pack(">I2",msg:len()) .. msg

if not gate then
if websocket.is_close(fd) then
log.warn("send not exists fd ",fd)
else
websocket.write(fd,send_buffer,type)
end
else
skynet.send(gate,'lua',send_type,fd,send_buffer)
end
end
end
end

local function create_ws_gate_broadcast(type)
local send_type = 'send_' .. type
return function(pack)
return function(gate_list,fd_list,name,tab)
assert(gate_list and #gate_list > 0)
assert(fd_list and #fd_list > 0)
assert(name)
assert(tab)

local msg,err = pack(name,tab)
if not msg then
log.error("util_net_base.pack err ",name,tab,err)
return
end

--大端2字节表示包长度
local send_buffer = string.pack(">I2",msg:len()) .. msg

for i = 1,#fd_list do
local fd = fd_list[i]
local gate = gate_list[i]
skynet.send(gate,'lua',send_type,fd,send_buffer)
end
end
end
end

M.create_ws_gate_send_text = create_ws_gate_send('text')

M.create_ws_gate_send_binary = create_ws_gate_send('binary')

M.create_ws_gate_broadcast_text = create_ws_gate_broadcast('text')

M.create_ws_gate_broadcast_binary = create_ws_gate_broadcast('binary')

--------------------------------------------------------
--基于skynet ws_gate 的消息解包
--------------------------------------------------------
function M.create_ws_gate_unpack(unpack)
return function(msg,sz)
assert(msg)
local msgstr = skynet.tostring(msg,sz)
if sz < 2 then
log.info("unpack invalid msg ",msg,sz)
return
end

local msgsz = (msgstr:byte(1) << 8) + msgstr:byte(2)
msgstr = msgstr:sub(3)
sz = msgstr:len()
if msgsz ~= sz then
log.info("unpack invalid msg ",msgsz,sz)
return
end

local packname,tab = unpack(msgstr)
if not packname then
log.error("unpack err ",tab)
return
end

return packname,tab
end
end

return M

由于gate版本的消息已经经过skynet.netpack处理,传递出来的是包体长度包体,所以gate版本在打包解包不需要处理2字节的包体长度了。ws_gate需要处理。
ws_gate区分了text消息和binary经过查资料,区别是text是人类可读的内容,比如json格式的数据,pb就是binary人类不可读的数据。
需要什么格式消息处理,只需要实现对于的打包解包函数即可。

例如 json pb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
local socket = require "socket"
local json_netpack = require "json_netpack"
local util_net_base = require "util_net_base"

--------------------------------------------------------
--这是给skynet gate网关服务处理消息用的 基于json协议
--------------------------------------------------------

local M = {}

--给fd发送socket消息
M.send = util_net_base.create_gate_send(json_netpack.pack)

--给fd_list发送socket消息
M.broadcast = util_net_base.create_gate_broadcast(json_netpack.pack)

--解包
M.unpack = util_net_base.create_gate_unpack(json_netpack.unpack)

--客户端读取消息包
M.recv = util_net_base.create_recv(socket.read,json_netpack.unpack)

return M
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
local socket = require "socket"
local pb_netpack = require "pb_netpack"
local util_net_base = require "util_net_base"

--------------------------------------------------------
--这是给skynet gate网关服务处理消息用的 基于protobuf协议
--------------------------------------------------------

local M = {}

--给fd发送socket消息
M.send = util_net_base.create_gate_send(pb_netpack.pack)

--给fd_list发送socket消息
M.broadcast = util_net_base.create_gate_broadcast(pb_netpack.pack)

--解包
M.unpack = util_net_base.create_gate_unpack(pb_netpack.unpack)

--客户端读取消息包
M.recv = util_net_base.create_recv(socket.read,pb_netpack.unpack)

return M

总结

消息包这一块的代码实现还是挺清晰明了,使用者也可以基于base扩展出其他不同的编码解码方式。

扩展实现想法

这一版用的packname的方式做映射,其实大多都是用消息码,然后代码里写对于映射关系。因为我一开始想着把packname打进去,pb消息能直接解析,不需要写映射关系,非常方便。
后续也可以实现一个消息码版本的,这种方式业务层需要多写一个对于消息码的映射,不过好处是包体更小。

底层优化想法

因为ws_gate是lua实现了,socket产生fd是slave节点接收管理的,所以每次发消息需要先发给slave,slave再发给客户端,这里可以想想能不能直接发。

skynetfly源码地址


关于skynet_fly gate,ws_gate消息包设计
https://huahua132.github.io/2023/12/10/skynet_fly_ss/pack/
作者
huahua132
发布于
2023年12月10日
许可协议