协程-有栈协程(coroutine)

ronald2年前职场10530

概述

    后台架构的微服务化,原先的单体应用被按照功能模块切分为若干进程组承担,此种架构演化带来的收益诸如:

  • 单进程复杂度降低,代码维护成本降低

  • 发布影响范围缩小,发布灵活性提升

  • 计算资源更精准的分配

  • ... ...

    但是这种架构带来的另外的变化就是,原先由单进程承载的事务,可能涉及几个甚至十几个进程;在这种情况下,采取异步回调的程序架构会导致代码零散不易维护;而采取多线程的话,一方面由于线程切换效率比较低,另一方面部分系统对线程个数的上限是有限制的,因此多线程的结构并不利于算力的充分利用。因此协程技术应运而生。

    所谓协程,即用户级线程,一种用于将异步代码同步化的编程机制,使得程序的执行流可以在多个并行事务之间切换但又不必承担切换带来的过高的性能损耗。当前很多的编程语言都内置协程特性或者有自己的协程库,如C/C++的libco、golang的goroutine等。而在实现机制上,又可以划分为有栈协程和无栈协程,我们分别进行介绍。

有栈协程

    所谓有栈协程是指执行环境的恢复是通过函数栈(即运行时上下文)的恢复实现的,在此之前我们先回忆一下函数调用的基础知识。

1.函数调用过程

    当可执行文件并载入到内存中,进程地址空间被划分为代码段、数据段、堆、栈等,如下图所示:

image-20211009104602015.png

        而程序运行过程中需要的内存是在栈和堆空间分配的,当进程内发生函数调用时,需要保存一些上下文信息以及为函数内局部变量分配存储空间,这些存储空间是在栈上分配的,具体来说:在函数调用之前主调函数会将函数参数和返回地址入栈,被调函数在执行之前会先将主调函数的ebp入栈,并在栈上新开辟一块内存用于存放局部变量等信息;如下图所示:

image-20211009113535676.png

        

    当被调函数执行完毕之后,会执行退栈操作,找到函数退出之前的下一条指令的地址并将栈中存放的局部变量信息恢复,即可恢复原来主调函数被中断的执行过程。

    在这里需要关注到几点:

    ·不是所有的寄存器都需要入栈的,只有ebp和eip

    ·我们在反汇编的时候是看不到返回地址入栈的操作,这个是由call指令完成的

    ·参数入栈顺序主流是从右向左入栈的

    通过对函数调用过程分析我们可以看到,函数通过函数返回语句实现执行权限的归还&通过栈中返回地址实现被中断执行流程的恢复,而有栈协程正是基于这一朴素的想法实现的:在有栈协程中,将每个并行事务看成是一个函数调用,而协程库负责把让出执行权时的协程的上下文缓存起来(即当时的栈包括局部变量、返回地址等),当协程被重新调度时,就把切出时的栈重新装载进去即可。我们选用两个协程库进行介绍,云风的基于ucontext函数簇的协程库和libco。

基于ucontext协程库

    ucontext即user thread context,用户线程上下文,是Linux系统自带的一套用于获取、修改和切换当前线程上下文的结构和相关方法;主要包括:ucontext_t结构体和context函数簇。

1.关于ucontext函数簇

    其中ucontext_t结构体:

typedef struct ucontext
  {
    unsigned long int uc_flags;
    struct ucontext *uc_link;
    stack_t uc_stack;
    mcontext_t uc_mcontext;
    __sigset_t uc_sigmask;
    struct _libc_fpstate __fpregs_mem;
  } ucontext_t;

    主要包括:

    ·uc_links:当前context执行结束后要执行的下一个context,若为空,表示执行完毕后退出程序;在协程库设计中一般用于存主协程的上下文

    ·uc_stack:当前上下文的栈信息

    ·uc_mcontext:用于保存当前上下文的寄存器内容

    ·uc_sigmask:当前线程的信号屏蔽码

    context函数簇包括:

    ·int getcontext(ucontext_t *ucp),将当前执行上下文信息保存在ucp指向的ucontext结构体中

    ·int setcontext(const ucontext_t *ucp),将ucp中保存的寄存器信息恢复到CPU中,用于将当前程序切换到目标上下文

    ·void makecontext(ucontext_t ucp, void (func)(), int argc, ...),初始化一个ucontext_t,并设置入口函数为func

    ·int swapcontext(ucontext_t *oucp, const ucontext_t *ucp),切换上下文,保存当前上下文到oucp中,并激活ucp中的上下文

    context底层原理差不多,我们挑一个看就行了,这里主要看swapcontext接口,因为既包含了上下文保存又包含上下文切换的内容,相对比较全面,其实现流程如下:

int swapcontext(ucontext_t *oucp, const ucontext_t *ucp);
ENTRY(__swapcontext)
    /* Save the preserved registers, the registers used for passing args,
       and the return address.  */
    movq    %rbx, oRBX(%rdi)
    movq    %rbp, oRBP(%rdi)
    movq    %r12, oR12(%rdi)
    movq    %r13, oR13(%rdi)
    movq    %r14, oR14(%rdi)
    movq    %r15, oR15(%rdi)

    movq    %rdi, oRDI(%rdi)
    movq    %rsi, oRSI(%rdi)
    movq    %rdx, oRDX(%rdi)
    movq    %rcx, oRCX(%rdi)
    movq    %r8, oR8(%rdi)
    movq    %r9, oR9(%rdi)         //这一波是把寄存器内容缓存起来,这些寄存器是传参和传递返回地址用的

    movq    (%rsp), %rcx           //因为这里函数调用,被调函数没有执行入栈操作,所以当前栈指针是执向返回地址的
    movq    %rcx, oRIP(%rdi)       //因此这里入栈其实是把返回地址给存器来了
    leaq    8(%rsp), %rcx       
    movq    %rcx, oRSP(%rdi)       //这两句是存返回地址
    leaq    oFPREGSMEM(%rdi), %rcx  //栈上面有独立的存储空间存放浮点数环境,我们用结构体中__fpregs_mem进行存储
    movq    %rcx, oFPREGS(%rdi)
    /* Save the floating-point environment.  */
    fnstenv (%rcx)                   //存浮点数环境
    stmxcsr oMXCSR(%rdi)             //存MXCSR寄存器内容
    /* The syscall destroys some registers, save them.  */
    movq    %rsi, %r12

    /* Save the current signal mask and install the new one with
       rt_sigprocmask (SIG_BLOCK, newset, oldset,_NSIG/8).  */
    leaq    oSIGMASK(%rdi), %rdx
    leaq    oSIGMASK(%rsi), %rsi
    movl    $SIG_SETMASK, %edi
    movl    $_NSIG8,%r10d
    movl    $__NR_rt_sigprocmask, %eax 
    syscall                       //这里其实是一个系统调用过程,将系统调用号存入rax中,参数存入rdx等寄存器中,并通过
    cmpq    $-4095, %rax          //syscall触发系统调用,调用完毕后通过eax获取返回值,上面的过程其实就是将当前的系统  
    jae SYSCALL_ERROR_LABEL       //调用号存起来并把新的传进去,通过rax寄存器拿到返回值判断是否成功

    /* Restore destroyed registers.  */
    movq    %r12, %rsi

    /* Restore the floating-point context.  Not the registers, only the
       rest.  */
    movq    oFPREGS(%rsi), %rcx
    fldenv  (%rcx)
    ldmxcsr oMXCSR(%rsi)

    /* Load the new stack pointer and the preserved registers.  */
    movq    oRSP(%rsi), %rsp
    movq    oRBX(%rsi), %rbx
    movq    oRBP(%rsi), %rbp
    movq    oR12(%rsi), %r12
    movq    oR13(%rsi), %r13
    movq    oR14(%rsi), %r14
    movq    oR15(%rsi), %r15

    /* The following ret should return to the address set with
    getcontext.  Therefore push the address on the stack.  */
    movq    oRIP(%rsi), %rcx
    pushq   %rcx

    /* Setup registers used for passing args.  */
    movq    oRDI(%rsi), %rdi
    movq    oRDX(%rsi), %rdx
    movq    oRCX(%rsi), %rcx
    movq    oR8(%rsi), %r8
    movq    oR9(%rsi), %r9

    /* Setup finally  %rsi.  */
    movq    oRSI(%rsi), %rsi    //要拉起的进程的寄存器恢复

    /* Clear rax to indicate success.  */
    xorl    %eax, %eax
    ret
PSEUDO_END(__swapcontext)

一些基础知识:

(1)FPU(floating point unit,浮点运算器)用于进行浮点运算的单元,在x86中,FPU有一套自己的运行时环境包括控制字、状态字、指令指针、数据指针等。

    fstenv/fnstenv用于将浮点环境缓存到指定的内存空间中;

    (2)MXCSR register:用于存储SSE寄存器中的控制和状态信息

        SSE(streaming SIMD Extensions,单指令多数据流扩展指令集),SSE加入新的8个128位寄存器,以及新的32位控制/状态寄存器(MXCSR);

        SIMD,单指令多数据流技术,即用一个控制器控制多个平行的处理单元,如GPU;

    (3)syscall

        我们可以在汇编程序中使用Linux系统调用,通常的步骤是:

        1)将系统调用号放在EAX寄存器中

        2)将参数存储在寄存器EBX、ECX等中

        3)调用相关的中断

        4)在EAX寄存器中获取返回

    主要包括以下几方面的内容:

    (1)swapcontext用于将当前执行流的上下文保存到oucp所指向的结构体,同时将ucp所指向的用户上下文重新进行装载实现执行流的切换

    (2)保存的信息包括三个部分:

        1)第一大块,一堆mov指令,实际上是把寄存器存起来

        2)第二大块,浮点数环境缓存起来

        3)第三大块,信号码相关信息

    (3)恢复的时候也是遵循这个顺序反向来的

    而云风基于coroutine协程库就是基于ucontext函数簇达到上下文切换的目的。

2.云风的coroutine协程库

    我们先通过一个例子看下这个协程库的用法:

struct args { int n;};
static void foo(struct schedule * S, void *ud) {
	struct args * arg = ud;
	int start = arg->n;
	int i;
	for (i=0;i<5;i++) {
		printf("coroutine %d : %d\n",coroutine_running(S) , start + i);
		coroutine_yield(S);
	}
}
static void test(struct schedule *S) {
	struct args arg1 = {0};
	struct args arg2 = {100};
	// 创建两个协程
	int co1 = coroutine_new(S, foo, &arg1);
	int co2 = coroutine_new(S, foo, &arg2);
	while (coroutine_status(S,co1) && coroutine_status(S,co2)) {
		coroutine_resume(S,co1);// 使用协程 co1
		coroutine_resume(S,co2);// 使用协程 co2
	}
}

int main() {
	struct schedule * S = coroutine_open();// 创建一个协程调度器
	test(S);
	coroutine_close(S);// 关闭协程调度器
	return 0;
}

    示例代码比较简单:首先是创建了一个调度器schedule(本质上也是一个协程),并且通过接口coroutine_resume在调度器下面创建了两个协程co1和co2,然后通过调度器循环去拉起其中一个协程,直到一个协程执行完毕。通过示例代码,我们可以看到这个协程库主要包括以下几个接口:

    ·struct schedule * coroutine_open(void):创建调度器

    ·int coroutine_new(struct schedule *, coroutine_func, void *ud):创建一个协程,入口函数func,ud是用户参数

    ·void coroutine_resume(struct schedule *, int id):拉起指定的协程,用id对协程进行标识

    ·void coroutine_yield(struct schedule *):当前协程阻塞,让出执行权限给调度器

    通过示例代码,从直观上,我们可以看出来,这个协程库采用的非对称的协程模型,当协程阻塞或者退出后,并不指定将执行权限移交给哪个协程,而是统一交还给调度器(主协程),由调度器选择指定的子协程进行调度和拉起。而更细节的,我们从源码去看下这个协程库是怎么组织协程、以及怎么通过context函数簇实现协程的执行序列的切换,部分源码如下:

struct schedule {
	char stack[STACK_SIZE];
	ucontext_t main;
	int nco;
	int cap;
	int running;
	struct coroutine **co;
};

struct coroutine {
	coroutine_func func;
	void *ud;
	ucontext_t ctx;
	struct schedule * sch;
	ptrdiff_t cap;
	ptrdiff_t size;
	int status;
	char *stack;
};

static void mainfunc(uint32_t low32, uint32_t hi32) {
	uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hi32 << 32);
	struct schedule *S = (struct schedule *)ptr;
	int id = S->running;
	struct coroutine *C = S->co[id];
	C->func(S,C->ud);
	_co_delete(C);
	S->co[id] = NULL;
	--S->nco;
	S->running = -1;
}

void  coroutine_resume(struct schedule * S, int id) {
	struct coroutine *C = S->co[id];
	switch(C->status) {
	case COROUTINE_READY:
		getcontext(&C->ctx);
		C->ctx.uc_stack.ss_sp = S->stack;
		C->ctx.uc_stack.ss_size = STACK_SIZE;
		C->ctx.uc_link = &S->main;
		S->running = id;
		C->status = COROUTINE_RUNNING;
		uintptr_t ptr = (uintptr_t)S;
		makecontext(&C->ctx,(void (*)(void))mainfunc,2,(uint32_t)ptr,(uint32_t(ptr>>32));
		swapcontext(&S->main, &C->ctx);
		break;
	case COROUTINE_SUSPEND:
		memcpy(S->stack + STACK_SIZE - C->size, C->stack, C->size);
		S->running = id;
		C->status = COROUTINE_RUNNING;
		swapcontext(&S->main, &C->ctx);
		break;
	default:
		assert(0);
	}
}

static void _save_stack(struct coroutine *C, char *top) {
	char dummy = 0;
	assert(top - &dummy <= STACK_SIZE);
	if (C->cap < top - &dummy) {
		free(C->stack);
		C->cap = top-&dummy;
		C->stack = malloc(C->cap);
	}
	C->size = top - &dummy;
	memcpy(C->stack, &dummy, C->size);
}
                    
void coroutine_yield(struct schedule * S) {
	int id = S->running;
	struct coroutine * C = S->co[id];
	_save_stack(C,S->stack + STACK_SIZE);
	C->status = COROUTINE_SUSPEND;
	S->running = -1;
	swapcontext(&C->ctx , &S->main);
}

    在了解完context函数簇之后,我们对于理解这个协程库会简单很多,我们逐个来看:

    (1)结构体schedule,用于描述调度器的结构体,其中

     ·stack是运行时协程栈,用于存放当前运行的协程的栈信息

     ·main,用于存放调度器的上下文信息

     ·coroutine,用于存放此调度器下管理的协程信息,每个协程

    (2)结构体coroutine,用于描述协程信息,其中:

      ·func,一个函数指针,指向

     ·ctx,协程的上下文信息

     ·stack,协程的栈信息,在堆空间上分配出来的

      ·cap/size:栈的容量和当前使用栈空间的大小

     ·status,协程的状态信息,是处于就绪状态(COROUTINE_READY)?运行状态(COROUTINE_RUNNING )?还是挂起状态(COROUTINE_SUSPEND )?

    (3)接口coroutine_resume,用于拉起指定id的子协程,具体来说有以下几点需要注意:

       1)协程的id是协程在调度器协程数组里面的下标

       2)对于COROUTINE_READY状态的协程被拉起时:

        ①子协程的ss_sp指向调度器的栈空间,表示子协程是共享stack内存空间的(意味着所有子协程的栈指针都是指向这块内存空间的)

        ②会通过getcontext和makecontext设置协程的入口函数

        ③最后通过swapcontext完成从调度器到子协程的执行流的切换

相关文章

协程-无栈协程(上)

协程-无栈协程(上)

无栈协程    有栈协程是基于函数切换上下文恢复的思路实现被中断协程的继续执行,但是这个上下文里面有返回地址,即下一条指令的地址,所以当程序发生改动重新编译生成,指令地址有可能发生改变,这种对于需要重新编译生成发布的发布场景支持并不友好,会因为程序指令地址的变化导致协程执行流的错乱。这时另外一种不基于上下文恢复的协程机制提供了一种新的思路。达夫设备 ...

协程-无栈协程(下)

无栈协程库——protothread    ProtoThread源码如下所示:#define LC_INIT(s) s = 0; #define LC_RESUME(s) switch(s) { case 0: #define LC_SET(s)...

协程-有栈协程(libco)

协程-有栈协程(libco)

libco      还有一个广泛使用的协程库就是libco,libco是被由微信开发并大规模应用的协程库,自2013年起稳定运行于数万台微信后台机器上;具备以下特性:高性能,号称可以调度千万级协程在IO阻塞时,可以自动切换,利用hook技术+epoll事件循环实现阻塞逻辑IO化改造支持嵌套创建既支持共享栈模式也支持独立栈模式提供超时管...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。