From 0cc2862170149cdee8d21409b0d6627c6b54a85d Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Tue, 13 Jan 2026 10:40:01 -0600 Subject: [PATCH] feedkit: split the former maximumdirect.net/weatherd project in two. feedkit now contains a reusable core, while weatherfeeder is a concrete implementation that includes weather-specific functions. --- .gitignore | 27 +++++ NAMING.md | 87 ++++++++++++++++ config/config.go | 139 ++++++++++++++++++++++++++ config/load.go | 193 +++++++++++++++++++++++++++++++++++ config/params.go | 72 +++++++++++++ dispatch/dispatch.go | 104 +++++++++++++++++++ dispatch/fanout.go | 222 +++++++++++++++++++++++++++++++++++++++++ event/doc.go | 8 ++ event/event.go | 101 +++++++++++++++++++ event/kind.go | 30 ++++++ go.mod | 5 + go.sum | 3 + pipeline/dedupe.go | 5 + pipeline/pipeline.go | 43 ++++++++ pipeline/ratelimit.go | 5 + scheduler/scheduler.go | 174 ++++++++++++++++++++++++++++++++ scheduler/worker.go | 7 ++ sinks/builtins.go | 55 ++++++++++ sinks/file.go | 30 ++++++ sinks/postgres.go | 37 +++++++ sinks/rabbitmq.go | 42 ++++++++ sinks/registry.go | 33 ++++++ sinks/sink.go | 16 +++ sinks/stdout.go | 36 +++++++ sources/registry.go | 49 +++++++++ sources/source.go | 30 ++++++ 26 files changed, 1553 insertions(+) create mode 100644 .gitignore create mode 100644 NAMING.md create mode 100644 config/config.go create mode 100644 config/load.go create mode 100644 config/params.go create mode 100644 dispatch/dispatch.go create mode 100644 dispatch/fanout.go create mode 100644 event/doc.go create mode 100644 event/event.go create mode 100644 event/kind.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pipeline/dedupe.go create mode 100644 pipeline/pipeline.go create mode 100644 pipeline/ratelimit.go create mode 100644 scheduler/scheduler.go create mode 100644 scheduler/worker.go create mode 100644 sinks/builtins.go create mode 100644 sinks/file.go create mode 100644 sinks/postgres.go create mode 100644 sinks/rabbitmq.go create mode 100644 sinks/registry.go create mode 100644 sinks/sink.go create mode 100644 sinks/stdout.go create mode 100644 sources/registry.go create mode 100644 sources/source.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5b90e79 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# ---> Go +# If you prefer the allow list template instead of the deny list, see community template: +# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore +# +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ + +# Go workspace file +go.work +go.work.sum + +# env file +.env + diff --git a/NAMING.md b/NAMING.md new file mode 100644 index 0000000..e67dda7 --- /dev/null +++ b/NAMING.md @@ -0,0 +1,87 @@ +# Naming Conventions + +This document defines naming conventions for the Feed-based ingestion daemons and shared libraries in the HomeOps ecosystem. + +These conventions are intentionally simple and consistent, and are designed to make system behavior obvious from names alone. + +--- + +## Core Shared Module + +**Module / repository name** +maximumdirect.net/feedkit + + +**Purpose** + +`feedkit` provides the domain-agnostic core used by all feed-processing daemons: +polling, scheduling, filtering, routing, persistence fanout, and signaling. + +It contains *infrastructure primitives*, not domain logic. + +--- + +## Feed Daemons + +### Naming Format +feeder + + +This name is used for **both**: +- the repository / Go module name +- the compiled binary name + +### Rationale + +A *feeder* is an active agent that: +- consumes one or more upstream feeds +- normalizes new items into events +- emits facts and/or signals into the system + +### Examples + +| Domain | Repository / Binary | +|---------------|---------------------| +| Weather | `weatherfeeder` | +| RSS | `rssfeeder` | +| News | `newsfeeder` | +| NBA | `nbafeeder` | +| Earthquakes | `earthquakefeeder` | +| Patreon | `patreonfeeder` | + +--- + +## Non-Feed Daemons + +Daemons that perform downstream work (downloads, archiving, transformation, etc.) +**should not** use the `feeder` suffix. + +Preferred naming reflects the primary action: + +- `mediafetcher` +- `videodownloader` +- `archiver` +- `syncd` + +This preserves a clear semantic boundary between: +- **feed detection** (feeders) +- **work execution** (workers) + +--- + +## Go Naming Conventions + +- All repository, module, package, and binary names are **lowercase** +- No dashes (`-`) or underscores (`_`) in Go package names +- Repository name and binary name should match unless there is a compelling reason not to + +--- + +## Summary + +- **Core library:** `feedkit` +- **Feed daemons:** `feeder` +- **Downstream workers:** action-based names +- Names should communicate *role*, not implementation details + + diff --git a/config/config.go b/config/config.go new file mode 100644 index 0000000..f41b979 --- /dev/null +++ b/config/config.go @@ -0,0 +1,139 @@ +package config + +import ( + "fmt" + "strconv" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +// Config is the top-level YAML configuration used by feedkit-style daemons. +// +// IMPORTANT: This package is intentionally domain-agnostic. +// - We validate *shape* and internal consistency (required fields, uniqueness, etc.). +// - We do NOT enforce domain rules (e.g., "observation|forecast|alert"). +// Domain modules (weatherfeeder/newsfeeder/...) can add their own validation layer. +type Config struct { + Sources []SourceConfig `yaml:"sources"` + Sinks []SinkConfig `yaml:"sinks"` + Routes []RouteConfig `yaml:"routes"` +} + +// SourceConfig describes one polling job. +// +// This is intentionally generic: +// - driver-specific knobs belong in Params. +// - "kind" is allowed (useful for safety checks / routing), but feedkit does not +// restrict the allowed values. +type SourceConfig struct { + Name string `yaml:"name"` + Driver string `yaml:"driver"` // e.g. "openmeteo_observation", "rss_feed", etc. + + Every Duration `yaml:"every"` // "15m", "1m", etc. + + // Kind is optional and domain-defined. If set, it should be a non-empty string. + // Domains commonly use it to enforce "this source should only emit kind X". + Kind string `yaml:"kind"` + + // Params are driver-specific settings (URL, headers, station IDs, API keys, etc.). + // The driver implementation is responsible for reading/validating these. + Params map[string]any `yaml:"params"` +} + +// SinkConfig describes one output sink adapter. +type SinkConfig struct { + Name string `yaml:"name"` + Driver string `yaml:"driver"` // "stdout", "file", "postgres", "rabbitmq", ... + Params map[string]any `yaml:"params"` // sink-specific settings +} + +// RouteConfig describes a routing rule: which sink receives which kinds. +type RouteConfig struct { + Sink string `yaml:"sink"` // sink name + + // Kinds is domain-defined. feedkit only enforces that each entry is non-empty. + // Whether a given daemon "recognizes" a kind is domain-specific validation. + Kinds []string `yaml:"kinds"` +} + +// Duration is a YAML-friendly duration wrapper. +// +// Supported YAML formats: +// +// every: 15m # string duration (recommended) +// every: 30s +// every: 1h +// +// Also supported for convenience (interpreted as minutes): +// +// every: 15 # integer minutes +// every: "15" # string minutes +type Duration struct { + time.Duration +} + +func (d *Duration) UnmarshalYAML(value *yaml.Node) error { + // We expect a scalar (string or int). + if value.Kind != yaml.ScalarNode { + return fmt.Errorf("duration must be a scalar (e.g. 15m), got %v", value.Kind) + } + + // Case 1: YAML integer -> interpret as minutes. + if value.Tag == "!!int" { + n, err := strconv.Atoi(value.Value) + if err != nil { + return fmt.Errorf("invalid integer duration %q: %w", value.Value, err) + } + if n <= 0 { + return fmt.Errorf("duration must be > 0, got %d", n) + } + d.Duration = time.Duration(n) * time.Minute + return nil + } + + // Case 2: YAML string. + if value.Tag == "!!str" { + s := strings.TrimSpace(value.Value) + if s == "" { + return fmt.Errorf("duration cannot be empty") + } + + // If it’s all digits, interpret as minutes (e.g. "15"). + if isAllDigits(s) { + n, err := strconv.Atoi(s) + if err != nil { + return fmt.Errorf("invalid numeric duration %q: %w", s, err) + } + if n <= 0 { + return fmt.Errorf("duration must be > 0, got %d", n) + } + d.Duration = time.Duration(n) * time.Minute + return nil + } + + // Otherwise parse as Go duration string (15m, 30s, 1h, etc.). + parsed, err := time.ParseDuration(s) + if err != nil { + return fmt.Errorf("invalid duration %q (expected e.g. 15m, 30s, 1h): %w", s, err) + } + if parsed <= 0 { + return fmt.Errorf("duration must be > 0, got %s", parsed) + } + d.Duration = parsed + return nil + } + + // Anything else: reject. + return fmt.Errorf("duration must be a string like 15m or an integer minutes, got tag %s", value.Tag) +} + +func isAllDigits(s string) bool { + for _, r := range s { + if r < '0' || r > '9' { + return false + } + } + return len(s) > 0 +} diff --git a/config/load.go b/config/load.go new file mode 100644 index 0000000..28eabb1 --- /dev/null +++ b/config/load.go @@ -0,0 +1,193 @@ +package config + +import ( + "fmt" + "os" + "sort" + "strings" + + "gopkg.in/yaml.v3" +) + +// Load reads the YAML config file from disk, decodes it into Config, +// and then validates it. +// +// Important behaviors: +// - Uses yaml.Decoder with KnownFields(true) to catch typos like "srouces:". +// - Returns a validation error that (usually) contains multiple issues at once. +func Load(path string) (*Config, error) { + if strings.TrimSpace(path) == "" { + path = "config.yml" + } + + raw, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("config.Load: read %q: %w", path, err) + } + + var cfg Config + + dec := yaml.NewDecoder(strings.NewReader(string(raw))) + dec.KnownFields(true) // strict mode for struct fields + + if err := dec.Decode(&cfg); err != nil { + return nil, fmt.Errorf("config.Load: parse YAML %q: %w", path, err) + } + + // Optional: ensure there isn't a second YAML document accidentally appended. + var extra any + if err := dec.Decode(&extra); err == nil { + return nil, fmt.Errorf("config.Load: %q contains multiple YAML documents; expected exactly one", path) + } + + if err := cfg.Validate(); err != nil { + return nil, err + } + + return &cfg, nil +} + +// Validate checks whether the config is internally consistent and safe to run. +// +// This is intentionally DOMAIN-AGNOSTIC validation: +// - required fields are present +// - names are unique +// - durations are > 0 +// - routes reference defined sinks +// +// We DO NOT enforce domain-specific constraints like "allowed kinds" or +// "NWS requires a user-agent". Those belong in the domain module (weatherfeeder). +func (c *Config) Validate() error { + var m multiError + + // ---------- sources ---------- + if len(c.Sources) == 0 { + m.Add(fieldErr("sources", "must contain at least one source")) + } else { + seen := map[string]bool{} + for i, s := range c.Sources { + path := fmt.Sprintf("sources[%d]", i) + + // Name + if strings.TrimSpace(s.Name) == "" { + m.Add(fieldErr(path+".name", "is required")) + } else { + if seen[s.Name] { + m.Add(fieldErr(path+".name", fmt.Sprintf("duplicate source name %q (source names must be unique)", s.Name))) + } + seen[s.Name] = true + } + + // Driver + if strings.TrimSpace(s.Driver) == "" { + m.Add(fieldErr(path+".driver", "is required (e.g. openmeteo_observation, rss_feed, ...)")) + } + + // Every + if s.Every.Duration <= 0 { + m.Add(fieldErr(path+".every", "must be a positive duration (e.g. 15m, 1m, 30s)")) + } + + // Kind (optional but if present must be non-empty after trimming) + if s.Kind != "" && strings.TrimSpace(s.Kind) == "" { + m.Add(fieldErr(path+".kind", "cannot be blank (omit it entirely, or provide a non-empty string)")) + } + + // Params can be nil; that's fine. + } + } + + // ---------- sinks ---------- + sinkNames := map[string]bool{} + if len(c.Sinks) == 0 { + m.Add(fieldErr("sinks", "must contain at least one sink")) + } else { + for i, s := range c.Sinks { + path := fmt.Sprintf("sinks[%d]", i) + + if strings.TrimSpace(s.Name) == "" { + m.Add(fieldErr(path+".name", "is required")) + } else { + if sinkNames[s.Name] { + m.Add(fieldErr(path+".name", fmt.Sprintf("duplicate sink name %q (sink names must be unique)", s.Name))) + } + sinkNames[s.Name] = true + } + + if strings.TrimSpace(s.Driver) == "" { + m.Add(fieldErr(path+".driver", "is required (stdout|file|postgres|rabbitmq|...)")) + } + + // Params can be nil; that's fine. + } + } + + // ---------- routes ---------- + // Routes are optional. If provided, validate shape + references. + for i, r := range c.Routes { + path := fmt.Sprintf("routes[%d]", i) + + if strings.TrimSpace(r.Sink) == "" { + m.Add(fieldErr(path+".sink", "is required")) + } else if !sinkNames[r.Sink] { + m.Add(fieldErr(path+".sink", fmt.Sprintf("references unknown sink %q (define it under sinks:)", r.Sink))) + } + + if len(r.Kinds) == 0 { + // You could relax this later (e.g. empty == "all kinds"), but for now + // keeping it strict prevents accidental "route does nothing". + m.Add(fieldErr(path+".kinds", "must contain at least one kind")) + } else { + for j, k := range r.Kinds { + kpath := fmt.Sprintf("%s.kinds[%d]", path, j) + if strings.TrimSpace(k) == "" { + m.Add(fieldErr(kpath, "kind cannot be empty")) + } + } + } + } + + return m.Err() +} + +// ---- error helpers ---- + +// fieldErr produces consistent "path: message" errors. +func fieldErr(path, msg string) error { + return fmt.Errorf("%s: %s", path, msg) +} + +// multiError collects many errors and returns them as one error. +// This makes config iteration much nicer: you fix several things per run. +type multiError struct { + errs []error +} + +func (m *multiError) Add(err error) { + if err == nil { + return + } + m.errs = append(m.errs, err) +} + +func (m *multiError) Err() error { + if len(m.errs) == 0 { + return nil + } + // Sort for stable output (useful in tests and when iterating). + sort.Slice(m.errs, func(i, j int) bool { + return m.errs[i].Error() < m.errs[j].Error() + }) + return m +} + +func (m *multiError) Error() string { + var b strings.Builder + b.WriteString("config validation failed:\n") + for _, e := range m.errs { + b.WriteString(" - ") + b.WriteString(e.Error()) + b.WriteString("\n") + } + return strings.TrimRight(b.String(), "\n") +} diff --git a/config/params.go b/config/params.go new file mode 100644 index 0000000..92c7ad0 --- /dev/null +++ b/config/params.go @@ -0,0 +1,72 @@ +// feedkit/config/params.go +package config + +import "strings" + +// ParamString returns the first non-empty string found for any of the provided keys. +// Values must actually be strings in the decoded config; other types are ignored. +// +// This keeps cfg.Params flexible (map[string]any) while letting callers stay type-safe. +func (cfg SourceConfig) ParamString(keys ...string) (string, bool) { + if cfg.Params == nil { + return "", false + } + for _, k := range keys { + v, ok := cfg.Params[k] + if !ok || v == nil { + continue + } + s, ok := v.(string) + if !ok { + continue + } + s = strings.TrimSpace(s) + if s == "" { + continue + } + return s, true + } + return "", false +} + +// ParamStringDefault returns ParamString(keys...) if present; otherwise it returns def. +// This is the “polite default” helper used by drivers for optional fields like user-agent. +func (cfg SourceConfig) ParamStringDefault(def string, keys ...string) string { + if s, ok := cfg.ParamString(keys...); ok { + return s + } + return strings.TrimSpace(def) +} + +// ParamString returns the first non-empty string found for any of the provided keys +// in SinkConfig.Params. (Same rationale as SourceConfig.ParamString.) +func (cfg SinkConfig) ParamString(keys ...string) (string, bool) { + if cfg.Params == nil { + return "", false + } + for _, k := range keys { + v, ok := cfg.Params[k] + if !ok || v == nil { + continue + } + s, ok := v.(string) + if !ok { + continue + } + s = strings.TrimSpace(s) + if s == "" { + continue + } + return s, true + } + return "", false +} + +// ParamStringDefault returns ParamString(keys...) if present; otherwise it returns def. +// Symmetric helper for sink implementations. +func (cfg SinkConfig) ParamStringDefault(def string, keys ...string) string { + if s, ok := cfg.ParamString(keys...); ok { + return s + } + return strings.TrimSpace(def) +} diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go new file mode 100644 index 0000000..7c7c8b5 --- /dev/null +++ b/dispatch/dispatch.go @@ -0,0 +1,104 @@ +package dispatch + +import ( + "context" + "fmt" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/pipeline" + "gitea.maximumdirect.net/ejr/feedkit/sinks" +) + +type Dispatcher struct { + In <-chan event.Event + + Pipeline *pipeline.Pipeline + + // Sinks by name + Sinks map[string]sinks.Sink + + // Routing rules (compiled from config) + Routes []Route + + // Fanout knobs (global defaults for now; we can wire these from config later). + // + // These are intentionally simple: one queue size, one enqueue timeout, + // one consume timeout for all sinks. + SinkQueueSize int + SinkEnqueueTimeout time.Duration + SinkConsumeTimeout time.Duration +} + +type Route struct { + SinkName string + Kinds map[event.Kind]bool +} + +type Logger func(format string, args ...any) + +func (d *Dispatcher) Run(ctx context.Context, logf Logger) error { + if d.In == nil { + return fmt.Errorf("dispatcher.Run: In channel is nil") + } + if d.Sinks == nil { + return fmt.Errorf("dispatcher.Run: Sinks map is nil") + } + if d.Pipeline == nil { + d.Pipeline = &pipeline.Pipeline{} + } + + // Build and start sink workers. + fanout, err := NewFanout(ctx, d.Sinks, FanoutOptions{ + QueueSize: d.SinkQueueSize, + EnqueueTimeout: d.SinkEnqueueTimeout, + ConsumeTimeout: d.SinkConsumeTimeout, + Logf: logf, + }) + if err != nil { + return err + } + defer fanout.Close() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + + case e, ok := <-d.In: + if !ok { + // If someone closes the event bus, treat as clean shutdown. + return nil + } + + out, err := d.Pipeline.Process(ctx, e) + if err != nil { + if logf != nil { + logf("dispatcher: pipeline error: %v", err) + } + continue + } + if out == nil { + // Dropped by policy. + continue + } + + d.routeToSinks(ctx, fanout, *out, logf) + } + } +} + +func (d *Dispatcher) routeToSinks(ctx context.Context, fanout *Fanout, e event.Event, logf Logger) { + for _, r := range d.Routes { + if len(r.Kinds) > 0 && !r.Kinds[e.Kind] { + continue + } + + // Publish is now the ONLY thing we do here. + // It is bounded (if configured) and does not call sink adapters directly. + if err := fanout.Publish(ctx, r.SinkName, e); err != nil && logf != nil { + logf("dispatcher: failed to enqueue event for sink %q (id=%s kind=%s source=%s): %v", + r.SinkName, e.ID, e.Kind, e.Source, err) + } + } +} diff --git a/dispatch/fanout.go b/dispatch/fanout.go new file mode 100644 index 0000000..9a4e6dc --- /dev/null +++ b/dispatch/fanout.go @@ -0,0 +1,222 @@ +package dispatch + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/sinks" +) + +// Fanout owns one buffered queue + one worker goroutine per sink. +// +// The goal is to decouple the dispatcher from slow sinks. +// The dispatcher enqueues quickly; each sink consumes at its own pace. +// +// Design notes: +// - We use a global enqueue timeout to prevent unbounded blocking if a sink queue fills. +// - We use a per-consume timeout (context.WithTimeout) to bound sink work. +// - This timeout only works if the sink honors ctx.Done(). Sinks MUST be written to respect context. +// (If a sink ignores context and blocks forever, no safe mechanism exists to "kill" that goroutine.) +type Fanout struct { + workers map[string]*sinkWorker + + enqueueTimeout time.Duration + logger Logger + + closeOnce sync.Once + wg sync.WaitGroup +} + +type FanoutOptions struct { + // QueueSize is the per-sink channel buffer size. + // Larger buffers absorb bursts but can also hide a stuck sink longer. + QueueSize int + + // EnqueueTimeout bounds how long Publish() will wait if a sink queue is full. + // If this fires, the event is dropped for that sink and an error is returned. + EnqueueTimeout time.Duration + + // ConsumeTimeout bounds how long we allow a single sink Consume() call to run. + // If your sink respects context, it should return early with context deadline exceeded. + ConsumeTimeout time.Duration + + Logf Logger +} + +type sinkWorker struct { + name string + sink sinks.Sink + ch chan event.Event + consumeTimeout time.Duration +} + +var ( + // ErrSinkQueueFull indicates that a sink queue could not accept the event within EnqueueTimeout. + ErrSinkQueueFull = errors.New("sink queue full (enqueue timeout)") +) + +// NewFanout builds workers for all provided sinks and starts their goroutines. +func NewFanout(ctx context.Context, sinkMap map[string]sinks.Sink, opts FanoutOptions) (*Fanout, error) { + if sinkMap == nil { + return nil, fmt.Errorf("dispatch.NewFanout: sink map is nil") + } + + queueSize := opts.QueueSize + if queueSize <= 0 { + queueSize = 64 // conservative default; adjust later as needed + } + + f := &Fanout{ + workers: make(map[string]*sinkWorker, len(sinkMap)), + enqueueTimeout: opts.EnqueueTimeout, + logger: opts.Logf, + } + + // Create + start one worker per sink. + for name, s := range sinkMap { + if s == nil { + return nil, fmt.Errorf("dispatch.NewFanout: sink %q is nil", name) + } + + w := &sinkWorker{ + name: name, + sink: s, + ch: make(chan event.Event, queueSize), + consumeTimeout: opts.ConsumeTimeout, + } + + f.workers[name] = w + + f.wg.Add(1) + go func(w *sinkWorker) { + defer f.wg.Done() + f.runWorker(ctx, w) + }(w) + } + + return f, nil +} + +// Close stops all workers by closing their channels and waits for them to exit. +// It is safe to call multiple times. +func (f *Fanout) Close() { + if f == nil { + return + } + + f.closeOnce.Do(func() { + // Closing the per-sink channels tells workers "no more events". + for _, w := range f.workers { + close(w.ch) + } + f.wg.Wait() + }) +} + +// Publish enqueues an event to a named sink's queue. +// +// If the sink queue is full, Publish will wait up to f.enqueueTimeout. +// If enqueueTimeout is <= 0, Publish will block until it can enqueue or ctx cancels. +// +// If Publish returns an error, the event has NOT been enqueued for that sink. +func (f *Fanout) Publish(ctx context.Context, sinkName string, e event.Event) error { + w, ok := f.workers[sinkName] + if !ok { + return fmt.Errorf("dispatch.Fanout.Publish: unknown sink %q", sinkName) + } + + // Fast path: no timeout configured; block until enqueue or ctx cancels. + if f.enqueueTimeout <= 0 { + select { + case w.ch <- e: + return nil + case <-ctx.Done(): + return ctx.Err() + } + } + + // Bounded enqueue: wait up to enqueueTimeout for space in the buffer. + timer := time.NewTimer(f.enqueueTimeout) + defer timer.Stop() + + select { + case w.ch <- e: + return nil + + case <-timer.C: + return fmt.Errorf("%w: sink=%q id=%s kind=%s source=%s", + ErrSinkQueueFull, sinkName, e.ID, e.Kind, e.Source) + + case <-ctx.Done(): + return ctx.Err() + } +} + +func (f *Fanout) runWorker(ctx context.Context, w *sinkWorker) { + for { + select { + case <-ctx.Done(): + // Context cancellation means we're shutting down; drop queued work. + return + + case e, ok := <-w.ch: + if !ok { + // Channel closed: dispatcher is done publishing to this sink. + return + } + + f.consumeOne(ctx, w, e) + } + } +} + +func (f *Fanout) consumeOne(parent context.Context, w *sinkWorker, e event.Event) { + // Always give sinks a context they can observe. + consumeCtx := parent + cancel := func() {} + if w.consumeTimeout > 0 { + consumeCtx, cancel = context.WithTimeout(parent, w.consumeTimeout) + } + defer cancel() + + start := time.Now() + + // Sinks are adapters. We keep the worker alive even if a sink panics: + // it's better to log loudly and continue than to silently lose a sink forever. + defer func() { + if r := recover(); r != nil { + f.logf("dispatch: sink %q PANIC while consuming event (id=%s kind=%s source=%s): %v", + w.name, e.ID, e.Kind, e.Source, r) + } + }() + + err := w.sink.Consume(consumeCtx, e) + elapsed := time.Since(start) + + if err == nil { + return + } + + // If the sink respects context, timeouts should surface as context deadline exceeded. + // We log a distinct message because it's operationally useful. + if errors.Is(err, context.DeadlineExceeded) { + f.logf("dispatch: sink %q timed out after %s consuming event (id=%s kind=%s source=%s)", + w.name, elapsed, e.ID, e.Kind, e.Source) + return + } + + // Normal errors. + f.logf("dispatch: sink %q failed consuming event (id=%s kind=%s source=%s): %v", + w.name, e.ID, e.Kind, e.Source, err) +} + +func (f *Fanout) logf(format string, args ...any) { + if f == nil || f.logger == nil { + return + } + f.logger(format, args...) +} diff --git a/event/doc.go b/event/doc.go new file mode 100644 index 0000000..5cceea8 --- /dev/null +++ b/event/doc.go @@ -0,0 +1,8 @@ +// Package event defines the domain-agnostic event envelope used by feedkit-style daemons. +// +// The core idea: +// - feedkit infrastructure moves "events" around without knowing anything about the domain. +// - domain-specific code (weatherfeeder, newsfeeder, etc.) provides a concrete Payload. +// +// This package should NOT import any domain packages (weather model types, etc.). +package event diff --git a/event/event.go b/event/event.go new file mode 100644 index 0000000..6ed4e79 --- /dev/null +++ b/event/event.go @@ -0,0 +1,101 @@ +package event + +import ( + "errors" + "strings" + "time" +) + +// ErrInvalidEvent is a sentinel error so callers can do: +// +// if errors.Is(err, event.ErrInvalidEvent) { ... } +var ErrInvalidEvent = errors.New("invalid event") + +// ValidationError reports one or more problems with an Event. +// +// We keep this structured so you get ALL issues in one pass rather than fixing +// them one-by-one. +type ValidationError struct { + Problems []string +} + +func (e *ValidationError) Error() string { + if e == nil || len(e.Problems) == 0 { + return "invalid event" + } + var b strings.Builder + b.WriteString("invalid event:\n") + for _, p := range e.Problems { + b.WriteString(" - ") + b.WriteString(p) + b.WriteString("\n") + } + return strings.TrimRight(b.String(), "\n") +} + +// Is lets errors.Is(err, ErrInvalidEvent) work. +func (e *ValidationError) Is(target error) bool { + return target == ErrInvalidEvent +} + +// Event is the domain-agnostic envelope moved through scheduler → pipeline → dispatcher → sinks. +// +// Payload is intentionally "any": +// - domain code can set it to a struct (recommended), a map, or some other JSON-marshable type. +// - feedkit infrastructure never type-asserts Payload. +// +// If you plan to persist/emit events as JSON, ensure Payload is JSON-marshable. +type Event struct { + // ID should be stable for dedupe/storage purposes. + // Often: "::". + ID string `json:"id"` + + // Kind is used for routing/policy. + Kind Kind `json:"kind"` + + // Source is the configured source name (e.g. "OpenMeteoObservation", "NWSAlertsSTL"). + Source string `json:"source"` + + // EmittedAt is when *your daemon* emitted this event (typically time.Now().UTC()). + EmittedAt time.Time `json:"emitted_at"` + + // EffectiveAt is optional: the timestamp the payload is "about" (e.g. observation time). + EffectiveAt *time.Time `json:"effective_at,omitempty"` + + // Schema is optional but strongly recommended once multiple domains exist. + // Examples: + // "weather.observation.v1" + // "news.article.v1" + // It helps sinks and downstream consumers interpret Payload. + Schema string `json:"schema,omitempty"` + + // Payload is domain-defined and must be non-nil. + Payload any `json:"payload"` +} + +// Validate enforces basic invariants that infrastructure depends on. +// Domain-specific validation belongs in domain code (or domain processors). +func (e Event) Validate() error { + var problems []string + + if strings.TrimSpace(e.ID) == "" { + problems = append(problems, "ID is required") + } + if strings.TrimSpace(string(e.Kind)) == "" { + problems = append(problems, "Kind is required") + } + if strings.TrimSpace(e.Source) == "" { + problems = append(problems, "Source is required") + } + if e.EmittedAt.IsZero() { + problems = append(problems, "EmittedAt must be set (non-zero)") + } + if e.Payload == nil { + problems = append(problems, "Payload must be non-nil") + } + + if len(problems) > 0 { + return &ValidationError{Problems: problems} + } + return nil +} diff --git a/event/kind.go b/event/kind.go new file mode 100644 index 0000000..a89bf1d --- /dev/null +++ b/event/kind.go @@ -0,0 +1,30 @@ +package event + +import ( + "fmt" + "strings" +) + +// Kind identifies the "type/category" of an event for routing and policy decisions. +// +// Kind is intentionally open-ended (stringly-typed), because different daemons will +// have different kinds: +// +// weatherfeeder: "observation", "forecast", "alert" +// newsfeeder: "article", "breaking", ... +// stockfeeder: "quote", "bar", "news", ... +// +// Conventions (recommended, not required): +// - lowercase +// - words separated by underscores if needed +type Kind string + +// ParseKind normalizes and validates a kind string. +// It lowercases and trims whitespace, and rejects empty values. +func ParseKind(s string) (Kind, error) { + k := strings.ToLower(strings.TrimSpace(s)) + if k == "" { + return "", fmt.Errorf("kind cannot be empty") + } + return Kind(k), nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fc41095 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module gitea.maximumdirect.net/ejr/feedkit + +go 1.22 + +require gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..4bc0337 --- /dev/null +++ b/go.sum @@ -0,0 +1,3 @@ +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pipeline/dedupe.go b/pipeline/dedupe.go new file mode 100644 index 0000000..219a56a --- /dev/null +++ b/pipeline/dedupe.go @@ -0,0 +1,5 @@ +package pipeline + +// Placeholder for dedupe processor: +// - key by Event.ID or computed key +// - in-memory store first; later optional Postgres-backed diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go new file mode 100644 index 0000000..a35fc51 --- /dev/null +++ b/pipeline/pipeline.go @@ -0,0 +1,43 @@ +package pipeline + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Processor can mutate/drop events (dedupe, rate-limit, normalization tweaks). +type Processor interface { + Process(ctx context.Context, in event.Event) (out *event.Event, err error) +} + +type Pipeline struct { + Processors []Processor +} + +func (p *Pipeline) Process(ctx context.Context, e event.Event) (*event.Event, error) { + if err := e.Validate(); err != nil { + return nil, fmt.Errorf("pipeline: invalid input event: %w", err) + } + + cur := &e + + for _, proc := range p.Processors { + out, err := proc.Process(ctx, *cur) + if err != nil { + return nil, err + } + if out == nil { + // Dropped by policy. + return nil, nil + } + cur = out + } + + if err := cur.Validate(); err != nil { + return nil, fmt.Errorf("pipeline: invalid output event: %w", err) + } + + return cur, nil +} diff --git a/pipeline/ratelimit.go b/pipeline/ratelimit.go new file mode 100644 index 0000000..1235aa5 --- /dev/null +++ b/pipeline/ratelimit.go @@ -0,0 +1,5 @@ +package pipeline + +// Placeholder for rate limit processor: +// - per source/kind sink routing limits +// - cooldown windows diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go new file mode 100644 index 0000000..2557bd6 --- /dev/null +++ b/scheduler/scheduler.go @@ -0,0 +1,174 @@ +package scheduler + +import ( + "context" + "fmt" + "hash/fnv" + "math/rand" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/sources" +) + +type Job struct { + Source sources.Source + Every time.Duration + + // Jitter is the maximum additional delay added before each poll. + // Example: if Every=15m and Jitter=30s, each poll will occur at: + // tick time + random(0..30s) + // + // If Jitter == 0, we compute a default jitter based on Every. + Jitter time.Duration +} + +type Logger func(format string, args ...any) + +type Scheduler struct { + Jobs []Job + Out chan<- event.Event + Logf Logger +} + +// Run starts one polling goroutine per job. +// Each job runs on its own interval and emits 0..N events per poll. +func (s *Scheduler) Run(ctx context.Context) error { + if s.Out == nil { + return fmt.Errorf("scheduler.Run: Out channel is nil") + } + if len(s.Jobs) == 0 { + return fmt.Errorf("scheduler.Run: no jobs configured") + } + + for _, job := range s.Jobs { + job := job // capture loop variable + go s.runJob(ctx, job) + } + + <-ctx.Done() + return ctx.Err() +} + +func (s *Scheduler) runJob(ctx context.Context, job Job) { + if job.Source == nil { + s.logf("scheduler: job has nil source") + return + } + if job.Every <= 0 { + s.logf("scheduler: job %s has invalid interval", job.Source.Name()) + return + } + + // Compute jitter: either configured per job, or a sensible default. + jitter := effectiveJitter(job.Every, job.Jitter) + + // Each worker gets its own RNG (safe + no lock contention). + seed := time.Now().UnixNano() ^ int64(hashStringFNV32a(job.Source.Name())) + rng := rand.New(rand.NewSource(seed)) + + // Optional startup jitter: avoids all jobs firing at the exact moment the daemon starts. + if !sleepJitter(ctx, rng, jitter) { + return + } + + // Immediate poll at startup (after startup jitter). + s.pollOnce(ctx, job) + + t := time.NewTicker(job.Every) + defer t.Stop() + + for { + select { + case <-t.C: + // Per-tick jitter: spreads calls out within the interval. + if !sleepJitter(ctx, rng, jitter) { + return + } + s.pollOnce(ctx, job) + + case <-ctx.Done(): + return + } + } +} + +func (s *Scheduler) pollOnce(ctx context.Context, job Job) { + events, err := job.Source.Poll(ctx) + if err != nil { + s.logf("scheduler: poll failed (%s): %v", job.Source.Name(), err) + return + } + + for _, e := range events { + select { + case s.Out <- e: + case <-ctx.Done(): + return + } + } +} + +func (s *Scheduler) logf(format string, args ...any) { + if s.Logf == nil { + return + } + s.Logf(format, args...) +} + +// effectiveJitter chooses a jitter value. +// - If configuredMax > 0, use it (but clamp). +// - Else default to min(every/10, 30s). +// - Clamp to at most every/2 (so jitter can’t delay more than half the interval). +func effectiveJitter(every time.Duration, configuredMax time.Duration) time.Duration { + if every <= 0 { + return 0 + } + + j := configuredMax + if j <= 0 { + j = every / 10 + if j > 30*time.Second { + j = 30 * time.Second + } + } + + // Clamp jitter so it doesn’t dominate the schedule. + maxAllowed := every / 2 + if j > maxAllowed { + j = maxAllowed + } + if j < 0 { + j = 0 + } + return j +} + +// sleepJitter sleeps for a random duration in [0, max]. +// Returns false if the context is cancelled while waiting. +func sleepJitter(ctx context.Context, rng *rand.Rand, max time.Duration) bool { + if max <= 0 { + return true + } + + // Int63n requires a positive argument. + // We add 1 so max itself is attainable. + n := rng.Int63n(int64(max) + 1) + d := time.Duration(n) + + timer := time.NewTimer(d) + defer timer.Stop() + + select { + case <-timer.C: + return true + case <-ctx.Done(): + return false + } +} + +func hashStringFNV32a(s string) uint32 { + h := fnv.New32a() + _, _ = h.Write([]byte(s)) + return h.Sum32() +} diff --git a/scheduler/worker.go b/scheduler/worker.go new file mode 100644 index 0000000..8830daa --- /dev/null +++ b/scheduler/worker.go @@ -0,0 +1,7 @@ +package scheduler + +// Placeholder for per-source worker logic: +// - ticker loop +// - jitter +// - backoff on errors +// - emits events into scheduler.Out diff --git a/sinks/builtins.go b/sinks/builtins.go new file mode 100644 index 0000000..7313b98 --- /dev/null +++ b/sinks/builtins.go @@ -0,0 +1,55 @@ +package sinks + +import ( + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +// RegisterBuiltins registers sink drivers included in this binary. +// +// In feedkit, these are "infrastructure primitives" — they are not domain-specific. +// Individual daemons can choose to call this (or register their own custom sinks). +func RegisterBuiltins(r *Registry) { + // Stdout sink: great for debugging, piping to jq, etc. + r.Register("stdout", func(cfg config.SinkConfig) (Sink, error) { + return NewStdoutSink(cfg.Name), nil + }) + + // File sink: writes/archives events somewhere on disk. + r.Register("file", func(cfg config.SinkConfig) (Sink, error) { + return NewFileSinkFromConfig(cfg) + }) + + // Postgres sink: persists events durably. + r.Register("postgres", func(cfg config.SinkConfig) (Sink, error) { + return NewPostgresSinkFromConfig(cfg) + }) + + // RabbitMQ sink: publishes events to a broker for downstream consumers. + r.Register("rabbitmq", func(cfg config.SinkConfig) (Sink, error) { + return NewRabbitMQSinkFromConfig(cfg) + }) +} + +// ---- helpers for validating sink params ---- +// +// These helpers live in sinks (not config) on purpose: +// - config is domain-agnostic and should not embed driver-specific validation helpers. +// - sinks are adapters; validating their own params here keeps the logic near the driver. + +func requireStringParam(cfg config.SinkConfig, key string) (string, error) { + v, ok := cfg.Params[key] + if !ok { + return "", fmt.Errorf("sink %q: params.%s is required", cfg.Name, key) + } + s, ok := v.(string) + if !ok { + return "", fmt.Errorf("sink %q: params.%s must be a string", cfg.Name, key) + } + if strings.TrimSpace(s) == "" { + return "", fmt.Errorf("sink %q: params.%s cannot be empty", cfg.Name, key) + } + return s, nil +} diff --git a/sinks/file.go b/sinks/file.go new file mode 100644 index 0000000..ce582e8 --- /dev/null +++ b/sinks/file.go @@ -0,0 +1,30 @@ +package sinks + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type FileSink struct { + name string + path string +} + +func NewFileSinkFromConfig(cfg config.SinkConfig) (Sink, error) { + path, err := requireStringParam(cfg, "path") + if err != nil { + return nil, err + } + return &FileSink{name: cfg.Name, path: path}, nil +} + +func (s *FileSink) Name() string { return s.name } + +func (s *FileSink) Consume(ctx context.Context, e event.Event) error { + _ = ctx + _ = e + return fmt.Errorf("file sink: TODO implement (path=%s)", s.path) +} diff --git a/sinks/postgres.go b/sinks/postgres.go new file mode 100644 index 0000000..9567a3a --- /dev/null +++ b/sinks/postgres.go @@ -0,0 +1,37 @@ +package sinks + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type PostgresSink struct { + name string + dsn string +} + +func NewPostgresSinkFromConfig(cfg config.SinkConfig) (Sink, error) { + dsn, err := requireStringParam(cfg, "dsn") + if err != nil { + return nil, err + } + return &PostgresSink{name: cfg.Name, dsn: dsn}, nil +} + +func (p *PostgresSink) Name() string { return p.name } + +func (p *PostgresSink) Consume(ctx context.Context, e event.Event) error { + _ = ctx + + // Boundary validation: if something upstream violated invariants, + // surface it loudly rather than printing partial nonsense. + if err := e.Validate(); err != nil { + return fmt.Errorf("rabbitmq sink: invalid event: %w", err) + } + + // TODO implement Postgres transaction + return nil +} diff --git a/sinks/rabbitmq.go b/sinks/rabbitmq.go new file mode 100644 index 0000000..15d81c5 --- /dev/null +++ b/sinks/rabbitmq.go @@ -0,0 +1,42 @@ +package sinks + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type RabbitMQSink struct { + name string + url string + exchange string +} + +func NewRabbitMQSinkFromConfig(cfg config.SinkConfig) (Sink, error) { + url, err := requireStringParam(cfg, "url") + if err != nil { + return nil, err + } + ex, err := requireStringParam(cfg, "exchange") + if err != nil { + return nil, err + } + return &RabbitMQSink{name: cfg.Name, url: url, exchange: ex}, nil +} + +func (r *RabbitMQSink) Name() string { return r.name } + +func (r *RabbitMQSink) Consume(ctx context.Context, e event.Event) error { + _ = ctx + + // Boundary validation: if something upstream violated invariants, + // surface it loudly rather than printing partial nonsense. + if err := e.Validate(); err != nil { + return fmt.Errorf("rabbitmq sink: invalid event: %w", err) + } + + // TODO implement RabbitMQ publishing + return nil +} diff --git a/sinks/registry.go b/sinks/registry.go new file mode 100644 index 0000000..c3e8b9e --- /dev/null +++ b/sinks/registry.go @@ -0,0 +1,33 @@ +package sinks + +import ( + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +// Factory constructs a sink instance from config. +// +// This is the mechanism that lets concrete daemons wire in whatever sinks they +// want without main.go being full of switch statements. +type Factory func(cfg config.SinkConfig) (Sink, error) + +type Registry struct { + byDriver map[string]Factory +} + +func NewRegistry() *Registry { + return &Registry{byDriver: map[string]Factory{}} +} + +func (r *Registry) Register(driver string, f Factory) { + r.byDriver[driver] = f +} + +func (r *Registry) Build(cfg config.SinkConfig) (Sink, error) { + f, ok := r.byDriver[cfg.Driver] + if !ok { + return nil, fmt.Errorf("unknown sink driver: %q", cfg.Driver) + } + return f(cfg) +} diff --git a/sinks/sink.go b/sinks/sink.go new file mode 100644 index 0000000..ae77980 --- /dev/null +++ b/sinks/sink.go @@ -0,0 +1,16 @@ +package sinks + +import ( + "context" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Sink is an adapter that consumes a stream of domain-agnostic events. +// +// Sinks MUST respect ctx.Done() whenever they do I/O or blocking work. +// (Fanout timeouts only help if the sink cooperates with context cancellation.) +type Sink interface { + Name() string + Consume(ctx context.Context, e event.Event) error +} diff --git a/sinks/stdout.go b/sinks/stdout.go new file mode 100644 index 0000000..d3c94b4 --- /dev/null +++ b/sinks/stdout.go @@ -0,0 +1,36 @@ +package sinks + +import ( + "context" + "encoding/json" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type StdoutSink struct{ name string } + +func NewStdoutSink(name string) *StdoutSink { + return &StdoutSink{name: name} +} + +func (s *StdoutSink) Name() string { return s.name } + +func (s *StdoutSink) Consume(ctx context.Context, e event.Event) error { + _ = ctx + + // Boundary validation: if something upstream violated invariants, + // surface it loudly rather than printing partial nonsense. + if err := e.Validate(); err != nil { + return fmt.Errorf("stdout sink: invalid event: %w", err) + } + + // Generic default: one JSON line per event. + // This makes stdout useful across all domains and easy to pipe into jq / logs. + b, err := json.Marshal(e) + if err != nil { + return fmt.Errorf("stdout sink: marshal event: %w", err) + } + fmt.Println(string(b)) + return nil +} diff --git a/sources/registry.go b/sources/registry.go new file mode 100644 index 0000000..3635d41 --- /dev/null +++ b/sources/registry.go @@ -0,0 +1,49 @@ +package sources + +import ( + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +// Factory constructs a configured Source instance from config. +// +// This is how concrete daemons (weatherfeeder/newsfeeder/...) register their +// domain-specific source drivers (Open-Meteo, NWS, RSS, etc.) while feedkit +// remains domain-agnostic. +type Factory func(cfg config.SourceConfig) (Source, error) + +type Registry struct { + byDriver map[string]Factory +} + +func NewRegistry() *Registry { + return &Registry{byDriver: map[string]Factory{}} +} + +// Register associates a driver name (e.g. "openmeteo_observation") with a factory. +// +// The driver string is the "lookup key" used by config.sources[].driver. +func (r *Registry) Register(driver string, f Factory) { + driver = strings.TrimSpace(driver) + if driver == "" { + // Panic is appropriate here: registering an empty driver is always a programmer error, + // and it will lead to extremely confusing runtime behavior if allowed. + panic("sources.Registry.Register: driver cannot be empty") + } + if f == nil { + panic(fmt.Sprintf("sources.Registry.Register: factory cannot be nil (driver=%q)", driver)) + } + + r.byDriver[driver] = f +} + +// Build constructs a Source from a SourceConfig by looking up cfg.Driver. +func (r *Registry) Build(cfg config.SourceConfig) (Source, error) { + f, ok := r.byDriver[cfg.Driver] + if !ok { + return nil, fmt.Errorf("unknown source driver: %q", cfg.Driver) + } + return f(cfg) +} diff --git a/sources/source.go b/sources/source.go new file mode 100644 index 0000000..997d10c --- /dev/null +++ b/sources/source.go @@ -0,0 +1,30 @@ +package sources + +import ( + "context" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Source is a configured polling job that emits 0..N events per poll. +// +// Source implementations live in domain modules (weatherfeeder/newsfeeder/...) +// and are registered into a feedkit sources.Registry. +// +// feedkit infrastructure treats Source as opaque; it just calls Poll() +// on the configured cadence and publishes the resulting events. +type Source interface { + // Name is the configured source name (used for logs and included in emitted events). + Name() string + + // Kind is the "primary kind" emitted by this source. + // + // This is mainly useful as a *safety check* (e.g. config says kind=forecast but + // driver emits observation). Some future sources may emit multiple kinds; if/when + // that happens, we can evolve this interface (e.g., make Kind optional, or remove it). + Kind() event.Kind + + // Poll fetches from upstream and returns 0..N events. + // Implementations should honor ctx.Done() for network calls and other I/O. + Poll(ctx context.Context) ([]event.Event, error) +}