User Tools

Site Tools


Differences

This shows you the differences between two versions of the page.

Link to this comparison view

c:thrsrv-kqueue [2019-06-04 12:14]
c:thrsrv-kqueue [2020-02-15 00:57] (current)
Line 1: Line 1:
 +=====Threadpool web server with kqueue()/​kevent()===== ​
 +
 +<code c thrsrv-kq.c>​
 +/*
 + * Copyright 2004-2019 Oleg Borodin ​ <​borodin@unix7.org>​
 + */
 +
 +#include <​stdlib.h>​
 +#include <​stdio.h>​
 +#include <​stdbool.h>​
 +#include <​unistd.h>​
 +#include <​string.h>​
 +#include <​fcntl.h>​
 +
 +#include <​pthread.h>​
 +#include <​sys/​event.h>​
 +
 +#include <​sys/​types.h>​
 +#include <​sys/​socket.h>​
 +#include <​arpa/​inet.h>​
 +#include <​netinet/​in.h>​
 +#include <​sys/​time.h>​
 +#include <​err.h>​
 +
 +void msleep(int dur) {
 +    usleep(dur * 1000);
 +}
 +
 +typedef struct queue {
 +    pthread_mutex_t mutex;
 +    int* items;
 +    int front;
 +    int rear;
 +    int size;
 +    int capacity;
 +} queue_t;
 +
 +queue_t* queue_create(int size) {
 +    queue_t* q = (queue_t*)malloc(sizeof(queue_t));​
 +    q->items = (int*)malloc((size + 1) * sizeof(int));​
 +    q->​capacity = size;
 +    q->front = 0;
 +    q->rear = -1;
 +    q->size = 0;
 +    pthread_mutexattr_t attr;
 +    pthread_mutexattr_init(&​attr);​
 +    pthread_mutexattr_settype(&​attr,​ PTHREAD_MUTEX_RECURSIVE);​
 +    pthread_mutex_init(&​(q->​mutex),​ NULL);
 +    pthread_mutexattr_destroy(&​attr);​
 +    return q;
 +}
 +
 +int queue_size(queue_t* q) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​(q->​mutex));​
 +    int res = q->size;
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +    return res;
 +}
 +
 +int queue_front(queue_t* q) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​(q->​mutex));​
 +    int res = -1;
 +    if (q->size > 0) {
 +        res = q->​items[q->​front];​
 +        q->front = (q->​front + 1) % q->​capacity;​
 +        q->​size--;​
 +    }
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +    return res;
 +}
 +
 +int queue_push(queue_t* q, int x) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​q->​mutex);​
 +    int res = -1;
 +    if ((q->​size + 1) > q->​capacity) {
 +        int newcapacity = q->​capacity * 2;
 +        void* items = realloc(q->​items,​ sizeof(int) * newcapacity);​
 +        if (items != NULL) {
 +            q->items = items;
 +            q->​capacity = newcapacity;​
 +        }
 +    }
 +    if ((q->​size + 1) > q->​capacity) {
 +        pthread_mutex_unlock(&​(q->​mutex));​
 +        return -1;
 +    }
 +    q->rear = (q->rear + 1) % q->​capacity;​
 +    q->​items[q->​rear] = x;
 +    q->​size++;​
 +    res = 1;
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +    return res;
 +}
 +
 +
 +void queue_lock(queue_t* q) {
 +    pthread_mutex_lock(&​(q->​mutex));​
 +}
 +
 +void queue_unlock(queue_t* q) {
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +}
 +
 +void queue_free(queue_t* q) {
 +    if (q == NULL) return;
 +    free(q->​items);​
 +    free(q);
 +}
 +
 +typedef struct worker {
 +    pthread_mutex_t mutex;
 +    pthread_t* thread;
 +    pthread_cond_t* cond;
 +    queue_t* queue;
 +} worker_t;
 +
 +worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) {
 +    worker_t* w = (worker_t*)malloc(sizeof(worker_t));​
 +    w->queue = queue;
 +    w->cond = cond;
 +    w->​thread = (pthread_t*)malloc(sizeof(pthread_t));​
 +    pthread_mutex_init(&​(w->​mutex),​ NULL);
 +    pthread_create(w->​thread,​ NULL, handler, (void*)w);
 +    pthread_detach(*(w->​thread));​
 +    return w;
 +}
 +
 +typedef struct wrkpool {
 +    int size;
 +    worker_t** workers;
 +    queue_t* queue;
 +    pthread_cond_t cond;
 +} wrkpool_t;
 +
 +
 +wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
 +    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));​
 +    wp->​queue = queue;
 +    wp->​workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1));
 +    pthread_cond_init(&​(wp->​cond),​ NULL);
 +
 +    for (int i = 0; i < size; i++) {
 +        wp->​workers[i] = worker_create(queue,​ &​(wp->​cond),​ handler);
 +    }
 +    wp->size = size;
 +    return wp;
 +}
 +
 +int wrkpool_enqueue(wrkpool_t* wp, int newsock) {
 +    queue_push(wp->​queue,​ newsock);
 +    for (int i = 0; i < wp->​size;​ i++) {
 +        worker_t* w = wp->​workers[i];​
 +        pthread_mutex_unlock(&​(w->​mutex));​
 +    }
 +    pthread_cond_broadcast(&​(wp->​cond));​
 +    return 1;
 +}
 +
 +void socket_rw(int sock) {
 +    const int reqcap = 1024;
 +    char reqbuf[reqcap] = "";​
 +    memset(reqbuf,​ 0, reqcap);
 +    int rv = 0;
 +    int trv = 0;
 +
 +    int rep = 0;
 +
 +    int rkq = kqueue();
 +    struct kevent revent;
 +    memset(&​revent,​ 0, sizeof(struct kevent));
 +    EV_SET(&​revent,​ sock, EVFILT_READ,​ EV_ADD | EV_EOF, 0, 0, NULL);
 +    if (kevent(rkq,​ &​revent,​ 1, NULL, 0, NULL) < 0) {
 +           ​close(rkq);​
 +           ​close(sock);​
 +           ​return;​
 +    }
 +
 +    while(1) {
 +        const int tmpsize = 1024 * 2;
 +        char tmpbuf[tmpsize];​
 +        memset(tmpbuf,​ 0, tmpsize);
 +
 +        struct kevent rtevent;
 +        memset(&​rtevent,​ 0, sizeof(struct kevent));
 +        kevent(rkq, NULL, 0, &​rtevent,​ 1, NULL);
 +        if (rtevent.flags & EV_EOF) {
 +            close(rkq);
 +            close(sock);​
 +            return;
 +        }
 +        if (rtevent.data > tmpsize) rtevent.data = tmpsize;
 +
 +        rv = read(sock, tmpbuf, rtevent.data);​
 +
 +        strncat(reqbuf,​ tmpbuf, rv);
 +        trv += rv;
 +        if (strnstr(reqbuf,​ "​\r\n\r\n",​ trv) != NULL) break;
 +        if (++rep > 4) {
 +            close(rkq);
 +            close(sock);​
 +            return;
 +        }
 +    }
 +    close(rkq);
 +
 +    if (trv > 0) {
 +        char content[] = "<​html>​\r\n"​
 +                         "<​head><​title>​Hello</​title></​head>​\r\n"​
 +                         "<​body>​\r\n"​
 +                         "<​center><​h1>​Hello,​ World!</​h1></​center>​\r\n"​
 +                         "<​hr><​center>​Srv11/​0.1</​center>​\r\n"​
 +                         "</​body>​\r\n"​
 +                         "</​html>​\r\n";​
 +        char header[] =
 +            "​HTTP/​1.1 200 OK\r\n"​
 +            "​Accept-Ranges:​ bytes\r\n"​
 +            "​Content-Type:​ text/​html\r\n"​
 +            "​Content-Length:​ 142\r\n"​
 +            "​Conection:​ close\r\n"​
 +            "​Server:​ Srv11/​0.1\r\n"​
 +            "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"​
 +            "​\r\n";​
 +
 +        const int rescap = 1024;
 +        char res[rescap] = "";​
 +        strcat(res, header);
 +        strcat(res, content);
 +
 +         int wkq = kqueue();
 +         ​struct kevent wevent;
 +         ​memset(&​wevent,​ 0, sizeof(struct kevent));
 +
 +         ​EV_SET(&​wevent,​ sock, EVFILT_WRITE,​ EV_ADD | EV_EOF, 0, 0, NULL);
 +         if (kevent(wkq,​ &​wevent,​ 1, NULL, 0, NULL) < 0) {
 +             ​fprintf(stderr,​ "​unable set write kevent\n"​);​
 +         }
 +         ​struct kevent wtevent;
 +         ​memset(&​wtevent,​ 0, sizeof(struct kevent));
 +         ​kevent(wkq,​ NULL, 0, &​wtevent,​ 1, NULL);
 +         if (wtevent.flags & EV_EOF) {
 +            close(rkq);
 +            close(sock);​
 +            return;
 +        }
 +        write(sock, res, strlen(res));​
 +        shutdown(sock,​ SHUT_RDWR);
 +        close(wkq);
 +    }
 +    close(sock);​
 +}
 +
 +void* handler(void* argp) {
 +    worker_t* w = (worker_t*)argp;​
 +    queue_t* queue = w->​queue;​
 +
 +    while(true) {
 +        pthread_mutex_lock(&​(w->​mutex));​
 +        while(queue_size(queue) == 0) {
 +            pthread_cond_wait(w->​cond,​ &​(w->​mutex));​
 +        }
 +
 +        queue_lock(queue);​
 +        if (queue_size(queue) == 0) {
 +            continue;
 +            queue_unlock(queue);​
 +        }
 +        int sock = queue_front(queue);​
 +        queue_unlock(queue);​
 +        socket_rw(sock);​
 +        pthread_mutex_unlock(&​(w->​mutex));​
 +    }
 +    return NULL;
 +}
 +
 +int socket_create() {
 +    int sock;
 +    if((sock = socket(PF_INET,​ SOCK_STREAM,​ 0)) < 0) {
 +        fprintf(stderr,​ "​cannot create socket, exit\n"​);​
 +        exit(1);
 +    }
 +    int optval;
 +    optval = 1;
 +    if (setsockopt(sock,​ SOL_SOCKET, SO_REUSEADDR,​ &​optval,​ sizeof(optval)) < 0) {
 +        fprintf(stderr,​ "​cannot set socket option, exit\n"​);​
 +        exit(1);
 +    }
 +
 +    struct sockaddr addr;
 +    const int port = 1024;
 +    struct sockaddr_in* paddr = (struct sockaddr_in*)&​addr;​
 +    paddr->​sin_family = AF_INET;
 +    paddr->​sin_addr.s_addr = INADDR_ANY;
 +    paddr->​sin_port = htons(port);​
 +    paddr->​sin_len = sizeof(struct sockaddr_in);​
 +
 +    if (bind(sock, (struct sockaddr*)paddr,​ paddr->​sin_len) < 0) {
 +        fprintf(stderr,​ "​cannot bind socket, exit\n"​);​
 +        exit(1);
 +    }
 +    if (listen(sock,​ 1024 * 4) < 0) {
 +        fprintf(stderr,​ "​cannot listen socket, exit\n"​);​
 +        exit(1);
 +    }
 +    return sock;
 +}
 +
 +
 +int main(int argc, char **argv) {
 +
 +    queue_t* queue = queue_create(1024 * 1024);
 +    wrkpool_t* wrkpool = wrkpool_create(50,​ queue, handler);
 +
 +    int sock = socket_create();​
 +
 +    int flags = fcntl(sock, F_GETFL, 0);
 +    fcntl(sock, F_SETFL, flags | O_NONBLOCK);​
 +
 +    int kq = kqueue();
 +    struct kevent event;
 +    memset(&​event,​ 0, sizeof(struct kevent));
 +
 +    EV_SET(&​event,​ sock, EVFILT_READ,​ EV_ADD, 0, 0, NULL);
 +    if (kevent(kq, &event, 1, NULL, 0, NULL) < 0) {
 +        fprintf(stderr,​ "​unable set kevent\n"​);​
 +    }
 +
 +    while (1) {
 +        int ec = 0;
 +
 +        struct kevent tevent;
 +        memset(&​tevent,​ 0, sizeof(struct kevent));
 +        if ((ec = kevent(kq, NULL, 0, &​tevent,​ 1, NULL)) < 0) {
 +            fprintf(stderr,​ "​kevent error\n"​);​
 +            continue;
 +        }
 +
 +        int newsock = 0;
 +        if (tevent.ident == sock) {
 +            if ((newsock = accept(sock,​ NULL, 0)) > 3) {
 +
 +                int flags = fcntl(newsock,​ F_GETFL, 0);
 +                fcntl(newsock,​ F_SETFL, flags | O_NONBLOCK);​
 +
 +                if (wrkpool_enqueue(wrkpool,​ newsock) < 0) {
 +                    close(newsock);​
 +                }
 +            }
 +        }
 +    }
 +    close(kq);
 +    close(sock);​
 +    return 0;
 +}
 +</​code>​
 +
 +===Out===
 +
 +
 +<​file>​
 +$ ab -s 20 -c 10000 -n100000 '​http://​192.168.56.16:​1024/'​
 +This is ApacheBench,​ Version 2.3 <​$Revision:​ 1826891 $>
 +Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://​www.zeustech.net/​
 +Licensed to The Apache Software Foundation, http://​www.apache.org/​
 +
 +Benchmarking 192.168.56.16 (be patient)
 +Completed 10000 requests
 +Completed 20000 requests
 +Completed 30000 requests
 +Completed 40000 requests
 +Completed 50000 requests
 +Completed 60000 requests
 +Completed 70000 requests
 +Completed 80000 requests
 +Completed 90000 requests
 +Completed 100000 requests
 +Finished 100000 requests
 +
 +
 +Server Software: ​       Srv11/0.1
 +Server Hostname: ​       192.168.56.16
 +Server Port:            1024
 +
 +Document Path:          /
 +Document Length: ​       142 bytes
 +
 +Concurrency Level: ​     10000
 +Time taken for tests: ​  5.760 seconds
 +Complete requests: ​     100000
 +Failed requests: ​       0
 +Total transferred: ​     30300000 bytes
 +HTML transferred: ​      ​14200000 bytes
 +Requests per second: ​   17362.55 [#/sec] (mean)
 +Time per request: ​      ​575.952 [ms] (mean)
 +Time per request: ​      0.058 [ms] (mean, across all concurrent requests)
 +Transfer rate:          5137.55 [Kbytes/​sec] received
 +
 +Connection Times (ms)
 +              min  mean[+/-sd] median ​  max
 +Connect: ​       0  262 345.0    192    3198
 +Processing: ​   63  277  62.5    275    1226
 +Waiting: ​       0  199  79.3    198    1199
 +Total: ​       292  539 344.3    460    3434
 +
 +Percentage of the requests served within a certain time (ms)
 +  50%    460
 +  66%    472
 +  75%    479
 +  80%    481
 +  90%    769
 +  95%    882
 +  98%    942
 +  99%   3258
 + ​100% ​  3434 (longest request)
 +</​file>​
 +
 +----
 +[<>]