actor模型在sknyet中的应用

简介

Actor模型由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成:

状态

Actor模型中的状态指Actor对象的变量信息,状态由Actor模型自己管理,避免了并发环境下的锁和内存原子性等问题。

行为

Actor模型中的计算逻辑,通过Actor模型接收到的消息来改变Actor模型的状态。

邮箱

邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO(先入先出)消息队列来存储发送方Actor的消息,接收方Actor再从邮箱队列中获取消息。

特点

隔离性

每个actor实例的状态只能通过自己的行为修改。

原子性

单个actor实例同时只会有一个线程在执行,类似redis只用单线程去读写内存数据,避免了多线程下的并发问题,actor修改内部状态不需要加锁。

异步消息处理

actor发消息并不是直接调用接口,而是类似邮件在对方邮箱放一个邮件,之后actor实例会按照先入先出的规则去处理邮件。

生命周期

actor实例可以在运行中动态创建和释放。

容错

actor实例的断言不会导致整个程序崩溃。

模型抽象

每个actor模型可以对自己的状态,行为进入定义。

skynet对actor的应用

skynet通过抽象skynet_context结构体实现对actor模型的应用。

skynet_context

贴一段skynet_context结构部分定义。

1
2
3
4
5
6
7
8
struct skynet_context { 
void * instance; //实例数据
struct skynet_module * mod; //模型
void * cb_ud; //回调函数实例数据
skynet_cb cb; //消息回调函数
struct message_queue *queue;//消息队列
uint32_t handle; //实例id
};

状态

skynet_context中的instancecb_ud指向的是实例数据,通常cb_ud指向的就是instance指向的数据,或者是instance中的成员数据,可以认为instancecb_ud就是对actor模型的状态定义。

行为

woker线程把消息传入cb回调函数处理,跟着传入的还有cb_ud

邮箱

message_queue消息队列,存放待处理消息。

总结

skynet通过actor模型抽象skynet服务,赋予了服务之间强隔离性和原子性,避免了频繁加锁,使得在开发中能够高效简单实现业务,同时还能高效利用多核。

延伸问题

actor模块是如何定义的?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void *
get_api(struct skynet_module *mod, const char *api_name) {
size_t name_size = strlen(mod->name);
size_t api_size = strlen(api_name);
char tmp[name_size + api_size + 1];
memcpy(tmp, mod->name, name_size);
memcpy(tmp+name_size, api_name, api_size+1);
char *ptr = strrchr(tmp, '.');
if (ptr == NULL) {
ptr = tmp;
} else {
ptr = ptr + 1;
}
return dlsym(mod->module, ptr);
}

static int
open_sym(struct skynet_module *mod) {
mod->create = get_api(mod, "_create");
mod->init = get_api(mod, "_init");
mod->release = get_api(mod, "_release");
mod->signal = get_api(mod, "_signal");

return mod->init == NULL;
}

skynet c服务模块约定需要实现create init release signal四个API接口。
create负责创建实例。
init负责初始化,一般是绑定cb cb_ud,返回0表示初始成功,其他表示失败。
release释放实例时调用。
signal通过API直接调用,接口实现需要保证线程安全。

actor实例(状态)的生命周期?

skynet_context实例的由skynet_context_new函数创建,该函数先后调用了create,init
skynet_context_release释放,调用skynet_context_release并不是立马释放ctx,而是原子操作ctx.ref减一以后为0才释放,每个操作ctx的api都需要先给ctx引用计数+1,操作完以后调用skynet_context_release释放引用计数-1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
//从通过handle_id 拿ctx实例
struct skynet_context *
skynet_handle_grab(uint32_t handle) {
struct handle_storage *s = H;
struct skynet_context * result = NULL;

rwlock_rlock(&s->lock);

uint32_t hash = handle & (s->slot_size-1);
struct skynet_context * ctx = s->slot[hash];
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result);
}

rwlock_runlock(&s->lock);

return result;
}
//加引用计算
void
skynet_context_grab(struct skynet_context *ctx) {
ATOM_INC(&ctx->ref);//原子操作+1
}
//减引用计数
struct skynet_context *
skynet_context_release(struct skynet_context *ctx) {
if (ATOM_DEC(&ctx->ref) == 0) {//原子操作-1
delete_context(ctx);//释放ctx
return NULL;
}
return ctx;
}
//在ctx实例中插入一条消息
int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle);//加引用
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message);
skynet_context_release(ctx);//操作完减引用

return 0;
}

为什么要用引用计数的方式管理ctx实例的生命周期?

skynet是多线程的框架,试想一下如下场景:2个线程同时操作ctx实例。A线程释放,B线程想给ctx插入一条消息。就会出现非常严重的问题访问野指针,也就是指针指向的内存已经释放了。

actor队列(邮箱)消息的投递与消费?

消息通过skynet_context_pushAPI投递。
由ctx在init中绑定的cb函数处理。
这里以skynet自带的logger服务为例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
	//skyent_error.c 部分代码,传递消息
struct skynet_message smsg;
if (context == NULL) {
smsg.source = 0;
} else {
smsg.source = skynet_context_handle(context);
}
smsg.session = 0;
smsg.data = data;
smsg.sz = len | ((size_t)PTYPE_TEXT << MESSAGE_TYPE_SHIFT);
skynet_context_push(logger, &smsg);

//service_logger.c 部分代码

//消息回调处理函数
static int
logger_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
struct logger * inst = ud;
switch (type) {
case PTYPE_SYSTEM:
if (inst->filename) {
inst->handle = freopen(inst->filename, "a", inst->handle);
}
break;
case PTYPE_TEXT:
if (inst->filename) {
char tmp[SIZETIMEFMT];
int csec = timestring(ud, tmp);
fprintf(inst->handle, "%s.%02d ", tmp, csec);
}
fprintf(inst->handle, "[:%08x] ", source);
fwrite(msg, sz , 1, inst->handle);
fprintf(inst->handle, "\n");
fflush(inst->handle);
break;
}

return 0;
}

int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
const char * r = skynet_command(ctx, "STARTTIME", NULL);
inst->starttime = strtoul(r, NULL, 10);
if (parm) {
inst->handle = fopen(parm,"a");
if (inst->handle == NULL) {
return 1;
}
inst->filename = skynet_malloc(strlen(parm)+1);
strcpy(inst->filename, parm);
inst->close = 1;
} else {
inst->handle = stdout;
}
if (inst->handle) {
skynet_callback(ctx, inst, logger_cb); //注册消息回调函数
return 0;
}
return 1;
}

init绑定logger_cb函数,之后如果有通过skynet_context_push投递消息给ctx实例,将异步调用logger_cb函数。

如何新建一个actor模块并且处理消息(行为)?

我以新建一个dber模块为例子。
这个服务非常简单,用于存储最新的字符串数据(状态),提供set get方法(行为)。
我们需要先定义状态数据。

1
2
3
struct dber {
char * value;
};

然后实现 _create _release _init _signal 接口。
其中 _signal接口暂时用不到,可以不实现。
编写模块对模块文件名接口前缀名有强约束性。
比如我们dber模块的文件名只能写成:service_dber.c,文件名格式是service_拼接模块名称dber
接口函数名就是模块名称拼接接口名称,比如_create就要写成dber_create
这样做的目的是为了方便编译、加载模块动态库、接口函数查找绑定、actor模块启动。

1
2
3
4
5
define CSERVICE_TEMP
$$(CSERVICE_PATH)/$(1).so : service-src/service_$(1).c | $$(CSERVICE_PATH)
$$(CC) $$(CFLAGS) $$(SHARED) $$< -o $$@ -Iskynet/skynet-src
endef
$(foreach v, $(CSERVICE), $(eval $(call CSERVICE_TEMP,$(v))))

编译service,通过文件名的约束,可以做到统一编译,只需要增加CSERVICE模块名称

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static void *
get_api(struct skynet_module *mod, const char *api_name) {
size_t name_size = strlen(mod->name);
size_t api_size = strlen(api_name);
char tmp[name_size + api_size + 1];
memcpy(tmp, mod->name, name_size);
memcpy(tmp+name_size, api_name, api_size+1);
char *ptr = strrchr(tmp, '.');
if (ptr == NULL) {
ptr = tmp;
} else {
ptr = ptr + 1;
}
return dlsym(mod->module, ptr);
}

static int
open_sym(struct skynet_module *mod) {
mod->create = get_api(mod, "_create");
mod->init = get_api(mod, "_init");
mod->release = get_api(mod, "_release");
mod->signal = get_api(mod, "_signal");

return mod->init == NULL;
}

加载接口动态库,通过模块名称拼接接口名称查找API并绑定到skynet_module结构实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
static struct skynet_module * 
_query(const char * name) {
int i;
for (i=0;i<M->count;i++) {
if (strcmp(M->m[i].name,name)==0) {
return &M->m[i];
}
}
return NULL;
}

struct skynet_module *
skynet_module_query(const char * name) {
struct skynet_module * result = _query(name);
if (result)
return result;

SPIN_LOCK(M)

result = _query(name); // double check

if (result == NULL && M->count < MAX_MODULE_TYPE) {
int index = M->count;
void * dl = _try_open(M,name);
if (dl) {
M->m[index].name = name;
M->m[index].module = dl;

if (open_sym(&M->m[index]) == 0) {
M->m[index].name = skynet_strdup(name);
M->count ++;
result = &M->m[index];
}
}
}

SPIN_UNLOCK(M)

return result;
}

我们在启动一个该模块actor需要通过模块名称查找对应模块的API接口。
明白了为何要做这种名称约束之后,我们再来实现相关接口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include "skynet.h"
#include "skynet_mq.h"
#include "skynet_server.h"

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <time.h>

struct dber {
char * value;
};

struct dber *
dber_create(void) {
struct dber * inst = skynet_malloc(sizeof(*inst));
inst->value = NULL;

return inst;
}

void
dber_release(struct dber * inst) {
if (NULL != inst->value) {
skynet_free(inst->value);
}
skynet_free(inst);
}

static int
dber_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
struct dber * inst = ud;
switch (type) {
case PTYPE_SYSTEM:
//set
if (NULL != inst->value) {
skynet_free(inst->value);
}
inst->value = skynet_malloc(strlen(msg) + 1);
strcpy(inst->value,msg);
break;
case PTYPE_TEXT:
//get
{
struct skynet_message smsg;
smsg.source = skynet_context_handle(context);
smsg.session = 0;
size_t len = strlen(inst->value) + 1;
smsg.data = skynet_malloc(len);
strcpy(smsg.data,inst->value);
smsg.sz = len;
skynet_context_push(source,&smsg);
break;
}
}

return 0;
}

int
dber_init(struct dber * inst, struct skynet_context *ctx, const char * parm) {
skynet_callback(ctx, inst, dber_cb);
return 0;
}

我们在create中创建数据(状态)
init中注册回调函数dber_cb;
release中去释放内存占用。
dber_cb中我简单的约束PTYPE_SYSTEM消息类型为set方法(行为)
PTYPE_TEXT消息类型为get方法(行为)

到此我们把这个服务模块写好了。
我们把它放在skynet/service-src目录下。

1
CSERVICE = snlua logger gate harbor dber

在makefile中CSERVICE下添加dber
之后编译一下skynet,如果编译成功,就会在skynet/cservice目录下看见dber.so动态库文件。

之后我们编写一个非常简单的skynet lua服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
local skynet = require "skynet"
require "skynet.manager"

skynet.register_protocol({
name = "system",
id = skynet.PTYPE_SYSTEM,
pack = skynet.packstring,
unpack = function(...) return ... end,
dispatch = function(...)
skynet.error("recv system msg:",...)
end,
})

skynet.register_protocol({
name = "text",
id = skynet.PTYPE_TEXT,
pack = function(...) return ... end,
unpack = skynet.tostring,
dispatch = function(...)
skynet.error("recv text msg:",...)
end,
})

skynet.start(function()
skynet.error("dber_test main run")
local dber = skynet.launch("dber")
skynet.error("handle:",dber)
skynet.error(skynet.send(dber,skynet.PTYPE_SYSTEM,"hello dber"))
skynet.error(skynet.send(dber,skynet.PTYPE_TEXT,""))
end)

先注册system,text消息处理函数。
在start中我们launch dber模块服务。
然后给dber服务发送PTYPE_SYSTEM消息传递设置 hello dber字符串。
再发送PTYPE_TEXT消息获取字符串。
dber服务接收到以后,会给PTYPE_TEXT消息发送者发送PTYPE_TEXT消息。

至此我们完成对skynet actor c模块服务的创建使用😊


actor模型在sknyet中的应用
https://huahua132.github.io/2023/05/07/skynet_frame/actor/
作者
huahua132
发布于
2023年5月7日
许可协议