diff --git a/README.md b/README.md index fcdc9b6..6c51b9f 100644 --- a/README.md +++ b/README.md @@ -15,13 +15,12 @@ lifecycle, domain schemas, and domain-specific validation in your daemon. ## Conceptual pipeline -Collect -> Normalize (optional) -> Policy -> Route -> Emit +Collect -> Process (optional stages, including normalize) -> Route -> Emit | Stage | Package(s) | |---|---| | Collect | `sources`, `scheduler` | -| Normalize | `normalize` (optional in `pipeline`) | -| Policy | `pipeline` | +| Process | `pipeline`, `processors`, `normalize` (optional stage) | | Route | `dispatch` | | Emit | `sinks` | | Configure | `config` | @@ -82,10 +81,15 @@ Runs one goroutine per source job: Optional processing chain between collection and dispatch. Processors can transform, drop, or reject events. +### `processors` + +Defines the generic processor interface and a named-driver registry used by +daemons to build ordered processor chains. + ### `normalize` -Optional normalization package (already implemented). Typical use: sources emit raw -payload events, then normalize to canonical schemas in a pipeline stage. +Concrete normalization processor implementation. Typical use: sources emit raw +payload events, then a normalize stage maps them to canonical schemas. ### `dispatch` diff --git a/doc.go b/doc.go index 940c638..5cb7151 100644 --- a/doc.go +++ b/doc.go @@ -5,16 +5,15 @@ // // Conceptual flow: // -// Collect -> Normalize (optional) -> Policy -> Route -> Emit +// Collect -> Process (optional stages, including normalize) -> Route -> Emit // // In feedkit this maps to: // -// Collect: sources + scheduler -// Normalize: normalize (optional pipeline stage) -// Policy: pipeline -// Route: dispatch -// Emit: sinks -// Config: config +// Collect: sources + scheduler +// Process: pipeline + processors + normalize (optional stage) +// Route: dispatch +// Emit: sinks +// Config: config // // feedkit intentionally does not define domain payload schemas or domain-specific // validation rules. Those belong in each concrete daemon. @@ -62,8 +61,11 @@ // Processor chain between scheduler and dispatch. // Processors can transform, drop, or reject events. // +// - processors +// Generic processor interface and named factory registry for wiring chains. +// // - normalize -// Optional pipeline processor for raw->canonical mapping. +// Concrete pipeline processor for raw->canonical mapping. // If no normalizer matches, the event passes through unchanged by default. // // - dispatch diff --git a/normalize/doc.go b/normalize/doc.go index f6dc773..a93a5d4 100644 --- a/normalize/doc.go +++ b/normalize/doc.go @@ -1,4 +1,4 @@ -// Package normalize provides an OPTIONAL normalization hook for feedkit pipelines. +// Package normalize provides a concrete normalization processor for feedkit pipelines. // // Motivation: // Many daemons have sources that: @@ -9,9 +9,9 @@ // encourages duplication (unit conversions, common mapping helpers, etc.). // // This package lets a source emit a "raw" event (e.g., Schema="raw.openweather.current.v1", -// Payload=json.RawMessage), and then a normalization processor can convert it into a +// Payload=json.RawMessage), and then a normalize.Processor can convert it into a // normalized event (e.g., Schema="weather.observation.v1", Payload=WeatherObservation{}). // // Key property: normalization is optional. -// If no registered Normalizer matches an event, it passes through unchanged. +// If no Normalizer matches an event, Processor passes it through unchanged by default. package normalize diff --git a/normalize/processor.go b/normalize/processor.go new file mode 100644 index 0000000..5cdd3a9 --- /dev/null +++ b/normalize/processor.go @@ -0,0 +1,57 @@ +package normalize + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Processor applies ordered normalization rules to pipeline events. +// +// Selection rule: +// - iterate in Normalizers order +// - the first Normalizer whose Match returns true is applied +// +// If no normalizer matches, the default behavior is pass-through. +type Processor struct { + Normalizers []Normalizer + + // If true, events that do not match any normalizer cause an error. + // Default is false (pass-through). + RequireMatch bool +} + +// NewProcessor constructs a normalization processor from an ordered normalizer list. +func NewProcessor(normalizers []Normalizer, requireMatch bool) Processor { + return Processor{ + Normalizers: append([]Normalizer(nil), normalizers...), + RequireMatch: requireMatch, + } +} + +// Process implements processors.Processor. +func (p Processor) Process(ctx context.Context, in event.Event) (*event.Event, error) { + for _, n := range p.Normalizers { + if n == nil { + continue + } + if !n.Match(in) { + continue + } + + out, err := n.Normalize(ctx, in) + if err != nil { + return nil, fmt.Errorf("normalize: normalizer failed: %w", err) + } + return out, nil + } + + if p.RequireMatch { + return nil, fmt.Errorf("normalize: no normalizer matched event (id=%s kind=%s source=%s schema=%q)", + in.ID, in.Kind, in.Source, in.Schema) + } + + out := in + return &out, nil +} diff --git a/normalize/processor_test.go b/normalize/processor_test.go new file mode 100644 index 0000000..c471bd8 --- /dev/null +++ b/normalize/processor_test.go @@ -0,0 +1,139 @@ +package normalize + +import ( + "context" + "errors" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestProcessorFirstMatchWins(t *testing.T) { + var firstCalls, secondCalls int + + p := NewProcessor([]Normalizer{ + Func{ + MatchFn: func(event.Event) bool { return true }, + NormalizeFn: func(_ context.Context, in event.Event) (*event.Event, error) { + firstCalls++ + out := in + out.Schema = "normalized.first.v1" + return &out, nil + }, + }, + Func{ + MatchFn: func(event.Event) bool { return true }, + NormalizeFn: func(_ context.Context, in event.Event) (*event.Event, error) { + secondCalls++ + out := in + out.Schema = "normalized.second.v1" + return &out, nil + }, + }, + }, false) + + out, err := p.Process(context.Background(), testEvent()) + if err != nil { + t.Fatalf("Process error: %v", err) + } + if out == nil { + t.Fatalf("expected output event, got nil") + } + if out.Schema != "normalized.first.v1" { + t.Fatalf("unexpected schema: %q", out.Schema) + } + if firstCalls != 1 { + t.Fatalf("expected first normalizer called once, got %d", firstCalls) + } + if secondCalls != 0 { + t.Fatalf("expected second normalizer skipped, got %d calls", secondCalls) + } +} + +func TestProcessorNoMatchPassThroughAndRequireMatch(t *testing.T) { + in := testEvent() + in.Schema = "raw.schema.v1" + + passThrough := NewProcessor([]Normalizer{ + Func{ + MatchFn: func(event.Event) bool { return false }, + NormalizeFn: func(_ context.Context, in event.Event) (*event.Event, error) { + out := in + out.Schema = "should.not.run" + return &out, nil + }, + }, + }, false) + + out, err := passThrough.Process(context.Background(), in) + if err != nil { + t.Fatalf("pass-through Process error: %v", err) + } + if out == nil { + t.Fatalf("expected pass-through output event, got nil") + } + if out.Schema != "raw.schema.v1" { + t.Fatalf("expected unchanged schema, got %q", out.Schema) + } + + required := NewProcessor(nil, true) + _, err = required.Process(context.Background(), in) + if err == nil { + t.Fatalf("expected require-match error") + } + if !strings.Contains(err.Error(), "no normalizer matched") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestProcessorDropAndErrorPropagation(t *testing.T) { + t.Run("drop", func(t *testing.T) { + p := NewProcessor([]Normalizer{ + Func{ + MatchFn: func(event.Event) bool { return true }, + NormalizeFn: func(context.Context, event.Event) (*event.Event, error) { + return nil, nil + }, + }, + }, false) + + out, err := p.Process(context.Background(), testEvent()) + if err != nil { + t.Fatalf("Process error: %v", err) + } + if out != nil { + t.Fatalf("expected nil output for dropped event, got %#v", out) + } + }) + + t.Run("error", func(t *testing.T) { + p := NewProcessor([]Normalizer{ + Func{ + MatchFn: func(event.Event) bool { return true }, + NormalizeFn: func(context.Context, event.Event) (*event.Event, error) { + return nil, errors.New("map failed") + }, + }, + }, false) + + _, err := p.Process(context.Background(), testEvent()) + if err == nil { + t.Fatalf("expected error") + } + if !strings.Contains(err.Error(), "normalizer failed") { + t.Fatalf("unexpected error: %v", err) + } + }) +} + +func testEvent() event.Event { + return event.Event{ + ID: "evt-normalize-1", + Kind: event.Kind("observation"), + Source: "source-1", + EmittedAt: time.Now().UTC(), + Payload: map[string]any{"x": 1}, + } +} diff --git a/normalize/registry.go b/normalize/registry.go deleted file mode 100644 index 054159b..0000000 --- a/normalize/registry.go +++ /dev/null @@ -1,140 +0,0 @@ -package normalize - -import ( - "context" - "fmt" - "sync" - - "gitea.maximumdirect.net/ejr/feedkit/event" -) - -// Registry holds a set of Normalizers and selects one for a given event. -// -// Selection rule (simple + predictable): -// - iterate in registration order -// - the FIRST Normalizer whose Match(e) returns true is used -// -// If none match, the event passes through unchanged. -// -// Why "first match wins"? -// Normalization is usually a single mapping step from a raw schema/version into -// a normalized schema/version. If you want multiple transformation steps, -// model them as multiple pipeline processors (which feedkit already supports). -type Registry struct { - mu sync.RWMutex - ns []Normalizer -} - -// Register adds a normalizer to the registry. -// -// Register panics if n is nil; this is a programmer error and should fail fast. -func (r *Registry) Register(n Normalizer) { - if n == nil { - panic("normalize.Registry.Register: normalizer cannot be nil") - } - r.mu.Lock() - defer r.mu.Unlock() - r.ns = append(r.ns, n) -} - -// Normalize finds the first matching Normalizer and applies it. -// -// If no normalizer matches, it returns the input event unchanged. -// -// If a normalizer returns (nil, nil), the event is dropped. -func (r *Registry) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { - if r == nil { - // Nil registry is a valid "feature off" state. - out := in - return &out, nil - } - - r.mu.RLock() - ns := append([]Normalizer(nil), r.ns...) // copy for safe iteration outside lock - r.mu.RUnlock() - - for _, n := range ns { - if n == nil { - // Shouldn't happen (Register panics), but guard anyway. - continue - } - if !n.Match(in) { - continue - } - - out, err := n.Normalize(ctx, in) - if err != nil { - return nil, fmt.Errorf("normalize: normalizer failed: %w", err) - } - // out may be nil to signal "drop". - return out, nil - } - - // No match: pass through unchanged. - out := in - return &out, nil -} - -// Processor adapts a Registry into a pipeline Processor. -// -// It implements: -// -// Process(ctx context.Context, in event.Event) (*event.Event, error) -// -// which matches feedkit/pipeline.Processor. -// -// Optionality: -// - If Registry is nil, Processor becomes a no-op pass-through. -// - If Registry has no matching normalizer for an event, that event passes through unchanged. -type Processor struct { - Registry *Registry - - // If true, events that do not match any normalizer cause an error. - // Default is false (pass-through), which is the behavior you asked for. - RequireMatch bool -} - -// Process implements the pipeline.Processor interface. -func (p Processor) Process(ctx context.Context, in event.Event) (*event.Event, error) { - // "Feature off": no registry means no normalization. - if p.Registry == nil { - out := in - return &out, nil - } - - out, err := p.Registry.Normalize(ctx, in) - if err != nil { - return nil, err - } - - if out == nil { - // Dropped by normalization policy. - return nil, nil - } - - if p.RequireMatch { - // Detect "no-op pass-through due to no match" by checking whether a match existed. - // We do this with a cheap second pass to avoid changing Normalize()'s signature. - // (This is rare to enable; correctness/clarity > micro-optimization.) - if !p.Registry.hasMatch(in) { - return nil, fmt.Errorf("normalize: no normalizer matched event (id=%s kind=%s source=%s schema=%q)", - in.ID, in.Kind, in.Source, in.Schema) - } - } - - return out, nil -} - -func (r *Registry) hasMatch(in event.Event) bool { - if r == nil { - return false - } - r.mu.RLock() - defer r.mu.RUnlock() - for _, n := range r.ns { - if n != nil && n.Match(in) { - return true - } - } - return false -} diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index a35fc51..7684899 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -5,15 +5,11 @@ import ( "fmt" "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/processors" ) -// Processor can mutate/drop events (dedupe, rate-limit, normalization tweaks). -type Processor interface { - Process(ctx context.Context, in event.Event) (out *event.Event, err error) -} - type Pipeline struct { - Processors []Processor + Processors []processors.Processor } func (p *Pipeline) Process(ctx context.Context, e event.Event) (*event.Event, error) { diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go new file mode 100644 index 0000000..d40241c --- /dev/null +++ b/pipeline/pipeline_test.go @@ -0,0 +1,115 @@ +package pipeline + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/processors" +) + +type procFunc func(context.Context, event.Event) (*event.Event, error) + +func (f procFunc) Process(ctx context.Context, in event.Event) (*event.Event, error) { + return f(ctx, in) +} + +func TestPipelineProcessSequentialOrder(t *testing.T) { + var gotOrder []string + + p := &Pipeline{ + Processors: []processors.Processor{ + procFunc(func(_ context.Context, in event.Event) (*event.Event, error) { + gotOrder = append(gotOrder, "first") + out := in + out.Schema = "stage.one.v1" + return &out, nil + }), + procFunc(func(_ context.Context, in event.Event) (*event.Event, error) { + gotOrder = append(gotOrder, "second") + if in.Schema != "stage.one.v1" { + return nil, fmt.Errorf("expected schema from first stage, got %q", in.Schema) + } + out := in + out.Schema = "stage.two.v1" + return &out, nil + }), + }, + } + + out, err := p.Process(context.Background(), validEvent()) + if err != nil { + t.Fatalf("Process error: %v", err) + } + if out == nil { + t.Fatalf("expected output event, got nil") + } + if out.Schema != "stage.two.v1" { + t.Fatalf("unexpected output schema: %q", out.Schema) + } + if strings.Join(gotOrder, ",") != "first,second" { + t.Fatalf("unexpected processor order: %v", gotOrder) + } +} + +func TestPipelineProcessInvalidInput(t *testing.T) { + p := &Pipeline{} + _, err := p.Process(context.Background(), event.Event{}) + if err == nil { + t.Fatalf("expected input validation error") + } + if !strings.Contains(err.Error(), "invalid input event") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestPipelineProcessDrop(t *testing.T) { + p := &Pipeline{ + Processors: []processors.Processor{ + procFunc(func(context.Context, event.Event) (*event.Event, error) { + return nil, nil + }), + }, + } + + out, err := p.Process(context.Background(), validEvent()) + if err != nil { + t.Fatalf("Process error: %v", err) + } + if out != nil { + t.Fatalf("expected nil output for dropped event, got %#v", out) + } +} + +func TestPipelineProcessInvalidOutput(t *testing.T) { + p := &Pipeline{ + Processors: []processors.Processor{ + procFunc(func(_ context.Context, in event.Event) (*event.Event, error) { + out := in + out.Payload = nil + return &out, nil + }), + }, + } + + _, err := p.Process(context.Background(), validEvent()) + if err == nil { + t.Fatalf("expected output validation error") + } + if !strings.Contains(err.Error(), "invalid output event") { + t.Fatalf("unexpected error: %v", err) + } +} + +func validEvent() event.Event { + return event.Event{ + ID: "evt-1", + Kind: event.Kind("observation"), + Source: "source-1", + EmittedAt: time.Now().UTC(), + Payload: map[string]any{"ok": true}, + } +} diff --git a/processors/doc.go b/processors/doc.go new file mode 100644 index 0000000..473a5e2 --- /dev/null +++ b/processors/doc.go @@ -0,0 +1,22 @@ +// Package processors defines feedkit's generic processor abstraction and registry. +// +// Processors are optional pipeline stages that can transform, drop, or reject +// events before dispatch to sinks. +// +// Registry provides name-based construction so daemons can assemble processor +// chains without embedding switch statements in wiring code. +// +// Example: +// +// reg := processors.NewRegistry() +// reg.Register("normalize", func() (processors.Processor, error) { +// return normalize.NewProcessor(myNormalizers, false), nil +// }) +// +// chain, err := reg.BuildChain([]string{"normalize"}) +// if err != nil { +// // handle wiring error +// } +// +// p := &pipeline.Pipeline{Processors: chain} +package processors diff --git a/processors/processor.go b/processors/processor.go new file mode 100644 index 0000000..4ca9d89 --- /dev/null +++ b/processors/processor.go @@ -0,0 +1,15 @@ +package processors + +import ( + "context" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// Processor can mutate/drop events (dedupe, rate-limit, normalization tweaks). +type Processor interface { + Process(ctx context.Context, in event.Event) (out *event.Event, err error) +} + +// Factory constructs a configured Processor instance. +type Factory func() (Processor, error) diff --git a/processors/registry.go b/processors/registry.go new file mode 100644 index 0000000..75f9769 --- /dev/null +++ b/processors/registry.go @@ -0,0 +1,71 @@ +package processors + +import ( + "fmt" + "strings" +) + +type Registry struct { + byDriver map[string]Factory +} + +func NewRegistry() *Registry { + return &Registry{byDriver: map[string]Factory{}} +} + +// Register associates a processor driver name with a factory. +// +// Register panics for empty driver names, nil factories, and duplicates. +func (r *Registry) Register(driver string, f Factory) { + if r == nil { + panic("processors.Registry.Register: registry cannot be nil") + } + driver = strings.TrimSpace(driver) + if driver == "" { + panic("processors.Registry.Register: driver cannot be empty") + } + if f == nil { + panic(fmt.Sprintf("processors.Registry.Register: factory cannot be nil (driver=%q)", driver)) + } + if r.byDriver == nil { + r.byDriver = map[string]Factory{} + } + if _, exists := r.byDriver[driver]; exists { + panic(fmt.Sprintf("processors.Registry.Register: driver %q already registered", driver)) + } + r.byDriver[driver] = f +} + +// Build constructs a Processor by driver name. +func (r *Registry) Build(driver string) (Processor, error) { + if r == nil { + return nil, fmt.Errorf("processors registry is nil") + } + driver = strings.TrimSpace(driver) + f, ok := r.byDriver[driver] + if !ok { + return nil, fmt.Errorf("unknown processor driver: %q", driver) + } + + p, err := f() + if err != nil { + return nil, fmt.Errorf("build processor %q: %w", driver, err) + } + if p == nil { + return nil, fmt.Errorf("build processor %q: factory returned nil processor", driver) + } + return p, nil +} + +// BuildChain constructs an ordered processor chain from a driver list. +func (r *Registry) BuildChain(drivers []string) ([]Processor, error) { + out := make([]Processor, 0, len(drivers)) + for i, driver := range drivers { + p, err := r.Build(driver) + if err != nil { + return nil, fmt.Errorf("build processor chain[%d] (%q): %w", i, strings.TrimSpace(driver), err) + } + out = append(out, p) + } + return out, nil +} diff --git a/processors/registry_test.go b/processors/registry_test.go new file mode 100644 index 0000000..2639695 --- /dev/null +++ b/processors/registry_test.go @@ -0,0 +1,100 @@ +package processors + +import ( + "context" + "errors" + "strings" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type testProcessor struct { + name string +} + +func (p testProcessor) Process(context.Context, event.Event) (*event.Event, error) { + return nil, nil +} + +func TestRegistryRegisterValidation(t *testing.T) { + t.Run("empty driver panics", func(t *testing.T) { + r := NewRegistry() + assertPanics(t, func() { + r.Register(" ", func() (Processor, error) { return testProcessor{name: "x"}, nil }) + }) + }) + + t.Run("nil factory panics", func(t *testing.T) { + r := NewRegistry() + assertPanics(t, func() { + r.Register("normalize", nil) + }) + }) + + t.Run("duplicate driver panics", func(t *testing.T) { + r := NewRegistry() + r.Register("normalize", func() (Processor, error) { return testProcessor{name: "a"}, nil }) + assertPanics(t, func() { + r.Register("normalize", func() (Processor, error) { return testProcessor{name: "b"}, nil }) + }) + }) +} + +func TestRegistryBuildUnknownDriver(t *testing.T) { + r := NewRegistry() + _, err := r.Build("does_not_exist") + if err == nil { + t.Fatalf("expected error for unknown driver") + } + if !strings.Contains(err.Error(), "unknown processor driver") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestRegistryBuildChainPreservesOrder(t *testing.T) { + r := NewRegistry() + r.Register("first", func() (Processor, error) { return testProcessor{name: "first"}, nil }) + r.Register("second", func() (Processor, error) { return testProcessor{name: "second"}, nil }) + + chain, err := r.BuildChain([]string{"first", "second"}) + if err != nil { + t.Fatalf("BuildChain error: %v", err) + } + if len(chain) != 2 { + t.Fatalf("expected 2 processors, got %d", len(chain)) + } + + p0, ok := chain[0].(testProcessor) + if !ok || p0.name != "first" { + t.Fatalf("unexpected chain[0]: %#v", chain[0]) + } + p1, ok := chain[1].(testProcessor) + if !ok || p1.name != "second" { + t.Fatalf("unexpected chain[1]: %#v", chain[1]) + } +} + +func TestRegistryBuildChainIndexedFailure(t *testing.T) { + r := NewRegistry() + r.Register("ok", func() (Processor, error) { return testProcessor{name: "ok"}, nil }) + r.Register("broken", func() (Processor, error) { return nil, errors.New("boom") }) + + _, err := r.BuildChain([]string{"ok", "broken"}) + if err == nil { + t.Fatalf("expected error") + } + if !strings.Contains(err.Error(), "chain[1]") { + t.Fatalf("expected indexed error, got: %v", err) + } +} + +func assertPanics(t *testing.T, fn func()) { + t.Helper() + defer func() { + if recover() == nil { + t.Fatalf("expected panic") + } + }() + fn() +}