一、开源协程库调研

1、golang语言自带协程

    Golang的协程是对称协程,调度器使用了GMP模型。使用go语言写一个并发程序极其简单,例如go func(x,y)即可并发执行一个函数f(x,y),这也是netco的目标:只需要调用接口co_go(func)即可并发执行func函数。
    Golang还有一个通道的概念,不同的协程可以往通道中写内容读内容。
    当然,重点还是GMP模型,其中G代表的是goroutine协程,Go1.11中协程栈默认是2KB。M代表的是machine,对应的是一个线程。P代表的是processor,当P有任务时需要创建或者唤醒一个系统线程来执行它队列里的任务。所以P/M需要进行绑定,构成一个执行单元。
    首先创建一个G对象,G对象保存到P本地队列或者是全局队列。P此时去唤醒一个M。P继续执行它的执行序。M寻找是否有空闲的P,如果有则将该G对象移动到它本身。接下来M执行一个调度循环(调用G对象->执行->清理线程→继续找新的Goroutine执行)。
    M执行过程中,随时会发生上下文切换。当发生上下文切换时,需要对执行现场进行保护,以便下次被调度执行时进行现场恢复。Go调度器M的栈保存在G对象上,只需要将M所需要的寄存器(SP、PC等)保存到G对象上就可以实现现场保护。当这些寄存器数据被保护起来,就随时可以做上下文切换了,在中断之前把现场保存起来。如果此时G任务还没有执行完,M可以将任务重新丢到P的任务队列,等待下一次被调度执行。

2、云风的coroutine协程库

    风云的协程库是使用C语言实现的使用共享栈的一个非对称协程库,即调用者和被调用者的关系是固定的,协程A调用B,则B完成后必定返回到A。因为云风不希望使用他的协程库的人太考虑栈的大小,并且认为,进行上下文切换的大多时候,栈的使用实际并不大,所以使用共享栈每次进行上下文切换时拷贝的开销其实可以接受。另外,该库的上下文切换使用的是glibc的ucontext。

3、腾讯的libco协程库

    腾讯的libco协程库是一个非对称的协程库,结合了epoll机制,其接口风格类似pthread,使用起来实际上已经有了使用线程的感觉。其栈空间(Separate coroutine stacks)的固定大小为128K,也可以使用共享栈(Copying the stack),但默认还是使用固定的栈空间。libco算是给了我极其之大的震撼,因为还是第一次看到结合epoll和hook系统调用的技术,有些叹为观止。另外,该库是自己使用汇编写的上下文切换方法。

4、魅族的libgo协程库

    libgo是一个go风格的c++11对称协程库,它的命名结构分为Scheduler,Processer和Task(协程),schedule 负责整个系统的协程调度,协程的运行依赖于执行器 Processer(简称 P),因此在调度器初始化的时候会选择创建 P 的数量(支持动态增长),所有的执行器会添加到双端队列中。该库的命名结构很有意思,所以我在我的netco也采用了类似的命名:Scheduler->Processor->Coroutine。另外,该库使用的是boost库的上下文切换方法。

二、netco协程库概述

    基于对上述协程库的调研,我写了netco协程库,它是一个线程风格的纯C++11对称协程库,并且可以用于高并发网络编程。
    在使用上,受golang的影响很大,所以我尽可能地减少使用接口,让使用更加轻便简洁。目前和协程相关的接口只有三个:co_go(func),运行一个协程,co_wait(time)等待time毫秒后继续执行当前协程,co_join()等待协程运行结束。
    对于上下文切换,我使用的是glibc的ucontext,这个上下文切换方法有一个缺点,就是执行了一次系统调用,有一定的性能损失,但是因为对各种机器的了解不足,我还是决定使用成熟的上下文切换方案,以屏蔽机器的差异。
    对于栈空间,使用的是Separate coroutine stacks,默认为8K大小,使用co_go接口时候可指定当前协程栈的大小,也可以在parameter.h中修改默认的协程栈大小重新编译。没有使用共享栈主要是为了性能的考虑。
    源码地址:https://github.com/YukangLiu/netco 。

三、netco的实现

1、框架

微信截图_20210111095423.png
    模型框架如上图,netco会根据计算机的核心数开对应的线程数运行协程,其中每一个线程对应一个Processor实例,协程Coroutine实例运行在Processor的主循环中,Processor使用epoll和定时器timer进行任务调度。而Scheduler则并不存在一个循环,它是一个全局单例,当某个线程中调用co_go()运行一个新协程后,实际会调用该实例的方法,选择一个协程最少的Processor接管新的协程,当然,用户也可以指定具体某一个Processor来接管新的协程。
    类图如下:
1.png

2、Context

    这里的context类封装了ucontext上下文切换的一些操作,使所有其他需要使用上下文切换的地方都使用Context类而不去使用被封装的ucontext,目的是将来想用自己写的上下文切换或者其他库的上下文切换方法的时候,只需要实现该类中的方法即可,而不需要修改netco中的其他部分。

3、Coroutine

    协程对象,主要实现协程的几个关键方法:resume,yield,实际真正的yield由Processor执行,这里的yield只是修改当前协程的状态。
    当然,用户是无法感知到Coroutine的,因为其只是更高层封装的组件。

4、对象池

    对象池可以为用户使用,在库中主要用在Coroutine的实例的创建上。
    对象池创建对象时,首先会从内存池中取出相应大小的块,内存池是与对象大小强相关的,其中有一个空闲链表,每次分配空间都从空闲链表上取,若空闲链表没有内容时,首先会分配(40 + 分配次数)* 对象大小的空间,然后分成一个个块挂在空闲链表上,这里空闲链表节点没有使用额外的空间:效仿的stl的二级配置器中的方法,将数据和next指针放在了一个union中。从内存池取出所需内存块后,会判断对象是否拥有non-trivial构造函数,没有的话直接返回,有的话使用placement new构造对象。

5、Epoller

    该类功能很简单,一个是监视epoll中是否有事件发生,一个是向epoll中添加、修改、删除监视的fd。值得注意的是,该类并不存储任何协程对象实体,也不维护任何协程对象实体的生命期。另外,该类使用的是LT。

6、Timer

    定时器主要使用的linux的timerfd_create创建的时钟fd配合一个优先队列(小根堆)实现的,原因是要求效率而没有移除协程的需求。
    这里的小根堆中存放的是时间(任务要执行的时刻)和协程对象的pair。
    首先,程序初始化时会timerfd_create一个timefd,然后将该fd放进epoll中,当有地方调用RunAt或RunAfter函数时候,会先将新来的任务函数插入到小根堆中,然后判断它是不是最近的任务,如果是的话调用timerfd_settime更新时间。
    若出现超时时间,则epoll_wait必然会跳出阻塞,而在Processor的主循环中,第一个处理的就是超时事件,方法就是与当前时间对比并取出小根堆中的协程,直到小根堆中所有任务的时间都比当前大,另外,取出来的协程会放在一个数组中,用于在Processor循环中执行。
    定时器还有另外一个功能,就是唤醒epoll_wait,当有新的协程加入时,实际就是通过定时器来唤醒的processor主循环,并执行新接受的协程。

7、Processor

    Processor意为处理器,对应一个CPU的核心,在netco中即对应一个线程。Processor负责存放协程Coroutine的实体并管理其生命期。更重要地,Processor中存在以下几个队列:
(1)newCoroutines_新协程双缓冲队列。使用一个队列来存放新来的协程,另一个队列给Processor主循环用于执行新来的协程,消费完后就交换队列。这里每加入一个新协程就会唤醒一次Processor主循环,以立即执行新来的协程。
(2)actCoroutines_被epoll激活的协程队列。当epoll_wait被激活时,Processor主循环会尝试从Epoller中获取活跃的协程,存放在actCoroutine队列中,然后依次恢复执行。
(3)timerExpiredCo_超时的协程队列。当epoll_wait被激活时,Processor主循环会首先尝试从Timer中获取活跃的协程,存放在timerExpiredCo队列中,然后依次恢复执行。
(4)removedCo_被移除的协程队列。执行完的协程首先会放在该队列中,在Processor主循环的最后一次性统一清理。
    至于Processor主循环的执行序,首先执行超时的协程,因为这个对时间要求是最敏感的,其次执行新接管的协程,然后执行epoller中被激活的协程,最后清理上述removedCo中的协程。

8、Scheduler

    Scheduler意为调度器,这里的调度并非OS中常说的调度(即决定当前执行哪个进程,实际这个工作是在Processor中做的),而是指协程应该运行在哪个计算机核心(线程,或者说Processor)上,netco中的该类为全局单例,所执行的调度也相对比较简单,其可以让用户指定协程运行在某个Processor上,若用户没有指定,则挑选协程数量最少的Processor接管新的协程。
    在libgo中,scheduler还有一个steal的操作,可以将一个协程从一个Processor中偷到另一个Processor中,因为其Processor的主循环是允许阻塞的,并且协程的运行完全由库决定。而netco可以让用户指定某个协程一直运行在某个Processor上,故没有实现该功能,未来性能若因为这个而出现瓶颈时再实现该功能。

9、netco_api

    虽然netco是c++11实现的协程库,但是为了使用尽量简单,将Scheduler进一步地封装成了函数接口而不是一个对象,所以只需要包含netco_api.h,即可调用netco函数风格的协程接口,而无需关心任何库中的对象。

10、Socket

    Socket类封装了socket族函数,用户直接使用其中的accept,connect,read,send,即可享受协程带来的便利。

11、RWMutex

    RWMutex是用于协程同步的读写锁,读锁互相不互斥而与写锁互斥,写锁与其它的均互斥。原理是类中维护了一个队列,若互斥了则将当前协程放入队列中等待另一协程解锁时的唤醒。

四、使用

    简单的example,包含了协程的使用,服务端,客户端的socket函数使用,读写锁的使用:

void single_acceptor_server_test()
{
	netco::co_go(
		[]{
			netco::Socket listener;
			if (listener.isUseful())
			{
				listener.setTcpNoDelay(true);
				listener.setReuseAddr(true);
				listener.setReusePort(true);
				if (listener.bind(8099) < 0)
				{
					return;
				}
				listener.listen();
			}
			while (1){
				netco::Socket* conn = new netco::Socket(listener.accept());
				conn->setTcpNoDelay(true);
				netco::co_go(
					[conn]
					{
						std::string hello("HTTP/1.0 200 OK\r\nServer: netco/0.1.0\r\nContent-Length: 72\r\nContent-Type: text/html\r\n\r\n<HTML><TITLE>hello</TITLE>\r\n<BODY><P>hello word!\r\n</BODY></HTML>\r\n");
						char buf[1024];
						if (conn->read((void*)buf, 1024) > 0)
						{
							conn->send(hello.c_str(), hello.size());
							netco::co_sleep(50);//需要等一下,否则还没发送完毕就关闭了
						}
						delete conn;
					}
					);
			}
		}
	);
}

//作为客户端的测试,可配合上述server测试
void client_test(){
	netco::co_go(
				[]
				{
					char buf[1024];
					while(1){
						netco::co_sleep(2000);
						netco::Socket s;
						s.connect("127.0.0.1", 8099);
						s.send("ping", 4);
						s.read(buf, 1024);
						std::cout << std::string(buf) << std::endl;
					}
				}
				);
}

//读写锁测试
void mutex_test(netco::RWMutex& mu){
	for(int i = 0; i < 10; ++i)
	if(i < 5){
		netco::co_go(
		[&mu, i]{
			mu.rlock();
			std::cout << i << " : start reading" << std::endl;
			netco::co_sleep(100);
			std::cout << i << " : finish reading" << std::endl;
			mu.runlock();
			mu.wlock();
			std::cout << i << " : start writing" << std::endl;
			netco::co_sleep(100);
			std::cout << i << " : finish writing" << std::endl;
			mu.wunlock();
		}
		);
	}else{
		netco::co_go(
		[&mu, i]{
			mu.wlock();
			std::cout << i << " : start writing" << std::endl;
			netco::co_sleep(100);
			std::cout << i << " : finish writing" << std::endl;
			mu.wunlock();
			mu.rlock();
			std::cout << i << " : start reading" << std::endl;
			netco::co_sleep(100);
			std::cout << i << " : finish reading" << std::endl;
			mu.runlock();
		}
	);
	}
	
}

int main()
{
	netco::RWMutex mu;
	mutex_test(mu);
	single_acceptor_server_test();
	client_test();
	netco::sche_join();
	std::cout << "end" << std::endl;
	return 0;
}

    测试环境:4核CPU3.70GHz,8G内存3200MHz,本机测试,服务端运行的single_acceptor_server_test()函数。
2020011423383535.png

五、后续

    bug fix;       
    另外有其它问题或bug可以联系390161495@qq.com。