Implemented a builtin NATS sink.
This commit is contained in:
97
sinks/nats.go
Normal file
97
sinks/nats.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package sinks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
type NATSSink struct {
|
||||
name string
|
||||
url string
|
||||
exchange string
|
||||
|
||||
mu sync.Mutex
|
||||
conn *nats.Conn
|
||||
}
|
||||
|
||||
func NewNATSSinkFromConfig(cfg config.SinkConfig) (Sink, error) {
|
||||
url, err := requireStringParam(cfg, "url")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ex, err := requireStringParam(cfg, "exchange")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &NATSSink{name: cfg.Name, url: url, exchange: ex}, nil
|
||||
}
|
||||
|
||||
func (r *NATSSink) Name() string { return r.name }
|
||||
|
||||
func (r *NATSSink) Consume(ctx context.Context, e event.Event) error {
|
||||
// Boundary validation: if something upstream violated invariants,
|
||||
// surface it loudly rather than printing partial nonsense.
|
||||
if err := e.Validate(); err != nil {
|
||||
return fmt.Errorf("NATS sink: invalid event: %w", err)
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := r.connect(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("NATS sink: connect: %w", err)
|
||||
}
|
||||
|
||||
b, err := json.Marshal(e)
|
||||
if err != nil {
|
||||
return fmt.Errorf("NATS sink: marshal event: %w", err)
|
||||
}
|
||||
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := conn.Publish(r.exchange, b); err != nil {
|
||||
return fmt.Errorf("NATS sink: publish: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *NATSSink) connect(ctx context.Context) (*nats.Conn, error) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if r.conn != nil && r.conn.Status() != nats.CLOSED {
|
||||
return r.conn, nil
|
||||
}
|
||||
|
||||
opts := []nats.Option{
|
||||
nats.Name(fmt.Sprintf("feedkit sink %s", r.name)),
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
timeout := time.Until(deadline)
|
||||
if timeout <= 0 {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
opts = append(opts, nats.Timeout(timeout))
|
||||
}
|
||||
|
||||
conn, err := nats.Connect(r.url, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.conn = conn
|
||||
return conn, nil
|
||||
}
|
||||
Reference in New Issue
Block a user