User Tools

Site Tools


Callback worker pool

wrkpool-req.c
/*
 * Copyright 2004-2019 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
 
 
typedef struct req {
    void (*cb)(void*);
    void* data;
    size_t size;
} req_t;
 
req_t* req_create(void(*cb)(void*), void* data, size_t size) {
    req_t* req = (req_t*)malloc(sizeof(req_t));
    req->cb = cb;
    req->data = data;
    req->size = size;
    return req;
}
 
void req_free(req_t* req) {
    free(req);
}
 
typedef struct queue {
    pthread_mutex_t mutex;
    req_t** items;
    int front;
    int rear;
    int size;
    int capacity;
} queue_t;
 
queue_t* queue_create(int size) {
    queue_t* q = NULL;
    if ((q = (queue_t*)malloc(sizeof(queue_t))) == NULL) {
        return NULL;
    };
 
    q->items = (req_t**)malloc(size * sizeof(req_t*));
    q->capacity = size;
    q->front = 0;
    q->rear = -1;
    q->size = 0;
 
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&(q->mutex), NULL);
    pthread_mutexattr_destroy(&attr);
 
    return q;
}
 
int queue_size(queue_t* q) {
    if (q == NULL) return -1;
    pthread_mutex_lock(&q->mutex);
    int res = q->size;
    pthread_mutex_unlock(&q->mutex);
    return res;
}
 
req_t* queue_front(queue_t* q) {
    if (q == NULL) return NULL;
    req_t* res = NULL;
    pthread_mutex_lock(&q->mutex);
    if (q->size > 0) {
        res = q->items[q->front];
        q->front = (q->front + 1) % q->capacity;
        q->size--;
    }
    pthread_mutex_unlock(&q->mutex);
    return res;
}
 
int queue_push(queue_t* q, req_t* x) {
    if (q == NULL) return -1;
    bool res = -1;
    pthread_mutex_lock(&q->mutex);
 
    if ((q->size + 1) > q->capacity) {
        int newcapacity = q->capacity * 2;
        void* items = realloc(q->items, sizeof(req_t*) * newcapacity);
        if (items == NULL) {
            return -1;
        }
        q->items = items;
        q->capacity = newcapacity;
    }
    if ((q->size + 1) > q->capacity) return -1;
    q->rear = (q->rear + 1) % q->capacity;
    q->items[q->rear] = x;
    q->size++;
    res = 1;
    pthread_mutex_unlock(&q->mutex);
    return res;
}
 
void queue_lock(queue_t* q) {
    pthread_mutex_lock(&(q->mutex));
}
 
void queue_unlock(queue_t* q) {
    pthread_mutex_unlock(&(q->mutex));
}
 
void queue_free(queue_t* q) {
    if (q == NULL) return;
    free(q->items);
    free(q);
}
 
typedef struct worker {
    pthread_mutex_t mutex;
    pthread_t* thread;
    pthread_cond_t cond;
    queue_t* queue;
} worker_t;
 
worker_t* worker_create(queue_t* queue, void* handler(void*)) {
    worker_t* w = (worker_t*)malloc(sizeof(worker_t));
 
    pthread_cond_init(&(w->cond), NULL);
    pthread_mutex_init(&(w->mutex), NULL);
 
    w->queue = queue;
    w->thread = (pthread_t*)malloc(sizeof(pthread_t));
    pthread_create(w->thread, NULL, handler, (void*)w);
    pthread_detach(*(w->thread));
 
    return w;
}
 
void worker_notify(worker_t* w) {
    pthread_mutex_unlock(&(w->mutex));
    pthread_cond_broadcast(&(w->cond));
}
 
typedef struct wrkpool {
    int size;
    worker_t** workers;
    queue_t* queue;
} wrkpool_t;
 
 
wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) {
    wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t));
    wp->queue = queue;
 
    wp->workers = (worker_t**)malloc(sizeof(worker_t*) * size);
    for (int i = 0; i < size; i++) {
        wp->workers[i] = worker_create(queue, handler);
    }
    wp->size = size;
    return wp;
}
 
int wrkpool_enqueue(wrkpool_t* wp, req_t* req) {
    queue_push(wp->queue, req);
    for (int i = 0; i < wp->size; i++) {
        worker_t* w = wp->workers[i];
        worker_notify(w);
    }
    return 1;
}
 
void wrkpool_free(wrkpool_t* wp) {
    free(wp);
}
 
void* handler(void* argp) {
 
    worker_t* w = (worker_t*)argp;
    queue_t* queue = w->queue;
 
    while(true) {
        pthread_mutex_lock(&(w->mutex));
        while(queue_size(queue) == 0) {
            pthread_cond_wait(&(w->cond), &(w->mutex));
        }
 
        queue_lock(queue);
        if (queue_size(queue) == 0) {
            continue;
            queue_unlock(queue);
        }
        req_t* req = queue_front(queue);
        queue_unlock(queue);
 
        if (req == NULL) {
            pthread_mutex_unlock(&(w->mutex));
            continue;
        }
 
 
        if (req->cb != NULL) {
            req->cb(req->data);
        }
        req_free(req);
        pthread_mutex_unlock(&(w->mutex));
    }
    return NULL;
}
 
void callback1(void* data) {
    printf("callback1: %s\n", (char*)data);
    /* free((void*)data); */
}
 
void callback2(void* data) {
    printf("callback2: %s\n", (char*)data);
    /* free((void*)data); */
}
 
 
int main(int argc, char **argv) {
 
    queue_t* q = queue_create(10);
    wrkpool_t* wp = wrkpool_create(5, q, handler);
 
    char data1[] = "hello";
    req_t* r1 = req_create(callback1, data1, sizeof(data1));
 
    char data2[] = "world";
    req_t* r2 = req_create(callback2, data2, sizeof(data2));
 
    wrkpool_enqueue(wp, r1);
    wrkpool_enqueue(wp, r2);
 
    sleep(1);
 
    wrkpool_free(wp);
    queue_free(q);
    return 0;
}

out

$ ./wrkpool-req
callback1: hello
callback2: world

First PagePrevious PageBack to overviewNext PageLast Page