User Tools

Site Tools


Thread-pool web (or no-web) server, pure C++, variant with bloked socket

thrsrv-bl.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <iostream>
#include <vector>
#include <queue>
#include <chrono>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <memory>
#include <stdexcept>
#include <iomanip>
#include <csignal>
 
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <unistd.h>
 
namespace srv {
 
class stream {
  private:
    int sock;
  public:
    stream(int sock) : sock(sock) {
    }
    std::shared_ptr<std::string> read(const int size) {
        auto str = std::make_shared<std::string>(size, 0);
        ssize_t rv = ::read(sock, (void*)str->c_str(), size);
        if (rv < size) { str->resize(rv + 1); }
        return str;
    }
    int write(std::string str) {
        return ::write(sock, str.c_str(), str.length());
    }
    void close() {
        ::shutdown(sock, SHUT_RDWR);
        ::close(sock);
    }
};
 
 
class listener {
  private:
    int sock;
  public:
    listener(int port, int backlog) {
        if((sock = ::socket(PF_INET, SOCK_STREAM, 0)) < 0) {
            throw std::runtime_error("cannot create socket");
        }
        int optval;
        optval = 1;
        if (::setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
            throw std::runtime_error("cannot set socket option");
        }
 
        struct timeval tv;
        tv.tv_sec = 10;
        tv.tv_usec = 0;
        if (::setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
            throw std::runtime_error("cannot set socket option");
        }
 
        struct sockaddr addr;
        struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
        paddr->sin_family = AF_INET;
        paddr->sin_addr.s_addr = INADDR_ANY;
        paddr->sin_port = htons(port);
        paddr->sin_len = sizeof(sockaddr_in);
 
        if (::bind(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
            throw std::runtime_error("cannot bind socket");
        }
        if (::listen(sock, backlog) < 0) {
            throw std::runtime_error("cannot listen socket");
        }
    }
 
    std::shared_ptr<srv::stream> accept() {
        int newsock;
        if ((newsock = ::accept(sock, NULL, 0)) != -1) {
            return std::make_shared<srv::stream>(newsock);
        }
        return nullptr;
    }
};
 
 
class queue {
  private:
    std::mutex mutex;
    std::queue<std::shared_ptr<srv::stream>> squeue;
  public:
    void push(std::shared_ptr<srv::stream> item) {
        std::unique_lock<std::mutex> lock(mutex);
        squeue.push(item);
    }
    std::shared_ptr<srv::stream> front() {
        std::unique_lock<std::mutex> lock(mutex);
        if (squeue.size() > 0) {
 
            auto result = squeue.front();
            squeue.pop();
            lock.unlock();
            return result;
        }
        return nullptr;
    }
    int size() {
        std::unique_lock<std::mutex> lock(mutex);
        return squeue.size();
    }
    bool empty() {
        std::unique_lock<std::mutex> lock(mutex);
        return squeue.empty();
    }
};
 
class worker {
  private:
    std::shared_ptr<srv::queue> queue;
    std::condition_variable cond;
    std::mutex mutex;
  public:
    worker(std::shared_ptr<srv::queue> q) : queue(q) {
        std::thread thread(&srv::worker::handler, this);
        thread.detach();
    }
    void handler() {
        while (true) {
            std::unique_lock<std::mutex> lock(mutex);
            cond.wait(lock, [this]{
                return !queue->empty();
            });
            auto stream = queue->front();
            if (stream == nullptr) continue;
 
            auto req = stream->read(1024);
            if (req->length() > 0) {
                std::string content =
                    "<html>\r\n"
                    "<head><title>Hello</title></head>\r\n"
                    "<body>\r\n"
                    "<center><h1>Hello, World!</h1></center>\r\n"
                    "<hr><center>Srv11/0.1</center>\r\n"
                    "</body>\r\n"
                    "</html>\r\n";
                std::string header =
                    "HTTP/1.1 200 OK\r\n"
                    "Accept-Ranges: bytes\r\n"
                    "Content-Type: text/html\r\n"
                    "Content-Length: 142\r\n"
                    "Conection: close\r\n"
                    "Server: Srv11/0.1\r\n"
                    "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"
                    "\r\n";
                stream->write(header);
                stream->write(content);
            }
            stream->close();
            lock.unlock();
        }
    };
 
    void notify() {
        std::unique_lock<std::mutex> lock(mutex);
        cond.notify_one();
    }
};
 
class pool {
  private:
    std::shared_ptr<srv::queue> queue;
    std::vector<std::shared_ptr<srv::worker>> workers;
  public:
    pool(int size) {
        queue = std::make_shared<srv::queue>();
        for (int i = 0; i < size; i++) {
            auto w = std::make_shared<srv::worker>(queue);
            workers.push_back(w);
        }
    }
    void enqueu(std::shared_ptr<srv::stream> item) {
        queue->push(item);
        for (auto& w: workers) { w->notify(); }
    }
};
 
 
class server {
  private:
    srv::listener& listener;
    srv::pool& pool;
  public:
    server(srv::listener& a, srv::pool& p) : listener(a), pool(p) {}
    void run() {
        while (true) {
            std::shared_ptr<srv::stream> s;
            if((s = listener.accept()) != nullptr) {
                pool.enqueu(std::move(s));
            }
        }
    }
};
 
} // namespace srv
 
int main(int argc, char **argv) {
    try {
        std::signal(SIGPIPE, SIG_IGN);
 
        srv::listener listener(1024, 1024*4);
        srv::pool pool(30);
        srv::server server(listener, pool);
        server.run();
    } catch (std::exception& e) {
        std::cerr << "#exception: " << e.what() << ", exit." << std::endl;
        return 1;
    }
    return 0;
}

Benchmark

$ ab -s 20 -c 2000 -n100000 'http://127.0.0.1:1024/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        Srv11/0.1
Server Hostname:        127.0.0.1
Server Port:            1024

Document Path:          /
Document Length:        142 bytes

Concurrency Level:      2000
Time taken for tests:   11.675 seconds
Complete requests:      100000
Failed requests:        0
Total transferred:      30300000 bytes
HTML transferred:       14200000 bytes
Requests per second:    8565.30 [#/sec] (mean)
Time per request:       233.500 [ms] (mean)
Time per request:       0.117 [ms] (mean, across all concurrent requests)
Transfer rate:          2534.46 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    4  24.1      0     200
Processing:    69  227  35.5    212     447
Waiting:        0  227  36.4    212     447
Total:         69  231  42.0    212     530

Percentage of the requests served within a certain time (ms)
  50%    212
  66%    214
  75%    233
  80%    253
  90%    287
  95%    291
  98%    379
  99%    441
 100%    530 (longest request)

vs Nginx

on the same system

$ ab -s 20 -c 2000 -n100000 'http://127.0.0.1:80/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
apr_socket_recv: Connection reset by peer (54)
Total of 1 requests completed

$ ab -s 20 -c 1000 -n100000 'http://127.0.0.1:80/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        nginx/1.14.2
Server Hostname:        127.0.0.1
Server Port:            80

Document Path:          /
Document Length:        185 bytes

Concurrency Level:      1000
Time taken for tests:   3.765 seconds
Complete requests:      100000
Failed requests:        0
Non-2xx responses:      100000
Total transferred:      37700000 bytes
HTML transferred:       18500000 bytes
Requests per second:    26557.00 [#/sec] (mean)
Time per request:       37.655 [ms] (mean)
Time per request:       0.038 [ms] (mean, across all concurrent requests)
Transfer rate:          9777.33 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0   14 150.9      5    3013
Processing:     5   15  24.0     11     658
Waiting:        0   13  22.2     10     467
Total:          7   29 152.9     15    3430

Percentage of the requests served within a certain time (ms)
  50%     15
  66%     19
  75%     20
  80%     21
  90%     25
  95%     56
  98%     75
  99%    237
 100%   3430 (longest request)