User Tools

Site Tools


Thread pool sample

I wanted wrote very compact/light thread pool with queue. =)

Total CPU utilization is around 95%.

# top -H | grep qu
51482 user        52    0 13088K  3812K umtxn   2   0:07  40.38% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  16.16% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  15.97% queue6{queue6}
51482 user        76    0 13088K  3812K CPU1    1   0:03  15.97% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  15.77% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  15.58% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  15.38% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  15.38% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  15.19% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  14.99% queue6{queue6}
51482 user        52    0 13088K  3812K umtxn   2   0:03  14.79% queue6{queue6}
queue6.cpp
/*
 *
 * Author, Copyright: Oleg Borodin <onborodin@gmail.com>
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with this program; if not, write to the Free Software
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 * MA 02110-1301, USA.
 *
 */
 
#include <iostream>
#include <functional>
#include <condition_variable>
#include <queue>
#include <thread>
#include <mutex>
#include <chrono>
 
template<typename T>
class executor {
    private:
        std::queue<T> queue;
        std::condition_variable cv;
        std::mutex mutex;
        std::thread threads[10];
    public:
        void enqueue(T e) {
            auto func = [&]{
                std::unique_lock<std::mutex> lock(mutex);
                queue.push(e);
            };
            func();
            cv.notify_all();
        }
 
        executor() {
            auto worker = [&](int n){
                while(true) {
                    std::unique_lock<std::mutex> lock(mutex);
                    cv.wait(lock, [&]{ return !queue.empty(); });
                    while(!queue.empty()) {
                        auto req = queue.front();
                        queue.pop();
                        cv.notify_one();
                        req.function(req.arg);
                    }
                }
            };
            for (int i = 0; i < 10; i++) {
                threads[i] = std::thread(worker, i);
                threads[i].detach();
            }
        }
 
};
 
template<typename F, typename A>
class task {
    public:
        F function;
        A arg;
        task(F function, A arg)
            : function(function), arg(arg) {
        };
};
 
int main(int argc, char **argv) {
 
    using request = task<std::function<void(std::string)>, std::string>;
    executor<request> executor;
 
    auto func = [](std::string str) {
        std::cout << str << " world" << std::endl;
    };
 
    for (int i = 0; i < 2000000; i++) {
        request r(func, "hello");
        executor.enqueue(r);
    }
    while(true) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    }
 
    return 0;
}