最近学习了disruptor,于是自己实现了一个极轻量化的多生产者多消费者c++版本disruptor,并用这个版本优化了github上排第一的threadpool项目,效率得到了一定的提升,特别是执行函数相对比mutex锁所需时间较小时候。
源码地址:https://github.com/WoBuShiXiaoKang/DisruptorThreadPool/tree/master

1、disruptor概述

关于disruptor的详细介绍可以拜读Disruptor的论文,中文翻译见 https://www.cnblogs.com/daoqidelv/p/7043696.html ,下面给出总结,欢迎补充指正:
(1)Disruptor的核心目的是消除锁(mutex的开销很大)。做法是隔离生产者与消费者之间,各生产者之间,各消费者之间对缓存队列的操作(原始的单队列缓存,每个生产者或消费者对队列操作都需要加锁,因为push和pop都是对队列的写操作),具体做法是预先分配缓存队列的大小,各生产者与消费者只是获取其中的槽位下标,然后对自己的槽位进行写或者读操作,这样就不需要对整个队列读写操作过程进行加锁。
(2)减轻GC机制负担。队列每一个元素pop实际都delete了那个节点,对GC机制造成负担。

2、C++实现

(1)序号类Sequence和AtomicSequence:

       该类封装了int64_t和std::atomic_int64_t,进行内存补齐保证_seq在一个缓存行中,以防止false sharing(http://ifeve.com/falsesharing/):

#define CACHELINE_SIZE_BYTES 64
#define CACHELINE_PADDING_FOR_ATOMIC_INT64_SIZE (CACHELINE_SIZE_BYTES - sizeof(std::atomic_int64_t))
#define CACHELINE_PADDING_FOR_INT64_SIZE (CACHELINE_SIZE_BYTES - sizeof(int64_t))

namespace Kang {

	//对std::atomic_int64_t进行了封装,内存补齐保证_seq在一个缓存行中
	class AtomicSequence
	{
	public:
		AtomicSequence(int64_t num = 0L) : _seq(num) {};
		~AtomicSequence() {};
		AtomicSequence(const AtomicSequence&) = delete;
		AtomicSequence(const AtomicSequence&&) = delete;
		void operator=(const AtomicSequence&) = delete;

		void store(const int64_t val)//, std::memory_order _order = std::memory_order_seq_cst)
		{
			_seq.store(val);//,_order);
		}

		int64_t load()//std::memory_order _order = std::memory_order_seq_cst)
		{
			return _seq.load();// _order);
		}

		int64_t fetch_add(const int64_t increment)//, std::memory_order _order = std::memory_order_seq_cst)
		{
			return _seq.fetch_add(increment);// _order);
		}

	private:
		//两边都补齐,以保证_seq不会与其它变量共享一个缓存行
		char _frontPadding[CACHELINE_PADDING_FOR_ATOMIC_INT64_SIZE];
		std::atomic_int64_t _seq;
		char _backPadding[CACHELINE_PADDING_FOR_ATOMIC_INT64_SIZE];
	};

	//对int64_t进行了封装,内存补齐保证_seq在一个缓存行中
	class Sequence
	{
	public:
		Sequence(int64_t num = 0L) : _seq(num) {};
		~Sequence() {};
		Sequence(const Sequence&) = delete;
		Sequence(const Sequence&&) = delete;
		void operator=(const Sequence&) = delete;

		void store(const int64_t val)
		{
			_seq = val;
		}

		int64_t load()
		{
			return _seq;
		}

	private:
		//两边都补齐,以保证_seq不会与其它变量共享一个缓存行
		char _frontPadding[CACHELINE_PADDING_FOR_INT64_SIZE];
		int64_t _seq;
		char _backPadding[CACHELINE_PADDING_FOR_INT64_SIZE];
	};
}

(2)Disruptor类

       使用_ringbuf存放内容(实际为一个分配好空间的数组);

       每次有生产者要往buffer中写时,获取_writableSeq的值并将其+1。每次生产者写完buffer后,等待_lastWrote等于自身seq-1后更新_lastWrote为自身seq(这里保证了_lastWrote前面的内容肯定是生产者写过的内容)。

       每次有消费者要读取buffer中内容时,将最后一次派发给消费者的槽位_lastDispatch+1,然后获取其值。每次消费者读完buffer内容后,等待_lastRead等于自身seq-1后更新_lastRead为自身seq(这里保证了_lastRead前面的内容肯定是消费者读过的内容)。

       使用_stopWorking让disruptor类停止工作,修改为true后,不允许再向buffer中写内容,并且若buffer中已经没有内容时,有消费者来读只会获取一个无效的槽位下标-1。

namespace Kang {

	//环形buffer默认大小,为了简化取余操作,这里的长度需要是2的n次方
	constexpr size_t DefaultRingBufferSize = 262144;

	//该disruptor类提供对环形buffer的操作
	//写接口:WriteInBuf()
	//读接口:(1)GetReadableSeq()获取可读的环形buffer槽位下标
	//        (2)ReadFromBuf()获取可读的内容
	//        (3)读完后调用FinishReading()
	//注:读接口使用复杂,使用了BufConsumer类进行了封装(利用RAII)
	template<class ValueType, size_t N = DefaultRingBufferSize>
	class Disruptor
	{
	public:
		Disruptor() : _lastRead(-1L) , _lastWrote(-1L), _lastDispatch(-1L), _writableSeq(0L) , _stopWorking(0L){};
		~Disruptor() {};

		Disruptor(const Disruptor&) = delete;
		Disruptor(const Disruptor&&) = delete;
		void operator=(const Disruptor&) = delete;

		static_assert(((N > 0) && ((N& (~N + 1)) == N)),
			"RingBuffer's size must be a positive power of 2");

		//向buffer中写内容
		void WriteInBuf(ValueType&& val)
		{
			const int64_t writableSeq = _writableSeq.fetch_add(1);
			while (writableSeq - _lastRead.load() > N)
			{//等待策略
				if (_stopWorking.load())
					throw std::runtime_error("writting when stopped disruptor");
				//std::this_thread::yield();
			}
			//写操作
			_ringBuf[writableSeq & (N - 1)] = val;

			while (writableSeq - 1 != _lastWrote.load())
			{//等待策略
			}
			_lastWrote.store(writableSeq);
		};

		//向buffer中写内容
		void WriteInBuf(ValueType& val)
		{
			const int64_t writableSeq = _writableSeq.fetch_add(1);
			while (writableSeq - _lastRead.load() > N)
			{//等待策略
				if (_stopWorking.load())
					throw std::runtime_error("writting when stopped disruptor");
				//std::this_thread::yield();
			}
			//写操作
			_ringBuf[writableSeq & (N - 1)] = val;

			while (writableSeq - 1 != _lastWrote.load())
			{//等待策略
			}
			_lastWrote.store(writableSeq);
		};

		//获取可读的buffer下标
		const int64_t GetReadableSeq()
		{
			const int64_t readableSeq = _lastDispatch.fetch_add(1) + 1;
			while (readableSeq > _lastWrote.load())
			{//等待策略
				if (_stopWorking.load() && empty())
				{
					return -1L;
				}
			}
			return readableSeq;
		};

		//读取指定下标位置的buffer内容
		ValueType& ReadFromBuf(const int64_t readableSeq)
		{
			if (readableSeq < 0)
			{
				throw("error : incorrect seq for ring Buffer when ReadFromBuf(seq)!");
			}
			return _ringBuf[readableSeq & (N - 1)];
		}

		//读取完指定下标位置的buffer内容,交还下标位置使用权
		void FinishReading(const int64_t seq)
		{
			if (seq < 0)
			{
				return;
			}

			while (seq - 1 != _lastRead.load())
			{//等待策略
			}
			//_lastRead = seq;
			_lastRead.store(seq);
		};

		bool empty()
		{
			return _writableSeq.load() - _lastRead.load() == 1;
		}

		//通知disruptor停止工作,调用该函数后,若buffer已经全部处理完,那么获取可读下标时只会获取到-1L
		void stop()
		{
			//_stopWorking = true;
			_stopWorking.store(1L);
		}

	private:
		//最后一个已读内容位置
		Sequence _lastRead;

		//最后一个已写内容位置
		Sequence _lastWrote;

		//disruptor是否停止工作
		Sequence _stopWorking;

		//最后一个派发给消费者使用的槽位序号
		AtomicSequence _lastDispatch;

		//当前可写的槽位序号
		AtomicSequence _writableSeq;

		//环形buffer,为加快取余操作,N需要时2的n次幂
		std::array<ValueType, N> _ringBuf;
	};
}

(3)BufConsumer类

       由于Disruptor的读接口使用起来麻烦,利用RAII封装了一个BufConsumer类,实际就是调用Disruptor类的接口:构造的时候调用GetReadableSeq()获取可读序号,使用empty()判断是否有可读内容,使用GetContent()获取buffer中的内容,析构的时候调用FinishReading()交还disruptor中ringbuffer的槽位使用权。

namespace Kang {

	//利用RAII封装Disruptor的读操作:
	//构造的时候调用GetReadableSeq()获取可读序号
	//使用empty()判断是否有可读内容
	//使用GetContent()获取buffer中的内容
	//析构的时候调用FinishReading()交还disruptor中ringbuffer的槽位使用权
	//
	//使用实例:
	//	std::function<void()> task;
	//	{
	//     BufConsumer<std::function<void()>> consumer(this->_tasks);
	//     if (consumer.empty())
	//     {
	//	     return;
	//     }
	//     task = std::move(consumer.GetContent());
	//	}
	//	task();
	template<class ValueType>
	class BufConsumer
	{
	public:

		BufConsumer(Disruptor<ValueType>* disruptor) : _disruptor(disruptor), _seq(-1L) {
			_seq = _disruptor->GetReadableSeq();
		};

		~BufConsumer()
		{
			_disruptor->FinishReading(_seq);
		};

		BufConsumer(const BufConsumer&) = delete;
		BufConsumer(const BufConsumer&&) = delete;
		void operator=(const BufConsumer&) = delete;

		bool empty()
		{
			return _seq < 0;
		}

		ValueType& GetContent()
		{
			return _disruptor->ReadFromBuf(_seq);
		}

	private:
		Disruptor<ValueType>* _disruptor;
		int64_t _seq;
	};

}

3、应用于线程池

       对Github第一的线程池项目https://github.com/progschj/ThreadPool 使用disruptor进行了优化:

namespace Kang {

	class ThreadPool {
	public:
		ThreadPool(size_t);
		template<class F, class... Args>
		auto enqueue(F&& f, Args&&... args)
			->std::future<typename std::result_of<F(Args...)>::type>;
		~ThreadPool();
	private:
		// need to keep track of threads so we can join them
		std::vector< std::thread > _workers;

		//Disruptor
		Disruptor<std::function<void()>> *_tasks;

	};

	// the constructor just launches some amount of workers
	inline ThreadPool::ThreadPool(size_t threads)
	{
		_tasks = new Disruptor<std::function<void()>>();
		for (size_t i = 0; i < threads; ++i)
		{
			_workers.emplace_back(
				[this]
				{
					for (;;)
					{
						std::function<void()> task;

						{
							BufConsumer<std::function<void()>> consumer(this->_tasks);
							if (consumer.empty())
							{
								return;
							}
							task = std::move(consumer.GetContent());
						}

						task();
					}
				}
				);
		}
	}

	// add new work item to the pool
	template<class F, class... Args>
	auto ThreadPool::enqueue(F&& f, Args&&... args)
		-> std::future<typename std::result_of<F(Args...)>::type>
	{
		using return_type = typename std::result_of<F(Args...)>::type;

		auto task = std::make_shared< std::packaged_task<return_type()> >(
			std::bind(std::forward<F>(f), std::forward<Args>(args)...)
			);

		std::future<return_type> res = task->get_future();

		_tasks->WriteInBuf(std::move([task]() { (*task)(); }));
		return res;
	}

	// the destructor joins all threads
	inline ThreadPool::~ThreadPool()
	{
		_tasks->stop();
		for (std::thread& worker : _workers)
			worker.join();
		delete _tasks;
	}

}

 在此之前,我对github这个线程池项目进行过双缓存队列改造,所以在这里一起进行对比,打印两个时间:
1.写操作入队时间.
2.所有任务执行完时间.
Disruptor改造的线程池的写操作入队时间主要受限于ringbuffer的大小(写操作一般很快,buffer不够大需要等待前面的读完才有空位写),所以测试都修改为ringbuffer很大,这里PO一个3生产者-3消费者的测试结果,第一个为双缓存队列改造的线程池,第二个为github上面的原始线程池,第三个为disruptor改造的线程池:
20191123112725453.png
       可见效率提高了不少。

       测试代码如下,使用的gtest框架:

#define THREADPOOL_SIZE 3
#define PRODUCER_SIZE 3
#define FUNC_TIMES 2000
#define ENQUEUE_NUMS 60000

int sum(int n)
{
	int ret = 0;
	for (int i = 1; i < n; ++i)
	{
		ret += i;
	}
	return ret;
}

void MyKangWrite(Kang::ThreadPool* threadPool)
{
	for (int i = 0; i < ENQUEUE_NUMS; ++i)
		threadPool->enqueue(sum, FUNC_TIMES);
}

void MyCCWrite(CCThreadPool* threadPool)
{
	for (int i = 0; i < ENQUEUE_NUMS; ++i)
		threadPool->enqueue(sum, FUNC_TIMES);
}

void MyGitWrite(ThreadPool* threadPool)
{
	for (int i = 0; i < ENQUEUE_NUMS; ++i)
		threadPool->enqueue(sum, FUNC_TIMES);
}

TEST(ThreadPoolTest, Dul_QueueThreadPool) {

	clock_t enstart, enfinish, finish;
	double   duration;
	{
		CCThreadPool threadPool(THREADPOOL_SIZE);
		std::vector< std::thread > writers;

		enstart = clock();

		for (size_t i = 0; i < PRODUCER_SIZE; ++i)
		{
			writers.emplace_back(std::thread(MyCCWrite, &threadPool));
		}
		for (size_t i = 0; i < PRODUCER_SIZE; ++i)
		{
			writers[i].join();
		}

		enfinish = clock();
		duration = ((double)(enfinish - enstart) / CLOCKS_PER_SEC) * 1000;
		printf("Dul-Queue Threadpool enqueue %f ms!\n", duration);
	}
	finish = clock();
	duration = ((double)(finish - enstart) / CLOCKS_PER_SEC) * 1000;
	printf("Dul-Queue Threadpool Run %f ms!\n", duration);
}

TEST(ThreadPoolTest, GithubThreadPool) {

	clock_t enstart, enfinish, finish;
	double   duration;
	{
		ThreadPool threadPool(THREADPOOL_SIZE);
		std::vector< std::thread > writers;

		enstart = clock();

		for (size_t i = 0; i < PRODUCER_SIZE; ++i)
		{
			writers.emplace_back(std::thread(MyGitWrite, &threadPool));
		}
		for (size_t i = 0; i < PRODUCER_SIZE; ++i)
		{
			writers[i].join();
		}

		enfinish = clock();
		duration = ((double)(enfinish - enstart) / CLOCKS_PER_SEC) * 1000;
		printf("Github Threadpool Multi enqueue %f ms!\n", duration);
	}
	finish = clock();
	duration = ((double)(finish - enstart) / CLOCKS_PER_SEC) * 1000;
	printf("Github Threadpool Multi Run %f ms!\n", duration);
}

TEST(ThreadPoolTest, DisruptorThreadPool) {

	clock_t enstart, enfinish, finish;
	double   duration;
	{
		Kang::ThreadPool threadPool(THREADPOOL_SIZE);
		std::vector< std::thread > writers;

		enstart = clock();

		for (size_t i = 0; i < PRODUCER_SIZE; ++i)
		{
			writers.emplace_back(std::thread(MyKangWrite, &threadPool));
		}
		for (size_t i = 0; i < PRODUCER_SIZE; ++i)
		{
			writers[i].join();
		}

		enfinish = clock();
		duration = ((double)(enfinish - enstart) / CLOCKS_PER_SEC) * 1000;
		printf("Disruptor Threadpool Multi enqueue %f ms!\n", duration);
	}
	finish = clock();
	duration = ((double)(finish - enstart) / CLOCKS_PER_SEC) * 1000;
	printf("Disruptor Threadpool Multi Run %f ms!\n", duration);
}

4、不足

(1)没有写等待策略类,写得很轻量,所有需要等待的地方都是自旋的,这个很快,但是对CPU负担也不小,所以这里不适合消费者和生产者很多的情况,很多的话需要用yield或者bloking wait。

(2)自旋的等待有些实际是可以优化掉的,比如像线程池这种确定了消费者数量的情况,可以创建一个数组,专门存放该消费者lastRead的下标,然后通过遍历这个数组来更新_lastRead,就不需要每个线程等待别的线程读到自己的上一个位子再更新_lastRead了。生产者数量确定的话也同理。

(3)单生产者单消费者连原子变量都不需要了,不过这个也不算不足。