User Tools

Site Tools


This is an old revision of the document!


Threadpool web server with blocked IO

thrsrv-bl.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <fcntl.h>
 
#include <pthread.h>
#include <sys/event.h>
 
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <err.h>
 
 
void msleep(int dur) {
    usleep(dur * 1000);
}
 
typedef struct queue {
    pthread_mutex_t mutex;
    int* items;
    int front;
    int rear;
    int size;
    int capacity;
} queue_t;
 
queue_t* queue_create(int size) {
    queue_t* q = (queue_t*)malloc(sizeof(queue_t));
    q->items = (int*)malloc((size + 1) * sizeof(int));
    q->capacity = size;
    q->front = 0;
    q->rear = -1;
    q->size = 0;
 
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&(q->mutex), NULL);
    pthread_mutexattr_destroy(&attr);
 
    return q;
}
 
int queue_size(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&(q->mutex));
    int res = q->size;
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
int queue_front(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&(q->mutex));
    int res = -1;
    if (q->size > 0) {
        res = q->items[q->front];
        q->front = (q->front + 1) % q->capacity;
        q->size--;
    }
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
int queue_back(queue_t* q, int x) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&q->mutex);
    int res = -1;
    if (x + 2 > q->capacity) {
        int newcapacity = q->capacity * 2;
        void* items = realloc(q->items, sizeof(int) * newcapacity);
        if (items != NULL) {
            q->items = items;
            q->capacity = newcapacity;
        }
    }
    if (x < q->capacity) {;
        q->rear = (q->rear + 1) % q->capacity;
        q->items[q->rear] = x;
        q->size++;
        res = 1;
    }
    pthread_mutex_unlock(&(q->mutex));
    return res;
}
 
void queue_lock(queue_t* q) {
    pthread_mutex_lock(&(q->mutex));
}
 
void queue_unlock(queue_t* q) {
    pthread_mutex_unlock(&(q->mutex));
}
 
void queue_free(queue_t* q) {
    if (q == NULL) return;
    free(q->items);
    free(q);
}
 
typedef struct worker {
    pthread_mutex_t mutex;
    pthread_t* thread;
    pthread_cond_t* cond;
    queue_t* queue;
} worker_t;
 
worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) {
    worker_t* w = (worker_t*)malloc(sizeof(worker_t));
    w->queue = queue;
    w->cond = cond;
    w->thread = (pthread_t*)malloc(sizeof(pthread_t));
 
    pthread_mutex_init(&(w->mutex), NULL);
 
    pthread_create(w->thread, NULL, handler, (void*)w);
    pthread_detach(*(w->thread));
 
    return w;
}
 
typedef struct wrkpool {
    int size;
    worker_t** workers;
    queue_t* queue;
    pthread_cond_t cond;
} wrkpool_t;
 
 
wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));
    wp->queue = queue;
 
    wp->workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1));
 
    pthread_cond_init(&(wp->cond), NULL);
 
    for (int i = 0; i < size; i++) {
        wp->workers[i] = worker_create(queue, &(wp->cond), handler);
    }
    wp->size = size;
    return wp;
}
 
int wrkpool_enqueue(wrkpool_t* wp, int newsock) {
    queue_back(wp->queue, newsock);
    for (int i = 0; i < wp->size; i++) {
        worker_t* w = wp->workers[i];
        pthread_mutex_unlock(&(w->mutex));
    }
    pthread_cond_broadcast(&(wp->cond));
    return 1;
}
 
void socket_rw(int sock) {
 
    const int reqcap = 1024;
    char reqbuf[reqcap] = "";
    memset(reqbuf, 0, reqcap);
    int rv = 0;
    int trv = 0;
    int rep = 0;
 
    while(1) {
        const int tmpsize = 1024 * 2;
        char tmpbuf[tmpsize];
        memset(tmpbuf, 0, tmpsize);
 
        rv = read(sock, tmpbuf, tmpsize);
 
        strncat(reqbuf, tmpbuf, rv);
        trv += rv;
        if (strnstr(reqbuf, "\r\n\r\n", trv) != NULL) break;
        if (++rep > 4) {
            close(sock);
            return;
        }
    }
    if (trv > 0) {
 
        char content[] = "<html>\r\n"
                         "<head><title>Hello</title></head>\r\n"
                         "<body>\r\n"
                         "<center><h1>Hello, World!</h1></center>\r\n"
                         "<hr><center>Srv11/0.1</center>\r\n"
                         "</body>\r\n"
                         "</html>\r\n";
        char header[] = "HTTP/1.1 200 OK\r\n"
                        "Accept-Ranges: bytes\r\n"
                        "Content-Type: text/html\r\n"
                        "Content-Length: 142\r\n"
                        "Conection: close\r\n"
                        "Server: Srv11/0.1\r\n"
                        "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"
                        "\r\n";
        const int rescap = 1024;
        char res[rescap] = "";
        strcat(res, header);
        strcat(res, content);
 
        write(sock, res, strlen(res));
 
        shutdown(sock, SHUT_RDWR);
    }
    close(sock);
}
 
void* handler(void* argp) {
 
    worker_t* w = (worker_t*)argp;
    queue_t* queue = w->queue;
 
    while(true) {
        pthread_mutex_lock(&(w->mutex));
        while(queue_size(queue) == 0) {
            pthread_cond_wait(w->cond, &(w->mutex));
        }
 
        queue_lock(queue);
        if (queue_size(queue) == 0) {
            continue;
            queue_unlock(queue);
        }
        int sock = queue_front(queue);
        queue_unlock(queue);
 
        socket_rw(sock);
 
        pthread_mutex_unlock(&(w->mutex));
    }
    return NULL;
}
 
int socket_create(int port, int backlog) {
    int sock;
    if((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
        fprintf(stderr, "cannot create socket, exit\n");
        exit(1);
    }
    int optval;
    optval = 1;
    if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval)) < 0) {
        fprintf(stderr, "cannot set socket option, exit\n");
        exit(1);
    }
 
 
 
    struct sockaddr addr;
    struct sockaddr_in* paddr = (struct sockaddr_in*)&addr;
    paddr->sin_family = AF_INET;
    paddr->sin_addr.s_addr = INADDR_ANY;
    paddr->sin_port = htons(port);
    paddr->sin_len = sizeof(struct sockaddr_in);
 
    if (bind(sock, (struct sockaddr*)paddr, paddr->sin_len) < 0) {
        fprintf(stderr, "cannot bind socket, exit\n");
        exit(1);
    }
    if (listen(sock, backlog) < 0) {
        fprintf(stderr, "cannot listen socket, exit\n");
        exit(1);
    }
    return sock;
}
 
 
int main(int argc, char **argv) {
 
    queue_t* queue = queue_create(1024 * 4);
    wrkpool_t* wrkpool = wrkpool_create(50, queue, handler);
 
    int sock = socket_create(1024, 4096);
 
    while (1) {
        int newsock = 0;
        if ((newsock = accept(sock, NULL, 0)) > 3) {
 
            struct timeval tv;
            tv.tv_sec = 3;
            tv.tv_usec = 0;
            if (setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(struct timeval)) < 0) {
                fprintf(stderr, "cannot set socket option, exit\n");
                exit(1);
            }
 
            if (wrkpool_enqueue(wrkpool, newsock) < 0) {
                close(newsock);
            }
        }
    }
    close(sock);
    return 0;
}

First PagePrevious PageBack to overviewNext PageLast Page