关于cluster 远程rpc调用

简介

cluster 远程rpc是依赖skynet的cluster模式实现的。
skynet_fly远程rpc调用的实现注重什么:
* 与热更模板系统完美融合。
* 简单易用的API设计。
* 支持多结点。
* 支持服务发现。

定位

因为远程rpc调用结点偶尔下线,上线都是常有的事,所以不建议用来做强生效性消息(就是调用了,一定要执行成功)。
skynet_fly cluster rpc 默认启用__nowaiting就是结点连不上也不会等待,rpc远程调用适合做查询,弱通知。

实现

基于skynet cluster mode 模式,扩展了cluster_sever,frpc_client_m,frpc_client3个文件。

  • cluster_sever.lua
    它是一个skynet服务。
    负责

    1. cluster rpc服务开启监听。
    2. 后续服务发现机制的服务注册。
    3. 担任rpc调用入口服务。
  • frpc_client_m.lua
    它是一个skynet_fly可热更模块。
    负责

    1. cluster 服务配置加载。
    2. 后续cluster服务发现。
    3. 担任远程rpc调用代理。
    4. 对接cluster_server消息。
  • frpc_client.lua
    它是一个lua模块。
    负责

    1. 封装简化远程rpc调用。
    2. 保持内部rpc调用相同的调用方法。

使用实例

examples/cluster_server_1
examples/cluster_server_2
examples/frpc_client

cluster_server_1和cluster_server_2代码一样,监听端口配置不同,为了模拟相同服务配置在2台机器上。
启动步骤

cluster_server_1

  1. 构建 sh ../../binshell/make_server.sh ../../
  2. 修改配置,打开cluster_server_1_config.lua文件,修改svr_name=cluster_server
  3. 启动script/run.sh

cluster_server_2

  1. 构建 sh ../../binshell/make_server.sh ../../
  2. 修改配置,打开cluster_server_1_config.lua文件,修改svr_name=cluster_server,修改svr_id = 2,修改debug_port改为没有被使用的端口。
  3. 启动script/run.sh

frpc_client

  1. 构建 sh ../../binshell/make_server.sh ../../
  2. 修改配置,打开cluster_server_1_config.lua文件,修改debug_port改为没有被使用的端口。
  3. 启动script/run.sh
  • 服务端

main.lua

1
2
3
4
5
6
7
8
9
10
local skynet = require "skynet"
local contriner_launcher = require "contriner_launcher"

skynet.start(function()
skynet.error("start cluster_server!!!>>>>>>>>>>>>>>>>>")
contriner_launcher.run()

skynet.uniqueservice("cluster_server")
skynet.exit()
end)

load_mods.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
return {
--共享配置
share_config_m = {
launch_seq = 1,
launch_num = 1,
default_arg = {
--cluster_server用的配置
cluster_server = {
host = "127.0.0.1:9688", --rpc监听端口
}
}
},

test_m = {
launch_seq = 2,
launch_num = 6,
mod_args = {
{instance_name = "test_one"},
{instance_name = "test_one"},
{instance_name = "test_one"},
{instance_name = "test_two"},
{instance_name = "test_two"},
{instance_name = "test_two"},
}
}
}

test_m.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
local log = require "log"
local skynet = require "skynet"
local contriner_client = require "contriner_client"

contriner_client:register("share_config_m")
local string = string

local g_config = nil

local CMD = {}

function CMD.hello(who)
log.info(string.format("%s send hello msg for me",who))
end

function CMD.ping()
local confclient = contriner_client:new("share_config_m")
local conf = confclient:mod_call('query','cluster_server')
return string.format("pong %s %s %s",g_config.instance_name,conf.host,skynet.self())
end

function CMD.start(config)
g_config = config
return true
end

function CMD.exit()
return true
end

return CMD
  • 客户端

main.lua

1
2
3
4
5
6
7
8
local skynet = require "skynet"
local contriner_launcher = require "contriner_launcher"

skynet.start(function()
skynet.error("start frpc_client!!!>>>>>>>>>>>>>>>>>")
contriner_launcher.run()
skynet.exit()
end)

load_mods.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
return {
frpc_client_m = {
launch_seq = 1,
launch_num = 1,
default_arg = {
node_map = {
['cluster_server'] = {
[1] = "127.0.0.1:9688",
[2] = "127.0.0.1:9689",
}
}
}
},

test_m = {
launch_seq = 2,
launch_num = 1,
}
}

test_m

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
local skynet = require "skynet"
local log = require "log"
local frpc_client = require "frpc_client"
local CMD = {}

function CMD.start()
skynet.fork(function()
local cli = frpc_client:new("cluster_server","test_m") --访问cluster_server的test_m模板

cli:one_balance_send("hello","one_balance_send")
cli:one_mod_send("hello","one_mod_send")
cli:set_svr_id(1):byid_balance_send("hello","byid_balance_send")
cli:set_svr_id(1):byid_mod_send("hello","byid_mod_send")

for i = 1,3 do
log.info("balance ping ",i,cli:one_balance_call("ping"))
end
for i = 1,3 do
log.info("mod ping ",i,cli:one_mod_call("ping"))
end
for i = 1,3 do
log.info("byid ping ",i,cli:set_svr_id(2):byid_balance_call("ping"))
end
for i = 1,3 do
log.info("byid ping ",i,cli:set_svr_id(1):byid_mod_call("ping"))
end

cli:all_mod_send("hello","all_mod_send")
local ret = cli:all_mod_call("ping")
log.info("all_mod_call: ",ret)

cli:all_balance_send("hello","all_balance_send")
local ret = cli:all_balance_call("ping")
log.info("all_balance_call: ",ret)

cli:one_broadcast("hello","one_broadcast")
cli:all_broadcast("hello","all_broadcast")
cli:set_svr_id(1):byid_broadcast("hello","byid_broadcast")

cli:set_instance_name("test_one")
cli:set_svr_id(2)
cli:one_balance_send_by_name("hello","one_balance_send_by_name")
cli:one_mod_send_by_name("hello","one_mod_send_by_name")
cli:byid_balance_send_by_name("hello","byid_balance_send_by_name")
cli:byid_mod_send_by_name("hello","byid_mod_send_by_name")

for i = 1,3 do
log.info("one_balance_call_by_name ping ",i,cli:one_balance_call_by_name("ping"))
end
for i = 1,3 do
log.info("one_mod_call_by_name ping ",i,cli:one_mod_call_by_name("ping"))
end
for i = 1,3 do
log.info("byid_balance_call_by_name ping ",i,cli:byid_balance_call_by_name("ping"))
end
for i = 1,3 do
log.info("byid_mod_call_by_name ping ",i,cli:byid_mod_call_by_name("ping"))
end

cli:all_mod_send_by_name("hello","all_mod_send_by_name")
local ret = cli:all_mod_call_by_name("ping")
log.info("all_mod_call_by_name: ",ret)

cli:all_balance_send_by_name("hello","all_balance_send_by_name")
local ret = cli:all_balance_call_by_name("ping")
log.info("all_balance_call_by_name: ",ret)

cli:one_broadcast_by_name("hello","one_broadcast_by_name")
cli:all_broadcast_by_name("hello","all_broadcast_by_name")
cli:byid_broadcast_by_name("hello","byid_broadcast_by_name")
end)

return true
end

function CMD.exit()
return true
end

return CMD

基于redis做的服务发现对比etcd如何

通常都是基于etcd做服务发现,etcd 之所以适合用于服务发现,主要有以下几个原因:

  1. 支持服务的自动注册和发现
    etcd允许服务在启动时主动向etcd注册自己提供的服务,而客户端可以通过监听服务变更事件来发现可用的服务实例。这样就无需人工记录服务地址。

  2. 支持服务健康检查
    etcd支持对保存的服务进行定期心跳检测,一旦服务失效就可以将其服务地址从注册表中删除或标记为不可用。

  3. 服务信息更新的事件通知
    etcd支持watch机制,服务信息有变更时,可立即通知注册的客户端。这样客户端可以快速响应服务的变化。

  4. 简单灵活的HTTP API
    etcd提供HTTP RESTful API用于读写服务注册表,使用简单且多语言都可以轻松访问。

  5. 服务注册表数据持久化
    etcd会将服务注册表数据持久保存,这样即使etcd节点重启,注册表数据也不会丢失。

  6. 支持服务负载均衡
    可以在etcd中保存服务的额外元数据,如服务的负载信息,用于实现服务负载均衡。

综上,etcd作为一个高可用的分布式KV存储系统,非常适合用于服务注册与发现场景,成为微服务架构下的配置中心和服务注册表。

那么redis做服务发现可以保证以上几点呢?

  1. 支持服务的自动注册和发现
    redis的sub/pub机制可以轻松实现。

  2. 支持服务健康检查
    通知定期对服务的host进行设置,并设置过期时间和过期事件通知,从而可以监听服务是否健康。

  3. 服务信息更新的事件通知
    redis通知开启notify-keyspace-events KA配置,客户端通过sub/pub机制可以监听key的所有事件,set,expired等等。

  4. 简单灵活的HTTP API
    redis仅仅支持tcp长连接,虽然通用性差一下,当时通信效率更好。

  5. 服务注册表数据持久化
    redis也有RDB和AOF持久化策略。

  6. 支持服务负载均衡
    redis也可以使用hash结构保存结点额外元数据,服务的负载信息等等,也可以用于实现负载均衡。

综上几点,可以说,redis也非常适合做服务发现。

redis做服务发现实现

  • 前提
    我们启动redis时,需要修改redis.conf
    设置 notify-keyspace-events KA
    这样我们可以通过watch监听某些key的所有事件,包括set(设置),expired(过期)。

  • rpc基础函数封装
    给rpc设置独特的key命名,防止以后业务key命名冲突,我基于框架命名。
    string.format("skynet_fly:rpc:%s:%s",svr_name,svr_id)

主要需要实现3个方法,register,get_node_host,watch

  • register 用于服务端服务开启时,间隔一秒去设置host信息,并设置2秒的过期时间。
  • get_node_host 用于客户端去拿取结点的host信息。
  • watch 用于客户端监听key的set和expired事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
local redisf = require "redisf"
local string_util = require "string_util"

local setmetatable = setmetatable
local assert = assert
local string = string
local tonumber = tonumber

local M = {}
local meta = {__index = M}

local g_dbindex = 0 --redis几号数据库
local g_db_name = "rpc"

function M:new()
local cli = redisf.new_client(g_db_name)
local t = {
cli = cli
}
assert(cli,"can`t connect redis",g_db_name)
setmetatable(t,meta)
return t
end

--注册,设置连接信息2秒过期时间,需要1秒调用一次
function M:register(svr_name,svr_id,host)
assert(svr_name,"not svr_name")
assert(svr_id,"not svr_id")
assert(host,"not host")

local key = string.format("skynet_fly:rpc:%s:%s",svr_name,svr_id)
self.cli:set(key,host,"EX",2)
end

--获取结点的ip和端口
function M:get_node_host(svr_name,svr_id)
assert(svr_name,"not svr_name")
assert(svr_id,"not svr_id")

local key = string.format("skynet_fly:rpc:%s:%s",svr_name,svr_id)
return self.cli:get(key)
end

--监听结点host
--redis config 需要配置 notify-keyspace-events KA
--可以监听key的所有操作事情包括过期
function M:watch(svr_name,call_back)
local k = string.format("__keyspace@%d__:skynet_fly:rpc:%s:*",g_dbindex,svr_name)
return redisf.new_watch(g_db_name,{},{k},function(event,key,psubkey)
local split_str = string_util.split(key,':')
local svr_id = tonumber(split_str[#split_str])
if event == 'set' then
local host = self:get_node_host(svr_name,svr_id)
if host then
call_back(event,svr_name,svr_id,host)
else
call_back("get_failed",svr_name,svr_id)
end
elseif event == 'expired' then
call_back(event,svr_name,svr_id,nil)
end
end)
end

return M
  • 服务端修改
    考虑到兼容性和扩展性,只是在cluster_server.lua增加了register的配置,不想使用服务发现也可以,后续想用etcd做服务发现也好扩展。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    --共享配置
    share_config_m = {
    launch_seq = 1,
    launch_num = 1,
    default_arg = {
    --cluster_server用的配置
    cluster_server = {
    host = "127.0.0.1:9688",
    register = "redis", --连接信息注册到redis
    },

    redis = {
    --rpc连接配置
    rpc = {
    host = '127.0.0.1',
    port = 6379,
    auth = '123456',
    db = 0,
    },
    },
    }
    },

    需要在redis配置中配置名称为rpc的配置。
    cluster_server增加如下代码

    1
    2
    3
    4
    5
    6
    7
    8
    local register = conf.register
    if register == 'redis' then --注册到redis
    local rpccli = rpc_redis:new()
    --一秒写一次
    timer:new(timer.second,0,function()
    rpccli:register(g_svr_name,g_svr_id,conf.host)
    end)
    end

    一秒一次是调用注册,如果服务下线了,客户端将收到key过期的通知。

  • 客户端
    考虑到兼容性和扩展性,只是在cluster_server.lua增加了watch的配置,不想使用服务发现也可以,后续想用etcd做服务发现也好扩展。
    load_mods.lua

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    share_config_m = {
    launch_seq = 1,
    launch_num = 1,
    default_arg = {
    redis = {
    --rpc连接配置
    rpc = {
    host = '127.0.0.1',
    port = 6379,
    auth = '123456',
    db = 0,
    },
    },
    }
    },

    frpc_client_m = {
    launch_seq = 2,
    launch_num = 1,
    default_arg = {
    node_map = {
    ['cluster_server_byredis'] = true, --连接cluster_server_byredis服务
    },
    watch = 'redis', --监听redis的方式做服务发现
    }
    },

frpc_client_m.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if watch == 'redis' then
--redis服务发现方式
local rpccli = rpc_redis:new()
for svr_name,node in pairs(node_map) do
g_redis_watch_cancel_map[svr_name] = rpccli:watch(svr_name,function(event,name,id,host)
if event == 'set' then --设置
local old_host = get_node_host(name,id)
if old_host ~= host then
del_node(name,id)
add_node(name,id,host)
log.error("change cluster node :",name,id,old_host,host)
end
elseif event == 'expired' then --过期
del_node(name,id)
log.error("down cluster node :",name,id)
elseif event == 'get_failed' then --拿不到配置,通常是因为redis挂了,或者key被意外删除,或者redis出现性能瓶颈了
del_node(name,id)
log.error("get_failed cluster node :",name,id)
end
end)
end
else
--本机配置方式
for svr_name,node in pairs(node_map) do
for svr_id,host in pairs(node) do
add_node(svr_name,svr_id,host)
end
end
end

完成使用实例可以查看运行,examples/cluster_client_byredisexamples/cluster_server_byredis_1,examples/cluster_server_byredis_2
跟没有使用服务发现的示例对比,区别主要在load_mods.lua配置上面。

总结

cluster_rpc API保持了易用性,服务发现机制也保留了扩展性,后续想支持ectd可以非常方便,不过cluster_rpc只方便跟skynet的项目做对接,后续看要不要实现grpc。

改动

后续使用 frpc_server 替换了cluster_server

skynetfly源码地址


关于cluster 远程rpc调用
https://huahua132.github.io/2023/07/28/skynet_fly_ss/cluster_rpc/
作者
huahua132
发布于
2023年7月28日
许可协议