下面介绍一个简单的任务队列,。
在实现任务队列前需要定义一个接口与一个工具类
- 任务接口:子类实现接口的run方法来处理具体任务。
- 自旋锁类:用于保护任务队列的并发访问(用C++11原子操作实现)。
任务接口源码如下
//任务接口class WorkItem{public: //接口方法必须在子类实现 virtual void run() = 0;public: //任务清理接口 virtual void clean() { } //判断任务是否可执行(返回真时任务才会执行) virtual bool runnable() { return true; }};
自旋锁源码如下
//自旋锁类class SpinMutex{private: atomic_flag flag = ATOMIC_FLAG_INIT;public: void lock() { while (flag.test_and_set(memory_order_acquire)); } void unlock() { flag.clear(std::memory_order_release); }};
任务队列源码如下
//任务队列class AsyncExecQueue{private: size_t maxsz; size_t threads; mutable SpinMutex mtx; std::queue> que; AsyncExecQueue() { this->maxsz = 0; } bool pop(shared_ptr & item) { std::lock_guard lk(mtx); if (que.empty()) return false; item = que.front(); que.pop(); return true; }public: //实现单例模式 static AsyncExecQueue* Instance() { static AsyncExecQueue obj; return &obj; }public: //中止任务处理 void stop() { threads = 0; } //清空队列 void clear() { std::lock_guard lk(mtx); while (que.size() > 0) que.pop(); } //判断队列是否为空 bool empty() const { std::lock_guard lk(mtx); return que.empty(); } //获取队列深度 size_t size() const { std::lock_guard lk(mtx); return que.size(); } //获取任务线程线 size_t getThreads() const { return threads; } //任务对象入队 bool push(shared_ptr item) { std::lock_guard lk(mtx); if (maxsz > 0 && que.size() >= maxsz) return false; que.push(item); return true; } //启动任务队列(启动处理线程) void start(size_t threads = 4, size_t maxsz = 10000) { this->threads = threads; this->maxsz = maxsz; for (size_t i = 0; i < threads; i++) { std::thread(std::bind(&AsyncExecQueue::run, this)).detach(); } } public: //这个方法里面处理具体任务 void run() { shared_ptr item; while (threads > 0) { if (pop(item)) { if (item->runnable()) { item->run(); item->clean(); } else { std::lock_guard lk(mtx); que.push(item); } } else { std::chrono::milliseconds dura(1); std::this_thread::sleep_for(dura); } } }};
下面给出任务队列的测试代码
//实现一个任务接口class Task : public WorkItem{ string name;public: //这个方法里面实现具体任务 void run() { cout << "异步处理任务[" << name << "]..." << endl; }public: Task(const string& name) { this->name = name; }};int main(int argc, char** argv){ //使用智能指针 shared_ptrA = make_shared ("A"); shared_ptr B = make_shared ("B"); shared_ptr C = make_shared ("C"); //启动任务队列 AsyncExecQueue::Instance()->start(); //将任务放入任务队列 AsyncExecQueue::Instance()->push(A); AsyncExecQueue::Instance()->push(B); AsyncExecQueue::Instance()->push(C); //等待所有任务执行完毕 while (AsyncExecQueue::Instance()->size() > 0) { std::chrono::milliseconds dura(1); std::this_thread::sleep_for(dura); } return 0;}
编译执行以下代码输出如下
异步处理任务[A]...异步处理任务[B]...异步处理任务[C]...