User Tools

Site Tools


Threadpool web server with kqueue()/kevent()

thrsrv-kq.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
 
#include <pthread.h>
#include <sys/event.h>
 
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <err.h>
 
void msleep(int dur) {
    usleep(dur * 1000);
}
 
typedef struct queue {
    pthread_mutex_t mutex;
    int* items;
    int front;
    int rear;
    int size;
    int capacity;
} queue_t;
 
queue_t* queue_create(int size) {
    queue_t* q = (queue_t*)malloc(sizeof(queue_t));
    q->items = (int*)malloc((size + 1) * sizeof(int));
    q->capacity = size;
    q->front = 0;
    q->rear = -1;
    q->size = 0;
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&(q->mutex), NULL);
    pthread_mutexattr_destroy(&attr);
    return q;
}
 
int queue_size(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&(q->mutex));
    int res = q->size;
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
int queue_front(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&(q->mutex));
    int res = -1;
    if (q->size > 0) {
        res = q->items[q->front];
        q->front = (q->front + 1) % q->capacity;
        q->size--;
    }
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
int queue_push(queue_t* q, int x) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&q->mutex);
    int res = -1;
    if ((q->size + 1) > q->capacity) {
        int newcapacity = q->capacity * 2;
        void* items = realloc(q->items, sizeof(int) * newcapacity);
        if (items != NULL) {
            q->items = items;
            q->capacity = newcapacity;
        }
    }
    if ((q->size + 1) > q->capacity) {
        pthread_mutex_unlock(&(q->mutex));
        return -1;
    }
    q->rear = (q->rear + 1) % q->capacity;
    q->items[q->rear] = x;
    q->size++;
    res = 1;
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
 
void queue_lock(queue_t* q) {
    pthread_mutex_lock(&(q->mutex));
}
 
void queue_unlock(queue_t* q) {
    pthread_mutex_unlock(&(q->mutex));
}
 
void queue_free(queue_t* q) {
    if (q == NULL) return;
    free(q->items);
    free(q);
}
 
typedef struct worker {
    pthread_mutex_t mutex;
    pthread_t* thread;
    pthread_cond_t* cond;
    queue_t* queue;
} worker_t;
 
worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) {
    worker_t* w = (worker_t*)malloc(sizeof(worker_t));
    w->queue = queue;
    w->cond = cond;
    w->thread = (pthread_t*)malloc(sizeof(pthread_t));
    pthread_mutex_init(&(w->mutex), NULL);
    pthread_create(w->thread, NULL, handler, (void*)w);
    pthread_detach(*(w->thread));
    return w;
}
 
typedef struct wrkpool {
    int size;
    worker_t** workers;
    queue_t* queue;
    pthread_cond_t cond;
} wrkpool_t;
 
 
wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));
    wp->queue = queue;
    wp->workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1));
    pthread_cond_init(&(wp->cond), NULL);
 
    for (int i = 0; i < size; i++) {
        wp->workers[i] = worker_create(queue, &(wp->cond), handler);
    }
    wp->size = size;
    return wp;
}
 
int wrkpool_enqueue(wrkpool_t* wp, int newsock) {
    queue_push(wp->queue, newsock);
    for (int i = 0; i < wp->size; i++) {
        worker_t* w = wp->workers[i];
        pthread_mutex_unlock(&(w->mutex));
    }
    pthread_cond_broadcast(&(wp->cond));
    return 1;
}
 
void socket_rw(int sock) {
    const int reqcap = 1024;
    char reqbuf[reqcap] = "";
    memset(reqbuf, 0, reqcap);
    int rv = 0;
    int trv = 0;
 
    int rep = 0;
 
    int rkq = kqueue();
    struct kevent revent;
    memset(&revent, 0, sizeof(struct kevent));
    EV_SET(&revent, sock, EVFILT_READ, EV_ADD | EV_EOF, 0, 0, NULL);
    if (kevent(rkq, &revent, 1, NULL, 0, NULL) < 0) {
           close(rkq);
           close(sock);
           return;
    }
 
    while(1) {
        const int tmpsize = 1024 * 2;
        char tmpbuf[tmpsize];
        memset(tmpbuf, 0, tmpsize);
 
        struct kevent rtevent;
        memset(&rtevent, 0, sizeof(struct kevent));
        kevent(rkq, NULL, 0, &rtevent, 1, NULL);
        if (rtevent.flags & EV_EOF) {
            close(rkq);
            close(sock);
            return;
        }
        if (rtevent.data > tmpsize) rtevent.data = tmpsize;
 
        rv = read(sock, tmpbuf, rtevent.data);
 
        strncat(reqbuf, tmpbuf, rv);
        trv += rv;
        if (strnstr(reqbuf, "\r\n\r\n", trv) != NULL) break;
        if (++rep > 4) {
            close(rkq);
            close(sock);
            return;
        }
    }
    close(rkq);
 
    if (trv > 0) {
        char content[] = "<html>\r\n"
                         "<head><title>Hello</title></head>\r\n"
                         "<body>\r\n"
                         "<center><h1>Hello, World!</h1></center>\r\n"
                         "<hr><center>Srv11/0.1</center>\r\n"
                         "</body>\r\n"
                         "</html>\r\n";
        char header[] =
            "HTTP/1.1 200 OK\r\n"
            "Accept-Ranges: bytes\r\n"
            "Content-Type: text/html\r\n"
            "Content-Length: 142\r\n"
            "Conection: close\r\n"
            "Server: Srv11/0.1\r\n"
            "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"
            "\r\n";
 
        const int rescap = 1024;
        char res[rescap] = "";
        strcat(res, header);
        strcat(res, content);
 
         int wkq = kqueue();
         struct kevent wevent;
         memset(&wevent, 0, sizeof(struct kevent));
 
         EV_SET(&wevent, sock, EVFILT_WRITE, EV_ADD | EV_EOF, 0, 0, NULL);
         if (kevent(wkq, &wevent, 1, NULL, 0, NULL) < 0) {
             fprintf(stderr, "unable set write kevent\n");
         }
         struct kevent wtevent;
         memset(&wtevent, 0, sizeof(struct kevent));
         kevent(wkq, NULL, 0, &wtevent, 1, NULL);
         if (wtevent.flags & EV_EOF) {
            close(rkq);
            close(sock);
            return;
        }
        write(sock, res, strlen(res));
        shutdown(sock, SHUT_RDWR);
        close(wkq);
    }
    close(sock);
}
 
void* handler(void* argp) {
    worker_t* w = (worker_t*)argp;
    queue_t* queue = w->queue;
 
    while(true) {
        pthread_mutex_lock(&(w->mutex));
        while(queue_size(queue) == 0) {
            pthread_cond_wait(w->cond, &(w->mutex));
        }
 
        queue_lock(queue);
        if (queue_size(queue) == 0) {
            continue;
            queue_unlock(queue);
        }
        int sock = queue_front(queue);
        queue_unlock(queue);
        socket_rw(sock);
        pthread_mutex_unlock(&(w->mutex));
    }
    return NULL;
}
 
int socket_create() {
    int sock;
    if((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
        fprintf(stderr, "cannot create socket, exit\n");
        exit(1);
    }
    int optval;
    optval = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        fprintf(stderr, "cannot set socket option, exit\n");
        exit(1);
    }
 
    struct sockaddr addr;
    const int port = 1024;
    struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
    paddr->sin_family = AF_INET;
    paddr->sin_addr.s_addr = INADDR_ANY;
    paddr->sin_port = htons(port);
    paddr->sin_len = sizeof(struct sockaddr_in);
 
    if (bind(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
        fprintf(stderr, "cannot bind socket, exit\n");
        exit(1);
    }
    if (listen(sock, 1024 * 4) < 0) {
        fprintf(stderr, "cannot listen socket, exit\n");
        exit(1);
    }
    return sock;
}
 
 
int main(int argc, char **argv) {
 
    queue_t* queue = queue_create(1024 * 1024);
    wrkpool_t* wrkpool = wrkpool_create(50, queue, handler);
 
    int sock = socket_create();
 
    int flags = fcntl(sock, F_GETFL, 0);
    fcntl(sock, F_SETFL, flags | O_NONBLOCK);
 
    int kq = kqueue();
    struct kevent event;
    memset(&event, 0, sizeof(struct kevent));
 
    EV_SET(&event, sock, EVFILT_READ, EV_ADD, 0, 0, NULL);
    if (kevent(kq, &event, 1, NULL, 0, NULL) < 0) {
        fprintf(stderr, "unable set kevent\n");
    }
 
    while (1) {
        int ec = 0;
 
        struct kevent tevent;
        memset(&tevent, 0, sizeof(struct kevent));
        if ((ec = kevent(kq, NULL, 0, &tevent, 1, NULL)) < 0) {
            fprintf(stderr, "kevent error\n");
            continue;
        }
 
        int newsock = 0;
        if (tevent.ident == sock) {
            if ((newsock = accept(sock, NULL, 0)) > 3) {
 
                int flags = fcntl(newsock, F_GETFL, 0);
                fcntl(newsock, F_SETFL, flags | O_NONBLOCK);
 
                if (wrkpool_enqueue(wrkpool, newsock) < 0) {
                    close(newsock);
                }
            }
        }
    }
    close(kq);
    close(sock);
    return 0;
}

Out

$ ab -s 20 -c 10000 -n100000 'http://192.168.56.16:1024/'
This is ApacheBench, Version 2.3 <$Revision: 1826891 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 192.168.56.16 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        Srv11/0.1
Server Hostname:        192.168.56.16
Server Port:            1024

Document Path:          /
Document Length:        142 bytes

Concurrency Level:      10000
Time taken for tests:   5.760 seconds
Complete requests:      100000
Failed requests:        0
Total transferred:      30300000 bytes
HTML transferred:       14200000 bytes
Requests per second:    17362.55 [#/sec] (mean)
Time per request:       575.952 [ms] (mean)
Time per request:       0.058 [ms] (mean, across all concurrent requests)
Transfer rate:          5137.55 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0  262 345.0    192    3198
Processing:    63  277  62.5    275    1226
Waiting:        0  199  79.3    198    1199
Total:        292  539 344.3    460    3434

Percentage of the requests served within a certain time (ms)
  50%    460
  66%    472
  75%    479
  80%    481
  90%    769
  95%    882
  98%    942
  99%   3258
 100%   3434 (longest request)

First PagePrevious PageBack to overviewNext PageLast Page