diff --git a/doc.go b/doc.go index 392838b..e34fb70 100644 --- a/doc.go +++ b/doc.go @@ -16,8 +16,7 @@ // In feedkit today, that maps to: // // Collect: sources.Source + scheduler.Scheduler -// Normalize: (today: domain code typically does this inside Source.Poll; -// future: a normalization Processor is a good fit) +// Normalize: (optional) normalize.Processor (or domain code inside Source.Poll) // Policy: pipeline.Pipeline (Processor chain; dedupe/ratelimit are planned) // Emit: dispatch.Dispatcher + dispatch.Fanout // Sinks: sinks.Sink (+ sinks.Registry to build from config) @@ -76,6 +75,147 @@ // // - dedupe/ratelimit processors are placeholders (planned). // +// - normalize +// Optional normalization hook for splitting "fetch" from "transform". +// +// Many domains (like weather) ingest multiple upstream providers whose payloads +// differ. A common evolution is to keep sources small and focused on polling, +// and move mapping/normalization into a dedicated stage. +// +// feedkit provides this as an OPTIONAL pipeline processor: +// +// - normalize.Normalizer: domain-implemented mapping logic +// +// - normalize.Registry: holds normalizers and selects one by Match() +// +// - normalize.Processor: adapts Registry into a pipeline.Processor +// +// Normalization is NOT required: +// +// - If you do all normalization inside Source.Poll, you can ignore this package. +// +// - If normalize.Processor is not installed in your pipeline, nothing changes. +// +// - If normalize.Processor is installed but no Normalizer matches an event, +// the event passes through unchanged. +// +// The key types: +// +// type Normalizer interface { +// // Match returns true if this normalizer should handle the event. +// // Matching is intentionally flexible: match on Schema, Kind, Source, +// // or any combination. +// Match(e event.Event) bool +// +// // Normalize converts the incoming event into a new (or modified) event. +// // +// // Return values: +// // - (out, nil) where out != nil: emit the normalized event +// // - (nil, nil): drop the event (policy drop) +// // - (nil, err): fail the pipeline +// Normalize(ctx context.Context, in event.Event) (*event.Event, error) +// } +// +// type Registry struct { ... } +// +// func (r *Registry) Register(n Normalizer) +// +// // Normalize finds the first matching normalizer (in registration order) and applies it. +// // If none match, it returns the input event unchanged. +// func (r *Registry) Normalize(ctx context.Context, in event.Event) (*event.Event, error) +// +// // Processor implements pipeline.Processor and calls into the Registry. +// // Optional behavior: +// // - If Registry is nil, Processor is a no-op pass-through. +// // - If RequireMatch is false (default), non-matching events pass through. +// // - If RequireMatch is true, non-matching events are treated as errors. +// type Processor struct { +// Registry *Registry +// RequireMatch bool +// } +// +// "First match wins": +// Registry applies the first Normalizer whose Match() returns true. +// This is intentional: normalization is usually a single mapping step from a +// raw schema into a canonical schema. If you want multiple sequential transforms, +// model them as multiple pipeline processors. +// +// Recommended convention: match by Event.Schema +// ------------------------------------------------ +// Schema gives you a versionable selector that doesn't depend on source names. +// +// A common pattern is: +// +// - sources emit "raw" events with Schema like: +// "raw.openweather.current.v1" +// "raw.openmeteo.current.v1" +// "raw.nws.observation.v1" +// +// - normalizers transform them into canonical domain schemas like: +// "weather.observation.v1" +// "weather.forecast.v1" +// "weather.alert.v1" +// +// What is a "raw event"? +// ------------------------------------------------ +// feedkit does not prescribe the raw payload representation. +// A raw payload is typically one of: +// +// - json.RawMessage (recommended for JSON APIs) +// +// - []byte (raw bytes) +// +// - map[string]any (already-decoded but untyped JSON) +// +// The only hard requirement enforced by feedkit is Event.Validate(): +// +// - ID, Kind, Source, EmittedAt must be set +// +// - Payload must be non-nil +// +// If you use raw events, you still must provide Event.Kind. +// Typical approaches: +// +// - set Kind to the intended canonical kind (e.g. "observation") even before normalization +// +// - or set Kind to a domain-defined "raw_*" kind and normalize it later +// +// The simplest approach is: set Kind to the final kind early, and use Schema +// to describe the raw-vs-normalized payload shape. +// +// Wiring example (daemon main.go) +// ------------------------------------------------ +// Install normalize.Processor at the front of your pipeline: +// +// normReg := &normalize.Registry{} +// +// normReg.Register(normalize.Func{ +// Name: "openweather current -> weather.observation.v1", +// MatchFn: func(e event.Event) bool { +// return e.Schema == "raw.openweather.current.v1" +// }, +// NormalizeFn: func(ctx context.Context, in event.Event) (*event.Event, error) { +// // 1) interpret in.Payload (json.RawMessage / []byte / map) +// // 2) build canonical domain payload +// // 3) return updated event +// +// out := in +// out.Schema = "weather.observation.v1" +// // Optionally adjust Kind, EffectiveAt, etc. +// out.Payload = /* canonical weather observation struct */ +// return &out, nil +// }, +// }) +// +// p := &pipeline.Pipeline{ +// Processors: []pipeline.Processor{ +// normalize.Processor{Registry: normReg}, // optional stage +// // dedupe.New(...), ratelimit.New(...), ... +// }, +// } +// +// If the event does not match any normalizer, it passes through unmodified. +// // - sinks // Extension point for output adapters. // @@ -141,13 +281,24 @@ // // Event bus. // bus := make(chan event.Event, 256) // +// // Optional normalization registry + pipeline. +// normReg := &normalize.Registry{} +// // domain registers normalizers into normReg... +// +// p := &pipeline.Pipeline{ +// Processors: []pipeline.Processor{ +// normalize.Processor{Registry: normReg}, // optional +// // dedupe/ratelimit/etc... +// }, +// } +// // // Scheduler. // s := &scheduler.Scheduler{Jobs: jobs, Out: bus, Logf: logf} // // // Dispatcher. // d := &dispatch.Dispatcher{ // In: bus, -// Pipeline: &pipeline.Pipeline{Processors: nil}, +// Pipeline: p, // Sinks: builtSinks, // Routes: routes, // } @@ -167,13 +318,12 @@ // All blocking or I/O work should honor ctx.Done(): // - sources.Source.Poll should pass ctx to HTTP calls, etc. // - sinks.Sink.Consume should honor ctx (Fanout timeouts only help if sinks cooperate). +// - normalizers should honor ctx if they do expensive work (rare; usually pure transforms). // // Future additions (likely) // // - A small Runner helper that performs the standard wiring (load config, // build sources/sinks/routes, run scheduler+dispatcher, handle shutdown). -// - A normalization hook (a Pipeline Processor + registry) that allows sources -// to emit "raw" payloads and defer normalization to a dedicated stage. // // # Non-goals // diff --git a/normalize/doc.go b/normalize/doc.go new file mode 100644 index 0000000..f6dc773 --- /dev/null +++ b/normalize/doc.go @@ -0,0 +1,17 @@ +// Package normalize provides an OPTIONAL normalization hook for feedkit pipelines. +// +// Motivation: +// Many daemons have sources that: +// 1. fetch raw upstream data (often JSON), and +// 2. transform it into a domain's normalized payload format. +// +// Doing both steps inside Source.Poll works, but tends to make sources large and +// encourages duplication (unit conversions, common mapping helpers, etc.). +// +// This package lets a source emit a "raw" event (e.g., Schema="raw.openweather.current.v1", +// Payload=json.RawMessage), and then a normalization processor can convert it into a +// normalized event (e.g., Schema="weather.observation.v1", Payload=WeatherObservation{}). +// +// Key property: normalization is optional. +// If no registered Normalizer matches an event, it passes through unchanged. +package normalize diff --git a/normalize/normalize.go b/normalize/normalize.go new file mode 100644 index 0000000..2f75b0e --- /dev/null +++ b/normalize/normalize.go @@ -0,0 +1,76 @@ +package normalize + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Normalizer converts one event shape into another. +// +// A Normalizer is typically domain-owned code (weatherfeeder/newsfeeder/...) +// that knows how to interpret a specific upstream payload and produce a +// normalized payload. +// +// Normalizers are selected via Match(). The matching strategy is intentionally +// flexible: implementations may match on Schema, Kind, Source, or any other +// Event fields. +type Normalizer interface { + // Match reports whether this normalizer applies to the given event. + // + // Common patterns: + // - match on e.Schema (recommended for versioning) + // - match on e.Source (useful if Schema is empty) + // - match on (e.Kind + e.Source), etc. + Match(e event.Event) bool + + // Normalize transforms the incoming event into a new (or modified) event. + // + // Return values: + // - (out, nil) where out != nil: emit the normalized event + // - (nil, nil): drop the event (treat as policy drop) + // - (nil, err): fail the pipeline + // + // Note: If you simply want to pass the event through unchanged, return &in. + Normalize(ctx context.Context, in event.Event) (*event.Event, error) +} + +// Func is an ergonomic adapter that lets you define a Normalizer with functions. +// +// Example: +// +// n := normalize.Func{ +// MatchFn: func(e event.Event) bool { return e.Schema == "raw.openweather.current.v1" }, +// NormalizeFn: func(ctx context.Context, in event.Event) (*event.Event, error) { +// // ... map in.Payload -> normalized payload ... +// }, +// } +type Func struct { + MatchFn func(e event.Event) bool + NormalizeFn func(ctx context.Context, in event.Event) (*event.Event, error) + + // Optional: helps produce nicer panic/error messages if something goes wrong. + Name string +} + +func (f Func) Match(e event.Event) bool { + if f.MatchFn == nil { + return false + } + return f.MatchFn(e) +} + +func (f Func) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + if f.NormalizeFn == nil { + return nil, fmt.Errorf("normalize.Func(%s): NormalizeFn is nil", f.safeName()) + } + return f.NormalizeFn(ctx, in) +} + +func (f Func) safeName() string { + if f.Name == "" { + return "" + } + return f.Name +} diff --git a/normalize/registry.go b/normalize/registry.go new file mode 100644 index 0000000..054159b --- /dev/null +++ b/normalize/registry.go @@ -0,0 +1,140 @@ +package normalize + +import ( + "context" + "fmt" + "sync" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Registry holds a set of Normalizers and selects one for a given event. +// +// Selection rule (simple + predictable): +// - iterate in registration order +// - the FIRST Normalizer whose Match(e) returns true is used +// +// If none match, the event passes through unchanged. +// +// Why "first match wins"? +// Normalization is usually a single mapping step from a raw schema/version into +// a normalized schema/version. If you want multiple transformation steps, +// model them as multiple pipeline processors (which feedkit already supports). +type Registry struct { + mu sync.RWMutex + ns []Normalizer +} + +// Register adds a normalizer to the registry. +// +// Register panics if n is nil; this is a programmer error and should fail fast. +func (r *Registry) Register(n Normalizer) { + if n == nil { + panic("normalize.Registry.Register: normalizer cannot be nil") + } + r.mu.Lock() + defer r.mu.Unlock() + r.ns = append(r.ns, n) +} + +// Normalize finds the first matching Normalizer and applies it. +// +// If no normalizer matches, it returns the input event unchanged. +// +// If a normalizer returns (nil, nil), the event is dropped. +func (r *Registry) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + if r == nil { + // Nil registry is a valid "feature off" state. + out := in + return &out, nil + } + + r.mu.RLock() + ns := append([]Normalizer(nil), r.ns...) // copy for safe iteration outside lock + r.mu.RUnlock() + + for _, n := range ns { + if n == nil { + // Shouldn't happen (Register panics), but guard anyway. + continue + } + if !n.Match(in) { + continue + } + + out, err := n.Normalize(ctx, in) + if err != nil { + return nil, fmt.Errorf("normalize: normalizer failed: %w", err) + } + // out may be nil to signal "drop". + return out, nil + } + + // No match: pass through unchanged. + out := in + return &out, nil +} + +// Processor adapts a Registry into a pipeline Processor. +// +// It implements: +// +// Process(ctx context.Context, in event.Event) (*event.Event, error) +// +// which matches feedkit/pipeline.Processor. +// +// Optionality: +// - If Registry is nil, Processor becomes a no-op pass-through. +// - If Registry has no matching normalizer for an event, that event passes through unchanged. +type Processor struct { + Registry *Registry + + // If true, events that do not match any normalizer cause an error. + // Default is false (pass-through), which is the behavior you asked for. + RequireMatch bool +} + +// Process implements the pipeline.Processor interface. +func (p Processor) Process(ctx context.Context, in event.Event) (*event.Event, error) { + // "Feature off": no registry means no normalization. + if p.Registry == nil { + out := in + return &out, nil + } + + out, err := p.Registry.Normalize(ctx, in) + if err != nil { + return nil, err + } + + if out == nil { + // Dropped by normalization policy. + return nil, nil + } + + if p.RequireMatch { + // Detect "no-op pass-through due to no match" by checking whether a match existed. + // We do this with a cheap second pass to avoid changing Normalize()'s signature. + // (This is rare to enable; correctness/clarity > micro-optimization.) + if !p.Registry.hasMatch(in) { + return nil, fmt.Errorf("normalize: no normalizer matched event (id=%s kind=%s source=%s schema=%q)", + in.ID, in.Kind, in.Source, in.Schema) + } + } + + return out, nil +} + +func (r *Registry) hasMatch(in event.Event) bool { + if r == nil { + return false + } + r.mu.RLock() + defer r.mu.RUnlock() + for _, n := range r.ns { + if n != nil && n.Match(in) { + return true + } + } + return false +}