User Tools

Site Tools


Threadpool web server with blocked IO

thrsrv-bl.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
 
#include <pthread.h>
#include <sys/event.h>
 
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <err.h>
 
 
void msleep(int dur) {
    usleep(dur * 1000);
}
 
typedef struct queue {
    pthread_mutex_t mutex;
    int* items;
    int front;
    int rear;
    int size;
    int capacity;
} queue_t;
 
queue_t* queue_create(int size) {
    queue_t* q = (queue_t*)malloc(sizeof(queue_t));
    q->items = (int*)malloc((size + 1) * sizeof(int));
    q->capacity = size;
    q->front = 0;
    q->rear = -1;
    q->size = 0;
 
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&(q->mutex), NULL);
    pthread_mutexattr_destroy(&attr);
 
    return q;
}
 
int queue_size(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&(q->mutex));
    int res = q->size;
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
int queue_front(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&(q->mutex));
    int res = -1;
    if (q->size > 0) {
        res = q->items[q->front];
        q->front = (q->front + 1) % q->capacity;
        q->size--;
    }
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
int queue_push(queue_t* q, int x) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&q->mutex);
    int res = -1;
    if ((q->size + 1) > q->capacity) {
        int newcapacity = q->capacity * 2;
        void* items = realloc(q->items, sizeof(int) * newcapacity);
        if (items != NULL) {
            q->items = items;
            q->capacity = newcapacity;
        }
    }
    if ((q->size + 1) > q->capacity) {
        pthread_mutex_unlock(&(q->mutex));
        return -1;
    }
    q->rear = (q->rear + 1) % q->capacity;
    q->items[q->rear] = x;
    q->size++;
    res = 1;
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
void queue_lock(queue_t* q) {
    pthread_mutex_lock(&(q->mutex));
}
 
void queue_unlock(queue_t* q) {
    pthread_mutex_unlock(&(q->mutex));
}
 
void queue_free(queue_t* q) {
    if (q == NULL) return;
    free(q->items);
    free(q);
}
 
typedef struct worker {
    pthread_mutex_t mutex;
    pthread_t* thread;
    pthread_cond_t* cond;
    queue_t* queue;
} worker_t;
 
worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) {
    worker_t* w = (worker_t*)malloc(sizeof(worker_t));
    w->queue = queue;
    w->cond = cond;
    w->thread = (pthread_t*)malloc(sizeof(pthread_t));
 
    pthread_mutex_init(&(w->mutex), NULL);
 
    pthread_create(w->thread, NULL, handler, (void*)w);
    pthread_detach(*(w->thread));
 
    return w;
}
 
typedef struct wrkpool {
    int size;
    worker_t** workers;
    queue_t* queue;
    pthread_cond_t cond;
} wrkpool_t;
 
 
wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));
    wp->queue = queue;
 
    wp->workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1));
 
    pthread_cond_init(&(wp->cond), NULL);
 
    for (int i = 0; i < size; i++) {
        wp->workers[i] = worker_create(queue, &(wp->cond), handler);
    }
    wp->size = size;
    return wp;
}
 
int wrkpool_enqueue(wrkpool_t* wp, int newsock) {
    queue_push(wp->queue, newsock);
    for (int i = 0; i < wp->size; i++) {
        worker_t* w = wp->workers[i];
        pthread_mutex_unlock(&(w->mutex));
    }
    pthread_cond_broadcast(&(wp->cond));
    return 1;
}
 
void socket_rw(int sock) {
 
    const int reqcap = 1024;
    char reqbuf[reqcap] = "";
    memset(reqbuf, 0, reqcap);
    int rv = 0;
    int trv = 0;
    int rep = 0;
 
    while(1) {
        const int tmpsize = 1024 * 2;
        char tmpbuf[tmpsize];
        memset(tmpbuf, 0, tmpsize);
 
        rv = read(sock, tmpbuf, tmpsize);
 
        strncat(reqbuf, tmpbuf, rv);
        trv += rv;
        if (strnstr(reqbuf, "\r\n\r\n", trv) != NULL) break;
        if (++rep > 4) {
            close(sock);
            return;
        }
    }
    if (trv > 0) {
 
        char content[] = "<html>\r\n"
                         "<head><title>Hello</title></head>\r\n"
                         "<body>\r\n"
                         "<center><h1>Hello, World!</h1></center>\r\n"
                         "<hr><center>Srv11/0.1</center>\r\n"
                         "</body>\r\n"
                         "</html>\r\n";
        char header[] = "HTTP/1.1 200 OK\r\n"
                        "Accept-Ranges: bytes\r\n"
                        "Content-Type: text/html\r\n"
                        "Content-Length: 142\r\n"
                        "Conection: close\r\n"
                        "Server: Srv11/0.1\r\n"
                        "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"
                        "\r\n";
        const int rescap = 1024;
        char res[rescap] = "";
        strcat(res, header);
        strcat(res, content);
 
        write(sock, res, strlen(res));
 
        shutdown(sock, SHUT_RDWR);
    }
    close(sock);
}
 
void* handler(void* argp) {
 
    worker_t* w = (worker_t*)argp;
    queue_t* queue = w->queue;
 
    while(true) {
        pthread_mutex_lock(&(w->mutex));
        while(queue_size(queue) == 0) {
            pthread_cond_wait(w->cond, &(w->mutex));
        }
 
        queue_lock(queue);
        if (queue_size(queue) == 0) {
            continue;
            queue_unlock(queue);
        }
        int sock = queue_front(queue);
        queue_unlock(queue);
 
        socket_rw(sock);
 
        pthread_mutex_unlock(&(w->mutex));
    }
    return NULL;
}
 
int socket_create(int port, int backlog) {
    int sock;
    if((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
        fprintf(stderr, "cannot create socket, exit\n");
        exit(1);
    }
    int optval;
    optval = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        fprintf(stderr, "cannot set socket option, exit\n");
        exit(1);
    }
 
 
 
    struct sockaddr addr;
    struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
    paddr->sin_family = AF_INET;
    paddr->sin_addr.s_addr = INADDR_ANY;
    paddr->sin_port = htons(port);
    paddr->sin_len = sizeof(struct sockaddr_in);
 
    if (bind(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
        fprintf(stderr, "cannot bind socket, exit\n");
        exit(1);
    }
    if (listen(sock, backlog) < 0) {
        fprintf(stderr, "cannot listen socket, exit\n");
        exit(1);
    }
    return sock;
}
 
 
int main(int argc, char **argv) {
 
    queue_t* queue = queue_create(1024 * 4);
    wrkpool_t* wrkpool = wrkpool_create(50, queue, handler);
 
    int sock = socket_create(1024, 4096);
 
    while (1) {
        int newsock = 0;
        if ((newsock = accept(sock, NULL, 0)) > 3) {
 
            struct timeval tv;
            tv.tv_sec = 3;
            tv.tv_usec = 0;
            if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
                fprintf(stderr, "cannot set socket option, exit\n");
                exit(1);
            }
 
            if (wrkpool_enqueue(wrkpool, newsock) < 0) {
                close(newsock);
            }
        }
    }
    close(sock);
    return 0;
}

benchmark

$ ab -s 20 -c 1000 -n100000 'http://127.0.0.1:1024/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        Srv11/0.1
Server Hostname:        127.0.0.1
Server Port:            1024

Document Path:          /
Document Length:        142 bytes

Concurrency Level:      1000
Time taken for tests:   3.819 seconds
Complete requests:      100000
Failed requests:        0
Total transferred:      30300000 bytes
HTML transferred:       14200000 bytes
Requests per second:    26183.30 [#/sec] (mean)
Time per request:       38.192 [ms] (mean)
Time per request:       0.038 [ms] (mean, across all concurrent requests)
Transfer rate:          7747.60 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0   12   7.4     10      55
Processing:     5   20  41.3     11     616
Waiting:        0   15  39.9      8     487
Total:         13   32  42.9     21     632

Percentage of the requests served within a certain time (ms)
  50%     21
  66%     22
  75%     23
  80%     23
  90%     59
  95%     80
  98%    205
  99%    269
 100%    632 (longest request)

Nginx on the same system

$ ab -s 20 -c 1000 -n100000 'http://127.0.0.1:80/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        nginx/1.14.2
Server Hostname:        127.0.0.1
Server Port:            80

Document Path:          /
Document Length:        185 bytes

Concurrency Level:      1000
Time taken for tests:   3.841 seconds
Complete requests:      100000
Failed requests:        0
Non-2xx responses:      100000
Total transferred:      37700000 bytes
HTML transferred:       18500000 bytes
Requests per second:    26035.91 [#/sec] (mean)
Time per request:       38.408 [ms] (mean)
Time per request:       0.038 [ms] (mean, across all concurrent requests)
Transfer rate:          9585.49 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0   10 111.9      5    3011
Processing:     5   18  27.0     12     542
Waiting:        0   16  25.1     11     534
Total:          9   28 115.0     17    3019

Percentage of the requests served within a certain time (ms)
  50%     17
  66%     19
  75%     20
  80%     21
  90%     37
  95%     65
  98%     78
  99%    241
 100%   3019 (longest request)

First PagePrevious PageBack to overviewNext PageLast Page