User Tools

Site Tools


,

Unlimited сhannel on Unix semaphores

Golang variant of “Thread pool with unlimited channel”.

Пример рабочий, но с ограниченным использованием, из-за издержек CGO. Возможно, при необходимости использования семафоров целесообразнее “завернуть” в Go вызовы целую конструкцию.

semchannel.go
/*
 * Copyright 2023 Oleg Borodin  <borodin@unix7.org>
 */
 
package main
 
//#cgo CFLAGS: -O -Wall -pthread
//#cgo LDFLAGS: -pthread
//
//#include <semaphore.h>
//#include <stdlib.h>
//#include <unistd.h>
//#include <time.h>
//void ts_addsec(struct timespec* ts, int sec) {
//    clock_gettime(CLOCK_REALTIME, ts);
//    ts->tv_sec += (time_t)sec;
//}
import "C"
 
import (
    "sync/atomic"
    "errors"
    "fmt"
    "sync"
    "time"
)
 
type Sema struct {
    csem  C.sem_t
}
 
func NewSema(val uint) (*Sema, int) {
    var sem Sema
    res := C.sem_init(&(sem.csem), C.int(0), C.uint(val))
    return &sem, int(res)
}
 
func (sem *Sema) Init(val uint) int {
    res := C.sem_init(&(sem.csem), C.int(0), C.uint(val))
    return int(res)
}
 
func (sem *Sema) Wait() int {
    res := C.sem_wait(&(sem.csem))
    return int(res)
}
 
func (sem *Sema) TimedWait(sec int) int {
    var ts C.struct_timespec
    C.ts_addsec(&ts, C.int(sec))
    res := C.sem_timedwait(&(sem.csem), &ts)
    return int(res)
}
 
func (sem *Sema) Post() {
    C.sem_post(&(sem.csem))
}
 
 
type Syncer struct {
    mtx     sync.Mutex
    sem     *Sema
    num     int64
}
 
func NewSyncer() *Syncer {
    var syncer  Syncer
    syncer.sem, _ = NewSema(0)
    syncer.num = 0
    return &syncer
}
 
func (syncer *Syncer) Add() {
    syncer.num = atomic.AddInt64(&(syncer.num), 1)
}
 
func (syncer *Syncer) Done() {
    syncer.mtx.Lock()
    syncer.num = atomic.AddInt64(&(syncer.num), -1)
    if (syncer.num < 1) {
        syncer.sem.Post()
    }
    syncer.mtx.Unlock()
}
 
func (syncer *Syncer) Wait() {
    syncer.sem.Wait()
}
 
 
type Cell struct {
    next *Cell
    elem int
}
 
func NewCell(elem int) *Cell {
    var cell Cell
    cell.elem = elem
    cell.next = nil
    return &cell
}
 
type Channel struct {
    head    *Cell
    tail    *Cell
    sema    *Sema
    mtx     sync.RWMutex
    size    int
}
 
func NewChannel() *Channel {
    var channel Channel
    channel.head = nil
    channel.tail = nil
    channel.size = 0
    channel.sema, _ = NewSema(0)
    return &channel
}
 
func (channel *Channel) Write(elem int) {
    channel.mtx.Lock()
    defer channel.mtx.Unlock()
    newCell := NewCell(elem)
    if (channel.head == nil) {
        channel.head = newCell
        channel.tail = newCell
        channel.size++
        channel.sema.Post()
        return
    }
    channel.tail.next = newCell
    channel.tail = newCell
    channel.size++
    channel.sema.Post()
    return
}
 
func (channel *Channel) Read() (int, error) {
    channel.sema.Wait()
    channel.mtx.Lock()
    defer channel.mtx.Unlock()
    var err error
    var res int
    if (channel.head == nil) {
        err = errors.New("empty channel")
        return res, err
    }
    res = channel.head.elem
    channel.head = channel.head.next
    channel.size--
    return res, err
}
 
func (channel *Channel) TimedRead(sec int ) (int, int, error) {
 
    timed := channel.sema.TimedWait(sec)
 
    channel.mtx.Lock()
    defer channel.mtx.Unlock()
 
    var err error
    var res int
    if (channel.head == nil) {
        err = errors.New("empty channel")
        return res, int(timed), err
    }
    res = channel.head.elem
    channel.head = channel.head.next
    channel.size--
    return res, int(timed), err
}
 
func (channel *Channel) Size() int {
    channel.mtx.Lock()
    defer channel.mtx.Unlock()
    return channel.size
}
 
type Worker struct {
    syncer  *Syncer
    channel   *Channel
    exit    int64
}
 
func NewWorker(syncer *Syncer, channel *Channel) *Worker {
    var worker Worker
    worker.syncer = syncer
    worker.channel = channel
    worker.exit = 0
    return &worker
}
 
 
func (worker *Worker) isExit() bool {
    var exit bool
    if (atomic.LoadInt64(&(worker.exit)) > 0) {
        exit = true
    }
    return exit
}
 
 
func (worker *Worker) Run() {
    defer worker.syncer.Done()
    for {
        elem, timed, _ := worker.channel.TimedRead(1)
        if (timed < 0) {
            fmt.Printf("channel timed\n")
            if worker.isExit() {
                fmt.Printf("worker exit %d\n", worker.exit)
                break
            }
            continue
        }
        fmt.Printf("val = %d\n", elem)
        if worker.isExit() {
            fmt.Printf("worker exit %d\n", worker.exit)
            break
        }
    }
    return
}
 
func (worker *Worker) Cancel() {
    worker.exit = atomic.AddInt64(&(worker.exit), 1)
}
 
func main() {
    channel := NewChannel()
    var count int = 12
    var i int
    for i = 0; i < count; i++ {
        channel.Write(i)
    }
    fmt.Printf("q size = %d\n", channel.Size())
 
    syncer := NewSyncer()
    worker := NewWorker(syncer, channel)
 
    syncer.Add()
    go worker.Run()
    time.Sleep(3 * time.Second);
    worker.Cancel()
    syncer.Wait()
 
}

Out

$ go run main.go
q size = 12
val = 0
val = 1
val = 2
val = 3
val = 4
val = 5
val = 6
val = 7
val = 8
val = 9
val = 10
val = 11
channel timed
channel timed
channel timed
worker exit 1

Bench

channel_test.go
/*
 * Copyright 2023 Oleg Borodin  <borodin@unix7.org>
 */
 
package main
 
import (
    "testing"
)
 
func BenchmarkChannel(b *testing.B) {
    channel := NewChannel()
     for n := 0; n < b.N; n++ {
            channel.Write(n);
     }
     for n := 0; n < b.N; n++ {
            channel.Read();
     }
}

Benchmarks

# go test -bench=. -benchtime=2s -benchmem 
goos: freebsd
goarch: amd64
cpu: Intel(R) Core(TM) i5-4300U CPU @ 1.90GHz
BenchmarkChannel-4   	 7233471	       349.9 ns/op	      16 B/op	       1 allocs/op
PASS
# go test -bench=. -benchtime=2s -benchmem 
goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i5-4300U CPU @ 1.90GHz
BenchmarkChannel-4   	 7756735	       314.1 ns/op	      16 B/op	       1 allocs/op
PASS