package sinks import ( "context" "encoding/json" "fmt" "sync" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" "github.com/nats-io/nats.go" ) type NATSSink struct { name string url string exchange string mu sync.Mutex conn *nats.Conn } func NewNATSSinkFromConfig(cfg config.SinkConfig) (Sink, error) { url, err := requireStringParam(cfg, "url") if err != nil { return nil, err } ex, err := requireStringParam(cfg, "exchange") if err != nil { return nil, err } return &NATSSink{name: cfg.Name, url: url, exchange: ex}, nil } func (r *NATSSink) Name() string { return r.name } func (r *NATSSink) Consume(ctx context.Context, e event.Event) error { // Boundary validation: if something upstream violated invariants, // surface it loudly rather than printing partial nonsense. if err := e.Validate(); err != nil { return fmt.Errorf("NATS sink: invalid event: %w", err) } if err := ctx.Err(); err != nil { return err } conn, err := r.connect(ctx) if err != nil { return fmt.Errorf("NATS sink: connect: %w", err) } b, err := json.Marshal(e) if err != nil { return fmt.Errorf("NATS sink: marshal event: %w", err) } if err := ctx.Err(); err != nil { return err } if err := conn.Publish(r.exchange, b); err != nil { return fmt.Errorf("NATS sink: publish: %w", err) } return nil } func (r *NATSSink) connect(ctx context.Context) (*nats.Conn, error) { if err := ctx.Err(); err != nil { return nil, err } r.mu.Lock() defer r.mu.Unlock() if r.conn != nil && r.conn.Status() != nats.CLOSED { return r.conn, nil } opts := []nats.Option{ nats.Name(fmt.Sprintf("feedkit sink %s", r.name)), } if deadline, ok := ctx.Deadline(); ok { timeout := time.Until(deadline) if timeout <= 0 { return nil, ctx.Err() } opts = append(opts, nats.Timeout(timeout)) } conn, err := nats.Connect(r.url, opts...) if err != nil { return nil, err } r.conn = conn return conn, nil }