User Tools

Site Tools


Differences

This shows you the differences between two versions of the page.

Link to this comparison view

cpp:thread-pool [2019-05-28 15:26]
cpp:thread-pool [2020-02-15 00:57] (current)
Line 1: Line 1:
 +=====Thread pool sample=====
 +
 +I wanted wrote very compact/​light thread pool with queue. ​ =)
 +
 +Total CPU utilization is around 95%.
 +
 +<​file>​
 +# top -H | grep qu
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​07 ​ 40.38% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 16.16% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 15.97% queue6{queue6}
 +51482 user        76    0 13088K ​ 3812K CPU1    1   ​0:​03 ​ 15.97% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 15.77% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 15.58% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 15.38% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 15.38% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 15.19% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 14.99% queue6{queue6}
 +51482 user        52    0 13088K ​ 3812K umtxn   ​2 ​  ​0:​03 ​ 14.79% queue6{queue6}
 +</​file> ​
 +
 +<code c++ queue6.cpp>​
 +/*
 + *
 + * Copyright 2004-2019 Oleg Borodin ​ <​borodin@unix7.org>​
 + *
 + * This program is free software; you can redistribute it and/or modify
 + * it under the terms of the GNU General Public License as published by
 + * the Free Software Foundation; either version 2 of the License, or
 + * (at your option) any later version.
 + *
 + * This program is distributed in the hope that it will be useful,
 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. ​ See the
 + * GNU General Public License for more details.
 + *
 + * You should have received a copy of the GNU General Public License
 + * along with this program; if not, write to the Free Software
 + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
 + * MA 02110-1301, USA.
 + *
 + */
 +
 +#include <​iostream>​
 +#include <​functional>​
 +#include <​condition_variable>​
 +#include <​queue>​
 +#include <​thread>​
 +#include <​mutex>​
 +#include <​chrono>​
 +
 +template<​typename T>
 +class executor {
 +    private:
 +        std::​queue<​T>​ queue;
 +        std::​condition_variable cv;
 +        std::mutex mutex;
 +        std::thread threads[10];​
 +    public:
 +        void enqueue(T e) {
 +            auto func = [&]{
 +                std::​unique_lock<​std::​mutex>​ lock(mutex);​
 +                queue.push(e);​
 +            };
 +            func();
 +            cv.notify_all();​
 +        }
 +
 +        executor() {
 +            auto worker = [&](int n){
 +                while(true) {
 +                    std::​unique_lock<​std::​mutex>​ lock(mutex);​
 +                    cv.wait(lock,​ [&]{ return !queue.empty();​ });
 +                    while(!queue.empty()) {
 +                        auto req = queue.front();​
 +                        queue.pop();​
 +                        cv.notify_one();​
 +                        req.function(req.arg);​
 +                    }
 +                }
 +            };
 +            for (int i = 0; i < 10; i++) {
 +                threads[i] = std::​thread(worker,​ i);
 +                threads[i].detach();​
 +            }
 +        }
 +
 +};
 +
 +template<​typename F, typename A>
 +class task {
 +    public:
 +        F function;
 +        A arg;
 +        task(F function, A arg)
 +            : function(function),​ arg(arg) {
 +        };
 +};
 +
 +int main(int argc, char **argv) {
 +
 +    using request = task<​std::​function<​void(std::​string)>,​ std::​string>;​
 +    executor<​request>​ executor;
 +
 +    auto func = [](std::​string str) {
 +        std::cout << str << " world" << std::endl;
 +    };
 +
 +    for (int i = 0; i < 2000000; i++) {
 +        request r(func, "​hello"​);​
 +        executor.enqueue(r);​
 +    }
 +    while(true) {
 +        std::​this_thread::​sleep_for(std::​chrono::​milliseconds(1000));​
 +    }
 +
 +    return 0;
 +}
 +</​code>​