User Tools

Site Tools


PostgresQL LISTEN/NOTIFY + Golang sample

notes.go
/*
 * Author, Copyright: Oleg Borodin <onborodin@gmail.com>
 */
 
package main
 
import (
    "context"
    "fmt"
    "log"
    "time"
    "os"
    "errors"
 
    "github.com/jmoiron/sqlx"
    "github.com/jackc/pgx/v4/pgxpool"
    _ "github.com/jackc/pgx/v4/stdlib"
)
 
type Server struct {
    Dbp         *sqlx.DB
    Pool        *pgxpool.Pool
}
 
const (
    DbUsername  string  = "e2api"
    DbPassword  string  = "e2api"
    DbHostname  string  = "localhost"
    DbPort      int     = 5432
    DbName      string  = "e2api"
)
 
func (this *Server) OpenDB() error {
    var err error
    dbUrl := fmt.Sprintf("postgres://%s:%s@%s:%d/%s",
                            DbUsername,
                            DbPassword,
                            DbHostname,
                            DbPort,
                            DbName)
    this.Dbp, err = sqlx.Open("pgx", dbUrl)
    if err != nil {
        return err
    }
 
    err = this.Dbp.Ping()
    if err != nil {
        return err
    }
    return err
}
 
func (this *Server) OpenPool() error {
    var err error
    dbUri := fmt.Sprintf("postgres://%s:%s@%s:%d/%s",
                            DbUsername,
                            DbPassword,
                            DbHostname,
                            DbPort,
                            DbName)
    this.Pool, err = pgxpool.Connect(context.Background(), dbUri)
    if err != nil {
        return err
    }
    //
    return err
}
 
func (this *Server) Listen() error {
 
    var err error
 
    if this.Pool == nil {
        return errors.New("empty pool reference")
    }
 
    conn, err := this.Pool.Acquire(context.Background())
    if err != nil {
        return err
    }
    defer conn.Release()
 
    _, err = conn.Exec(context.Background(), "LISTEN notes")
    if err != nil {
        return err
    }
 
    for {
        notification, err := conn.Conn().WaitForNotification(context.Background())
        if err != nil {
            log.Println("error waiting for notification:", err)
            time.Sleep(10 * time.Millisecond)
        }
        log.Println("accepted message from pid:", notification.PID,
                        "channel:", notification.Channel,
                        "payload:", notification.Payload)
    }
}
 
func (this *Server) SendLoop() {
    var err error
    var message string = "foobar"
 
    for {
        time.Sleep(1 * time.Second)
        _, err = this.Pool.Exec(context.Background(), "SELECT pg_notify('notes', $1)", message)
        if err != nil {
            log.Println("error sending notification:", err)
        }
    }
}
 
func NewServer() *Server {
    return &Server{
    }
}
 
 
func main() {
    var err error
 
    server := NewServer()
    err = server.OpenDB()
    if err != nil {
        log.Println("unable open database:", err)
        os.Exit(1)
    }
    err = server.OpenPool()
    if err != nil {
        log.Println("unable open pool:", err)
        os.Exit(1)
    }
    go func() {
        err = server.Listen()
        if err != nil {
            log.Println("unable start listener:", err)
            os.Exit(1)
        }
    }()
    if err != nil {
        log.Println("unable start listener:", err)
        os.Exit(1)
    }
    server.SendLoop()
}

Out

$ go run notes.go
2021/02/18 15:32:35 accepted message from pid: 5069 channel: notes payload: foobar
2021/02/18 15:32:36 accepted message from pid: 5069 channel: notes payload: foobar
2021/02/18 15:32:37 accepted message from pid: 5069 channel: notes payload: foobar
2021/02/18 15:32:38 accepted message from pid: 5069 channel: notes payload: foobar
2021/02/18 15:32:39 accepted message from pid: 5069 channel: notes payload: foobar
^Csignal: interrupt