User Tools

Site Tools


Differences

This shows you the differences between two versions of the page.

Link to this comparison view

c:wrkpoll-reqcb [2019-06-04 13:16] (current)
ziggi created
Line 1: Line 1:
 +
 +=====Callback worker pool=====
 +
 +<code c wrkpool-req.c>​
 +/*
 + * Copyright 2004-2019 Oleg Borodin ​ <​borodin@unix7.org>​
 + */
 +
 +#include <​stdlib.h>​
 +#include <​stdio.h>​
 +#include <​stdbool.h>​
 +#include <​unistd.h>​
 +#include <​string.h>​
 +#include <​pthread.h>​
 +
 +
 +typedef struct req {
 +    void (*cb)(void*);​
 +    void* data;
 +    size_t size;
 +} req_t;
 +
 +req_t* req_create(void(*cb)(void*),​ void* data, size_t size) {
 +    req_t* req = (req_t*)malloc(sizeof(req_t));​
 +    req->cb = cb;
 +    req->​data = data;
 +    req->​size = size;
 +    return req;
 +}
 +
 +void req_free(req_t* req) {
 +    free(req);
 +}
 +
 +typedef struct queue {
 +    pthread_mutex_t mutex;
 +    req_t** items;
 +    int front;
 +    int rear;
 +    int size;
 +    int capacity;
 +} queue_t;
 +
 +queue_t* queue_create(int size) {
 +    queue_t* q = NULL;
 +    if ((q = (queue_t*)malloc(sizeof(queue_t))) == NULL) {
 +        return NULL;
 +    };
 +
 +    q->items = (req_t**)malloc(size * sizeof(req_t*));​
 +    q->​capacity = size;
 +    q->front = 0;
 +    q->rear = -1;
 +    q->size = 0;
 +
 +    pthread_mutexattr_t attr;
 +    pthread_mutexattr_init(&​attr);​
 +    pthread_mutexattr_settype(&​attr,​ PTHREAD_MUTEX_RECURSIVE);​
 +    pthread_mutex_init(&​(q->​mutex),​ NULL);
 +    pthread_mutexattr_destroy(&​attr);​
 +
 +    return q;
 +}
 +
 +int queue_size(queue_t* q) {
 +    if (q == NULL) return -1;
 +    pthread_mutex_lock(&​q->​mutex);​
 +    int res = q->size;
 +    pthread_mutex_unlock(&​q->​mutex);​
 +    return res;
 +}
 +
 +req_t* queue_front(queue_t* q) {
 +    if (q == NULL) return NULL;
 +    req_t* res = NULL;
 +    pthread_mutex_lock(&​q->​mutex);​
 +    if (q->size > 0) {
 +        res = q->​items[q->​front];​
 +        q->front = (q->​front + 1) % q->​capacity;​
 +        q->​size--;​
 +    }
 +    pthread_mutex_unlock(&​q->​mutex);​
 +    return res;
 +}
 +
 +int queue_push(queue_t* q, req_t* x) {
 +    if (q == NULL) return -1;
 +    bool res = -1;
 +    pthread_mutex_lock(&​q->​mutex);​
 +
 +    if ((q->​size + 1) > q->​capacity) {
 +        int newcapacity = q->​capacity * 2;
 +        void* items = realloc(q->​items,​ sizeof(req_t*) * newcapacity);​
 +        if (items == NULL) {
 +            return -1;
 +        }
 +        q->items = items;
 +        q->​capacity = newcapacity;​
 +    }
 +    if ((q->​size + 1) > q->​capacity) return -1;
 +    q->rear = (q->rear + 1) % q->​capacity;​
 +    q->​items[q->​rear] = x;
 +    q->​size++;​
 +    res = 1;
 +    pthread_mutex_unlock(&​q->​mutex);​
 +    return res;
 +}
 +
 +void queue_lock(queue_t* q) {
 +    pthread_mutex_lock(&​(q->​mutex));​
 +}
 +
 +void queue_unlock(queue_t* q) {
 +    pthread_mutex_unlock(&​(q->​mutex));​
 +}
 +
 +void queue_free(queue_t* q) {
 +    if (q == NULL) return;
 +    free(q->​items);​
 +    free(q);
 +}
 +
 +typedef struct worker {
 +    pthread_mutex_t mutex;
 +    pthread_t* thread;
 +    pthread_cond_t cond;
 +    queue_t* queue;
 +} worker_t;
 +
 +worker_t* worker_create(queue_t* queue, void* handler(void*)) {
 +    worker_t* w = (worker_t*)malloc(sizeof(worker_t));​
 +
 +    pthread_cond_init(&​(w->​cond),​ NULL);
 +    pthread_mutex_init(&​(w->​mutex),​ NULL);
 +
 +    w->queue = queue;
 +    w->​thread = (pthread_t*)malloc(sizeof(pthread_t));​
 +    pthread_create(w->​thread,​ NULL, handler, (void*)w);
 +    pthread_detach(*(w->​thread));​
 +
 +    return w;
 +}
 +
 +void worker_notify(worker_t* w) {
 +    pthread_mutex_unlock(&​(w->​mutex));​
 +    pthread_cond_broadcast(&​(w->​cond));​
 +}
 +
 +typedef struct wrkpool {
 +    int size;
 +    worker_t** workers;
 +    queue_t* queue;
 +} wrkpool_t;
 +
 +
 +wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
 +    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));​
 +    wp->​queue = queue;
 +
 +    wp->​workers = (worker_t**)malloc(sizeof(worker_t*) * size);
 +    for (int i = 0; i < size; i++) {
 +        wp->​workers[i] = worker_create(queue,​ handler);
 +    }
 +    wp->size = size;
 +    return wp;
 +}
 +
 +int wrkpool_enqueue(wrkpool_t* wp, req_t* req) {
 +    queue_push(wp->​queue,​ req);
 +    for (int i = 0; i < wp->​size;​ i++) {
 +        worker_t* w = wp->​workers[i];​
 +        worker_notify(w);​
 +    }
 +    return 1;
 +}
 +
 +void wrkpool_free(wrkpool_t* wp) {
 +    free(wp);
 +}
 +
 +void* handler(void* argp) {
 +
 +    worker_t* w = (worker_t*)argp;​
 +    queue_t* queue = w->​queue;​
 +
 +    while(true) {
 +        pthread_mutex_lock(&​(w->​mutex));​
 +        while(queue_size(queue) == 0) {
 +            pthread_cond_wait(&​(w->​cond),​ &​(w->​mutex));​
 +        }
 +
 +        queue_lock(queue);​
 +        if (queue_size(queue) == 0) {
 +            continue;
 +            queue_unlock(queue);​
 +        }
 +        req_t* req = queue_front(queue);​
 +        queue_unlock(queue);​
 +
 +        if (req == NULL) {
 +            pthread_mutex_unlock(&​(w->​mutex));​
 +            continue;
 +        }
 +
 +
 +        if (req->cb != NULL) {
 +            req->​cb(req->​data);​
 +        }
 +        req_free(req);​
 +        pthread_mutex_unlock(&​(w->​mutex));​
 +    }
 +    return NULL;
 +}
 +
 +void callback1(void* data) {
 +    printf("​callback1:​ %s\n", (char*)data);​
 +    /* free((void*)data);​ */
 +}
 +
 +void callback2(void* data) {
 +    printf("​callback2:​ %s\n", (char*)data);​
 +    /* free((void*)data);​ */
 +}
 +
 +
 +int main(int argc, char **argv) {
 +
 +    queue_t* q = queue_create(10);​
 +    wrkpool_t* wp = wrkpool_create(5,​ q, handler);
 +
 +    char data1[] = "​hello";​
 +    req_t* r1 = req_create(callback1,​ data1, sizeof(data1));​
 +
 +    char data2[] = "​world";​
 +    req_t* r2 = req_create(callback2,​ data2, sizeof(data2));​
 +
 +    wrkpool_enqueue(wp,​ r1);
 +    wrkpool_enqueue(wp,​ r2);
 +
 +    sleep(1);
 +
 +    wrkpool_free(wp);​
 +    queue_free(q);​
 +    return 0;
 +}
 +</​code>​
 +
 +===out===
 +
 +<​file>​
 +$ ./​wrkpool-req
 +callback1: hello
 +callback2: world
 +</​file>​
 +
 +----
 +[<>]