User Tools

Site Tools


reactor.go
/*
 * Author, Copyright: Oleg Borodin <onborodin@gmail.com>
 *
 */
 
 
package main
 
import (
    "fmt"
    "time"
    "math/rand"
)
 
 
type Message struct {
    Id      int64
    Subject string
    Body    string
}
 
type Report struct {
    WorkerId       int64
    MessageId      int64
    Err     error
}
 
type Worker struct {
    Id      int64
    Mailbox chan Message
    Reports chan Report
}
 
func NewWorker(id int64, reports chan Report) *Worker {
    mailbox := make(chan Message, 10)
    return &Worker {
        Id:         id,
        Mailbox:    mailbox,
        Reports:    reports,
    }
}
 
type Reactor struct {
    Mailbox chan Message
    Reports chan Report
    Workers []*Worker
}
 
func NewReactor() *Reactor {
    mailbox := make(chan Message, 10)
    reports := make(chan Report, 10)
 
    // Create worrker array
    workers := make([]*Worker, 0)
    for i := 0; i < 10; i++ {
        workers = append(workers, NewWorker(int64(i), reports))
    }
 
    return &Reactor {
        Mailbox:    mailbox,
        Reports:    reports,
        Workers:    workers,
    }
}
 
func (this *Reactor) Loop() {
    // Start worker threads
    for _, item := range this.Workers {
        go item.Loop()
    }
 
    for {
        select {
            case message := <- this.Mailbox:
                    fmt.Println("new job", message.Id)
                    // Simple random scheduler
                    workerId := rand.Intn(10)
                    this.Workers[workerId].Mailbox <- message
            case report := <- this.Reports:
                    fmt.Println("new report", report.MessageId, "from worker", report.WorkerId)
            //case <-time.After(time.Second * 1):
            //        fmt.Println("reactor timeout")
        }
    }
}
 
func (this *Worker) Loop() {
    for {
        select {
            case message := <- this.Mailbox:
                    fmt.Println("worker", this.Id, "have new work", message.Id)
                    // Hard work 
                    time.Sleep(rand.Intn(400) * time.Millisecond)
                    // Report result
                    this.Reports <- Report{ WorkerId: this.Id, MessageId: message.Id, Err: nil }
            //case <-time.After(time.Second * 1):
            //        fmt.Println("worker timeout")
        }
    }
}
 
 
func main() {
    reactor := NewReactor()
    // Start reactor
    go reactor.Loop()
 
    // Make incoming messages
    for {
        id := time.Now().UnixNano()
        message := Message{
            Id:         id,
            Subject:    "foo",
            Body:       "bar",
        }
        time.Sleep(100 * time.Millisecond)
        fmt.Println("new message:", message.Id)
        reactor.Mailbox <- message
    }
}

Output

$ go run reactor2.go
new message: 1610667202836643810
  new job 1610667202836643810
    worker 1 have new work 1610667202836643810
new message: 1610667202943911256
  new job 1610667202943911256
    worker 7 have new work 1610667202943911256
        new report 1610667202943911256 from worker 7
new message: 1610667203051048937
  new job 1610667203051048937
    worker 1 have new work 1610667203051048937
        new report 1610667202836643810 from worker 1
new message: 1610667203154040232
  new job 1610667203154040232
    worker 5 have new work 1610667203154040232
new message: 1610667203256222056
        new report 1610667203051048937 from worker 1
  new job 1610667203256222056
    worker 6 have new work 1610667203256222056
        new report 1610667203154040232 from worker 5
new message: 1610667203361862402
        new report 1610667203256222056 from worker 6
  new job 1610667203361862402
    worker 4 have new work 1610667203361862402
new message: 1610667203462710013
  new job 1610667203462710013
    worker 2 have new work 1610667203462710013
        new report 1610667203361862402 from worker 4
new message: 1610667203564491658
  new job 1610667203564491658
    worker 8 have new work 1610667203564491658
        new report 1610667203564491658 from worker 8
new message: 1610667203666838902
  new job 1610667203666838902
    worker 1 have new work 1610667203666838902
        new report 1610667203462710013 from worker 2
new message: 1610667203769782557
  new job 1610667203769782557
    worker 7 have new work 1610667203769782557
new message: 1610667203870944714
  new job 1610667203870944714
    worker 5 have new work 1610667203870944714
        new report 1610667203666838902 from worker 1
new message: 1610667203973248182
  new job 1610667203973248182
    worker 8 have new work 1610667203973248182
new message: 1610667204075115775
  new job 1610667204075115775
    worker 7 have new work 1610667204075115775
        new report 1610667203769782557 from worker 7
        new report 1610667203870944714 from worker 5
new message: 1610667204176946959
  new job 1610667204176946959
        new report 1610667203973248182 from worker 8
new message: 1610667204278434592
  new job 1610667204278434592
    worker 8 have new work 1610667204278434592
new message: 1610667204379519344
  new job 1610667204379519344
    worker 5 have new work 1610667204379519344
    worker 7 have new work 1610667204176946959
        new report 1610667204075115775 from worker 7
        new report 1610667204176946959 from worker 7
new message: 1610667204481507375
  new job 1610667204481507375
    worker 7 have new work 1610667204481507375
        new report 1610667204481507375 from worker 7
new message: 1610667204582071843
  new job 1610667204582071843
    worker 9 have new work 1610667204582071843
        new report 1610667204278434592 from worker 8
new message: 1610667204683128531
  new job 1610667204683128531
    worker 7 have new work 1610667204683128531
        new report 1610667204379519344 from worker 5
        new report 1610667204582071843 from worker 9
new message: 1610667204783957724
  new job 1610667204783957724
    worker 5 have new work 1610667204783957724
new message: 1610667204884887144
  new job 1610667204884887144
    worker 3 have new work 1610667204884887144
        new report 1610667204683128531 from worker 7
new message: 1610667204985404459
  new job 1610667204985404459
    worker 4 have new work 1610667204985404459
        new report 1610667204783957724 from worker 5
new message: 1610667205085962637
  new job 1610667205085962637
        new report 1610667204985404459 from worker 4
    worker 3 have new work 1610667205085962637
        new report 1610667204884887144 from worker 3
new message: 1610667205186408166
  new job 1610667205186408166
    worker 8 have new work 1610667205186408166
new message: 1610667205286996043
  new job 1610667205286996043
    worker 9 have new work 1610667205286996043
        new report 1610667205085962637 from worker 3
new message: 1610667205387453459
  new job 1610667205387453459
    worker 7 have new work 1610667205387453459
        new report 1610667205286996043 from worker 9
new message: 1610667205487725718
  new job 1610667205487725718
    worker 9 have new work 1610667205487725718
        new report 1610667205387453459 from worker 7
        new report 1610667205186408166 from worker 8
new message: 1610667205588102438
  new job 1610667205588102438
    worker 0 have new work 1610667205588102438
        new report 1610667205487725718 from worker 9
new message: 1610667205688668394
  new job 1610667205688668394
    worker 8 have new work 1610667205688668394
new message: 1610667205788911406
  new job 1610667205788911406
    worker 3 have new work 1610667205788911406
        new report 1610667205688668394 from worker 8
^Csignal: interrupt