/* * Author, Copyright: Oleg Borodin <onborodin@gmail.com> */ #include <stdlib.h> #include <stdio.h> #include <stdbool.h> #include <unistd.h> #include <string.h> #include <pthread.h> typedef struct req { void (*cb)(void*); void* data; size_t size; } req_t; req_t* req_create(void(*cb)(void*), void* data, size_t size) { req_t* req = (req_t*)malloc(sizeof(req_t)); req->cb = cb; req->data = data; req->size = size; return req; } void req_free(req_t* req) { free(req); } typedef struct queue { pthread_mutex_t mutex; req_t** items; int front; int rear; int size; int capacity; } queue_t; queue_t* queue_create(int size) { queue_t* q = NULL; if ((q = (queue_t*)malloc(sizeof(queue_t))) == NULL) { return NULL; }; q->items = (req_t**)malloc(size * sizeof(req_t*)); q->capacity = size; q->front = 0; q->rear = -1; q->size = 0; pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&(q->mutex), NULL); pthread_mutexattr_destroy(&attr); return q; } int queue_size(queue_t* q) { if (q == NULL) return -1; pthread_mutex_lock(&q->mutex); int res = q->size; pthread_mutex_unlock(&q->mutex); return res; } req_t* queue_front(queue_t* q) { if (q == NULL) return NULL; req_t* res = NULL; pthread_mutex_lock(&q->mutex); if (q->size > 0) { res = q->items[q->front]; q->front = (q->front + 1) % q->capacity; q->size--; } pthread_mutex_unlock(&q->mutex); return res; } int queue_push(queue_t* q, req_t* x) { if (q == NULL) return -1; bool res = -1; pthread_mutex_lock(&q->mutex); if ((q->size + 1) > q->capacity) { int newcapacity = q->capacity * 2; void* items = realloc(q->items, sizeof(req_t*) * newcapacity); if (items == NULL) { return -1; } q->items = items; q->capacity = newcapacity; } if ((q->size + 1) > q->capacity) return -1; q->rear = (q->rear + 1) % q->capacity; q->items[q->rear] = x; q->size++; res = 1; pthread_mutex_unlock(&q->mutex); return res; } void queue_lock(queue_t* q) { pthread_mutex_lock(&(q->mutex)); } void queue_unlock(queue_t* q) { pthread_mutex_unlock(&(q->mutex)); } void queue_free(queue_t* q) { if (q == NULL) return; free(q->items); free(q); } typedef struct worker { pthread_mutex_t mutex; pthread_t* thread; pthread_cond_t cond; queue_t* queue; } worker_t; worker_t* worker_create(queue_t* queue, void* handler(void*)) { worker_t* w = (worker_t*)malloc(sizeof(worker_t)); pthread_cond_init(&(w->cond), NULL); pthread_mutex_init(&(w->mutex), NULL); w->queue = queue; w->thread = (pthread_t*)malloc(sizeof(pthread_t)); pthread_create(w->thread, NULL, handler, (void*)w); pthread_detach(*(w->thread)); return w; } void worker_notify(worker_t* w) { pthread_mutex_unlock(&(w->mutex)); pthread_cond_broadcast(&(w->cond)); } typedef struct wrkpool { int size; worker_t** workers; queue_t* queue; } wrkpool_t; wrkpool_t* wrkpool_create(const int size, queue_t* queue, void* handler) { wrkpool_t* wp = (wrkpool_t*)malloc(sizeof(wrkpool_t)); wp->queue = queue; wp->workers = (worker_t**)malloc(sizeof(worker_t*) * size); for (int i = 0; i < size; i++) { wp->workers[i] = worker_create(queue, handler); } wp->size = size; return wp; } int wrkpool_enqueue(wrkpool_t* wp, req_t* req) { queue_push(wp->queue, req); for (int i = 0; i < wp->size; i++) { worker_t* w = wp->workers[i]; worker_notify(w); } return 1; } void wrkpool_free(wrkpool_t* wp) { free(wp); } void* handler(void* argp) { worker_t* w = (worker_t*)argp; queue_t* queue = w->queue; while(true) { pthread_mutex_lock(&(w->mutex)); while(queue_size(queue) == 0) { pthread_cond_wait(&(w->cond), &(w->mutex)); } queue_lock(queue); if (queue_size(queue) == 0) { continue; queue_unlock(queue); } req_t* req = queue_front(queue); queue_unlock(queue); if (req == NULL) { pthread_mutex_unlock(&(w->mutex)); continue; } if (req->cb != NULL) { req->cb(req->data); } req_free(req); pthread_mutex_unlock(&(w->mutex)); } return NULL; } void callback1(void* data) { printf("callback1: %s\n", (char*)data); /* free((void*)data); */ } void callback2(void* data) { printf("callback2: %s\n", (char*)data); /* free((void*)data); */ } int main(int argc, char **argv) { queue_t* q = queue_create(10); wrkpool_t* wp = wrkpool_create(5, q, handler); char data1[] = "hello"; req_t* r1 = req_create(callback1, data1, sizeof(data1)); char data2[] = "world"; req_t* r2 = req_create(callback2, data2, sizeof(data2)); wrkpool_enqueue(wp, r1); wrkpool_enqueue(wp, r2); sleep(1); wrkpool_free(wp); queue_free(q); return 0; }