协程-有栈协程(libco)

ronald1年前职场6781

libco  

    还有一个广泛使用的协程库就是libco,libco是被由微信开发并大规模应用的协程库,自2013年起稳定运行于数万台微信后台机器上;具备以下特性:

    • 高性能,号称可以调度千万级协程

    • 在IO阻塞时,可以自动切换,利用hook技术+epoll事件循环实现阻塞逻辑IO化改造

    • 支持嵌套创建

    • 既支持共享栈模式也支持独立栈模式

    • 提供超时管理

    • ... ...

    而libco基于性能优化考虑,没有使用ucontext,而是自己用汇编写了一套上下文切换的代码,在文件coctx_swap.S里面,他这里面只保存和恢复了寄存器内存和栈内容,相比于ucontext,少了浮点数上下文和sigmask,因为:

    • sigmask会引发一次syscall(需要从用户态进入到内核态并返回),性能上有损耗

    • 取消浮点数上下文,是因为服务端编程几乎用不到浮点数计算

    此外libco的上下文切换只支持i386和x86架构,因为后台服务器大多是x86架构的,总结来看,libco牺牲了通用性,把一些不常用的场景下上下文保存的操作去掉了,实现代码性能的提升,有人进行过性能比对,libco的性能是ucontext的3.6倍。

    一个libco的示例:

struct stTask_t {
	int id;
};
struct stEnv_t {
	stCoCond_t* cond;
	queue<stTask_t*> task_queue;
};
void* Producer(void* args) {
	co_enable_hook_sys();
	stEnv_t* env=  (stEnv_t*)args;
	int id = 0;
	while (true) {
		stTask_t* task = (stTask_t*)calloc(1, sizeof(stTask_t));
		task->id = id++;
		env->task_queue.push(task);
		co_cond_signal(env->cond);
		poll(NULL, 0, 1000);
	}
	return NULL;
}
void* Consumer(void* args) {
	co_enable_hook_sys();
	stEnv_t* env = (stEnv_t*)args;
	while (true) {
		if (env->task_queue.empty())
		{
			co_cond_timedwait(env->cond, -1);
			continue;
		}
		stTask_t* task = env->task_queue.front();
		env->task_queue.pop();
		free(task);
	}
	return NULL;
}
int main() {
	stEnv_t* env = new stEnv_t;
	env->cond = co_cond_alloc();

	stCoRoutine_t* consumer_routine;
	co_create(&consumer_routine, NULL, Consumer, env);
	co_resume(consumer_routine);

	stCoRoutine_t* producer_routine;
	co_create(&producer_routine, NULL, Producer, env);
	co_resume(producer_routine);
	
	co_eventloop(co_get_epoll_ct(), NULL, NULL);
	return 0;
}

    从代码可以看到:

    ·这是一个生产者消费者示例,一个负责生产的producer_routine协程,负责把生产的数据放到env的task_queue中;和一个负责消费的consumer_routine负责从task_queue取数据

    ·libco使用结构体stCoRoutine_t描述一个协程,接口co_create负责创建协程,接口co_resume负责拉起协程

    ·co_eventloop表示进入协程的执行循环

    结合示例涉及的接口看一下相关的数据结构及接口原理:

co_create

struct stCoRoutineAttr_t {
	int stack_size;
	stShareStack_t*  share_stack;
	stCoRoutineAttr_t()
	{
		stack_size = 128 * 1024;
		share_stack = NULL;
	}
}__attribute__ ((packed));

struct stCoRoutineEnv_t{
	stCoRoutine_t *pCallStack[ 128 ];
	int iCallStackSize;
	stCoEpoll_t *pEpoll;
	stCoRoutine_t* pending_co;
	stCoRoutine_t* occupy_co;
};

struct stCoRoutine_t
{
	stCoRoutineEnv_t *env;
	pfn_co_routine_t pfn;
	void *arg;
	coctx_t ctx;
	char cStart;
	char cEnd;
	char cIsMain;
	char cEnableSysHook;
	char cIsShareStack;
	void *pvEnv;
	stStackMem_t* stack_mem;
	//save satck buffer while confilct on same stack_buffer;
	char* stack_sp; 
	unsigned int save_size;
	char* save_buffer;
	stCoSpec_t aSpec[1024];
};

struct coctx_t
{
#if defined(__i386__)
	void *regs[ 8 ];
#else
	void *regs[ 14 ];
#endif
	size_t ss_size;
	char *ss_sp;
	
};

stCoRoutineEnv_t *co_get_curr_thread_env() {
	return gCoEnvPerThread;
}

void co_init_curr_thread_env() {
	gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
	stCoRoutineEnv_t *env = gCoEnvPerThread;
	env->iCallStackSize = 0;
	struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
	self->cIsMain = 1;
	env->pending_co = NULL;
	env->occupy_co = NULL;
	coctx_init( &self->ctx );
	env->pCallStack[ env->iCallStackSize++ ] = self;
	stCoEpoll_t *ev = AllocEpoll();
	SetEpoll( env,ev );
}


int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg ) {
	if( !co_get_curr_thread_env() ) {
		co_init_curr_thread_env();
	}
	stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
	*ppco = co;
	return 0;
}

struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,
		pfn_co_routine_t pfn,void *arg ) {
	stCoRoutineAttr_t at;
	if( attr ) {
		memcpy( &at,attr,sizeof(at) );
	}
	if( at.stack_size <= 0 ) {
		at.stack_size = 128 * 1024; 
	}
	else if( at.stack_size > 1024 * 1024 * 8 ) {
		at.stack_size = 1024 * 1024 * 8;
	}
	if( at.stack_size & 0xFFF )  {
		at.stack_size &= ~0xFFF;
		at.stack_size += 0x1000;
	}
	stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
	memset( lp,0,(long)(sizeof(stCoRoutine_t))); 
	lp->env = env;
	lp->pfn = pfn;
	lp->arg = arg;
	stStackMem_t* stack_mem = NULL;
	if( at.share_stack ) {
		... ...
	}
	else {
		// 如果没有采用共享栈,则分配内存
		stack_mem = co_alloc_stackmem(at.stack_size);
	}
	lp->stack_mem = stack_mem;
	// 设置该协程的context
	lp->ctx.ss_sp = stack_mem->stack_buffer; // 栈地址
	lp->ctx.ss_size = at.stack_size; // 栈大小
	lp->cStart = 0;
	lp->cEnd = 0;
	lp->cIsMain = 0;
	lp->cEnableSysHook = 0;	// 默认不开启hook
	lp->cIsShareStack = at.share_stack != NULL;
	return lp;
}

逐个来看:

    (1)结构体stCoRoutineAttr_t,用于描述协程的属性信息,目前这个属性信息只包括共享栈的信息

    (2)结构体stCoRoutineEnv_t,用于描述协程的运行环境,每个线程都有唯一一个该结构的变量用于对该线程下协程进行管理

        ·pCallStack协程的调用栈,最后一位是当前运行的协程,上一位是当前协程的父协程;可以看到libco最多只支持128层的协程嵌套

        ·pEpoll,libco主要用epoll作为协程调度器

    (3)结构体stCoRoutine_t用于描述一个协程,其中:

        ·stCoRoutineEnv_t *env,指向协程所属的运行环境,可以理解为协程所属的协程管理器

        ·pfn_co_routine_t pfn/void *arg,协程对应的函数及参数

        ·coctx_t ctx,协程的上下文信息,包括寄存器和栈

        ·cStart/cEnd/cIsMain/cEnableSysHook/cIsShareStack:一系列标志变量

        ·stStackMem_t* stack_mem;,栈空间指针

    (4)创建协程的接口co_create,其中

        参数部分:

        ·ppco:输出变量,协程的地址

        ·attr:协程的属性信息,目前只有一个属性就是是否是共享栈

        ·pfn:协程的入口函数

        ·arg:协程的入口函数的参数

        核心功能通过接口co_create_env完成

    (5)co_create_env本质上就是创建一个stCoRoutine_t并初始化

co_resume

#if defined(__i386__)
... ...
#elif defined(__x86_64__)
int coctx_make( coctx_t *ctx,coctx_pfn_t pfn,const void *s,const void *s1 )
{
	char *sp = ctx->ss_sp + ctx->ss_size;
	sp = (char*) ((unsigned long)sp & -16LL  );
	memset(ctx->regs, 0, sizeof(ctx->regs));
	ctx->regs[ kRSP ] = sp - 8;
	ctx->regs[ kRETAddr] = (char*)pfn;
	ctx->regs[ kRDI ] = (char*)s;
	ctx->regs[ kRSI ] = (char*)s1;
	return 0;
}

void co_resume( stCoRoutine_t *co )
{
	stCoRoutineEnv_t *env = co->env;
	// 找到当前运行的协程, 从数组最后一位拿出当前运行的协程,如果目前没有协程,那就是主线程
	stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
	if( !co->cStart ) {
		// 如果当前协程还没有开始运行,为其构建上下文
		coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co, 0 );
		co->cStart = 1;
	}
	// 将指定协程放入线程的协程队列末尾
	env->pCallStack[ env->iCallStackSize++ ] = co;
	// 将当前运行的上下文保存到lpCurrRoutine中,同时将协程co的上下文替换进去
	co_swap( lpCurrRoutine, co );
}

注意到:

    (1)当协程被创建之后还没有运行的情况下,cStart标识为0,第一次调度之后,这个标志位为1,表示已经运行了

    (2)在第一次调度协程时,会先通过接口coctx_make先为这个协程构建一个上下文

    (3)而coctx_make做了什么?

        1)把协程入口地址写入到kRETAddr寄存器中

        2)把协程对象的地址写到kRDI寄存器中

    (4)完成上下文创建之后,当前协程入栈(env->pCallStack[ env->iCallStackSize++ ] = co),并切换到子协程执行

co_swap

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
 	stCoRoutineEnv_t* env = co_get_curr_thread_env();
	//get curr stack sp
	//这里非常重要!!!: 这个c变量的实现,作用是为了找到目前的栈底,因为c变量是最后一个放入栈中的内容。
	char c;
	curr->stack_sp= &c;
	if (!pending_co->cIsShareStack) {  
		env->pending_co = NULL;
		env->occupy_co = NULL;
	}
	else { // 如果采用了共享栈
	    ... ...
	}
	// swap context
	coctx_swap(&(curr->ctx),&(pending_co->ctx) );
	// 上一步coctx_swap会进入到pending_co的协程环境中运行
	// 但是pengdin_co也是这一步换出的,所以被换入的时候也是从这一步继续往后执行
	// 而yield回此协程之前,env->pending_co会被上一层协程设置为此协程
	// 因此可以顺利执行: 将之前保存起来的栈内容,恢复到运行栈上
	//stack buffer may be overwrite, so get again;
	stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
	stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
	stCoRoutine_t* update_pending_co = curr_env->pending_co;
	// 将栈的内容恢复,如果不是共享栈的话,每个协程都有自己独立的栈空间,则不用恢复。
	if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) {
         ... ...
	}
}
.globl coctx_swap
// coctx_swap(coctx_t curr,coctx_t pending)

#if defined(__i386__)
	... ...
#elif defined(__x86_64__)
	leaq 8(%rsp),%rax   // 父函数栈帧中除返回地址外栈帧顶的位置
	
	leaq 112(%rdi),%rsp ;rdi寄存器存的是第一个参数的地址,其112个字节后,是regs数组后的位置。
	                    ;所以这一步是把栈指针设置为regs数组地址
	// 将寄存器保存到入栈,因为此时栈的地址指向数组,因此实际上就是讲各个寄存器填充到数组中
	pushq %rax  // rax -> regs[13],也就是当前的rsp -> regs[13]
	pushq %rbx  // rbx -> regs[12]
	pushq %rcx  // rcx -> regs[11]
	pushq %rdx  // rdx -> regs[10]
	pushq -8(%rax) // ret func addr  返回地址 -> regs[9]
	pushq %rsi  // rsi -> regs[8]
	pushq %rdi  // rdi -> regs[7]
	pushq %rbp  // rbp -> regs[6]
	pushq %r8   // r8 -> regs[5]
	pushq %r9   // r9 -> regs[4]
	pushq %r12   // r12 -> regs[3]
	pushq %r13    // r13 -> regs[2]
	pushq %r14   // r14 -> regs[1]
	pushq %r15   // r15 -> regs[0]  // r15是程序计数器
	// 截止到此,所有的协程上下文保存成功 
	// rsi中是第二个参数,我们需要将第二个参数的上下文载入到寄存器和栈里面
	// rsi的首地址就是整个regs[]参数的地址,从0开始,所以当pop的时候,将从0将参数pop出来。
	movq %rsi, %rsp
	popq %r15         ;以下为倒序还原
	popq %r14
	popq %r13
	popq %r12
	popq %r9
	popq %r8
	popq %rbp
	popq %rdi
	popq %rsi
	popq %rax //ret func addr
	popq %rdx
	popq %rcx
	popq %rbx
	popq %rsp
	pushq %rax // 将返回地址入栈
	// 将eax寄存器清零,eax是rax的低32位,也就是将rax的低32位清零。也就是return 0的意思。
	xorl %eax, %eax
	// 返回函数
	ret
#endif

    co_swap主要完成了协程切换的功能,将执行流从curr指向的当前协程切换为pending_co指向的协程,在coctx_swap中:

    ·根据函数入栈规则,当调用到coctx_swap时,依次将pending、curr和ret-addr入栈了,所以第一句leaq 8(%rsp),%rax实际是让rax指向除返回地址以外的栈顶处

    ·leaq 112(%rdi),%rsp是将栈的首地址指向了curr->regs起始位置处,后面的push本质上就是push到regs中,所以后面的一串push实际上就是存寄存器内容的

    ·rsi是用来存第二个参数的,movq %rsi, %rsp实际就是把栈指针指向rsi的regs数组,然后一波pop用来填充寄存器的内容

    ·pushq %rax将返回地址入栈(对应于leaq 8(%rsp),%rax),因为返回时会从栈顶取返回地址,作为下一条指令的地址,通过这步操作就从上次让出CPU的位置继续执行了

    ·而栈信息通过rbp和rsp指向,而栈顶指针在coctx_make里面已经设置为coctx_t::ss_sp的存储空间,所以在pop序列进行环境恢复的时候,实际就把对应协程空间的栈空间恢复了

    ·关于ebp的话,因为在coctx_make一开始,我们就把sp指向我们的栈空间,所以在协程内发生函数调用的时候,被调函数的函数头会有mov rsp rbp和push rbp,此时实际rbp也是指向我们分配的栈空间,所以真正栈空间里面是ebp和esp都是在我们管理的存储空间里面

    ·而在第一次调用协程时,co_resume执行完毕协程栈和寄存器状态如下图所示

image-20211011232144263.png

    这其中:

    ·当执行完毕时popq %rax是把协程入口地址存入rax寄存器中,并pushq把这个入口地址压入栈顶

    ·执行到ret指令时,会把栈顶出栈,并填充到RIP寄存器中

    ·而一开始栈顶指针-8,就是预留空间给这个返回地址的,加上八个字节正好放满栈空间

co_yield

/*
*
* 主动将当前运行的协程挂起,并恢复到上一层的协程
*
* @param env 协程管理器 
*/
void co_yield_env( stCoRoutineEnv_t *env )
{
	// 这里直接取了iCallStackSize - 2,那么万一icallstacksize < 2呢?
	// 所以这里实际上有个约束,就是co_yield之前必须先co_resume, 这样就不会造成这个问题了
	// last就是 找到上次调用co_resume(curr)的协程
	stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
	// 当前栈
	stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
	env->iCallStackSize--;
	// 把上下文当前的存储到curr中,并切换成last的上下文
	co_swap( curr, last);
}

void co_yield( stCoRoutine_t *co )
{
	co_yield_env( co->env );
}

    co_yield主要功能由co_yield_env完成,主要就是让出执行权限,主要就是:

    ·从env->pCallStack取出当前协程和调度此协程的父协程,然后把执行权限通过接口co_swap交还给父协程即可,并将当前协程退栈。

参考资料

        函数调用过程

        ucontext manual pages

        swapcontext() — Save and restore user context

        云风协程库源码

        编程沉思录——云风协程库源码分析

        编程沉思录——libco源码分析

        libco源码地址

        libco性能对比

        达夫设备

        Label As Values标签变量

        ucontext族函数的使用及原理分析

        FSTENV

        Intel x86 MXCSR Register

        SSE-维基百科

        libco源码注释版


相关文章

    栈(stack)一种操作受限的线性结构,限定仅能在尾部进行插入和删除,能进行插入和删除操作的一端叫做栈顶,而另一端叫做栈底;我们将插入栈的操作叫做入栈,从栈中删除元素的操作叫做出栈,栈是一个先入后出(FILO)的数据结构,即先入栈的元素会后出栈。    栈最常被我们接触的场景就是函数调用了,在进程地址空间中,有...

后台开发人员面试资源汇总(C&C++方向)

后台开发人员面试资源汇总(C&C++方向)

    汇总一下C/C++后台开发方向的学习资料,为避免广告之嫌,只罗列书名不贴链接了1.编程语言类《C++ Primer》《C 专家编程》《深度探索C++对象模型》《Effective C++:改善程序与设计的55个具体做法》《STL源码剖析》2.操作系统类《鸟哥的Linux私房菜 基础学习篇》《深入理解LINUX内核(第3版)》3.软件工程类《大话设计模...

K8S实战技巧(一)

一. 概述        K8S是谷歌开源的一个容器编排管理工具,可以帮助业务实现自动化部署、故障发现、容灾、扩缩容、流量管理等,大大提升业务的服务能力;k8s通过yaml描述文件实现容器的部署,本文介绍一些实际应用过程中可能用到一些k8s的特性及配置方法,适用于对k8s有一定了解的读者。二. 共享进程命名空间&nbs...

协程-无栈协程(下)

无栈协程库——protothread    ProtoThread源码如下所示:#define LC_INIT(s) s = 0; #define LC_RESUME(s) switch(s) { case 0: #define LC_SET(s)...

Lua热更新机制(上)

Lua热更新机制一个Lua热更新demo    Lua在游戏开发中能广泛使用不仅由于其轻量易嵌入的特性,还有一个重要的点是易于热更新,设想在产品线上运营过程中,出现bug需要修复,频繁停机对于产品体验影响大,也影响口碑;所以实际运营我们是希望能尽量避免停止服务进行代码更新的操作,下面先从一段比较简单的代码看Lua的热更新机制:require &qu...

协程-无栈协程(上)

协程-无栈协程(上)

无栈协程    有栈协程是基于函数切换上下文恢复的思路实现被中断协程的继续执行,但是这个上下文里面有返回地址,即下一条指令的地址,所以当程序发生改动重新编译生成,指令地址有可能发生改变,这种对于需要重新编译生成发布的发布场景支持并不友好,会因为程序指令地址的变化导致协程执行流的错乱。这时另外一种不基于上下文恢复的协程机制提供了一种新的思路。达夫设备 ...

评论列表

bob
2022-06-12 17:33:31

来感受一下阿里最新开源的协程库和I/O库吧:https://github.com/alibaba/PhotonLibOS
支持io_uring作为事件引擎和I/O引擎哦

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。