协程-有栈协程(libco)
libco
还有一个广泛使用的协程库就是libco,libco是被由微信开发并大规模应用的协程库,自2013年起稳定运行于数万台微信后台机器上;具备以下特性:
高性能,号称可以调度千万级协程
在IO阻塞时,可以自动切换,利用hook技术+epoll事件循环实现阻塞逻辑IO化改造
支持嵌套创建
既支持共享栈模式也支持独立栈模式
提供超时管理
... ...
而libco基于性能优化考虑,没有使用ucontext,而是自己用汇编写了一套上下文切换的代码,在文件coctx_swap.S里面,他这里面只保存和恢复了寄存器内存和栈内容,相比于ucontext,少了浮点数上下文和sigmask,因为:
sigmask会引发一次syscall(需要从用户态进入到内核态并返回),性能上有损耗
取消浮点数上下文,是因为服务端编程几乎用不到浮点数计算
此外libco的上下文切换只支持i386和x86架构,因为后台服务器大多是x86架构的,总结来看,libco牺牲了通用性,把一些不常用的场景下上下文保存的操作去掉了,实现代码性能的提升,有人进行过性能比对,libco的性能是ucontext的3.6倍。
一个libco的示例:
struct stTask_t { int id; }; struct stEnv_t { stCoCond_t* cond; queue<stTask_t*> task_queue; }; void* Producer(void* args) { co_enable_hook_sys(); stEnv_t* env= (stEnv_t*)args; int id = 0; while (true) { stTask_t* task = (stTask_t*)calloc(1, sizeof(stTask_t)); task->id = id++; env->task_queue.push(task); co_cond_signal(env->cond); poll(NULL, 0, 1000); } return NULL; } void* Consumer(void* args) { co_enable_hook_sys(); stEnv_t* env = (stEnv_t*)args; while (true) { if (env->task_queue.empty()) { co_cond_timedwait(env->cond, -1); continue; } stTask_t* task = env->task_queue.front(); env->task_queue.pop(); free(task); } return NULL; } int main() { stEnv_t* env = new stEnv_t; env->cond = co_cond_alloc(); stCoRoutine_t* consumer_routine; co_create(&consumer_routine, NULL, Consumer, env); co_resume(consumer_routine); stCoRoutine_t* producer_routine; co_create(&producer_routine, NULL, Producer, env); co_resume(producer_routine); co_eventloop(co_get_epoll_ct(), NULL, NULL); return 0; }
从代码可以看到:
·这是一个生产者消费者示例,一个负责生产的producer_routine协程,负责把生产的数据放到env的task_queue中;和一个负责消费的consumer_routine负责从task_queue取数据
·libco使用结构体stCoRoutine_t描述一个协程,接口co_create负责创建协程,接口co_resume负责拉起协程
·co_eventloop表示进入协程的执行循环
结合示例涉及的接口看一下相关的数据结构及接口原理:
co_create
struct stCoRoutineAttr_t { int stack_size; stShareStack_t* share_stack; stCoRoutineAttr_t() { stack_size = 128 * 1024; share_stack = NULL; } }__attribute__ ((packed)); struct stCoRoutineEnv_t{ stCoRoutine_t *pCallStack[ 128 ]; int iCallStackSize; stCoEpoll_t *pEpoll; stCoRoutine_t* pending_co; stCoRoutine_t* occupy_co; }; struct stCoRoutine_t { stCoRoutineEnv_t *env; pfn_co_routine_t pfn; void *arg; coctx_t ctx; char cStart; char cEnd; char cIsMain; char cEnableSysHook; char cIsShareStack; void *pvEnv; stStackMem_t* stack_mem; //save satck buffer while confilct on same stack_buffer; char* stack_sp; unsigned int save_size; char* save_buffer; stCoSpec_t aSpec[1024]; }; struct coctx_t { #if defined(__i386__) void *regs[ 8 ]; #else void *regs[ 14 ]; #endif size_t ss_size; char *ss_sp; }; stCoRoutineEnv_t *co_get_curr_thread_env() { return gCoEnvPerThread; } void co_init_curr_thread_env() { gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) ); stCoRoutineEnv_t *env = gCoEnvPerThread; env->iCallStackSize = 0; struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL ); self->cIsMain = 1; env->pending_co = NULL; env->occupy_co = NULL; coctx_init( &self->ctx ); env->pCallStack[ env->iCallStackSize++ ] = self; stCoEpoll_t *ev = AllocEpoll(); SetEpoll( env,ev ); } int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg ) { if( !co_get_curr_thread_env() ) { co_init_curr_thread_env(); } stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg ); *ppco = co; return 0; } struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr, pfn_co_routine_t pfn,void *arg ) { stCoRoutineAttr_t at; if( attr ) { memcpy( &at,attr,sizeof(at) ); } if( at.stack_size <= 0 ) { at.stack_size = 128 * 1024; } else if( at.stack_size > 1024 * 1024 * 8 ) { at.stack_size = 1024 * 1024 * 8; } if( at.stack_size & 0xFFF ) { at.stack_size &= ~0xFFF; at.stack_size += 0x1000; } stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) ); memset( lp,0,(long)(sizeof(stCoRoutine_t))); lp->env = env; lp->pfn = pfn; lp->arg = arg; stStackMem_t* stack_mem = NULL; if( at.share_stack ) { ... ... } else { // 如果没有采用共享栈,则分配内存 stack_mem = co_alloc_stackmem(at.stack_size); } lp->stack_mem = stack_mem; // 设置该协程的context lp->ctx.ss_sp = stack_mem->stack_buffer; // 栈地址 lp->ctx.ss_size = at.stack_size; // 栈大小 lp->cStart = 0; lp->cEnd = 0; lp->cIsMain = 0; lp->cEnableSysHook = 0; // 默认不开启hook lp->cIsShareStack = at.share_stack != NULL; return lp; }
逐个来看:
(1)结构体stCoRoutineAttr_t,用于描述协程的属性信息,目前这个属性信息只包括共享栈的信息
(2)结构体stCoRoutineEnv_t,用于描述协程的运行环境,每个线程都有唯一一个该结构的变量用于对该线程下协程进行管理
·pCallStack协程的调用栈,最后一位是当前运行的协程,上一位是当前协程的父协程;可以看到libco最多只支持128层的协程嵌套
·pEpoll,libco主要用epoll作为协程调度器
(3)结构体stCoRoutine_t用于描述一个协程,其中:
·stCoRoutineEnv_t *env,指向协程所属的运行环境,可以理解为协程所属的协程管理器
·pfn_co_routine_t pfn/void *arg,协程对应的函数及参数
·coctx_t ctx,协程的上下文信息,包括寄存器和栈
·cStart/cEnd/cIsMain/cEnableSysHook/cIsShareStack:一系列标志变量
·stStackMem_t* stack_mem;,栈空间指针
(4)创建协程的接口co_create,其中
参数部分:
·ppco:输出变量,协程的地址
·attr:协程的属性信息,目前只有一个属性就是是否是共享栈
·pfn:协程的入口函数
·arg:协程的入口函数的参数
核心功能通过接口co_create_env完成
(5)co_create_env本质上就是创建一个stCoRoutine_t并初始化
co_resume
#if defined(__i386__) ... ... #elif defined(__x86_64__) int coctx_make( coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1 ) { char *sp = ctx->ss_sp + ctx->ss_size; sp = (char*) ((unsigned long)sp & -16LL ); memset(ctx->regs, 0, sizeof(ctx->regs)); ctx->regs[ kRSP ] = sp - 8; ctx->regs[ kRETAddr] = (char*)pfn; ctx->regs[ kRDI ] = (char*)s; ctx->regs[ kRSI ] = (char*)s1; return 0; } void co_resume( stCoRoutine_t *co ) { stCoRoutineEnv_t *env = co->env; // 找到当前运行的协程, 从数组最后一位拿出当前运行的协程,如果目前没有协程,那就是主线程 stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ]; if( !co->cStart ) { // 如果当前协程还没有开始运行,为其构建上下文 coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co, 0 ); co->cStart = 1; } // 将指定协程放入线程的协程队列末尾 env->pCallStack[ env->iCallStackSize++ ] = co; // 将当前运行的上下文保存到lpCurrRoutine中,同时将协程co的上下文替换进去 co_swap( lpCurrRoutine, co ); }
注意到:
(1)当协程被创建之后还没有运行的情况下,cStart标识为0,第一次调度之后,这个标志位为1,表示已经运行了
(2)在第一次调度协程时,会先通过接口coctx_make先为这个协程构建一个上下文
(3)而coctx_make做了什么?
1)把协程入口地址写入到kRETAddr寄存器中
2)把协程对象的地址写到kRDI寄存器中
(4)完成上下文创建之后,当前协程入栈(env->pCallStack[ env->iCallStackSize++ ] = co),并切换到子协程执行
co_swap
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) { stCoRoutineEnv_t* env = co_get_curr_thread_env(); //get curr stack sp //这里非常重要!!!: 这个c变量的实现,作用是为了找到目前的栈底,因为c变量是最后一个放入栈中的内容。 char c; curr->stack_sp= &c; if (!pending_co->cIsShareStack) { env->pending_co = NULL; env->occupy_co = NULL; } else { // 如果采用了共享栈 ... ... } // swap context coctx_swap(&(curr->ctx),&(pending_co->ctx) ); // 上一步coctx_swap会进入到pending_co的协程环境中运行 // 但是pengdin_co也是这一步换出的,所以被换入的时候也是从这一步继续往后执行 // 而yield回此协程之前,env->pending_co会被上一层协程设置为此协程 // 因此可以顺利执行: 将之前保存起来的栈内容,恢复到运行栈上 //stack buffer may be overwrite, so get again; stCoRoutineEnv_t* curr_env = co_get_curr_thread_env(); stCoRoutine_t* update_occupy_co = curr_env->occupy_co; stCoRoutine_t* update_pending_co = curr_env->pending_co; // 将栈的内容恢复,如果不是共享栈的话,每个协程都有自己独立的栈空间,则不用恢复。 if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) { ... ... } }
.globl coctx_swap // coctx_swap(coctx_t curr,coctx_t pending) #if defined(__i386__) ... ... #elif defined(__x86_64__) leaq 8(%rsp),%rax // 父函数栈帧中除返回地址外栈帧顶的位置 leaq 112(%rdi),%rsp ;rdi寄存器存的是第一个参数的地址,其112个字节后,是regs数组后的位置。 ;所以这一步是把栈指针设置为regs数组地址 // 将寄存器保存到入栈,因为此时栈的地址指向数组,因此实际上就是讲各个寄存器填充到数组中 pushq %rax // rax -> regs[13],也就是当前的rsp -> regs[13] pushq %rbx // rbx -> regs[12] pushq %rcx // rcx -> regs[11] pushq %rdx // rdx -> regs[10] pushq -8(%rax) // ret func addr 返回地址 -> regs[9] pushq %rsi // rsi -> regs[8] pushq %rdi // rdi -> regs[7] pushq %rbp // rbp -> regs[6] pushq %r8 // r8 -> regs[5] pushq %r9 // r9 -> regs[4] pushq %r12 // r12 -> regs[3] pushq %r13 // r13 -> regs[2] pushq %r14 // r14 -> regs[1] pushq %r15 // r15 -> regs[0] // r15是程序计数器 // 截止到此,所有的协程上下文保存成功 // rsi中是第二个参数,我们需要将第二个参数的上下文载入到寄存器和栈里面 // rsi的首地址就是整个regs[]参数的地址,从0开始,所以当pop的时候,将从0将参数pop出来。 movq %rsi, %rsp popq %r15 ;以下为倒序还原 popq %r14 popq %r13 popq %r12 popq %r9 popq %r8 popq %rbp popq %rdi popq %rsi popq %rax //ret func addr popq %rdx popq %rcx popq %rbx popq %rsp pushq %rax // 将返回地址入栈 // 将eax寄存器清零,eax是rax的低32位,也就是将rax的低32位清零。也就是return 0的意思。 xorl %eax, %eax // 返回函数 ret #endif
co_swap主要完成了协程切换的功能,将执行流从curr指向的当前协程切换为pending_co指向的协程,在coctx_swap中:
·根据函数入栈规则,当调用到coctx_swap时,依次将pending、curr和ret-addr入栈了,所以第一句leaq 8(%rsp),%rax实际是让rax指向除返回地址以外的栈顶处
·leaq 112(%rdi),%rsp是将栈的首地址指向了curr->regs起始位置处,后面的push本质上就是push到regs中,所以后面的一串push实际上就是存寄存器内容的
·rsi是用来存第二个参数的,movq %rsi, %rsp实际就是把栈指针指向rsi的regs数组,然后一波pop用来填充寄存器的内容
·pushq %rax将返回地址入栈(对应于leaq 8(%rsp),%rax),因为返回时会从栈顶取返回地址,作为下一条指令的地址,通过这步操作就从上次让出CPU的位置继续执行了
·而栈信息通过rbp和rsp指向,而栈顶指针在coctx_make里面已经设置为coctx_t::ss_sp的存储空间,所以在pop序列进行环境恢复的时候,实际就把对应协程空间的栈空间恢复了
·关于ebp的话,因为在coctx_make一开始,我们就把sp指向我们的栈空间,所以在协程内发生函数调用的时候,被调函数的函数头会有mov rsp rbp和push rbp,此时实际rbp也是指向我们分配的栈空间,所以真正栈空间里面是ebp和esp都是在我们管理的存储空间里面
·而在第一次调用协程时,co_resume执行完毕协程栈和寄存器状态如下图所示
这其中:
·当执行完毕时popq %rax是把协程入口地址存入rax寄存器中,并pushq把这个入口地址压入栈顶
·执行到ret指令时,会把栈顶出栈,并填充到RIP寄存器中
·而一开始栈顶指针-8,就是预留空间给这个返回地址的,加上八个字节正好放满栈空间
co_yield
/* * * 主动将当前运行的协程挂起,并恢复到上一层的协程 * * @param env 协程管理器 */ void co_yield_env( stCoRoutineEnv_t *env ) { // 这里直接取了iCallStackSize - 2,那么万一icallstacksize < 2呢? // 所以这里实际上有个约束,就是co_yield之前必须先co_resume, 这样就不会造成这个问题了 // last就是 找到上次调用co_resume(curr)的协程 stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ]; // 当前栈 stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ]; env->iCallStackSize--; // 把上下文当前的存储到curr中,并切换成last的上下文 co_swap( curr, last); } void co_yield( stCoRoutine_t *co ) { co_yield_env( co->env ); }
co_yield主要功能由co_yield_env完成,主要就是让出执行权限,主要就是:
·从env->pCallStack取出当前协程和调度此协程的父协程,然后把执行权限通过接口co_swap交还给父协程即可,并将当前协程退栈。