iOS/Mac 平台下 apple 提供了非常好用的 dispatch_queue
能够很方便的进行线程的管理以及各个线程之间的切换(当然还有很多其他特性)。虽说 C++ 的标准库中提供了很多线程管理的方法,但相比于 dispatch_queue
还是弱爆了。由于项目中会经常用到,GitHub
上找了一些类似的实现都不太理想,于是自己实现了一版简单的主要支持一下特性:
- 支持并发调用,并支持并发的任务处理
- 支持任务的同步执行和异步执行
- 可创建指定数量的任务线程
- 执行同步任务时,在任务线程中可继续执行同步任务而不卡死
- 同一任务可重复执行
- 支持 lambda 表达式
- 支持任务线程的安全退出
结构设计
上图是 DispatchQueue
类结构图,结构比较简单,DispatchQueue
是核心抽象类;QueueTask
就是任务的抽象基类了,ClosureTask
类是一个模板类,主要用来实现 lambda 表达式;DisruptorImp
与 MutexQueueImp
则是两个具体的 DispatchQueue
的实现,disruptor
包是一个第三方库,后面会有详细介绍,。
DispatchQueue 抽象类的定义
以下是 DispatchQueue
接口的定义和实现:
|
|
DispatchQueue
是基于建造者模式进行设计,将同步方法、异步方法对 lambda 表达式的支持,放在父类中实现,因为这部分代码相对固定,而具体的同步与异步的实现都推迟到子类中实现,因为这部分可以有不同的实现方式。抽象的父类相对简单,只实现了对 lambda 的支持,在来看看 QueueTask
的类设计如下:
|
|
virtual void run() = 0
是纯虚函数用来给子类重载,signal()/wait()
是两个具体方法用来支持任务的同步执行,在生产线程中执行 wait()
方法用来等待当前任务执行完成,而在任务线程中 QueueTask
执行完成后会调用 signal
来通知生产线程完成等待。reset
方法可以让该任务执行完成后重置内部状态,以便可继续将当前任务添加都队列中。ClosureTask
模板类则用来包装 lambda 表达式。
基于 std::mutex/std::queue 的实现
基于标准库实现的思路很简单,使用标准库中提供的 std::mutex
和 std::queue
,在进行插入任务和执行任务时,对任务队列进行加锁操作,这里使用递归锁 std::recursive_mutex
而非 std::mutex
。具体实现代码如下:
|
|
该类除了实现父类的sync_imp/async_imp
方法外,还有用来创建线程的 create_thread
方法,该方法每调用一次可以产生一个新的任务线程;在类的析构方法中会停止当前所有线程,并等待线程的安全退出。
在 MutexQueueImp
实现中,是典型的 生产者-消费者
线程模型,在 async_imp
方法中将任务插入到队列中,并通知任务线程;任务线程接收到任务加入队列的信号后,循环的从任务队列中取出任务,当所有任务处理完成,进入到休眠模式,直到下一个任务加入队列。
在 sync_imp
方法中会判断当前线程是否为任务线程,如果是任务线程,并且只有一个任务线程时,则直接执行任务,以免造成当前线程等待自己的情况,以免造成死锁。
基于 disruptor 的实现
Disruptor
最初是在 Java 上被发明的,这里使用它的 C++ 实现版本原理和 Java 版本是一致的。但由于 Disruptor
是基于 发布者-订阅者
的分发模型,所以当一个任务来到时,所以等待的线程,都将被唤醒,该任务可能被多个线程同时执行,在当前我们的 Dispatch Queue
中是不被允许的,所以只有在单线程时,才会使用 Disruptor
来作为 Dispatch Queue
的实现,以确保高效和正确性。
C++ 版本的阻塞等待工具类 BlockingStrategy 的实现有错误,无法唤醒,所以不要使用 BlockingStrategy 作为等待策略。
以下是 disruptor
实现的主要代码:
|
|
DisruptorImp
类的结构与 MutexQueueImp
基本一致,只有换成了 Disruptor
实现,至于对 Disruptor
的使用可参看文章后面的链接即可。最后 DispatchQueue
接口对象的创建使用一下方法来创建:
|
|
总结
DispatchQueue
可以说满足了基本的线程管理的需要,配合 lambda 表达式,使用起来也非常方便了。当然你也把它当做线程池来使用。后续如果有需要可以方便的扩展其他特性例如:延迟执行、任务之间的依赖关系等,完整的代码可在我的 GitHub 上找到。