User Tools

Site Tools


Differences

This shows you the differences between two versions of the page.

Link to this comparison view

c:thrsrv-bs [2019-06-04 12:14]
c:thrsrv-bs [2020-02-15 00:57] (current)
Line 1: Line 1:
 +
 +=====Threadpool web server with blocked IO=====
 +
 +<code c thrsrv-bl.c>​
 +/*
 + * Copyright 2004-2019 Oleg Borodin ​ <​borodin@unix7.org>​
 + */
 +
 +#include <​stdlib.h>​
 +#include <​stdio.h>​
 +#include <​stdbool.h>​
 +#include <​unistd.h>​
 +#include <​string.h>​
 +#include <​fcntl.h>​
 +
 +#include <​pthread.h>​
 +#include <​sys/​event.h>​
 +
 +#include <​sys/​types.h>​
 +#include <​sys/​socket.h>​
 +#include <​arpa/​inet.h>​
 +#include <​netinet/​in.h>​
 +#include <​sys/​time.h>​
 +#include <​err.h>​
 +
 +
 +void msleep(int dur) {
 +    usleep(dur * 1000);
 +}
 +
 +typedef struct queue {
 +    pthread_mutex_t mutex;
 +    int* items;
 +    int front;
 +    int rear;
 +    int size;
 +    int capacity;
 +} queue_t;
 +
 +queue_t* queue_create(int size) {
 +    queue_t* q = (queue_t*)malloc(sizeof(queue_t));​
 +    q->items = (int*)malloc((size + 1) * sizeof(int));​
 +    q->​capacity = size;
 +    q->front = 0;
 +    q->rear = -1;
 +    q->size = 0;
 +
 +    pthread_mutexattr_t attr;
 +    pthread_mutexattr_init(&​attr);​
 +    pthread_mutexattr_settype(&​attr,​ PTHREAD_MUTEX_RECURSIVE);​
 +    pthread_mutex_init(&​(q->​mutex),​ NULL);
 +    pthread_mutexattr_destroy(&​attr);​
 +
 +    return q;
 +}
 +
 +int queue_size(queue_t* q) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​(q->​mutex));​
 +    int res = q->size;
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +    return res;
 +}
 +
 +int queue_front(queue_t* q) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​(q->​mutex));​
 +    int res = -1;
 +    if (q->size > 0) {
 +        res = q->​items[q->​front];​
 +        q->front = (q->​front + 1) % q->​capacity;​
 +        q->​size--;​
 +    }
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +    return res;
 +}
 +
 +int queue_push(queue_t* q, int x) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​q->​mutex);​
 +    int res = -1;
 +    if ((q->​size + 1) > q->​capacity) {
 +        int newcapacity = q->​capacity * 2;
 +        void* items = realloc(q->​items,​ sizeof(int) * newcapacity);​
 +        if (items != NULL) {
 +            q->items = items;
 +            q->​capacity = newcapacity;​
 +        }
 +    }
 +    if ((q->​size + 1) > q->​capacity) {
 +        pthread_mutex_unlock(&​(q->​mutex));​
 +        return -1;
 +    }
 +    q->rear = (q->rear + 1) % q->​capacity;​
 +    q->​items[q->​rear] = x;
 +    q->​size++;​
 +    res = 1;
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +    return res;
 +}
 +
 +void queue_lock(queue_t* q) {
 +    pthread_mutex_lock(&​(q->​mutex));​
 +}
 +
 +void queue_unlock(queue_t* q) {
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +}
 +
 +void queue_free(queue_t* q) {
 +    if (q == NULL) return;
 +    free(q->​items);​
 +    free(q);
 +}
 +
 +typedef struct worker {
 +    pthread_mutex_t mutex;
 +    pthread_t* thread;
 +    pthread_cond_t* cond;
 +    queue_t* queue;
 +} worker_t;
 +
 +worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) {
 +    worker_t* w = (worker_t*)malloc(sizeof(worker_t));​
 +    w->queue = queue;
 +    w->cond = cond;
 +    w->​thread = (pthread_t*)malloc(sizeof(pthread_t));​
 +
 +    pthread_mutex_init(&​(w->​mutex),​ NULL);
 +
 +    pthread_create(w->​thread,​ NULL, handler, (void*)w);
 +    pthread_detach(*(w->​thread));​
 +
 +    return w;
 +}
 +
 +typedef struct wrkpool {
 +    int size;
 +    worker_t** workers;
 +    queue_t* queue;
 +    pthread_cond_t cond;
 +} wrkpool_t;
 +
 +
 +wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
 +    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));​
 +    wp->​queue = queue;
 +
 +    wp->​workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1));
 +
 +    pthread_cond_init(&​(wp->​cond),​ NULL);
 +
 +    for (int i = 0; i < size; i++) {
 +        wp->​workers[i] = worker_create(queue,​ &​(wp->​cond),​ handler);
 +    }
 +    wp->size = size;
 +    return wp;
 +}
 +
 +int wrkpool_enqueue(wrkpool_t* wp, int newsock) {
 +    queue_push(wp->​queue,​ newsock);
 +    for (int i = 0; i < wp->​size;​ i++) {
 +        worker_t* w = wp->​workers[i];​
 +        pthread_mutex_unlock(&​(w->​mutex));​
 +    }
 +    pthread_cond_broadcast(&​(wp->​cond));​
 +    return 1;
 +}
 +
 +void socket_rw(int sock) {
 +
 +    const int reqcap = 1024;
 +    char reqbuf[reqcap] = "";​
 +    memset(reqbuf,​ 0, reqcap);
 +    int rv = 0;
 +    int trv = 0;
 +    int rep = 0;
 +
 +    while(1) {
 +        const int tmpsize = 1024 * 2;
 +        char tmpbuf[tmpsize];​
 +        memset(tmpbuf,​ 0, tmpsize);
 +
 +        rv = read(sock, tmpbuf, tmpsize);
 +
 +        strncat(reqbuf,​ tmpbuf, rv);
 +        trv += rv;
 +        if (strnstr(reqbuf,​ "​\r\n\r\n",​ trv) != NULL) break;
 +        if (++rep > 4) {
 +            close(sock);​
 +            return;
 +        }
 +    }
 +    if (trv > 0) {
 +
 +        char content[] = "<​html>​\r\n"​
 +                         "<​head><​title>​Hello</​title></​head>​\r\n"​
 +                         "<​body>​\r\n"​
 +                         "<​center><​h1>​Hello,​ World!</​h1></​center>​\r\n"​
 +                         "<​hr><​center>​Srv11/​0.1</​center>​\r\n"​
 +                         "</​body>​\r\n"​
 +                         "</​html>​\r\n";​
 +        char header[] = "​HTTP/​1.1 200 OK\r\n"​
 +                        "​Accept-Ranges:​ bytes\r\n"​
 +                        "​Content-Type:​ text/​html\r\n"​
 +                        "​Content-Length:​ 142\r\n"​
 +                        "​Conection:​ close\r\n"​
 +                        "​Server:​ Srv11/​0.1\r\n"​
 +                        "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"​
 +                        "​\r\n";​
 +        const int rescap = 1024;
 +        char res[rescap] = "";​
 +        strcat(res, header);
 +        strcat(res, content);
 +
 +        write(sock, res, strlen(res));​
 +
 +        shutdown(sock,​ SHUT_RDWR);
 +    }
 +    close(sock);​
 +}
 +
 +void* handler(void* argp) {
 +
 +    worker_t* w = (worker_t*)argp;​
 +    queue_t* queue = w->​queue;​
 +
 +    while(true) {
 +        pthread_mutex_lock(&​(w->​mutex));​
 +        while(queue_size(queue) == 0) {
 +            pthread_cond_wait(w->​cond,​ &​(w->​mutex));​
 +        }
 +
 +        queue_lock(queue);​
 +        if (queue_size(queue) == 0) {
 +            continue;
 +            queue_unlock(queue);​
 +        }
 +        int sock = queue_front(queue);​
 +        queue_unlock(queue);​
 +
 +        socket_rw(sock);​
 +
 +        pthread_mutex_unlock(&​(w->​mutex));​
 +    }
 +    return NULL;
 +}
 +
 +int socket_create(int port, int backlog) {
 +    int sock;
 +    if((sock = socket(PF_INET,​ SOCK_STREAM,​ 0)) < 0) {
 +        fprintf(stderr,​ "​cannot create socket, exit\n"​);​
 +        exit(1);
 +    }
 +    int optval;
 +    optval = 1;
 +    if (setsockopt(sock,​ SOL_SOCKET, SO_REUSEADDR,​ &​optval,​ sizeof(optval)) < 0) {
 +        fprintf(stderr,​ "​cannot set socket option, exit\n"​);​
 +        exit(1);
 +    }
 +
 +
 +
 +    struct sockaddr addr;
 +    struct sockaddr_in* paddr = (struct sockaddr_in*)&​addr;​
 +    paddr->​sin_family = AF_INET;
 +    paddr->​sin_addr.s_addr = INADDR_ANY;
 +    paddr->​sin_port = htons(port);​
 +    paddr->​sin_len = sizeof(struct sockaddr_in);​
 +
 +    if (bind(sock, (struct sockaddr*)paddr,​ paddr->​sin_len) < 0) {
 +        fprintf(stderr,​ "​cannot bind socket, exit\n"​);​
 +        exit(1);
 +    }
 +    if (listen(sock,​ backlog) < 0) {
 +        fprintf(stderr,​ "​cannot listen socket, exit\n"​);​
 +        exit(1);
 +    }
 +    return sock;
 +}
 +
 +
 +int main(int argc, char **argv) {
 +
 +    queue_t* queue = queue_create(1024 * 4);
 +    wrkpool_t* wrkpool = wrkpool_create(50,​ queue, handler);
 +
 +    int sock = socket_create(1024,​ 4096);
 +
 +    while (1) {
 +        int newsock = 0;
 +        if ((newsock = accept(sock,​ NULL, 0)) > 3) {
 +
 +            struct timeval tv;
 +            tv.tv_sec = 3;
 +            tv.tv_usec = 0;
 +            if (setsockopt(sock,​ SOL_SOCKET, SO_RCVTIMEO,​ &tv, sizeof(struct timeval)) < 0) {
 +                fprintf(stderr,​ "​cannot set socket option, exit\n"​);​
 +                exit(1);
 +            }
 +
 +            if (wrkpool_enqueue(wrkpool,​ newsock) < 0) {
 +                close(newsock);​
 +            }
 +        }
 +    }
 +    close(sock);​
 +    return 0;
 +}
 +</​code>​
 +===benchmark===
 +
 +<​file>​
 +$ ab -s 20 -c 1000 -n100000 '​http://​127.0.0.1:​1024/'​
 +This is ApacheBench,​ Version 2.3 <​$Revision:​ 1826891 $>
 +Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://​www.zeustech.net/​
 +Licensed to The Apache Software Foundation, http://​www.apache.org/​
 +
 +Benchmarking 127.0.0.1 (be patient)
 +Completed 10000 requests
 +Completed 20000 requests
 +Completed 30000 requests
 +Completed 40000 requests
 +Completed 50000 requests
 +Completed 60000 requests
 +Completed 70000 requests
 +Completed 80000 requests
 +Completed 90000 requests
 +Completed 100000 requests
 +Finished 100000 requests
 +
 +
 +Server Software: ​       Srv11/0.1
 +Server Hostname: ​       127.0.0.1
 +Server Port:            1024
 +
 +Document Path:          /
 +Document Length: ​       142 bytes
 +
 +Concurrency Level: ​     1000
 +Time taken for tests: ​  3.819 seconds
 +Complete requests: ​     100000
 +Failed requests: ​       0
 +Total transferred: ​     30300000 bytes
 +HTML transferred: ​      ​14200000 bytes
 +Requests per second: ​   26183.30 [#/sec] (mean)
 +Time per request: ​      ​38.192 [ms] (mean)
 +Time per request: ​      0.038 [ms] (mean, across all concurrent requests)
 +Transfer rate:          7747.60 [Kbytes/​sec] received
 +
 +Connection Times (ms)
 +              min  mean[+/-sd] median ​  max
 +Connect: ​       0   ​12 ​  ​7.4 ​    ​10 ​     55
 +Processing: ​    ​5 ​  ​20 ​ 41.3     ​11 ​    616
 +Waiting: ​       0   ​15 ​ 39.9      8     487
 +Total: ​        ​13 ​  ​32 ​ 42.9     ​21 ​    632
 +
 +Percentage of the requests served within a certain time (ms)
 +  50%     21
 +  66%     22
 +  75%     23
 +  80%     23
 +  90%     59
 +  95%     80
 +  98%    205
 +  99%    269
 + ​100% ​   632 (longest request)
 +</​file>​
 +
 +===Nginx on the same system===
 +
 +<​file>​
 +$ ab -s 20 -c 1000 -n100000 '​http://​127.0.0.1:​80/'​
 +This is ApacheBench,​ Version 2.3 <​$Revision:​ 1826891 $>
 +Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://​www.zeustech.net/​
 +Licensed to The Apache Software Foundation, http://​www.apache.org/​
 +
 +Benchmarking 127.0.0.1 (be patient)
 +Completed 10000 requests
 +Completed 20000 requests
 +Completed 30000 requests
 +Completed 40000 requests
 +Completed 50000 requests
 +Completed 60000 requests
 +Completed 70000 requests
 +Completed 80000 requests
 +Completed 90000 requests
 +Completed 100000 requests
 +Finished 100000 requests
 +
 +
 +Server Software: ​       nginx/​1.14.2
 +Server Hostname: ​       127.0.0.1
 +Server Port:            80
 +
 +Document Path:          /
 +Document Length: ​       185 bytes
 +
 +Concurrency Level: ​     1000
 +Time taken for tests: ​  3.841 seconds
 +Complete requests: ​     100000
 +Failed requests: ​       0
 +Non-2xx responses: ​     100000
 +Total transferred: ​     37700000 bytes
 +HTML transferred: ​      ​18500000 bytes
 +Requests per second: ​   26035.91 [#/sec] (mean)
 +Time per request: ​      ​38.408 [ms] (mean)
 +Time per request: ​      0.038 [ms] (mean, across all concurrent requests)
 +Transfer rate:          9585.49 [Kbytes/​sec] received
 +
 +Connection Times (ms)
 +              min  mean[+/-sd] median ​  max
 +Connect: ​       0   10 111.9      5    3011
 +Processing: ​    ​5 ​  ​18 ​ 27.0     ​12 ​    542
 +Waiting: ​       0   ​16 ​ 25.1     ​11 ​    534
 +Total: ​         9   28 115.0     ​17 ​   3019
 +
 +Percentage of the requests served within a certain time (ms)
 +  50%     17
 +  66%     19
 +  75%     20
 +  80%     21
 +  90%     37
 +  95%     65
 +  98%     78
 +  99%    241
 + ​100% ​  3019 (longest request)
 +</​file>​
 +
 +----
 +[<>]