From 215afe1acffe693e6be561422e67c017054246cd Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Mon, 16 Mar 2026 18:17:53 -0500 Subject: [PATCH] Added a dedupe processor, and moved processor packages under processors/* --- README.md | 12 +- doc.go | 9 +- pipeline/dedupe.go | 5 - processors/dedupe/doc.go | 28 +++ processors/dedupe/processor.go | 89 ++++++++++ processors/dedupe/processor_test.go | 163 ++++++++++++++++++ processors/doc.go | 4 +- {normalize => processors/normalize}/doc.go | 0 .../normalize}/normalize.go | 0 .../normalize}/processor.go | 0 .../normalize}/processor_test.go | 0 11 files changed, 297 insertions(+), 13 deletions(-) delete mode 100644 pipeline/dedupe.go create mode 100644 processors/dedupe/doc.go create mode 100644 processors/dedupe/processor.go create mode 100644 processors/dedupe/processor_test.go rename {normalize => processors/normalize}/doc.go (100%) rename {normalize => processors/normalize}/normalize.go (100%) rename {normalize => processors/normalize}/processor.go (100%) rename {normalize => processors/normalize}/processor_test.go (100%) diff --git a/README.md b/README.md index 420809b..73a302f 100644 --- a/README.md +++ b/README.md @@ -15,12 +15,12 @@ lifecycle, domain schemas, and domain-specific validation in your daemon. ## Conceptual pipeline -Collect -> Process (optional stages, including normalize) -> Route -> Emit +Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit | Stage | Package(s) | |---|---| | Collect | `sources`, `scheduler` | -| Process | `pipeline`, `processors`, `normalize` (optional stage) | +| Process | `pipeline`, `processors`, `processors/dedupe`, `processors/normalize` (optional stages) | | Route | `dispatch` | | Emit | `sinks` | | Configure | `config` | @@ -86,7 +86,11 @@ Processors can transform, drop, or reject events. Defines the generic processor interface and a named-driver registry used by daemons to build ordered processor chains. -### `normalize` +### `processors/dedupe` + +Built-in in-memory LRU dedupe processor that drops repeated events by `Event.ID`. + +### `processors/normalize` Concrete normalization processor implementation. Typical use: sources emit raw payload events, then a normalize stage maps them to canonical schemas. @@ -100,7 +104,7 @@ Compiles routes and fans out events to sinks with per-sink queue/worker isolatio Defines sink interface and sink registry. Built-ins include: - `stdout` - `nats` -- `postgres` (downstream registers table schema + event mapper; feedkit handles create-if-missing DDL, transactional inserts, and optional prune APIs) +- `postgres` Detailed Postgres configuration and wiring examples live in package docs: `sinks/doc.go`. diff --git a/doc.go b/doc.go index 316fb9f..c2aefac 100644 --- a/doc.go +++ b/doc.go @@ -5,12 +5,12 @@ // // Conceptual flow: // -// Collect -> Process (optional stages, including normalize) -> Route -> Emit +// Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit // // In feedkit this maps to: // // Collect: sources + scheduler -// Process: pipeline + processors + normalize (optional stage) +// Process: pipeline + processors + processors/dedupe + processors/normalize (optional stages) // Route: dispatch // Emit: sinks // Config: config @@ -64,7 +64,10 @@ // - processors // Generic processor interface and named factory registry for wiring chains. // -// - normalize +// - processors/dedupe +// Built-in in-memory LRU dedupe processor keyed by Event.ID. +// +// - processors/normalize // Concrete pipeline processor for raw->canonical mapping. // If no normalizer matches, the event passes through unchanged by default. // diff --git a/pipeline/dedupe.go b/pipeline/dedupe.go deleted file mode 100644 index 219a56a..0000000 --- a/pipeline/dedupe.go +++ /dev/null @@ -1,5 +0,0 @@ -package pipeline - -// Placeholder for dedupe processor: -// - key by Event.ID or computed key -// - in-memory store first; later optional Postgres-backed diff --git a/processors/dedupe/doc.go b/processors/dedupe/doc.go new file mode 100644 index 0000000..25fe473 --- /dev/null +++ b/processors/dedupe/doc.go @@ -0,0 +1,28 @@ +// Package dedupe provides a default in-memory LRU deduplication processor. +// +// The processor keys strictly by event.Event.ID: +// - first-seen IDs pass through +// - repeated IDs are dropped +// +// The in-memory seen-ID set is bounded by a required maxEntries capacity. +// When capacity is exceeded, the least recently used ID is evicted. +// +// Typical registry wiring: +// +// ```go +// reg := processors.NewRegistry() +// reg.Register("dedupe", dedupe.Factory(10_000)) +// +// reg.Register("normalize", func() (processors.Processor, error) { +// return normalize.NewProcessor(myNormalizers, false), nil +// }) +// +// chain, err := reg.BuildChain([]string{"dedupe", "normalize"}) +// +// if err != nil { +// // handle wiring error +// } +// +// p := &pipeline.Pipeline{Processors: chain} +// ``` +package dedupe diff --git a/processors/dedupe/processor.go b/processors/dedupe/processor.go new file mode 100644 index 0000000..a118943 --- /dev/null +++ b/processors/dedupe/processor.go @@ -0,0 +1,89 @@ +package dedupe + +import ( + "container/list" + "context" + "fmt" + "strings" + "sync" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/processors" +) + +// Processor drops duplicate events by Event.ID using an in-memory LRU. +type Processor struct { + maxEntries int + + mu sync.Mutex + order *list.List // most-recent at front, least-recent at back + byID map[string]*list.Element // id -> list element (element.Value is string id) +} + +var _ processors.Processor = (*Processor)(nil) + +// NewProcessor constructs a dedupe processor with a required max entry count. +func NewProcessor(maxEntries int) (*Processor, error) { + if maxEntries <= 0 { + return nil, fmt.Errorf("dedupe: maxEntries must be > 0, got %d", maxEntries) + } + + return &Processor{ + maxEntries: maxEntries, + order: list.New(), + byID: make(map[string]*list.Element, maxEntries), + }, nil +} + +// Factory returns a processors.Factory that constructs Processor instances. +func Factory(maxEntries int) processors.Factory { + return func() (processors.Processor, error) { + return NewProcessor(maxEntries) + } +} + +// Process implements processors.Processor. +func (p *Processor) Process(_ context.Context, in event.Event) (*event.Event, error) { + if p == nil { + return nil, fmt.Errorf("dedupe: processor is nil") + } + if p.maxEntries <= 0 { + return nil, fmt.Errorf("dedupe: processor maxEntries must be > 0") + } + + id := strings.TrimSpace(in.ID) + if id == "" { + return nil, fmt.Errorf("dedupe: event ID is required") + } + + p.mu.Lock() + + if p.order == nil || p.byID == nil { + p.mu.Unlock() + return nil, fmt.Errorf("dedupe: processor is not initialized") + } + + if elem, exists := p.byID[id]; exists { + p.order.MoveToFront(elem) + p.mu.Unlock() + return nil, nil + } + + elem := p.order.PushFront(id) + p.byID[id] = elem + + if p.order.Len() > p.maxEntries { + oldest := p.order.Back() + if oldest != nil { + p.order.Remove(oldest) + if oldestID, ok := oldest.Value.(string); ok { + delete(p.byID, oldestID) + } + } + } + + p.mu.Unlock() + + out := in + return &out, nil +} diff --git a/processors/dedupe/processor_test.go b/processors/dedupe/processor_test.go new file mode 100644 index 0000000..2f792c0 --- /dev/null +++ b/processors/dedupe/processor_test.go @@ -0,0 +1,163 @@ +package dedupe + +import ( + "context" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/feedkit/processors" +) + +func TestNewProcessorValidation(t *testing.T) { + t.Run("rejects non-positive maxEntries", func(t *testing.T) { + for _, maxEntries := range []int{0, -1} { + p, err := NewProcessor(maxEntries) + if err == nil { + t.Fatalf("expected error for maxEntries=%d, got nil", maxEntries) + } + if p != nil { + t.Fatalf("expected nil processor for maxEntries=%d", maxEntries) + } + if !strings.Contains(err.Error(), "maxEntries") { + t.Fatalf("unexpected error: %v", err) + } + } + }) + + t.Run("accepts positive maxEntries", func(t *testing.T) { + p, err := NewProcessor(1) + if err != nil { + t.Fatalf("NewProcessor error: %v", err) + } + if p == nil { + t.Fatalf("expected processor, got nil") + } + }) +} + +func TestProcessorFirstSeenAndDuplicate(t *testing.T) { + p, err := NewProcessor(8) + if err != nil { + t.Fatalf("NewProcessor error: %v", err) + } + + ctx := context.Background() + first := testEvent("evt-1") + + out, err := p.Process(ctx, first) + if err != nil { + t.Fatalf("Process first error: %v", err) + } + if out == nil { + t.Fatalf("expected first event to pass through") + } + if out.ID != first.ID { + t.Fatalf("expected unchanged ID %q, got %q", first.ID, out.ID) + } + + out, err = p.Process(ctx, first) + if err != nil { + t.Fatalf("Process duplicate error: %v", err) + } + if out != nil { + t.Fatalf("expected duplicate to be dropped, got %#v", out) + } + + out, err = p.Process(ctx, testEvent("evt-2")) + if err != nil { + t.Fatalf("Process second unique error: %v", err) + } + if out == nil { + t.Fatalf("expected second unique event to pass through") + } +} + +func TestProcessorLRUEvictionAndPromotion(t *testing.T) { + p, err := NewProcessor(2) + if err != nil { + t.Fatalf("NewProcessor error: %v", err) + } + + ctx := context.Background() + + mustPass(t, p, ctx, "a") + mustPass(t, p, ctx, "b") + mustDrop(t, p, ctx, "a") // promote "a" so "b" becomes least-recently-used + mustPass(t, p, ctx, "c") // evicts "b" + mustDrop(t, p, ctx, "a") // "a" should still be tracked after promotion + mustPass(t, p, ctx, "b") // "b" was evicted, so now it passes again +} + +func TestProcessorRejectsBlankID(t *testing.T) { + p, err := NewProcessor(4) + if err != nil { + t.Fatalf("NewProcessor error: %v", err) + } + + in := testEvent(" ") + out, err := p.Process(context.Background(), in) + if err == nil { + t.Fatalf("expected error for blank ID") + } + if out != nil { + t.Fatalf("expected nil output on error, got %#v", out) + } + if !strings.Contains(err.Error(), "event ID is required") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestFactoryWithRegistry(t *testing.T) { + r := processors.NewRegistry() + r.Register("dedupe", Factory(3)) + + p, err := r.Build("dedupe") + if err != nil { + t.Fatalf("Build error: %v", err) + } + if p == nil { + t.Fatalf("expected processor, got nil") + } + + out, err := p.Process(context.Background(), testEvent("evt-factory-1")) + if err != nil { + t.Fatalf("Process error: %v", err) + } + if out == nil { + t.Fatalf("expected first event to pass through") + } +} + +func mustPass(t *testing.T, p *Processor, ctx context.Context, id string) { + t.Helper() + out, err := p.Process(ctx, testEvent(id)) + if err != nil { + t.Fatalf("expected pass for id=%q, got error: %v", id, err) + } + if out == nil { + t.Fatalf("expected pass for id=%q, got drop", id) + } +} + +func mustDrop(t *testing.T, p *Processor, ctx context.Context, id string) { + t.Helper() + out, err := p.Process(ctx, testEvent(id)) + if err != nil { + t.Fatalf("expected drop for id=%q, got error: %v", id, err) + } + if out != nil { + t.Fatalf("expected drop for id=%q, got output", id) + } +} + +func testEvent(id string) event.Event { + return event.Event{ + ID: id, + Kind: event.Kind("observation"), + Source: "source-1", + EmittedAt: time.Now().UTC(), + Payload: map[string]any{"ok": true}, + } +} diff --git a/processors/doc.go b/processors/doc.go index 473a5e2..f378c4d 100644 --- a/processors/doc.go +++ b/processors/doc.go @@ -9,11 +9,13 @@ // Example: // // reg := processors.NewRegistry() +// reg.Register("dedupe", dedupe.Factory(10_000)) // reg.Register("normalize", func() (processors.Processor, error) { +// // import "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" // return normalize.NewProcessor(myNormalizers, false), nil // }) // -// chain, err := reg.BuildChain([]string{"normalize"}) +// chain, err := reg.BuildChain([]string{"dedupe", "normalize"}) // if err != nil { // // handle wiring error // } diff --git a/normalize/doc.go b/processors/normalize/doc.go similarity index 100% rename from normalize/doc.go rename to processors/normalize/doc.go diff --git a/normalize/normalize.go b/processors/normalize/normalize.go similarity index 100% rename from normalize/normalize.go rename to processors/normalize/normalize.go diff --git a/normalize/processor.go b/processors/normalize/processor.go similarity index 100% rename from normalize/processor.go rename to processors/normalize/processor.go diff --git a/normalize/processor_test.go b/processors/normalize/processor_test.go similarity index 100% rename from normalize/processor_test.go rename to processors/normalize/processor_test.go