User Tools

Site Tools


Thread pool with unlimited channel

Пул работников с одной общей “неограниченной” очередью, мягкой остановкой, ожиданием остановки всей группы.

Note: the code w/o some check of return values.

Also in https://github.com/kindsoldier/thrpool

Channel benchmark

~120ns per write / read.

$ gmake clean test
rm -f *_test
rm -f *.o *~
cc -c -O2 -Wall -I. -std=c99 -pthread -o channel_test.o channel_test.c
cc -c -O2 -Wall -I. -std=c99 -pthread -o tools.o tools.c
cc -c -O2 -Wall -I. -std=c99 -pthread -o worker.o worker.c
cc -c -O2 -Wall -I. -std=c99 -pthread -o channel.o channel.c
cc -c -O2 -Wall -I. -std=c99 -pthread -o syncer.o syncer.c
cc -pthread -o channel_test channel_test.o tools.o worker.o channel.o syncer.o

./channel_test
time per rw: 118 ns

Run example

$ ./main
main started
worker 1 start
worker 2 start
worker 4 start
worker 3 start
worker 4 run req 0
worker 4 run req 3
worker 4 run req 4
worker 2 run req 5
worker 2 run req 7
worker 2 run req 8
worker 4 run req 6
worker 4 run req 10
worker 4 run req 11
worker 3 run req 2
worker 1 run req 1
worker 2 run req 9
worker 1 is canceled
worker 2 is canceled
worker 4 is canceled
worker 3 is canceled
main done

makefile

Makefile
all: main
 
CC = cc
CFLAGS = -O -Wall -I. -std=c99 -pthread
LDFLAGS = -pthread
 
.c.o:
	$(CC) -c $(CFLAGS) -o $@ $<
 
main.c: main.h
main.o: main.c
 
syncer.c: syncer.h
syncer.o: syncer.c
 
channel.c: channel.h
channel.o: channel.c
 
worker.c: worker.h syncer.h
worker.o: worker.c
 
tools.c: tools.h
tools.o: tools.c
 
OBJS += tools.o
OBJS += worker.o
OBJS += channel.o
OBJS += syncer.o
OBJS += main.o
 
main: $(OBJS)
	$(CC) $(LDFLAGS) -o $@ $(OBJS)
 
test: main
	./main
 
clean:
	rm -f main
	rm -f *.o *~

channel

channel.h
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <semaphore.h>
 
#include <channel.h>
 
struct cell {
    cell_t* next;
    int     data;
};
 
 
static cell_t* new_cell(int data) {
    cell_t* cell = malloc(sizeof(cell_t));
    cell->data = data;
    cell->next = NULL;
    return cell;
}
 
static void cell_free(cell_t* cell) {
    free(cell);
}
 
void channel_init(channel_t* channel) {
    sem_init(&(channel->lock), 0, 1);
    sem_init(&(channel->sem), 0, 0);
 
    channel->head = NULL;
    channel->tail = NULL;
    channel->size = 0;
    return;
}
 
void channel_destroy(channel_t* channel) {
    sem_wait(&(channel->lock));
    int qsize = channel->size;
    for (int i = 0; i < qsize; i++) {
        switch(channel->size) {
            case 0: {
                break;
            }
            case 1: {
                cell_free(channel->head);
                channel->head = NULL;
                channel->tail = NULL;
                channel->size = 0;
                break;
            }
            default: {
                cell_t* oldcell = channel->head;
                channel->head = channel->head->next;
                channel->size--;
                cell_free(oldcell);
                break;
            }
        }
    }
    sem_post(&(channel->lock));
 
    sem_destroy(&(channel->sem));
    sem_destroy(&(channel->lock));
}
 
int channel_write(channel_t* channel, int elem) {
    sem_wait(&(channel->lock));
    int res = 0;
    cell_t* newcell = new_cell(elem);
    uintptr_t headptr = (uintptr_t)channel->head;
    switch(headptr) {
        case (uintptr_t)NULL: {
            channel->head = newcell;
            channel->tail = newcell;
            channel->size++;
            res = 1;
            break;
        }
        default: {
            channel->tail->next = newcell;
            channel->tail = newcell;
            channel->size++;
            res = 1;
            break;
        }
    }
    sem_post(&(channel->lock));
    sem_post(&(channel->sem));
    return res;
}
 
int channel_read(channel_t* channel, int* elem) {
    sem_wait(&(channel->sem));
    sem_wait(&(channel->lock));
    int res = 0;
    switch(channel->size) {
        case 0: {
            res = -1;
            break;
        }
        case 1: {
            *elem = channel->head->data;
            cell_free(channel->head);
            channel->head = NULL;
            channel->tail = NULL;
            channel->size = 0;
            res = 1;
            break;
        }
        default: {
            *elem = channel->head->data;
            cell_t* oldcell = channel->head;
            channel->head = channel->head->next;
            channel->size--;
            cell_free(oldcell);
            res = 1;
            break;
        }
    }
    sem_post(&(channel->lock));
    return res;
}
 
int channel_timedread(channel_t* channel, int* elem, struct timespec* ts) {
    sem_timedwait(&(channel->sem), ts);
    sem_wait(&(channel->lock));
    int res = 0;
    switch(channel->size) {
        case 0: {
            res = -1;
            break;
        }
        case 1: {
            *elem = channel->head->data;
            cell_free(channel->head);
            channel->head = NULL;
            channel->tail = NULL;
            channel->size = 0;
            res = 1;
            break;
        }
        default: {
            *elem = channel->head->data;
            cell_t* oldcell = channel->head;
            channel->head = channel->head->next;
            channel->size--;
            cell_free(oldcell);
            res = 1;
            break;
        }
    }
    sem_post(&(channel->lock));
    return res;
}

syncer

syncer.h
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#ifndef SYNCER_H_QWERTY
#define SYNCER_H_QWERTY
 
#include <semaphore.h>
#include <stdatomic.h>
 
typedef struct {
    sem_t       sem;
    atomic_int  num;
} syncer_t;
 
syncer_t* new_syncer();
void syncer_add(syncer_t* syncer);
void syncer_done(syncer_t* syncer);
void syncer_wait(syncer_t* syncer);
void syncer_init(syncer_t* syncer);
void syncer_destroy(syncer_t* syncer);
void syncer_free(syncer_t* syncer);
 
#endif
syncer.c
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <syncer.h>
 
syncer_t* new_syncer() {
    syncer_t* syncer = malloc(sizeof(syncer_t));
    syncer->num = 0;
    sem_init(&(syncer->sem), 1, 0);
    return syncer;
}
 
void syncer_add(syncer_t* syncer) {
    syncer->num++;
}
 
void syncer_done(syncer_t* syncer) {
    if ((--syncer->num) == 0) sem_post(&(syncer->sem));
}
 
void syncer_wait(syncer_t* syncer) {
    if ((syncer->num) < 1) sem_post(&(syncer->sem));
    sem_wait(&(syncer->sem));
}
 
void syncer_init(syncer_t* syncer) {
    syncer->num = 0;
    sem_init(&(syncer->sem), 1, 0);
}
 
void syncer_destroy(syncer_t* syncer) {
    sem_destroy(&(syncer->sem));
}
 
void syncer_free(syncer_t* syncer) {
    sem_destroy(&(syncer->sem));
    free(syncer);
}

worker

worker.h
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
 
#ifndef WORKER_H_QWERTY
#define WORKER_H_QWERTY
 
#include <stdatomic.h>
#include <semaphore.h>
#include <pthread.h>
 
#include <syncer.h>
#include <channel.h>
 
typedef struct {
    int         ident;
    channel_t*  channel;
    syncer_t*   syncer;
    pthread_t   thread;
    atomic_int  doexit;
} worker_t;
 
 
worker_t* new_worker(channel_t* channel, syncer_t* syncer, int ident);
void worker_init(worker_t* worker, channel_t* channel, syncer_t* syncer, int ident);
 
void worker_run(worker_t* worker);
void worker_cancel(worker_t* worker);
void worker_destroy(worker_t* worker);
void worker_free(worker_t* worker);
 
#endif
worker.c
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdatomic.h>
#include <semaphore.h>
#include <pthread.h>
#include <stdbool.h>
 
#include <syncer.h>
#include <channel.h>
#include <worker.h>
#include <tools.h>
 
worker_t* new_worker(channel_t* channel, syncer_t* syncer, int ident) {
    worker_t* worker = malloc(sizeof(worker_t));
    worker->ident   = ident;
    worker->channel = channel;
    worker->syncer  = syncer;
    worker->doexit = 0;
    return worker;
}
 
void worker_init(worker_t* worker, channel_t* channel, syncer_t* syncer, int ident) {
    worker->ident   = ident;
    worker->channel = channel;
    worker->syncer  = syncer;
    worker->doexit = 0;
}
 
static void worker_doexit(worker_t* worker) {
    if (worker->doexit > 0) {
        printf("worker %d is canceled\n", worker->ident);
        syncer_done(worker->syncer);
        pthread_exit(NULL);
    }
}
 
static void* worker_start(void* argp) {
    worker_t* worker = (worker_t*)argp;
    printf("worker %d start\n", worker->ident);
 
    while(true) {
        struct timespec ts;
        ts_addsec(&ts, 1);
 
        int req = -1;
        int res = channel_timedread(worker->channel, &req, &ts);
        if (res < 0) {
            worker_doexit(worker);
            continue;
        }
        printf("worker %d run req %d\n", worker->ident, req);
        worker_doexit(worker);
    }
 
    printf("worker %d done\n", worker->ident);
    syncer_done(worker->syncer);
    pthread_exit(NULL);
}
 
void worker_run(worker_t* worker) {
    pthread_create(&(worker->thread), NULL, worker_start, (void*)worker);
    pthread_detach(worker->thread);
}
 
void worker_cancel(worker_t* worker) {
    worker->doexit++;
}
 
void worker_destroy(worker_t* worker) {
    // NOP
}
 
void worker_free(worker_t* worker) {
    free(worker);
}

main

main.c
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
 
#include <channel.h>
#include <syncer.h>
#include <worker.h>
 
#include <main.h>
 
int main(int argc, char **argv) {
 
    srand(1);
    printf("main started\n");
 
    syncer_t syncer;
    syncer_init(&syncer);
 
    channel_t channel;
    channel_init(&channel);
 
    int wcount = 4;
    worker_t workers[wcount];
    for (int i = 0; i < wcount; i++) {
        worker_t* worker = &(workers[i]);
        int ident = i + 1;
        worker_init(worker, &channel, &syncer, ident);
        syncer_add(&syncer);
        worker_run(worker);
    }
 
    sleep(1);
 
    int jobs = 12;
    for (int i = 0; i < jobs; i++) {
        channel_write(&channel, i);
    }
 
    sleep(1);
 
    for (int i = 0; i < wcount; i++) {
        worker_cancel(&(workers[i]));
    }
    syncer_wait(&syncer);
 
    for (int i = 0; i < wcount; i++) {
        worker_destroy(&(workers[i]));
    }
 
    syncer_destroy(&syncer);
    channel_destroy(&channel);
 
    printf("main done\n");
    return 0;
}
main.h
/*
 *
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 *
 */
 
#ifndef MAIN_H_QWERTY
#define MAIN_H_QWERTY
 
 
#endif

tools

tools.c
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#include <unistd.h>
#include <time.h>
 
int msleep(uint tms) {
  return usleep(tms * 1000);
}
 
void ts_addsec(struct timespec* ts, int sec) {
    clock_gettime(CLOCK_REALTIME, ts);
    ts->tv_sec += (time_t)sec;
}
 
void ts_addmsec(struct timespec* ts, long msec) {
    clock_gettime(CLOCK_REALTIME, ts);
    ts->tv_nsec += (time_t)msec * 1000 * 1000;
}
 
void ts_addusec(struct timespec* ts, long usec) {
    clock_gettime(CLOCK_REALTIME, ts);
    ts->tv_nsec += (time_t)usec * 1000;
}
 
long getnanotime() {
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    return ts.tv_sec * 1000 * 1000 * 1000 + ts.tv_nsec;
}
tools.h
/*
 * Copyright 2022 Oleg Borodin  <borodin@unix7.org>
 */
 
#ifndef TOOLS_H_QWERTY
#define TOOLS_H_QWERTY
 
int msleep(uint tms);
 
void ts_addsec(struct timespec* ts, int sec);
void ts_addmsec(struct timespec* ts, long msec);
void ts_addusec(struct timespec* ts, long usec);
 
#endif