User Tools

Site Tools


Differences

This shows you the differences between two versions of the page.

Link to this comparison view

c:thrsrv-bs [2019-06-01 15:56]
ziggi created
c:thrsrv-bs [2020-02-15 00:57]
Line 1: Line 1:
- 
-=====Threadpool web server with blocked IO===== 
- 
-<code c thrsrv-bl.c>​ 
-/* 
- * Copyright 2004-2019 Oleg Borodin ​ <​borodin@unix7.org>​ 
- */ 
- 
-#include <​stdlib.h>​ 
-#include <​stdio.h>​ 
-#include <​stdbool.h>​ 
-#include <​unistd.h>​ 
-#include <​string.h>​ 
-#include <​fcntl.h>​ 
- 
-#include <​pthread.h>​ 
-#include <​sys/​event.h>​ 
- 
-#include <​sys/​types.h>​ 
-#include <​sys/​socket.h>​ 
-#include <​arpa/​inet.h>​ 
-#include <​netinet/​in.h>​ 
-#include <​sys/​time.h>​ 
-#include <​err.h>​ 
- 
- 
-void msleep(int dur) { 
-    usleep(dur * 1000); 
-} 
- 
-typedef struct queue { 
-    pthread_mutex_t mutex; 
-    int* items; 
-    int front; 
-    int rear; 
-    int size; 
-    int capacity; 
-} queue_t; 
- 
-queue_t* queue_create(int size) { 
-    queue_t* q = (queue_t*)malloc(sizeof(queue_t));​ 
-    q->items = (int*)malloc((size + 1) * sizeof(int));​ 
-    q->​capacity = size; 
-    q->front = 0; 
-    q->rear = -1; 
-    q->size = 0; 
- 
-    pthread_mutexattr_t attr; 
-    pthread_mutexattr_init(&​attr);​ 
-    pthread_mutexattr_settype(&​attr,​ PTHREAD_MUTEX_RECURSIVE);​ 
-    pthread_mutex_init(&​(q->​mutex),​ NULL); 
-    pthread_mutexattr_destroy(&​attr);​ 
- 
-    return q; 
-} 
- 
-int queue_size(queue_t* q) { 
-    if (q == NULL) return -1; 
-    pthread_mutex_lock(&​(q->​mutex));​ 
-    int res = q->size; 
-    pthread_mutex_unlock(&​(q->​mutex));​ 
-    return res; 
-} 
- 
-int queue_front(queue_t* q) { 
-    if (q == NULL) return -1; 
-    pthread_mutex_lock(&​(q->​mutex));​ 
-    int res = -1; 
-    if (q->size > 0) { 
-        res = q->​items[q->​front];​ 
-        q->front = (q->​front + 1) % q->​capacity;​ 
-        q->​size--;​ 
-    } 
-    pthread_mutex_unlock(&​(q->​mutex));​ 
-    return res; 
-} 
- 
-int queue_back(queue_t* q, int x) { 
-    if (q == NULL) return -1; 
-    pthread_mutex_lock(&​q->​mutex);​ 
-    int res = -1; 
-    if (x + 2 > q->​capacity) { 
-        int newcapacity = q->​capacity * 2; 
-        void* items = realloc(q->​items,​ sizeof(int) * newcapacity);​ 
-        if (items != NULL) { 
-            q->items = items; 
-            q->​capacity = newcapacity;​ 
-        } 
-    } 
-    if (x < q->​capacity) {; 
-        q->rear = (q->rear + 1) % q->​capacity;​ 
-        q->​items[q->​rear] = x; 
-        q->​size++;​ 
-        res = 1; 
-    } 
-    pthread_mutex_unlock(&​(q->​mutex));​ 
-    return res; 
-} 
- 
-void queue_lock(queue_t* q) { 
-    pthread_mutex_lock(&​(q->​mutex));​ 
-} 
- 
-void queue_unlock(queue_t* q) { 
-    pthread_mutex_unlock(&​(q->​mutex));​ 
-} 
- 
-void queue_free(queue_t* q) { 
-    if (q == NULL) return; 
-    free(q->​items);​ 
-    free(q); 
-} 
- 
-typedef struct worker { 
-    pthread_mutex_t mutex; 
-    pthread_t* thread; 
-    pthread_cond_t* cond; 
-    queue_t* queue; 
-} worker_t; 
- 
-worker_t* worker_create(queue_t* queue, pthread_cond_t* cond, void* handler(void*)) { 
-    worker_t* w = (worker_t*)malloc(sizeof(worker_t));​ 
-    w->queue = queue; 
-    w->cond = cond; 
-    w->​thread = (pthread_t*)malloc(sizeof(pthread_t));​ 
- 
-    pthread_mutex_init(&​(w->​mutex),​ NULL); 
- 
-    pthread_create(w->​thread,​ NULL, handler, (void*)w); 
-    pthread_detach(*(w->​thread));​ 
- 
-    return w; 
-} 
- 
-typedef struct wrkpool { 
-    int size; 
-    worker_t** workers; 
-    queue_t* queue; 
-    pthread_cond_t cond; 
-} wrkpool_t; 
- 
- 
-wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) { 
-    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));​ 
-    wp->​queue = queue; 
- 
-    wp->​workers = (worker_t**)malloc(sizeof(worker_t*) * (size + 1)); 
- 
-    pthread_cond_init(&​(wp->​cond),​ NULL); 
- 
-    for (int i = 0; i < size; i++) { 
-        wp->​workers[i] = worker_create(queue,​ &​(wp->​cond),​ handler); 
-    } 
-    wp->size = size; 
-    return wp; 
-} 
- 
-int wrkpool_enqueue(wrkpool_t* wp, int newsock) { 
-    queue_back(wp->​queue,​ newsock); 
-    for (int i = 0; i < wp->​size;​ i++) { 
-        worker_t* w = wp->​workers[i];​ 
-        pthread_mutex_unlock(&​(w->​mutex));​ 
-    } 
-    pthread_cond_broadcast(&​(wp->​cond));​ 
-    return 1; 
-} 
- 
-void socket_rw(int sock) { 
- 
-    const int reqcap = 1024; 
-    char reqbuf[reqcap] = "";​ 
-    memset(reqbuf,​ 0, reqcap); 
-    int rv = 0; 
-    int trv = 0; 
-    int rep = 0; 
- 
-    while(1) { 
-        const int tmpsize = 1024 * 2; 
-        char tmpbuf[tmpsize];​ 
-        memset(tmpbuf,​ 0, tmpsize); 
- 
-        rv = read(sock, tmpbuf, tmpsize); 
- 
-        strncat(reqbuf,​ tmpbuf, rv); 
-        trv += rv; 
-        if (strnstr(reqbuf,​ "​\r\n\r\n",​ trv) != NULL) break; 
-        if (++rep > 4) { 
-            close(sock);​ 
-            return; 
-        } 
-    } 
-    if (trv > 0) { 
- 
-        char content[] = "<​html>​\r\n"​ 
-                         "<​head><​title>​Hello</​title></​head>​\r\n"​ 
-                         "<​body>​\r\n"​ 
-                         "<​center><​h1>​Hello,​ World!</​h1></​center>​\r\n"​ 
-                         "<​hr><​center>​Srv11/​0.1</​center>​\r\n"​ 
-                         "</​body>​\r\n"​ 
-                         "</​html>​\r\n";​ 
-        char header[] = "​HTTP/​1.1 200 OK\r\n"​ 
-                        "​Accept-Ranges:​ bytes\r\n"​ 
-                        "​Content-Type:​ text/​html\r\n"​ 
-                        "​Content-Length:​ 142\r\n"​ 
-                        "​Conection:​ close\r\n"​ 
-                        "​Server:​ Srv11/​0.1\r\n"​ 
-                        "Date: Wed, 16 May 2019 20:05:07 UTC\r\n"​ 
-                        "​\r\n";​ 
-        const int rescap = 1024; 
-        char res[rescap] = "";​ 
-        strcat(res, header); 
-        strcat(res, content); 
- 
-        write(sock, res, strlen(res));​ 
- 
-        shutdown(sock,​ SHUT_RDWR); 
-    } 
-    close(sock);​ 
-} 
- 
-void* handler(void* argp) { 
- 
-    worker_t* w = (worker_t*)argp;​ 
-    queue_t* queue = w->​queue;​ 
- 
-    while(true) { 
-        pthread_mutex_lock(&​(w->​mutex));​ 
-        while(queue_size(queue) == 0) { 
-            pthread_cond_wait(w->​cond,​ &​(w->​mutex));​ 
-        } 
- 
-        queue_lock(queue);​ 
-        if (queue_size(queue) == 0) { 
-            continue; 
-            queue_unlock(queue);​ 
-        } 
-        int sock = queue_front(queue);​ 
-        queue_unlock(queue);​ 
- 
-        socket_rw(sock);​ 
- 
-        pthread_mutex_unlock(&​(w->​mutex));​ 
-    } 
-    return NULL; 
-} 
- 
-int socket_create(int port, int backlog) { 
-    int sock; 
-    if((sock = socket(PF_INET,​ SOCK_STREAM,​ 0)) < 0) { 
-        fprintf(stderr,​ "​cannot create socket, exit\n"​);​ 
-        exit(1); 
-    } 
-    int optval; 
-    optval = 1; 
-    if (setsockopt(sock,​ SOL_SOCKET, SO_REUSEADDR,​ &​optval,​ sizeof(optval)) < 0) { 
-        fprintf(stderr,​ "​cannot set socket option, exit\n"​);​ 
-        exit(1); 
-    } 
- 
- 
- 
-    struct sockaddr addr; 
-    struct sockaddr_in* paddr = (struct sockaddr_in*)&​addr;​ 
-    paddr->​sin_family = AF_INET; 
-    paddr->​sin_addr.s_addr = INADDR_ANY; 
-    paddr->​sin_port = htons(port);​ 
-    paddr->​sin_len = sizeof(struct sockaddr_in);​ 
- 
-    if (bind(sock, (struct sockaddr*)paddr,​ paddr->​sin_len) < 0) { 
-        fprintf(stderr,​ "​cannot bind socket, exit\n"​);​ 
-        exit(1); 
-    } 
-    if (listen(sock,​ backlog) < 0) { 
-        fprintf(stderr,​ "​cannot listen socket, exit\n"​);​ 
-        exit(1); 
-    } 
-    return sock; 
-} 
- 
- 
-int main(int argc, char **argv) { 
- 
-    queue_t* queue = queue_create(1024 * 4); 
-    wrkpool_t* wrkpool = wrkpool_create(50,​ queue, handler); 
- 
-    int sock = socket_create(1024,​ 4096); 
- 
-    while (1) { 
-        int newsock = 0; 
-        if ((newsock = accept(sock,​ NULL, 0)) > 3) { 
- 
-            struct timeval tv; 
-            tv.tv_sec = 3; 
-            tv.tv_usec = 0; 
-            if (setsockopt(sock,​ SOL_SOCKET, SO_RCVTIMEO,​ &tv, sizeof(struct timeval)) < 0) { 
-                fprintf(stderr,​ "​cannot set socket option, exit\n"​);​ 
-                exit(1); 
-            } 
- 
-            if (wrkpool_enqueue(wrkpool,​ newsock) < 0) { 
-                close(newsock);​ 
-            } 
-        } 
-    } 
-    close(sock);​ 
-    return 0; 
-} 
-</​code>​ 
- 
----- 
-[<>]