Golang variant of “Thread pool with unlimited channel”.
Пример рабочий, но с ограниченным использованием, из-за издержек CGO. Возможно, при необходимости использования семафоров целесообразнее “завернуть” в Go вызовы целую конструкцию.
/* * Copyright 2023 Oleg Borodin <borodin@unix7.org> */ package main //#cgo CFLAGS: -O -Wall -pthread //#cgo LDFLAGS: -pthread // //#include <semaphore.h> //#include <stdlib.h> //#include <unistd.h> //#include <time.h> //void ts_addsec(struct timespec* ts, int sec) { // clock_gettime(CLOCK_REALTIME, ts); // ts->tv_sec += (time_t)sec; //} import "C" import ( "sync/atomic" "errors" "fmt" "sync" "time" ) type Sema struct { csem C.sem_t } func NewSema(val uint) (*Sema, int) { var sem Sema res := C.sem_init(&(sem.csem), C.int(0), C.uint(val)) return &sem, int(res) } func (sem *Sema) Init(val uint) int { res := C.sem_init(&(sem.csem), C.int(0), C.uint(val)) return int(res) } func (sem *Sema) Wait() int { res := C.sem_wait(&(sem.csem)) return int(res) } func (sem *Sema) TimedWait(sec int) int { var ts C.struct_timespec C.ts_addsec(&ts, C.int(sec)) res := C.sem_timedwait(&(sem.csem), &ts) return int(res) } func (sem *Sema) Post() { C.sem_post(&(sem.csem)) } type Syncer struct { mtx sync.Mutex sem *Sema num int64 } func NewSyncer() *Syncer { var syncer Syncer syncer.sem, _ = NewSema(0) syncer.num = 0 return &syncer } func (syncer *Syncer) Add() { syncer.num = atomic.AddInt64(&(syncer.num), 1) } func (syncer *Syncer) Done() { syncer.mtx.Lock() syncer.num = atomic.AddInt64(&(syncer.num), -1) if (syncer.num < 1) { syncer.sem.Post() } syncer.mtx.Unlock() } func (syncer *Syncer) Wait() { syncer.sem.Wait() } type Cell struct { next *Cell elem int } func NewCell(elem int) *Cell { var cell Cell cell.elem = elem cell.next = nil return &cell } type Channel struct { head *Cell tail *Cell sema *Sema mtx sync.RWMutex size int } func NewChannel() *Channel { var channel Channel channel.head = nil channel.tail = nil channel.size = 0 channel.sema, _ = NewSema(0) return &channel } func (channel *Channel) Write(elem int) { channel.mtx.Lock() defer channel.mtx.Unlock() newCell := NewCell(elem) if (channel.head == nil) { channel.head = newCell channel.tail = newCell channel.size++ channel.sema.Post() return } channel.tail.next = newCell channel.tail = newCell channel.size++ channel.sema.Post() return } func (channel *Channel) Read() (int, error) { channel.sema.Wait() channel.mtx.Lock() defer channel.mtx.Unlock() var err error var res int if (channel.head == nil) { err = errors.New("empty channel") return res, err } res = channel.head.elem channel.head = channel.head.next channel.size-- return res, err } func (channel *Channel) TimedRead(sec int ) (int, int, error) { timed := channel.sema.TimedWait(sec) channel.mtx.Lock() defer channel.mtx.Unlock() var err error var res int if (channel.head == nil) { err = errors.New("empty channel") return res, int(timed), err } res = channel.head.elem channel.head = channel.head.next channel.size-- return res, int(timed), err } func (channel *Channel) Size() int { channel.mtx.Lock() defer channel.mtx.Unlock() return channel.size } type Worker struct { syncer *Syncer channel *Channel exit int64 } func NewWorker(syncer *Syncer, channel *Channel) *Worker { var worker Worker worker.syncer = syncer worker.channel = channel worker.exit = 0 return &worker } func (worker *Worker) isExit() bool { var exit bool if (atomic.LoadInt64(&(worker.exit)) > 0) { exit = true } return exit } func (worker *Worker) Run() { defer worker.syncer.Done() for { elem, timed, _ := worker.channel.TimedRead(1) if (timed < 0) { fmt.Printf("channel timed\n") if worker.isExit() { fmt.Printf("worker exit %d\n", worker.exit) break } continue } fmt.Printf("val = %d\n", elem) if worker.isExit() { fmt.Printf("worker exit %d\n", worker.exit) break } } return } func (worker *Worker) Cancel() { worker.exit = atomic.AddInt64(&(worker.exit), 1) } func main() { channel := NewChannel() var count int = 12 var i int for i = 0; i < count; i++ { channel.Write(i) } fmt.Printf("q size = %d\n", channel.Size()) syncer := NewSyncer() worker := NewWorker(syncer, channel) syncer.Add() go worker.Run() time.Sleep(3 * time.Second); worker.Cancel() syncer.Wait() }
$ go run main.go q size = 12 val = 0 val = 1 val = 2 val = 3 val = 4 val = 5 val = 6 val = 7 val = 8 val = 9 val = 10 val = 11 channel timed channel timed channel timed worker exit 1
/* * Copyright 2023 Oleg Borodin <borodin@unix7.org> */ package main import ( "testing" ) func BenchmarkChannel(b *testing.B) { channel := NewChannel() for n := 0; n < b.N; n++ { channel.Write(n); } for n := 0; n < b.N; n++ { channel.Read(); } }
# go test -bench=. -benchtime=2s -benchmem goos: freebsd goarch: amd64 cpu: Intel(R) Core(TM) i5-4300U CPU @ 1.90GHz BenchmarkChannel-4 7233471 349.9 ns/op 16 B/op 1 allocs/op PASS
# go test -bench=. -benchtime=2s -benchmem goos: linux goarch: amd64 cpu: Intel(R) Core(TM) i5-4300U CPU @ 1.90GHz BenchmarkChannel-4 7756735 314.1 ns/op 16 B/op 1 allocs/op PASS