关于redis的封装

为什么要封装redis?

  1. 适用skynet_fly生态。
  2. redis连接配置可以写在mod_config中。

对于实现方式的思考

  • 方式一
    用skynet_fly 热更模块的方式,这样连接配置就可以写在mod_config,还可以配置连接多个不同的redis。
    优势
    1. 配置redis连接方便。
    2. 固定redis连接数量。
    劣势
    1. 多了一次消息的打包解包。
    2. redis的sub/pub,订阅通知不好处理。
    3. redis的scan命令,数据存在被刷新的可能。

  • 方式二
    按lua模块实现,去配置中心拿取配置。
    优势
    1. 配置redis连接方便。
    2. 独占订阅连接。
    3. 直连redis。
    4. 独立的scan命令游标。
    劣势
    1. 连接随服务数量增长。

用脚思考,直接选二。

基于方式二的实现

先弄个配置中心,简单的key-value形式,非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
local skynet = require "skynet"
local timer = require "timer"

local g_config = nil
local CMD = {}

function CMD.start(config)
g_config = config
return true
end

function CMD.query(k)
return g_config[k]
end

function CMD.exit()
timer:new(timer.minute,1,skynet.exit)
end

return CMD

配合contriner_client.lua模块,就可以轻松拿取配置。

1
2
local cli = contriner_client:new('share_config_m')
local conf_map = cli:mod_call('query','redis')

接下来就可以收割redisf.lua的实现了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
local skynet = require "skynet"
local contriner_client = require "contriner_client"
local redis = require "skynet.db.redis"
local sha2 = require "sha2"
local log = require "log"

local setmetatable = setmetatable
local assert = assert
local pcall = pcall
local ipairs = ipairs
local type = type
local string = string

local g_sha_map = {}

local M = {}
--[[
函数作用域:M的成员函数
函数名称:new_client
描述:新建一个在share_config_m 中写的key为redis表的名为db_name的连接配置
参数:
- db_name (string): 连接配置名称
]]
function M.new_client(db_name)
local cli = contriner_client:new('share_config_m')
local conf_map = cli:mod_call('query','redis')
assert(conf_map and conf_map[db_name],"not redis conf")

local conf = conf_map[db_name]
local ok,conn = pcall(redis.connect,conf)
if not ok then
log.fatal("redisf new_client err ",conn,conf)
return nil
end

return conn
end

--[[
函数作用域:M的成员函数
函数名称:script_run
描述:运行redis脚本命令
参数:
- conn (redis_conn): new_client返回的连接对象
- script_str (string):redis lua 脚本
- ... 脚本传递参数
]]
function M.script_run(conn,script_str,...)
assert(conn)
assert(type(script_str) == 'string','script_str not string')

local sha = g_sha_map[script_str]
if not sha then
sha = sha2.sha1(script_str)
g_sha_map[script_str] = sha
end

local ok,ret = pcall(conn.evalsha,conn,sha,...)
if not ok then
if string.find(ret,"NOSCRIPT",nil,true) then
ret = conn:eval(script_str,...)
end
end

return ret
end

--[[
函数作用域:M的成员函数
函数名称:new_watch
描述:redis订阅
参数:
- db_name (string): 连接的redis名称
- subscribe_list (table): 订阅的固定key
- psubscribe_list (table): 订阅的匹配key
- call_back (function): 消息回调函数

返回值
- 取消订阅函数
]]
function M.new_watch(db_name,subscribe_list,psubscribe_list,call_back)
local cli = contriner_client:new('share_config_m')
local conf_map = cli:mod_call('query','redis')
assert(conf_map and conf_map[db_name],"not redis conf")
local conf = conf_map[db_name]

local ok,watch = pcall(redis.watch,conf)
if not ok then
log.fatal("redisf new_watch err ",conf)
return nil
end

for _,key in ipairs(subscribe_list) do
watch:subscribe(key)
end

for _,key in ipairs(psubscribe_list) do
watch:psubscribe(key)
end

local is_cancel = false

skynet.fork(function()
while not is_cancel do
local ok,msg,key,psubkey = pcall(watch.message,watch)
if ok then
call_back(msg,key,psubkey)
else
if not is_cancel then
log.fatal("watch.message err :",msg,key,psubkey)
end
break
end
end
end)

return function()
for _,key in ipairs(subscribe_list) do
watch:unsubscribe(key)
end

for _,key in ipairs(psubscribe_list) do
watch:punsubscribe(key)
end
watch:disconnect()
is_cancel = true
return true
end
end

return M

版本修改

第一个版本有一些诟病的地方:

  1. 没有考虑首次连接失败的情况

    • 问题
      redis可能挂掉,重启啥的,上一个版本没有考虑到服务启动时连不上的问题,首次连接失败后,后续不会再尝试连接。
    • 解决方案
      每次调用命令的时候都去检测连接是否存在,不存在先尝试连接。
  2. 保护执行

    • 问题
      调用redis命令,有可能因为网络原因调用失败,导致lua层面断言。
    • 解决方案
      调用命令都经过pcall,通过__index方法对命令进行pcall包裹再缓存函数。
  3. 自定义命令扩展性

    • 问题
      原生命令产生的结果可能需要进行二次处理,业务层才能更方便的使用。
    • 解决方案
      利用__index方法检查自定义command表是否有函数,有的话用pcall进行包裹再缓存函数。
      增加add_command函数,外部想扩展自定义函数只需要调用该命令即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
 local skynet = require "skynet"
local contriner_client = require "contriner_client"
local redis = require "skynet.db.redis"
local string_util = require "string_util"
local sha2 = require "sha2"
local log = require "log"

local setmetatable = setmetatable
local assert = assert
local pcall = pcall
local ipairs = ipairs
local pairs = pairs
local type = type
local string = string
local select = select
local tunpack = table.unpack
local debug_getinfo = debug.getinfo

local g_sha_map = {}

local M = {}
local command = {} --自定义命令函数

local cmdfuncs = {} --命令函数缓存

--[[
函数作用域:M的成员函数
函数名称:script_run
描述:运行redis脚本命令
参数:
- self (redis_conn): new_client返回的连接对象
- script_str (string):redis lua 脚本
- ... 脚本传递参数
]]
function command:script_run(script_str,...)
local conn = self.conn
assert(conn,"not connect redis ")
assert(type(script_str) == 'string','script_str not string')

local sha = g_sha_map[script_str]
if not sha then
sha = sha2.sha1(script_str)
g_sha_map[script_str] = sha
end

local isok,ret = pcall(conn.evalsha,conn,...)
if not isok then
if string.find(ret,"NOSCRIPT",nil,true) then
ret = conn:eval(script_str,...)
end
end

return ret
end

local function get_line_info()
local info = debug_getinfo(3,"Sl")
local lineinfo = info.short_src .. ":" .. info.currentline
end

--给redis命令施加保护执行
local mt = {
__index = function(t,k)
local f = cmdfuncs[k]
if f then
t[k] = f
return f
end

local f = function (self,...)
if not self.conn then
local ok,conn = pcall(redis.connect,self.conf)
if not ok then
log.error("connect redis err ",get_line_info(),conn,k,self.conf)
return
else
self.conn = conn
end
end

local cmd = command[k]
if cmd then
local ret = {pcall(cmd,self,...)}
local isok = ret[1]
local err = ret[2]
if not isok then
log.error("call redis command faild ",get_line_info(),err,k,...)
return
else
return select(2,tunpack(ret))
end
else
local isok,ret = pcall(self.conn[k],self.conn,...)
if not isok then
log.error("call redis faild ",get_line_info(),ret,k,...)
return
end
return ret
end
end

t[k] = f
--缓存命令函数
cmdfuncs[k] = f
return f
end}

--[[
函数作用域:M的成员函数
函数名称:new_client
描述:新建一个在share_config_m 中写的key为redis表的名为db_name的连接配置
参数:
- db_name (string): 连接配置名称
]]
function M.new_client(db_name)
local cli = contriner_client:new('share_config_m')
local conf_map = cli:mod_call('query','redis')
assert(conf_map and conf_map[db_name],"not redis conf")

local conf = conf_map[db_name]
local t_conn = {
conf = conf,
conn = false
}
setmetatable(t_conn,mt)
t_conn:get("ping") --尝试调一下
return t_conn
end

--[[
函数作用域:M的成员函数
函数名称:add_command
描述:增加自定义command命令
参数:
- M (table): 定义的函数模块
]]
function M.add_command(M)
for k,func in pairs(M) do
assert(not command[k],"command is exists " .. k)
command[k] = func
end
end
--[[
函数作用域:M的成员函数
函数名称:new_watch
描述:redis订阅
参数:
- db_name (string): 连接的redis名称
- subscribe_list (table): 订阅的固定key
- psubscribe_list (table): 订阅的匹配key
- call_back (function): 消息回调函数

返回值
- 取消订阅函数
]]
function M.new_watch(db_name,subscribe_list,psubscribe_list,call_back)
local cli = contriner_client:new('share_config_m')
local conf_map = cli:mod_call('query','redis')
assert(conf_map and conf_map[db_name],"not redis conf")
local conf = conf_map[db_name]

local is_cancel = false
local ok,watch

skynet.fork(function()
while not watch and not is_cancel do
ok,watch = pcall(redis.watch,conf)
if not ok then
log.error("redisf connect watch err ",conf)
end
skynet.sleep(100)
end
for _,key in ipairs(subscribe_list) do
if not is_cancel then
watch:subscribe(key)
end
end

for _,key in ipairs(psubscribe_list) do
if not is_cancel then
watch:psubscribe(key)
end
end

while not is_cancel do
local ok,msg,key,psubkey = pcall(watch.message,watch)
if ok then
call_back(msg,key,psubkey)
else
if not is_cancel then
log.error("watch.message err :",msg,key,psubkey)
end
end
end
end)

return function()
is_cancel = true
if watch then
for _,key in ipairs(subscribe_list) do
watch:unsubscribe(key)
end

for _,key in ipairs(psubscribe_list) do
watch:punsubscribe(key)
end
watch:disconnect()
watch = nil
end
return true
end
end

return M

外部扩展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
local redisf = require "redisf"

local tinsert = table.insert
local tunpack = table.unpack
local pairs = pairs

local M = {}

function M:hgetall(key)
local conn = self.conn
local ret = conn:hgetall(key)
local res = {}

for i = 1,#ret,2 do
local k = ret[i]
local v = ret[i + 1]
res[k] = v
end

return res
end

function M:hmset(key,map)
local conn = self.conn
local args_list = {}
for k,v in pairs(map) do
tinsert(args_list,k)
tinsert(args_list,v)
end
return conn:hmset(key,tunpack(args_list))
end

redisf.add_command(M)

return redisf

测试

测试可以看skynet_fly源码下的test模块。
skynetfly源码地址


关于redis的封装
https://huahua132.github.io/2023/07/08/skynet_fly_ss/redis/
作者
huahua132
发布于
2023年7月8日
许可协议