简介
Actor模型由状态(State)、行为(Behavior)和邮箱(MailBox,可以认为是一个消息队列)三部分组成:
状态
Actor模型中的状态指Actor对象的变量信息,状态由Actor模型自己管理,避免了并发环境下的锁和内存原子性等问题。
行为
Actor模型中的计算逻辑,通过Actor模型接收到的消息来改变Actor模型的状态。
邮箱
邮箱是Actor和Actor之间的通信桥梁,邮箱内部通过FIFO(先入先出)消息队列来存储发送方Actor的消息,接收方Actor再从邮箱队列中获取消息。
特点
隔离性
每个actor实例的状态只能通过自己的行为修改。
原子性
单个actor实例同时只会有一个线程在执行,类似redis只用单线程去读写内存数据,避免了多线程下的并发问题,actor修改内部状态不需要加锁。
异步消息处理
actor发消息并不是直接调用接口,而是类似邮件在对方邮箱放一个邮件,之后actor实例会按照先入先出的规则去处理邮件。
生命周期
actor实例可以在运行中动态创建和释放。
容错
actor实例的断言不会导致整个程序崩溃。
模型抽象
每个actor模型可以对自己的状态,行为进入定义。
skynet对actor的应用
skynet通过抽象skynet_context结构体实现对actor模型的应用。
skynet_context
贴一段skynet_context结构部分定义。
1 2 3 4 5 6 7 8
| struct skynet_context { void * instance; struct skynet_module * mod; void * cb_ud; skynet_cb cb; struct message_queue *queue; uint32_t handle; };
|
状态
skynet_context中的instance和cb_ud指向的是实例数据,通常cb_ud指向的就是instance指向的数据,或者是instance中的成员数据,可以认为instance和cb_ud就是对actor模型的状态定义。
行为
woker线程把消息传入cb回调函数处理,跟着传入的还有cb_ud。
邮箱
message_queue消息队列,存放待处理消息。
总结
skynet通过actor模型抽象skynet服务,赋予了服务之间强隔离性和原子性,避免了频繁加锁,使得在开发中能够高效简单实现业务,同时还能高效利用多核。
延伸问题
actor模块是如何定义的?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static void * get_api(struct skynet_module *mod, const char *api_name) { size_t name_size = strlen(mod->name); size_t api_size = strlen(api_name); char tmp[name_size + api_size + 1]; memcpy(tmp, mod->name, name_size); memcpy(tmp+name_size, api_name, api_size+1); char *ptr = strrchr(tmp, '.'); if (ptr == NULL) { ptr = tmp; } else { ptr = ptr + 1; } return dlsym(mod->module, ptr); }
static int open_sym(struct skynet_module *mod) { mod->create = get_api(mod, "_create"); mod->init = get_api(mod, "_init"); mod->release = get_api(mod, "_release"); mod->signal = get_api(mod, "_signal");
return mod->init == NULL; }
|
skynet c服务模块约定需要实现create
init
release
signal
四个API接口。
create
负责创建实例。
init
负责初始化,一般是绑定cb
cb_ud
,返回0表示初始成功,其他表示失败。
release
释放实例时调用。
signal
通过API直接调用,接口实现需要保证线程安全。
actor实例(状态)的生命周期?
skynet_context实例的由skynet_context_new
函数创建,该函数先后调用了create
,init
。
由skynet_context_release
释放,调用skynet_context_release并不是立马释放ctx,而是原子操作ctx.ref
减一以后为0才释放,每个操作ctx的api都需要先给ctx引用计数+1,操作完以后调用skynet_context_release释放引用计数-1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| struct skynet_context * skynet_handle_grab(uint32_t handle) { struct handle_storage *s = H; struct skynet_context * result = NULL;
rwlock_rlock(&s->lock);
uint32_t hash = handle & (s->slot_size-1); struct skynet_context * ctx = s->slot[hash]; if (ctx && skynet_context_handle(ctx) == handle) { result = ctx; skynet_context_grab(result); }
rwlock_runlock(&s->lock);
return result; }
void skynet_context_grab(struct skynet_context *ctx) { ATOM_INC(&ctx->ref); }
struct skynet_context * skynet_context_release(struct skynet_context *ctx) { if (ATOM_DEC(&ctx->ref) == 0) { delete_context(ctx); return NULL; } return ctx; }
int skynet_context_push(uint32_t handle, struct skynet_message *message) { struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { return -1; } skynet_mq_push(ctx->queue, message); skynet_context_release(ctx);
return 0; }
|
为什么要用引用计数的方式管理ctx实例的生命周期?
skynet是多线程的框架,试想一下如下场景:2个线程同时操作ctx实例。A线程释放,B线程想给ctx插入一条消息。就会出现非常严重的问题访问野指针,也就是指针指向的内存已经释放了。
actor队列(邮箱)消息的投递与消费?
消息通过skynet_context_push
API投递。
由ctx在init中绑定的cb函数处理。
这里以skynet自带的logger服务为例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| struct skynet_message smsg; if (context == NULL) { smsg.source = 0; } else { smsg.source = skynet_context_handle(context); } smsg.session = 0; smsg.data = data; smsg.sz = len | ((size_t)PTYPE_TEXT << MESSAGE_TYPE_SHIFT); skynet_context_push(logger, &smsg);
static int logger_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) { struct logger * inst = ud; switch (type) { case PTYPE_SYSTEM: if (inst->filename) { inst->handle = freopen(inst->filename, "a", inst->handle); } break; case PTYPE_TEXT: if (inst->filename) { char tmp[SIZETIMEFMT]; int csec = timestring(ud, tmp); fprintf(inst->handle, "%s.%02d ", tmp, csec); } fprintf(inst->handle, "[:%08x] ", source); fwrite(msg, sz , 1, inst->handle); fprintf(inst->handle, "\n"); fflush(inst->handle); break; }
return 0; }
int logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) { const char * r = skynet_command(ctx, "STARTTIME", NULL); inst->starttime = strtoul(r, NULL, 10); if (parm) { inst->handle = fopen(parm,"a"); if (inst->handle == NULL) { return 1; } inst->filename = skynet_malloc(strlen(parm)+1); strcpy(inst->filename, parm); inst->close = 1; } else { inst->handle = stdout; } if (inst->handle) { skynet_callback(ctx, inst, logger_cb); return 0; } return 1; }
|
init
绑定logger_cb
函数,之后如果有通过skynet_context_push
投递消息给ctx实例,将异步调用logger_cb
函数。
如何新建一个actor模块并且处理消息(行为)?
我以新建一个dber模块为例子。
这个服务非常简单,用于存储最新的字符串数据(状态),提供set
get
方法(行为)。
我们需要先定义状态数据。
1 2 3
| struct dber { char * value; };
|
然后实现 _create
_release
_init
_signal
接口。
其中 _signal
接口暂时用不到,可以不实现。
编写模块对模块的文件名和接口前缀名有强约束性。
比如我们dber模块的文件名只能写成:service_dber.c
,文件名格式是service_
拼接模块名称dber
。
接口函数名就是模块名称拼接接口名称,比如_create
就要写成dber_create
。
这样做的目的是为了方便编译、加载模块动态库、接口函数查找绑定、actor模块启动。
1 2 3 4 5
| define CSERVICE_TEMP $$(CSERVICE_PATH)/$(1).so : service-src/service_$(1).c | $$(CSERVICE_PATH) $$(CC) $$(CFLAGS) $$(SHARED) $$< -o $$@ -Iskynet/skynet-src endef $(foreach v, $(CSERVICE), $(eval $(call CSERVICE_TEMP,$(v))))
|
编译service,通过文件名的约束,可以做到统一编译,只需要增加CSERVICE模块名称
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| static void * get_api(struct skynet_module *mod, const char *api_name) { size_t name_size = strlen(mod->name); size_t api_size = strlen(api_name); char tmp[name_size + api_size + 1]; memcpy(tmp, mod->name, name_size); memcpy(tmp+name_size, api_name, api_size+1); char *ptr = strrchr(tmp, '.'); if (ptr == NULL) { ptr = tmp; } else { ptr = ptr + 1; } return dlsym(mod->module, ptr); }
static int open_sym(struct skynet_module *mod) { mod->create = get_api(mod, "_create"); mod->init = get_api(mod, "_init"); mod->release = get_api(mod, "_release"); mod->signal = get_api(mod, "_signal");
return mod->init == NULL; }
|
加载接口动态库,通过模块名称拼接接口名称查找API并绑定到skynet_module
结构实例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| static struct skynet_module * _query(const char * name) { int i; for (i=0;i<M->count;i++) { if (strcmp(M->m[i].name,name)==0) { return &M->m[i]; } } return NULL; }
struct skynet_module * skynet_module_query(const char * name) { struct skynet_module * result = _query(name); if (result) return result;
SPIN_LOCK(M)
result = _query(name);
if (result == NULL && M->count < MAX_MODULE_TYPE) { int index = M->count; void * dl = _try_open(M,name); if (dl) { M->m[index].name = name; M->m[index].module = dl;
if (open_sym(&M->m[index]) == 0) { M->m[index].name = skynet_strdup(name); M->count ++; result = &M->m[index]; } } }
SPIN_UNLOCK(M)
return result; }
|
我们在启动一个该模块actor需要通过模块名称查找对应模块的API接口。
明白了为何要做这种名称约束之后,我们再来实现相关接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| #include "skynet.h" #include "skynet_mq.h" #include "skynet_server.h"
#include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <string.h> #include <time.h>
struct dber { char * value; };
struct dber * dber_create(void) { struct dber * inst = skynet_malloc(sizeof(*inst)); inst->value = NULL;
return inst; }
void dber_release(struct dber * inst) { if (NULL != inst->value) { skynet_free(inst->value); } skynet_free(inst); }
static int dber_cb(struct skynet_context * context, void *ud, int type, int session, uint32_t source, const void * msg, size_t sz) { struct dber * inst = ud; switch (type) { case PTYPE_SYSTEM: if (NULL != inst->value) { skynet_free(inst->value); } inst->value = skynet_malloc(strlen(msg) + 1); strcpy(inst->value,msg); break; case PTYPE_TEXT: { struct skynet_message smsg; smsg.source = skynet_context_handle(context); smsg.session = 0; size_t len = strlen(inst->value) + 1; smsg.data = skynet_malloc(len); strcpy(smsg.data,inst->value); smsg.sz = len; skynet_context_push(source,&smsg); break; } }
return 0; }
int dber_init(struct dber * inst, struct skynet_context *ctx, const char * parm) { skynet_callback(ctx, inst, dber_cb); return 0; }
|
我们在create
中创建数据(状态)
init
中注册回调函数dber_cb
;
release
中去释放内存占用。
dber_cb中我简单的约束PTYPE_SYSTEM
消息类型为set
方法(行为)
PTYPE_TEXT
消息类型为get
方法(行为)
到此我们把这个服务模块写好了。
我们把它放在skynet/service-src
目录下。
1
| CSERVICE = snlua logger gate harbor dber
|
在makefile中CSERVICE下添加dber
之后编译一下skynet,如果编译成功,就会在skynet/cservice
目录下看见dber.so动态库文件。
之后我们编写一个非常简单的skynet lua服务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| local skynet = require "skynet" require "skynet.manager"
skynet.register_protocol({ name = "system", id = skynet.PTYPE_SYSTEM, pack = skynet.packstring, unpack = function(...) return ... end, dispatch = function(...) skynet.error("recv system msg:",...) end, })
skynet.register_protocol({ name = "text", id = skynet.PTYPE_TEXT, pack = function(...) return ... end, unpack = skynet.tostring, dispatch = function(...) skynet.error("recv text msg:",...) end, })
skynet.start(function() skynet.error("dber_test main run") local dber = skynet.launch("dber") skynet.error("handle:",dber) skynet.error(skynet.send(dber,skynet.PTYPE_SYSTEM,"hello dber")) skynet.error(skynet.send(dber,skynet.PTYPE_TEXT,"")) end)
|
先注册system
,text
消息处理函数。
在start中我们launch dber
模块服务。
然后给dber服务发送PTYPE_SYSTEM
消息传递设置 hello dber
字符串。
再发送PTYPE_TEXT
消息获取字符串。
dber
服务接收到以后,会给PTYPE_TEXT
消息发送者发送PTYPE_TEXT
消息。
至此我们完成对skynet actor c模块服务的创建使用😊