diff --git a/go.mod b/go.mod index fc41095..766877a 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,15 @@ module gitea.maximumdirect.net/ejr/feedkit go 1.22 -require gopkg.in/yaml.v3 v3.0.1 +require ( + github.com/nats-io/nats.go v1.34.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/klauspost/compress v1.17.2 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.18.0 // indirect + golang.org/x/sys v0.16.0 // indirect +) diff --git a/go.sum b/go.sum index 4bc0337..91387b2 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,16 @@ +github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= +github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk= +github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.18.0 h1:PGVlW0xEltQnzFZ55hkuX5+KLyrMYhHld1YHO4AKcdc= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/sinks/builtins.go b/sinks/builtins.go index 7313b98..59974b2 100644 --- a/sinks/builtins.go +++ b/sinks/builtins.go @@ -27,9 +27,9 @@ func RegisterBuiltins(r *Registry) { return NewPostgresSinkFromConfig(cfg) }) - // RabbitMQ sink: publishes events to a broker for downstream consumers. - r.Register("rabbitmq", func(cfg config.SinkConfig) (Sink, error) { - return NewRabbitMQSinkFromConfig(cfg) + // NATS sink: publishes events to a broker for downstream consumers. + r.Register("nats", func(cfg config.SinkConfig) (Sink, error) { + return NewNATSSinkFromConfig(cfg) }) } diff --git a/sinks/nats.go b/sinks/nats.go new file mode 100644 index 0000000..20cbea7 --- /dev/null +++ b/sinks/nats.go @@ -0,0 +1,97 @@ +package sinks + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "github.com/nats-io/nats.go" +) + +type NATSSink struct { + name string + url string + exchange string + + mu sync.Mutex + conn *nats.Conn +} + +func NewNATSSinkFromConfig(cfg config.SinkConfig) (Sink, error) { + url, err := requireStringParam(cfg, "url") + if err != nil { + return nil, err + } + ex, err := requireStringParam(cfg, "exchange") + if err != nil { + return nil, err + } + return &NATSSink{name: cfg.Name, url: url, exchange: ex}, nil +} + +func (r *NATSSink) Name() string { return r.name } + +func (r *NATSSink) Consume(ctx context.Context, e event.Event) error { + // Boundary validation: if something upstream violated invariants, + // surface it loudly rather than printing partial nonsense. + if err := e.Validate(); err != nil { + return fmt.Errorf("NATS sink: invalid event: %w", err) + } + + if err := ctx.Err(); err != nil { + return err + } + + conn, err := r.connect(ctx) + if err != nil { + return fmt.Errorf("NATS sink: connect: %w", err) + } + + b, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("NATS sink: marshal event: %w", err) + } + + if err := ctx.Err(); err != nil { + return err + } + if err := conn.Publish(r.exchange, b); err != nil { + return fmt.Errorf("NATS sink: publish: %w", err) + } + return nil +} + +func (r *NATSSink) connect(ctx context.Context) (*nats.Conn, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + r.mu.Lock() + defer r.mu.Unlock() + + if r.conn != nil && r.conn.Status() != nats.CLOSED { + return r.conn, nil + } + + opts := []nats.Option{ + nats.Name(fmt.Sprintf("feedkit sink %s", r.name)), + } + if deadline, ok := ctx.Deadline(); ok { + timeout := time.Until(deadline) + if timeout <= 0 { + return nil, ctx.Err() + } + opts = append(opts, nats.Timeout(timeout)) + } + + conn, err := nats.Connect(r.url, opts...) + if err != nil { + return nil, err + } + r.conn = conn + return conn, nil +} diff --git a/sinks/rabbitmq.go b/sinks/rabbitmq.go deleted file mode 100644 index 15d81c5..0000000 --- a/sinks/rabbitmq.go +++ /dev/null @@ -1,42 +0,0 @@ -package sinks - -import ( - "context" - "fmt" - - "gitea.maximumdirect.net/ejr/feedkit/config" - "gitea.maximumdirect.net/ejr/feedkit/event" -) - -type RabbitMQSink struct { - name string - url string - exchange string -} - -func NewRabbitMQSinkFromConfig(cfg config.SinkConfig) (Sink, error) { - url, err := requireStringParam(cfg, "url") - if err != nil { - return nil, err - } - ex, err := requireStringParam(cfg, "exchange") - if err != nil { - return nil, err - } - return &RabbitMQSink{name: cfg.Name, url: url, exchange: ex}, nil -} - -func (r *RabbitMQSink) Name() string { return r.name } - -func (r *RabbitMQSink) Consume(ctx context.Context, e event.Event) error { - _ = ctx - - // Boundary validation: if something upstream violated invariants, - // surface it loudly rather than printing partial nonsense. - if err := e.Validate(); err != nil { - return fmt.Errorf("rabbitmq sink: invalid event: %w", err) - } - - // TODO implement RabbitMQ publishing - return nil -}