User Tools

Site Tools


standalone Asio thread pool server

Based on C++03 Server3

I rewrote it sample with C++11 lambda functions and std::, exlcude any boost::

My regsrds Asio author, Christopher M. Kohlhoff.

main.cpp

main.cpp
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <iostream>
#include <string>
#include <asio.hpp>
 
#include "server.hpp"
 
int main(int argc, char* argv[]) {
    try {
        std::size_t num_threads = 10;
        server3::server s("0.0.0.0", "1026", num_threads);
        s.run();
    } catch (std::exception& e) {
        std::cerr << "exception: " << e.what() << "\n";
    }
    return 0;
}

server.cpp

server.cpp
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <vector>
#include <thread>
#include <functional>
 
#include "server.hpp"
 
namespace server3 {
 
server::server(const std::string& address, const std::string& port, std::size_t thread_pool_size)
    : thread_pool_size(thread_pool_size),
      signals(io_context),
      acceptor(io_context),
      new_connection()
    {
    signals.add(SIGINT);
    signals.add(SIGTERM);
    //signals.add(SIGQUIT);
    signals.async_wait(
        [this](std::error_code ec, int signo){
            io_context.stop();
        }
    );
 
    asio::ip::tcp::resolver resolver(io_context);
    asio::ip::tcp::endpoint endpoint = *resolver.resolve(address, port).begin();
    acceptor.open(endpoint.protocol());
    acceptor.set_option(asio::ip::tcp::acceptor::reuse_address(true));
    acceptor.bind(endpoint);
    acceptor.listen();
 
    start_accept();
}
 
void server::run() {
    std::vector<std::shared_ptr<asio::thread>> threads;
 
    for (std::size_t i = 0; i < thread_pool_size; ++i) {
 
        auto f = [this]{ io_context.run(); };
        auto t = new asio::thread(std::move(f));
        std::shared_ptr<asio::thread> thread(std::move(t));
 
        threads.push_back(thread);
    }
    for (std::size_t i = 0; i < threads.size(); ++i) {
        threads[i]->join();
    }
}
 
void server::start_accept() {
    new_connection.reset(new connection(io_context));
    auto accept_handler = [this](asio::error_code err) {
        if (!err) {
            new_connection->start();
        }
        start_accept();
    };
    acceptor.async_accept(new_connection->get_socket(), accept_handler);
}
 
} // namespace server3

server.hpp

server.hpp
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#ifndef HTTP_SERVER3_SERVER_HPP
#define HTTP_SERVER3_SERVER_HPP
 
#include <asio.hpp>
#include <string>
#include <vector>
 
#include "connection.hpp"
 
namespace server3 {
 
class server {
  private:
    std::size_t thread_pool_size;
    asio::io_context io_context;
    asio::signal_set signals;
    asio::ip::tcp::acceptor acceptor;
    std::shared_ptr<connection> new_connection;
 
    void start_accept();
 
  public:
    explicit server(const std::string& address, const std::string& port, std::size_t thread_pool_size);
    void run();
};
 
} // namespace server3
 
#endif // HTTP_SERVER3_SERVER_HPP

connection.cpp

connection.cpp
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <iostream>
#include <sstream>
#include <iomanip>
#include <memory>
#include <asio.hpp>
#include "connection.hpp"
 
namespace server3 {
 
connection::connection(asio::io_context& io_context) : strand(io_context), socket(io_context) {
}
 
asio::ip::tcp::socket& connection::get_socket() {
    return socket;
}
 
void connection::start() {
    do_read();
}
 
void connection::do_read() {
    try {
        auto self(shared_from_this());
        auto read_handler = [this, self](std::error_code err, std::size_t size) {
            if (err) {
                socket.close();
                return;
            }
            asio::ip::tcp::endpoint remote = socket.remote_endpoint();
            asio::ip::address remote_address = remote.address();
            //std::cerr << remote_address.to_string() << std::endl;
            do_write(err, size);
        };
        std::string delimiter = "\r\n\r\n";
        auto buffer = asio::dynamic_buffer(input_buffer);
        asio::async_read_until(socket, std::move(buffer), delimiter,
            asio::bind_executor(strand, read_handler));
    } catch (std::exception& e) {
        std::cerr << "# do_read exception: " << e.what() << "\n";
    }
}
 
std::string timestamp() {
    std::stringstream ss;
    std::time_t now = std::time(0);
    ss << std::put_time(std::gmtime(&now), "%a, %d %b %Y %T %Z");
    return ss.str();
}
 
void connection::do_write(const std::error_code& err, std::size_t bytes_transferred) {
    try {
        if (!err) {
            std::stringstream response;
            response << "HTTP/1.1 200 OK\r\n";
            response << "Date: " << timestamp() << "\r\n";
            response << "Server: Srv3/0.1\r\n";
            response << "Content-Length: 7\r\n";
            response << "Content-Type: text/plain\r\n";
            response << "Connection: close\r\n";
            response << "\r\n";
            response << "hello!\n";
 
            auto self(shared_from_this());
            auto write_handler = [this, self](const asio::error_code ec, std::size_t size) {
                socket.shutdown(asio::ip::tcp::socket::shutdown_both);
                socket.close();
            };
            asio::async_write(socket, asio::buffer(response.str()),
                asio::bind_executor(strand, write_handler));
        }
    } catch (std::exception& e) {
        std::cerr << "# do_write exception: " << e.what() << "\n";
    }
}
 
} // namespace server3

connection.hpp

connection.hpp
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#ifndef HTTP_SERVER3_CONNECTION_HPP
#define HTTP_SERVER3_CONNECTION_HPP
 
#include <memory>
 
#include <asio.hpp>
 
namespace server3 {
 
class connection : public std::enable_shared_from_this<connection> {
    private:
        asio::io_context::strand strand;
        asio::ip::tcp::socket socket;
        std::string input_buffer;
 
        void do_read();
        void do_write(const asio::error_code& err, std::size_t bytes_transferred);
 
    public:
        explicit connection(asio::io_context& io_context);
        asio::ip::tcp::socket& get_socket();
        void start();
};
 
} // namespace server3
 
#endif // HTTP_SERVER3_CONNECTION_HPP

Result

Stress test

  • with 30 preallocated threads
  • Freebsd x64, Intel I5 2500Mhz, 2 real CPU
  • compiled Clang, -O2
$ ab -c 9999 -n500000 'http://localhost:1026/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Finished 500000 requests

Server Software:        Srv3/0.1
Server Hostname:        localhost
Server Port:            1026

Document Path:          /
Document Length:        7 bytes

Concurrency Level:      9999
Time taken for tests:   35.566 seconds
Complete requests:      500000
Failed requests:        0
Total transferred:      72500000 bytes
HTML transferred:       3500000 bytes
Requests per second:    14058.23 [#/sec] (mean)
Time per request:       711.256 [ms] (mean)
Time per request:       0.071 [ms] (mean, across all concurrent requests)
Transfer rate:          1990.67 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0  374 454.9    320    6440
Processing:   111  324  65.1    336    1050
Waiting:        0  247  87.5    258     758
Total:        228  698 457.1    643    6784

Percentage of the requests served within a certain time (ms)
  50%    643
  66%    702
  75%    719
  80%    729
  90%    789
  95%    821
  98%   3343
  99%   3550
 100%   6784 (longest request)

Memory usage

PID USERNAME     THR PRI NICE   SIZE    RES STATE   C   TIME    WCPU COMMAND
35520 www         31  23    0   197M   175M uwait   0   1:33 155.12% srv3
36598 www          1  94    0   189M   138M CPU0    0   0:16  85.51% ab