协程-无栈协程(下)

ronald9个月前职场2590

无栈协程库——protothread

    ProtoThread源码如下所示:

#define LC_INIT(s) s = 0;

#define LC_RESUME(s) switch(s) { case 0:

#define LC_SET(s) s = __LINE__; case __LINE__:

#define LC_END(s) }

typedef unsigned short lc_t;
//用于定义一个描述protothread实例的结构体,每一个无栈协程用这个结构体进行描述
struct pt {
  lc_t lc;
};

/**
 初始化一个protothread实例,无栈协程实例,核心就是将指令标签设置为0
 */
#define PT_INIT(pt)   LC_INIT((pt)->lc)

/**
 * 这里用于定义一个protothread实例的接口,name_args是一个包含函数名和形参列表的字符串
 * 且这个接口的返回值得是char型
 */
#define PT_THREAD(name_args) char name_args

/**
 * 用于定义一个protothread的起始执行位置,其实就是在prototype前面套了一个switch
 */
#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; LC_RESUME((pt)->lc)

/**
 * 用于界定protothread的终止位置,就是在后面加了一个},并对结构体进行初始化
 */
#define PT_END(pt) LC_END((pt)->lc); PT_YIELD_FLAG = 0; \
                   PT_INIT(pt); return PT_ENDED; }

/**
 *阻塞直到条件为true,实际应用中返回PT_WAITING表示当前进程阻塞让出执行权,其他表示未被阻塞继续执行
 */
#define PT_WAIT_UNTIL(pt, condition)	        \
  do {						\
    LC_SET((pt)->lc);				\
    if(!(condition)) {				\
      return PT_WAITING;			\
    }						\
  } while(0)

/**
 * 调度一个prototype协程,当返回PT_WAITING,表示调度器阻塞了,让出执行权限给里面的进程
 */
#define PT_SCHEDULE(f) ((f) == PT_WAITING)

/**
 * 让出执行权限,本质上就是在让出位置打一个标签,并直接return,把执行权限交给主调接口
 */
#define PT_YIELD(pt)				\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if(PT_YIELD_FLAG == 0) {			\
      return PT_YIELDED;			\
    }						\
  } while(0)

/**
 */
#define PT_YIELD_UNTIL(pt, cond)		\
  do {						\
    PT_YIELD_FLAG = 0;				\
    LC_SET((pt)->lc);				\
    if((PT_YIELD_FLAG == 0) || !(cond)) {	\
      return PT_YIELDED;			\
    }						\
  } while(0)

    如上述代码段所示:

    ·protothread使用结构体struct pt描述一个协程,协程里面含有lc_t类型成员变量,本质上是一个unsigned short类型

    ·整个PT协程,在创建之前需要调用PT_INIT进行初始化,初始化之后调用PT_BEGIN拉起协程,协程运行完毕之后调用PT_END关闭协程

    ·ProtoThread通过PT_THREAD封装协程执行接口

    ·ProtoThread调用PT_WAIT_UNTIL阻塞,直到condition为true。在这里若是condition为false,表示不满足条件,直接通过return交出执行权限;在交出执行权限之前,调用LC_SET,查看LC_SET的代码,看到这里我们看PT是通过记录行号给源码打标签

    ·ProtoThread通过宏PT_SCHEDULE来实现协程的调度,通常调用PT_SCHEDULE的是主控协程,主控协程决策调度哪个协程之后通过PT_SCHEDULE进行调度

    我们尝试用ProtoThread写一个多玩家登陆的代码,如下:

#include "pt.h"
struct MessageBuffer {
    int change_flag;
    string content;
} g_message_buffer;

typedef struct RoleData {
    int id;
    int step;
    string name;   
    pt  thread_inst_pt;
    int recv_message;
} tagRoleData;

std::map<std::string,RoleData> g_role_set;

void Timer() {
    printf("timer work\n");
    return;
}

MessageBuffer recv_message() {
    MessageBuffer msg = g_message_buffer;
    reset_message();
    return msg;
}

int receive_message(tagRoleData& data) {
    if(data.recv_message > 0) {
        data.recv_message = 0;
        return 1;
    }
    return 0;
}

int process_online_data(tagRoleData& data){
    printf("process online  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

int process_profile_data(tagRoleData& data){
    printf("process profile  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

static PT_THREAD(login_thread(tagRoleData& data)) {
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, receive_message(data));
        process_profile_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}

fd_set fds;
struct timeval tv;
static char c[100] = {0};
static PT_THREAD(network_thread(struct pt *pt))
{
    FD_ZERO(&fds);
    FD_SET(0,&fds);
    PT_BEGIN(pt);
    while(1) {
        PT_WAIT_UNTIL(pt,select(1,&fds,NULL,NULL,&tv) > 0);
        read(0,c,100);
        g_message_buffer.content = string(c);
        g_message_buffer.change_flag = 1;
        if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
            RoleData role_data;
            role_data.step = 0;
            role_data.name = g_message_buffer.content;
            PT_INIT(&role_data.thread_inst_pt);
            g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
        }
        std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
        role_iter->second.recv_message = 1;
        PT_SCHEDULE(login_thread(role_iter->second));
    }
    PT_EXIT(pt);
    PT_END(pt);
}

static struct timer codelock_timer, input_timer;
static PT_THREAD(timer_thread(struct pt *pt))
{
    PT_BEGIN(pt);
    timer_set(&input_timer, 1000);
    PT_WAIT_UNTIL(pt, timer_expired(&input_timer));
    PT_EXIT(pt);
    PT_END(pt);
}

static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
    PT_INIT(&network_thread_pt);
    while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
        PT_SCHEDULE(timer_thread(&timer_thread_pt));
        sleep(1);
    } 
}

int main() {
    Proc();
    return 0;
}

    这其中:

    ·代码中定义了三个执行单元,一个是network_thread网络协程,一个是timer_thread定时协程,一个是login_thread登录协程;

    ·其中timer_thread协程负责定时器任务,network_thread负责消息接收并根据消息头拉起对应的登录协程login_thread,而login_thread对应不同的登录实体的登录行为;

    ·network_thread协程,PT_WAIT_UNTIL会“阻塞”直到文件句柄直到可读(这里我们用标准输入进行替代以便于验证);

    ·当读到消息之后,对于未开启流程的玩家创建一个协程,其他的则调度对应的协程(PT_SCHEDULE(login_thread(role_iter->second)))继续往后走;

    ·对于登录协程。需要多步通信过程,一个是需要等待取在线数据并处理(process_online_data),一个是需要取角色数据并处理(process_profile_data);

    ·在本例中,我们在RoleData中封装了pt类型的成员变量thread_inst_pt用于缓存协程的状态信息,而外层用name->RoleData的映射关系管理协程及其他协程中间态数据;

    需要注意的是——以protothread来说:

    ·对于无栈协程来说,因为不存在指针等信息,所以无栈协程的所有信息是可以缓存在共享内存的,因此进程可以通过共享内存在重启的环境下,也不会导致协程中断;

    ·但是这种恢复也是有条件的,在protothread中是用行号进行协程恢复,若是用到协程的源文件的行号出现改变,则可能执行错乱,如下所示,假设中断前宏扩展后执行序列如下:

switch(Line){
    case 0:{
        state1-1;
        s=Line1-2;
    }
    case Line1-2:{
        if(!cond){
            return;
        }
        state1-3;
        s=Line1-3
    }
    case Line1-3:{
        if(!cond){
            return;
        }
        state1-4;
    }
}

    当源码修改之后,可能宏扩展之后代码就变为:

switch(Line){
    case 0:{
        state2-1;
        s=Line2-2;
    }
    case Line2-2:{
        if(!cond){
            return;
        }
        state2-3;
        s=Line2-3
    }
    case Line2-3:{
        if(!cond){
            return;
        }
        state2-4;
    }
}

    当Line1-xx和Line2-xx不相等的时候,会重新调度进来就会找不到行号了,引发执行流程错乱(所以在使用这类库的时候,应该将函数的实现和协程主流程分开,以避免因为逻辑修改导致协程不可恢复的场景);

    对于无栈协程来说,执行流的恢复只是通过找到下一条指令的执行地址,但是不包括上下文,这意味着无栈协程里面不能有局部变量,需要我们手动把后面需要用到的局部变量缓存起来。

    此外这里无栈协程是通过switch-case实现的,嵌套的switch-case会产生问题,限制比较多,所以也不适用于线上场景。

Label As Value

    标签变量(labels as values)是GCC对C语言的扩展,是指我们可以通过操作符&&得到当前函数中定义的标签地址,这个值的类型是void*,并且是常量,我们可以在任何可以使用这个类型的常量处使用;如下:

#include "stdio.h"
void* ptr = NULL;
int Test()
{
    printf("local:%d,global:%d:global2:%d\n",&&test_local,&&test_global,&&test_global2);
    if(NULL == ptr) {
         printf("here\n");
         ptr = &&test_local;
    }
    goto *ptr;
    test_local:
        ptr = &&test_global;
        printf("local test %d\n",ptr);
        return 0;
    test_global:
        ptr = &&test_global2;
        printf("global test\n");
        return 0;
    test_global2:
        ptr = &&test_local;
        printf("global2 test\n");
        return 0;
    return 0;
}
int main()
{
    Test();
    Test();
    Test();
    return 0;
}

    执行完毕后有如下执行结果:

[ronaldwang@VM-98-253-centos ~/test/label_value_test]$ ./main 
local:4196026,global:4196069:global2:4196097
here
local test 4196069
local:4196026,global:4196069:global2:4196097
global test
local:4196026,global:4196069:global2:4196097
global2 test

    受此启发,我们对protothread进行修改,可以得到如下代码:

typedef void * lc_t;

#define LC_CONCAT2(s1, s2) s1##s2
#define LC_CONCAT(s1, s2) LC_CONCAT2(s1, s2)

#define LC_RESUME(s)                            \
  do {                                          \
    if(s != NULL) {                             \
      goto *s;                                  \
    }                                           \
  } while(0)

#define LC_SET(s,label)                         \
  do {                                          \
    LC_CONCAT(label, __LINE__):                 \
    (s) = &&LC_CONCAT(label, __LINE__); \
  } while(0)
//block until 
#define PT_WAIT_UNTIL(pt, label, condition)             \
  do {                                          \
    LC_SET((pt)->lc, label);                            \
    if(!(condition)) {                          \
      return PT_WAITING;                        \
    }                                           \
  } while(0)

    对于库文件的改造:

    ·阻塞命令PT_WAIT_UNTIL新增标签字段label,当阻塞时,我们不仅指明解除阻塞所需满足的条件,也指明解除阻塞后要执行的代码段

    ·调度的指令LC_RESUME,则是根据标签的地址直接跳转的对应代码去执行goto *s

    则最终代码的使用样式如下:

static
PT_THREAD(login_thread(tagRoleData& data))
{
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label, receive_message(data));
    online_label:    process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt,profile_label, receive_message(data));
    profile_label:    process_profile_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label2, receive_message(data));
    online_label2:    process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, online_label3, receive_message(data));
    online_label3:    process_online_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}

    从这段代码可以看到:

    ·每段接口执行完毕后,都阻塞等待对应条件满足,并指明了阻塞解除后要执行的代码通过标签的形式展示出来。

问题

    上述采取标签的形式还是解决不了重启后协程恢复的问题,因为标签在内存中的位置会在重新编译的时候地址出现变化,我们遵循标签的修改方式对原先的基于行号的代码进行修改,如下:

#define LC_SET(s, evt_id) s = evt_id; case evt_id:
#define PT_WAIT_UNTIL(pt, evt_id, condition)            \
  do {                                          \
    LC_SET((pt)->lc, evt_id);                           \
    if(!(condition)) {                          \
      return PT_WAITING;                        \
    }                                           \
  } while(0)

    业务方可以通过如下形式进行使用:

using namespace std;

struct MessageBuffer {
    int change_flag;
    string content;
} g_message_buffer;

typedef struct RoleData {
    int id;
    int step;
    string name;   
    pt  thread_inst_pt;
    int recv_message;
} tagRoleData;

std::map<std::string,RoleData> g_role_set;

int receive_message(tagRoleData& data) {
    if(data.recv_message > 0) {
        data.recv_message = 0;
        return 1;
    }
    return 0;
}

int process_online_data(tagRoleData& data){
    printf("process online  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

int process_profile_data(tagRoleData& data){
    printf("process profile  name[%s] current step[%d]\n",data.name.c_str(),data.step);
    data.step += 1;
    return 0;
}

#define MSG_ONLINE_RSP 1
#define MSG_PROFILE_RSP 2
#define MSG_ONLINE_RSP_2 3
#define MSG_ONLINE_RSP_3 4
static PT_THREAD(login_thread(tagRoleData& data))
{
    PT_BEGIN(&data.thread_inst_pt);
    while(data.step < 4) {
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_PROFILE_RSP, receive_message(data));
        process_profile_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_2, receive_message(data));
        process_online_data(data);
        PT_WAIT_UNTIL(&data.thread_inst_pt, MSG_ONLINE_RSP_3, receive_message(data));
        process_online_data(data);
    }
    PT_EXIT(&data.thread_inst_pt);
    PT_END(&data.thread_inst_pt);
}


#define NETWORK_EVTID 200
fd_set fds;
struct timeval tv;
static PT_THREAD(network_thread(struct pt *pt))
{
    FD_ZERO(&fds);
    FD_SET(0,&fds);

    PT_BEGIN(pt);
    tv.tv_sec=0;
    tv.tv_usec=0;
    while(1) {
        PT_WAIT_UNTIL(pt, NETWORK_EVTID, select(1,&fds,NULL,NULL,&tv) > 0);
        read(0,d,100);
        memcpy(c,d,strlen(d)-1);
        g_message_buffer.content = string(c);
        g_message_buffer.change_flag = 1;
        if (g_role_set.find(g_message_buffer.content) == g_role_set.end()){
            RoleData role_data;
            role_data.step = 0;
            role_data.name = g_message_buffer.content;
            PT_INIT(&role_data.thread_inst_pt);
            g_role_set.insert(std::pair<std::string,RoleData>(g_message_buffer.content,role_data));
        }
        std::map<std::string,RoleData>::iterator role_iter = g_role_set.find(g_message_buffer.content);
        role_iter->second.recv_message = 1;
        PT_SCHEDULE(login_thread(role_iter->second));
    }
    PT_EXIT(pt);
    PT_END(pt);
}

#define TIMER_EVTID  100 
static PT_THREAD(timer_thread(struct pt *pt))
{
    PT_BEGIN(pt);
    timer_set(&input_timer, 1000);
    PT_WAIT_UNTIL(pt, TIMER_EVTID, timer_expired(&input_timer));
    PT_EXIT(pt);
    PT_END(pt);
}

static struct pt network_thread_pt;
static struct pt timer_thread_pt;
void Proc() {
    PT_INIT(&network_thread_pt);
    while(PT_SCHEDULE(network_thread(&network_thread_pt))) {
        PT_SCHEDULE(timer_thread(&timer_thread_pt));
        sleep(1);
    } 
}

int main() {
    Proc();
    return 0;
}

    由此可见:

    ·我们可以在协程让出执行权限的时候,指明要等待的事件,如PT_WAIT_UNTIL(pt, evt_id, condition)所示

    ·其他的如之前所示,在阻塞分支之前会按照等待的事件ID,新增一个case分支

    ·因为标签是我们自定义的,不会因为程序的重新编译发生变化,所以重启不会影响协程的恢复和执行

参考资料

        函数调用过程

        ucontext manual pages

        swapcontext() — Save and restore user context

        云风协程库源码

        编程沉思录——云风协程库源码分析

        编程沉思录——libco源码分析

        libco源码地址

        libco性能对比

        达夫设备

        Label As Values标签变量

        ucontext族函数的使用及原理分析

        FSTENV

        Intel x86 MXCSR Register

        SSE-维基百科

        libco源码注释版



相关文章

Lua和so

Lua和so

    实际在应用开发过程中,会用到很多第三方的库;Lua由于其易嵌入的特性,不仅可以使用Lua编写的库,也可以将C++的库进行二次封装供Lua调用。这里我们以实现在Lua中解析xml文件格式场景,结合C++编写的“tinyxml2” 这个库为例进行讲解:库的准备及编译第一步,下载“tinyxml2”的源码下载链接:leethomason/tinyxml2:...

Lua的Upvalue和闭包(二)

Lua的Upvalue和闭包(二)

Lua闭包和Upvalue的实现    前面文章介绍了Lua闭包和upvalue的概念,本文简单过一下Lua对于闭包和upvalue的实现以加深理解。Lua闭包结构    Lua在内存的结构如下所示:#define ClosureHeader \ CommonHeader; lu_by...

K8S入门-概念篇(上)

K8S入门-概念篇(上)

    认识到K8S的产生背景之后,我们开始进一步了解K8S,基于对K8S里面一些概念的了解之后,我们再去探讨K8S的一些原理:一. node    如前所述,K8S是一个容器编排平台,即容器的自动部署、扩展和管理;其最终的落点是把容器调度到一个运行他的节点上,在K8S中这个运行容器的节点就是node,但是需要注意的是...

几种Lua和C交叉编程的程序写法

Lua程序调用C接口//另一个待Lua调用的C注册函数。 static int sub2(lua_State* L) {     double op1 = luaL_checknumber(L,1);     double op2 ...

协程-无栈协程(上)

协程-无栈协程(上)

无栈协程    有栈协程是基于函数切换上下文恢复的思路实现被中断协程的继续执行,但是这个上下文里面有返回地址,即下一条指令的地址,所以当程序发生改动重新编译生成,指令地址有可能发生改变,这种对于需要重新编译生成发布的发布场景支持并不友好,会因为程序指令地址的变化导致协程执行流的错乱。这时另外一种不基于上下文恢复的协程机制提供了一种新的思路。达夫设备 ...

Lua的垃圾回收(上)

Lua 5.3版本的垃圾回收垃圾回收算法    垃圾回收算法一般分为两类:引用计数法和标记扫描法。引用计数法    所谓引用计数法,是指在为对象申请内存的时候,在分配的内存块预留一块区域用于存放这块内存被引用的次数,当被引用的时候增一,解引用的时候减一,当这块内存的引用次数降到0的时候,这块内存被认为不可访问,直接被回...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。