Пул работников с одной общей “неограниченной” очередью, мягкой остановкой, ожиданием остановки всей группы.
Note: the code w/o some check of return values.
~120ns per write / read.
$ gmake clean test rm -f *_test rm -f *.o *~ cc -c -O2 -Wall -I. -std=c99 -pthread -o channel_test.o channel_test.c cc -c -O2 -Wall -I. -std=c99 -pthread -o tools.o tools.c cc -c -O2 -Wall -I. -std=c99 -pthread -o worker.o worker.c cc -c -O2 -Wall -I. -std=c99 -pthread -o channel.o channel.c cc -c -O2 -Wall -I. -std=c99 -pthread -o syncer.o syncer.c cc -pthread -o channel_test channel_test.o tools.o worker.o channel.o syncer.o ./channel_test time per rw: 118 ns
$ ./main main started worker 1 start worker 2 start worker 4 start worker 3 start worker 4 run req 0 worker 4 run req 3 worker 4 run req 4 worker 2 run req 5 worker 2 run req 7 worker 2 run req 8 worker 4 run req 6 worker 4 run req 10 worker 4 run req 11 worker 3 run req 2 worker 1 run req 1 worker 2 run req 9 worker 1 is canceled worker 2 is canceled worker 4 is canceled worker 3 is canceled main done
all: main CC = cc CFLAGS = -O -Wall -I. -std=c99 -pthread LDFLAGS = -pthread .c.o: $(CC) -c $(CFLAGS) -o $@ $< main.c: main.h main.o: main.c syncer.c: syncer.h syncer.o: syncer.c channel.c: channel.h channel.o: channel.c worker.c: worker.h syncer.h worker.o: worker.c tools.c: tools.h tools.o: tools.c OBJS += tools.o OBJS += worker.o OBJS += channel.o OBJS += syncer.o OBJS += main.o main: $(OBJS) $(CC) $(LDFLAGS) -o $@ $(OBJS) test: main ./main clean: rm -f main rm -f *.o *~
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #include <stdlib.h> #include <stdio.h> #include <stdint.h> #include <semaphore.h> #include <channel.h> struct cell { cell_t* next; int data; }; static cell_t* new_cell(int data) { cell_t* cell = malloc(sizeof(cell_t)); cell->data = data; cell->next = NULL; return cell; } static void cell_free(cell_t* cell) { free(cell); } void channel_init(channel_t* channel) { sem_init(&(channel->lock), 0, 1); sem_init(&(channel->sem), 0, 0); channel->head = NULL; channel->tail = NULL; channel->size = 0; return; } void channel_destroy(channel_t* channel) { sem_wait(&(channel->lock)); int qsize = channel->size; for (int i = 0; i < qsize; i++) { switch(channel->size) { case 0: { break; } case 1: { cell_free(channel->head); channel->head = NULL; channel->tail = NULL; channel->size = 0; break; } default: { cell_t* oldcell = channel->head; channel->head = channel->head->next; channel->size--; cell_free(oldcell); break; } } } sem_post(&(channel->lock)); sem_destroy(&(channel->sem)); sem_destroy(&(channel->lock)); } int channel_write(channel_t* channel, int elem) { sem_wait(&(channel->lock)); int res = 0; cell_t* newcell = new_cell(elem); uintptr_t headptr = (uintptr_t)channel->head; switch(headptr) { case (uintptr_t)NULL: { channel->head = newcell; channel->tail = newcell; channel->size++; res = 1; break; } default: { channel->tail->next = newcell; channel->tail = newcell; channel->size++; res = 1; break; } } sem_post(&(channel->lock)); sem_post(&(channel->sem)); return res; } int channel_read(channel_t* channel, int* elem) { sem_wait(&(channel->sem)); sem_wait(&(channel->lock)); int res = 0; switch(channel->size) { case 0: { res = -1; break; } case 1: { *elem = channel->head->data; cell_free(channel->head); channel->head = NULL; channel->tail = NULL; channel->size = 0; res = 1; break; } default: { *elem = channel->head->data; cell_t* oldcell = channel->head; channel->head = channel->head->next; channel->size--; cell_free(oldcell); res = 1; break; } } sem_post(&(channel->lock)); return res; } int channel_timedread(channel_t* channel, int* elem, struct timespec* ts) { sem_timedwait(&(channel->sem), ts); sem_wait(&(channel->lock)); int res = 0; switch(channel->size) { case 0: { res = -1; break; } case 1: { *elem = channel->head->data; cell_free(channel->head); channel->head = NULL; channel->tail = NULL; channel->size = 0; res = 1; break; } default: { *elem = channel->head->data; cell_t* oldcell = channel->head; channel->head = channel->head->next; channel->size--; cell_free(oldcell); res = 1; break; } } sem_post(&(channel->lock)); return res; }
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #ifndef SYNCER_H_QWERTY #define SYNCER_H_QWERTY #include <semaphore.h> #include <stdatomic.h> typedef struct { sem_t sem; atomic_int num; } syncer_t; syncer_t* new_syncer(); void syncer_add(syncer_t* syncer); void syncer_done(syncer_t* syncer); void syncer_wait(syncer_t* syncer); void syncer_init(syncer_t* syncer); void syncer_destroy(syncer_t* syncer); void syncer_free(syncer_t* syncer); #endif
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #include <stdlib.h> #include <syncer.h> syncer_t* new_syncer() { syncer_t* syncer = malloc(sizeof(syncer_t)); syncer->num = 0; sem_init(&(syncer->sem), 1, 0); return syncer; } void syncer_add(syncer_t* syncer) { syncer->num++; } void syncer_done(syncer_t* syncer) { if ((--syncer->num) == 0) sem_post(&(syncer->sem)); } void syncer_wait(syncer_t* syncer) { if ((syncer->num) < 1) sem_post(&(syncer->sem)); sem_wait(&(syncer->sem)); } void syncer_init(syncer_t* syncer) { syncer->num = 0; sem_init(&(syncer->sem), 1, 0); } void syncer_destroy(syncer_t* syncer) { sem_destroy(&(syncer->sem)); } void syncer_free(syncer_t* syncer) { sem_destroy(&(syncer->sem)); free(syncer); }
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #ifndef WORKER_H_QWERTY #define WORKER_H_QWERTY #include <stdatomic.h> #include <semaphore.h> #include <pthread.h> #include <syncer.h> #include <channel.h> typedef struct { int ident; channel_t* channel; syncer_t* syncer; pthread_t thread; atomic_int doexit; } worker_t; worker_t* new_worker(channel_t* channel, syncer_t* syncer, int ident); void worker_init(worker_t* worker, channel_t* channel, syncer_t* syncer, int ident); void worker_run(worker_t* worker); void worker_cancel(worker_t* worker); void worker_destroy(worker_t* worker); void worker_free(worker_t* worker); #endif
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <stdatomic.h> #include <semaphore.h> #include <pthread.h> #include <stdbool.h> #include <syncer.h> #include <channel.h> #include <worker.h> #include <tools.h> worker_t* new_worker(channel_t* channel, syncer_t* syncer, int ident) { worker_t* worker = malloc(sizeof(worker_t)); worker->ident = ident; worker->channel = channel; worker->syncer = syncer; worker->doexit = 0; return worker; } void worker_init(worker_t* worker, channel_t* channel, syncer_t* syncer, int ident) { worker->ident = ident; worker->channel = channel; worker->syncer = syncer; worker->doexit = 0; } static void worker_doexit(worker_t* worker) { if (worker->doexit > 0) { printf("worker %d is canceled\n", worker->ident); syncer_done(worker->syncer); pthread_exit(NULL); } } static void* worker_start(void* argp) { worker_t* worker = (worker_t*)argp; printf("worker %d start\n", worker->ident); while(true) { struct timespec ts; ts_addsec(&ts, 1); int req = -1; int res = channel_timedread(worker->channel, &req, &ts); if (res < 0) { worker_doexit(worker); continue; } printf("worker %d run req %d\n", worker->ident, req); worker_doexit(worker); } printf("worker %d done\n", worker->ident); syncer_done(worker->syncer); pthread_exit(NULL); } void worker_run(worker_t* worker) { pthread_create(&(worker->thread), NULL, worker_start, (void*)worker); pthread_detach(worker->thread); } void worker_cancel(worker_t* worker) { worker->doexit++; } void worker_destroy(worker_t* worker) { // NOP } void worker_free(worker_t* worker) { free(worker); }
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <channel.h> #include <syncer.h> #include <worker.h> #include <main.h> int main(int argc, char **argv) { srand(1); printf("main started\n"); syncer_t syncer; syncer_init(&syncer); channel_t channel; channel_init(&channel); int wcount = 4; worker_t workers[wcount]; for (int i = 0; i < wcount; i++) { worker_t* worker = &(workers[i]); int ident = i + 1; worker_init(worker, &channel, &syncer, ident); syncer_add(&syncer); worker_run(worker); } sleep(1); int jobs = 12; for (int i = 0; i < jobs; i++) { channel_write(&channel, i); } sleep(1); for (int i = 0; i < wcount; i++) { worker_cancel(&(workers[i])); } syncer_wait(&syncer); for (int i = 0; i < wcount; i++) { worker_destroy(&(workers[i])); } syncer_destroy(&syncer); channel_destroy(&channel); printf("main done\n"); return 0; }
/* * * Copyright 2022 Oleg Borodin <borodin@unix7.org> * */ #ifndef MAIN_H_QWERTY #define MAIN_H_QWERTY #endif
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #include <unistd.h> #include <time.h> int msleep(uint tms) { return usleep(tms * 1000); } void ts_addsec(struct timespec* ts, int sec) { clock_gettime(CLOCK_REALTIME, ts); ts->tv_sec += (time_t)sec; } void ts_addmsec(struct timespec* ts, long msec) { clock_gettime(CLOCK_REALTIME, ts); ts->tv_nsec += (time_t)msec * 1000 * 1000; } void ts_addusec(struct timespec* ts, long usec) { clock_gettime(CLOCK_REALTIME, ts); ts->tv_nsec += (time_t)usec * 1000; } long getnanotime() { struct timespec ts; clock_gettime(CLOCK_REALTIME, &ts); return ts.tv_sec * 1000 * 1000 * 1000 + ts.tv_nsec; }
/* * Copyright 2022 Oleg Borodin <borodin@unix7.org> */ #ifndef TOOLS_H_QWERTY #define TOOLS_H_QWERTY int msleep(uint tms); void ts_addsec(struct timespec* ts, int sec); void ts_addmsec(struct timespec* ts, long msec); void ts_addusec(struct timespec* ts, long usec); #endif