Added a dedupe processor, and moved processor packages under processors/*

This commit is contained in:
2026-03-16 18:17:53 -05:00
parent 4572c53580
commit 215afe1acf
11 changed files with 297 additions and 13 deletions

28
processors/dedupe/doc.go Normal file
View File

@@ -0,0 +1,28 @@
// Package dedupe provides a default in-memory LRU deduplication processor.
//
// The processor keys strictly by event.Event.ID:
// - first-seen IDs pass through
// - repeated IDs are dropped
//
// The in-memory seen-ID set is bounded by a required maxEntries capacity.
// When capacity is exceeded, the least recently used ID is evicted.
//
// Typical registry wiring:
//
// ```go
// reg := processors.NewRegistry()
// reg.Register("dedupe", dedupe.Factory(10_000))
//
// reg.Register("normalize", func() (processors.Processor, error) {
// return normalize.NewProcessor(myNormalizers, false), nil
// })
//
// chain, err := reg.BuildChain([]string{"dedupe", "normalize"})
//
// if err != nil {
// // handle wiring error
// }
//
// p := &pipeline.Pipeline{Processors: chain}
// ```
package dedupe

View File

@@ -0,0 +1,89 @@
package dedupe
import (
"container/list"
"context"
"fmt"
"strings"
"sync"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/feedkit/processors"
)
// Processor drops duplicate events by Event.ID using an in-memory LRU.
type Processor struct {
maxEntries int
mu sync.Mutex
order *list.List // most-recent at front, least-recent at back
byID map[string]*list.Element // id -> list element (element.Value is string id)
}
var _ processors.Processor = (*Processor)(nil)
// NewProcessor constructs a dedupe processor with a required max entry count.
func NewProcessor(maxEntries int) (*Processor, error) {
if maxEntries <= 0 {
return nil, fmt.Errorf("dedupe: maxEntries must be > 0, got %d", maxEntries)
}
return &Processor{
maxEntries: maxEntries,
order: list.New(),
byID: make(map[string]*list.Element, maxEntries),
}, nil
}
// Factory returns a processors.Factory that constructs Processor instances.
func Factory(maxEntries int) processors.Factory {
return func() (processors.Processor, error) {
return NewProcessor(maxEntries)
}
}
// Process implements processors.Processor.
func (p *Processor) Process(_ context.Context, in event.Event) (*event.Event, error) {
if p == nil {
return nil, fmt.Errorf("dedupe: processor is nil")
}
if p.maxEntries <= 0 {
return nil, fmt.Errorf("dedupe: processor maxEntries must be > 0")
}
id := strings.TrimSpace(in.ID)
if id == "" {
return nil, fmt.Errorf("dedupe: event ID is required")
}
p.mu.Lock()
if p.order == nil || p.byID == nil {
p.mu.Unlock()
return nil, fmt.Errorf("dedupe: processor is not initialized")
}
if elem, exists := p.byID[id]; exists {
p.order.MoveToFront(elem)
p.mu.Unlock()
return nil, nil
}
elem := p.order.PushFront(id)
p.byID[id] = elem
if p.order.Len() > p.maxEntries {
oldest := p.order.Back()
if oldest != nil {
p.order.Remove(oldest)
if oldestID, ok := oldest.Value.(string); ok {
delete(p.byID, oldestID)
}
}
}
p.mu.Unlock()
out := in
return &out, nil
}

View File

@@ -0,0 +1,163 @@
package dedupe
import (
"context"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/feedkit/processors"
)
func TestNewProcessorValidation(t *testing.T) {
t.Run("rejects non-positive maxEntries", func(t *testing.T) {
for _, maxEntries := range []int{0, -1} {
p, err := NewProcessor(maxEntries)
if err == nil {
t.Fatalf("expected error for maxEntries=%d, got nil", maxEntries)
}
if p != nil {
t.Fatalf("expected nil processor for maxEntries=%d", maxEntries)
}
if !strings.Contains(err.Error(), "maxEntries") {
t.Fatalf("unexpected error: %v", err)
}
}
})
t.Run("accepts positive maxEntries", func(t *testing.T) {
p, err := NewProcessor(1)
if err != nil {
t.Fatalf("NewProcessor error: %v", err)
}
if p == nil {
t.Fatalf("expected processor, got nil")
}
})
}
func TestProcessorFirstSeenAndDuplicate(t *testing.T) {
p, err := NewProcessor(8)
if err != nil {
t.Fatalf("NewProcessor error: %v", err)
}
ctx := context.Background()
first := testEvent("evt-1")
out, err := p.Process(ctx, first)
if err != nil {
t.Fatalf("Process first error: %v", err)
}
if out == nil {
t.Fatalf("expected first event to pass through")
}
if out.ID != first.ID {
t.Fatalf("expected unchanged ID %q, got %q", first.ID, out.ID)
}
out, err = p.Process(ctx, first)
if err != nil {
t.Fatalf("Process duplicate error: %v", err)
}
if out != nil {
t.Fatalf("expected duplicate to be dropped, got %#v", out)
}
out, err = p.Process(ctx, testEvent("evt-2"))
if err != nil {
t.Fatalf("Process second unique error: %v", err)
}
if out == nil {
t.Fatalf("expected second unique event to pass through")
}
}
func TestProcessorLRUEvictionAndPromotion(t *testing.T) {
p, err := NewProcessor(2)
if err != nil {
t.Fatalf("NewProcessor error: %v", err)
}
ctx := context.Background()
mustPass(t, p, ctx, "a")
mustPass(t, p, ctx, "b")
mustDrop(t, p, ctx, "a") // promote "a" so "b" becomes least-recently-used
mustPass(t, p, ctx, "c") // evicts "b"
mustDrop(t, p, ctx, "a") // "a" should still be tracked after promotion
mustPass(t, p, ctx, "b") // "b" was evicted, so now it passes again
}
func TestProcessorRejectsBlankID(t *testing.T) {
p, err := NewProcessor(4)
if err != nil {
t.Fatalf("NewProcessor error: %v", err)
}
in := testEvent(" ")
out, err := p.Process(context.Background(), in)
if err == nil {
t.Fatalf("expected error for blank ID")
}
if out != nil {
t.Fatalf("expected nil output on error, got %#v", out)
}
if !strings.Contains(err.Error(), "event ID is required") {
t.Fatalf("unexpected error: %v", err)
}
}
func TestFactoryWithRegistry(t *testing.T) {
r := processors.NewRegistry()
r.Register("dedupe", Factory(3))
p, err := r.Build("dedupe")
if err != nil {
t.Fatalf("Build error: %v", err)
}
if p == nil {
t.Fatalf("expected processor, got nil")
}
out, err := p.Process(context.Background(), testEvent("evt-factory-1"))
if err != nil {
t.Fatalf("Process error: %v", err)
}
if out == nil {
t.Fatalf("expected first event to pass through")
}
}
func mustPass(t *testing.T, p *Processor, ctx context.Context, id string) {
t.Helper()
out, err := p.Process(ctx, testEvent(id))
if err != nil {
t.Fatalf("expected pass for id=%q, got error: %v", id, err)
}
if out == nil {
t.Fatalf("expected pass for id=%q, got drop", id)
}
}
func mustDrop(t *testing.T, p *Processor, ctx context.Context, id string) {
t.Helper()
out, err := p.Process(ctx, testEvent(id))
if err != nil {
t.Fatalf("expected drop for id=%q, got error: %v", id, err)
}
if out != nil {
t.Fatalf("expected drop for id=%q, got output", id)
}
}
func testEvent(id string) event.Event {
return event.Event{
ID: id,
Kind: event.Kind("observation"),
Source: "source-1",
EmittedAt: time.Now().UTC(),
Payload: map[string]any{"ok": true},
}
}