Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 215afe1acf |
12
README.md
12
README.md
@@ -15,12 +15,12 @@ lifecycle, domain schemas, and domain-specific validation in your daemon.
|
|||||||
|
|
||||||
## Conceptual pipeline
|
## Conceptual pipeline
|
||||||
|
|
||||||
Collect -> Process (optional stages, including normalize) -> Route -> Emit
|
Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit
|
||||||
|
|
||||||
| Stage | Package(s) |
|
| Stage | Package(s) |
|
||||||
|---|---|
|
|---|---|
|
||||||
| Collect | `sources`, `scheduler` |
|
| Collect | `sources`, `scheduler` |
|
||||||
| Process | `pipeline`, `processors`, `normalize` (optional stage) |
|
| Process | `pipeline`, `processors`, `processors/dedupe`, `processors/normalize` (optional stages) |
|
||||||
| Route | `dispatch` |
|
| Route | `dispatch` |
|
||||||
| Emit | `sinks` |
|
| Emit | `sinks` |
|
||||||
| Configure | `config` |
|
| Configure | `config` |
|
||||||
@@ -86,7 +86,11 @@ Processors can transform, drop, or reject events.
|
|||||||
Defines the generic processor interface and a named-driver registry used by
|
Defines the generic processor interface and a named-driver registry used by
|
||||||
daemons to build ordered processor chains.
|
daemons to build ordered processor chains.
|
||||||
|
|
||||||
### `normalize`
|
### `processors/dedupe`
|
||||||
|
|
||||||
|
Built-in in-memory LRU dedupe processor that drops repeated events by `Event.ID`.
|
||||||
|
|
||||||
|
### `processors/normalize`
|
||||||
|
|
||||||
Concrete normalization processor implementation. Typical use: sources emit raw
|
Concrete normalization processor implementation. Typical use: sources emit raw
|
||||||
payload events, then a normalize stage maps them to canonical schemas.
|
payload events, then a normalize stage maps them to canonical schemas.
|
||||||
@@ -100,7 +104,7 @@ Compiles routes and fans out events to sinks with per-sink queue/worker isolatio
|
|||||||
Defines sink interface and sink registry. Built-ins include:
|
Defines sink interface and sink registry. Built-ins include:
|
||||||
- `stdout`
|
- `stdout`
|
||||||
- `nats`
|
- `nats`
|
||||||
- `postgres` (downstream registers table schema + event mapper; feedkit handles create-if-missing DDL, transactional inserts, and optional prune APIs)
|
- `postgres`
|
||||||
|
|
||||||
Detailed Postgres configuration and wiring examples live in package docs:
|
Detailed Postgres configuration and wiring examples live in package docs:
|
||||||
`sinks/doc.go`.
|
`sinks/doc.go`.
|
||||||
|
|||||||
9
doc.go
9
doc.go
@@ -5,12 +5,12 @@
|
|||||||
//
|
//
|
||||||
// Conceptual flow:
|
// Conceptual flow:
|
||||||
//
|
//
|
||||||
// Collect -> Process (optional stages, including normalize) -> Route -> Emit
|
// Collect -> Process (optional stages, including dedupe + normalize) -> Route -> Emit
|
||||||
//
|
//
|
||||||
// In feedkit this maps to:
|
// In feedkit this maps to:
|
||||||
//
|
//
|
||||||
// Collect: sources + scheduler
|
// Collect: sources + scheduler
|
||||||
// Process: pipeline + processors + normalize (optional stage)
|
// Process: pipeline + processors + processors/dedupe + processors/normalize (optional stages)
|
||||||
// Route: dispatch
|
// Route: dispatch
|
||||||
// Emit: sinks
|
// Emit: sinks
|
||||||
// Config: config
|
// Config: config
|
||||||
@@ -64,7 +64,10 @@
|
|||||||
// - processors
|
// - processors
|
||||||
// Generic processor interface and named factory registry for wiring chains.
|
// Generic processor interface and named factory registry for wiring chains.
|
||||||
//
|
//
|
||||||
// - normalize
|
// - processors/dedupe
|
||||||
|
// Built-in in-memory LRU dedupe processor keyed by Event.ID.
|
||||||
|
//
|
||||||
|
// - processors/normalize
|
||||||
// Concrete pipeline processor for raw->canonical mapping.
|
// Concrete pipeline processor for raw->canonical mapping.
|
||||||
// If no normalizer matches, the event passes through unchanged by default.
|
// If no normalizer matches, the event passes through unchanged by default.
|
||||||
//
|
//
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
package pipeline
|
|
||||||
|
|
||||||
// Placeholder for dedupe processor:
|
|
||||||
// - key by Event.ID or computed key
|
|
||||||
// - in-memory store first; later optional Postgres-backed
|
|
||||||
28
processors/dedupe/doc.go
Normal file
28
processors/dedupe/doc.go
Normal file
@@ -0,0 +1,28 @@
|
|||||||
|
// Package dedupe provides a default in-memory LRU deduplication processor.
|
||||||
|
//
|
||||||
|
// The processor keys strictly by event.Event.ID:
|
||||||
|
// - first-seen IDs pass through
|
||||||
|
// - repeated IDs are dropped
|
||||||
|
//
|
||||||
|
// The in-memory seen-ID set is bounded by a required maxEntries capacity.
|
||||||
|
// When capacity is exceeded, the least recently used ID is evicted.
|
||||||
|
//
|
||||||
|
// Typical registry wiring:
|
||||||
|
//
|
||||||
|
// ```go
|
||||||
|
// reg := processors.NewRegistry()
|
||||||
|
// reg.Register("dedupe", dedupe.Factory(10_000))
|
||||||
|
//
|
||||||
|
// reg.Register("normalize", func() (processors.Processor, error) {
|
||||||
|
// return normalize.NewProcessor(myNormalizers, false), nil
|
||||||
|
// })
|
||||||
|
//
|
||||||
|
// chain, err := reg.BuildChain([]string{"dedupe", "normalize"})
|
||||||
|
//
|
||||||
|
// if err != nil {
|
||||||
|
// // handle wiring error
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// p := &pipeline.Pipeline{Processors: chain}
|
||||||
|
// ```
|
||||||
|
package dedupe
|
||||||
89
processors/dedupe/processor.go
Normal file
89
processors/dedupe/processor.go
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
package dedupe
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Processor drops duplicate events by Event.ID using an in-memory LRU.
|
||||||
|
type Processor struct {
|
||||||
|
maxEntries int
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
order *list.List // most-recent at front, least-recent at back
|
||||||
|
byID map[string]*list.Element // id -> list element (element.Value is string id)
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ processors.Processor = (*Processor)(nil)
|
||||||
|
|
||||||
|
// NewProcessor constructs a dedupe processor with a required max entry count.
|
||||||
|
func NewProcessor(maxEntries int) (*Processor, error) {
|
||||||
|
if maxEntries <= 0 {
|
||||||
|
return nil, fmt.Errorf("dedupe: maxEntries must be > 0, got %d", maxEntries)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Processor{
|
||||||
|
maxEntries: maxEntries,
|
||||||
|
order: list.New(),
|
||||||
|
byID: make(map[string]*list.Element, maxEntries),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Factory returns a processors.Factory that constructs Processor instances.
|
||||||
|
func Factory(maxEntries int) processors.Factory {
|
||||||
|
return func() (processors.Processor, error) {
|
||||||
|
return NewProcessor(maxEntries)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process implements processors.Processor.
|
||||||
|
func (p *Processor) Process(_ context.Context, in event.Event) (*event.Event, error) {
|
||||||
|
if p == nil {
|
||||||
|
return nil, fmt.Errorf("dedupe: processor is nil")
|
||||||
|
}
|
||||||
|
if p.maxEntries <= 0 {
|
||||||
|
return nil, fmt.Errorf("dedupe: processor maxEntries must be > 0")
|
||||||
|
}
|
||||||
|
|
||||||
|
id := strings.TrimSpace(in.ID)
|
||||||
|
if id == "" {
|
||||||
|
return nil, fmt.Errorf("dedupe: event ID is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
p.mu.Lock()
|
||||||
|
|
||||||
|
if p.order == nil || p.byID == nil {
|
||||||
|
p.mu.Unlock()
|
||||||
|
return nil, fmt.Errorf("dedupe: processor is not initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
if elem, exists := p.byID[id]; exists {
|
||||||
|
p.order.MoveToFront(elem)
|
||||||
|
p.mu.Unlock()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
elem := p.order.PushFront(id)
|
||||||
|
p.byID[id] = elem
|
||||||
|
|
||||||
|
if p.order.Len() > p.maxEntries {
|
||||||
|
oldest := p.order.Back()
|
||||||
|
if oldest != nil {
|
||||||
|
p.order.Remove(oldest)
|
||||||
|
if oldestID, ok := oldest.Value.(string); ok {
|
||||||
|
delete(p.byID, oldestID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.mu.Unlock()
|
||||||
|
|
||||||
|
out := in
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
163
processors/dedupe/processor_test.go
Normal file
163
processors/dedupe/processor_test.go
Normal file
@@ -0,0 +1,163 @@
|
|||||||
|
package dedupe
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestNewProcessorValidation(t *testing.T) {
|
||||||
|
t.Run("rejects non-positive maxEntries", func(t *testing.T) {
|
||||||
|
for _, maxEntries := range []int{0, -1} {
|
||||||
|
p, err := NewProcessor(maxEntries)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error for maxEntries=%d, got nil", maxEntries)
|
||||||
|
}
|
||||||
|
if p != nil {
|
||||||
|
t.Fatalf("expected nil processor for maxEntries=%d", maxEntries)
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "maxEntries") {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("accepts positive maxEntries", func(t *testing.T) {
|
||||||
|
p, err := NewProcessor(1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewProcessor error: %v", err)
|
||||||
|
}
|
||||||
|
if p == nil {
|
||||||
|
t.Fatalf("expected processor, got nil")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessorFirstSeenAndDuplicate(t *testing.T) {
|
||||||
|
p, err := NewProcessor(8)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewProcessor error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
first := testEvent("evt-1")
|
||||||
|
|
||||||
|
out, err := p.Process(ctx, first)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Process first error: %v", err)
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
t.Fatalf("expected first event to pass through")
|
||||||
|
}
|
||||||
|
if out.ID != first.ID {
|
||||||
|
t.Fatalf("expected unchanged ID %q, got %q", first.ID, out.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err = p.Process(ctx, first)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Process duplicate error: %v", err)
|
||||||
|
}
|
||||||
|
if out != nil {
|
||||||
|
t.Fatalf("expected duplicate to be dropped, got %#v", out)
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err = p.Process(ctx, testEvent("evt-2"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Process second unique error: %v", err)
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
t.Fatalf("expected second unique event to pass through")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessorLRUEvictionAndPromotion(t *testing.T) {
|
||||||
|
p, err := NewProcessor(2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewProcessor error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
mustPass(t, p, ctx, "a")
|
||||||
|
mustPass(t, p, ctx, "b")
|
||||||
|
mustDrop(t, p, ctx, "a") // promote "a" so "b" becomes least-recently-used
|
||||||
|
mustPass(t, p, ctx, "c") // evicts "b"
|
||||||
|
mustDrop(t, p, ctx, "a") // "a" should still be tracked after promotion
|
||||||
|
mustPass(t, p, ctx, "b") // "b" was evicted, so now it passes again
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProcessorRejectsBlankID(t *testing.T) {
|
||||||
|
p, err := NewProcessor(4)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("NewProcessor error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
in := testEvent(" ")
|
||||||
|
out, err := p.Process(context.Background(), in)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("expected error for blank ID")
|
||||||
|
}
|
||||||
|
if out != nil {
|
||||||
|
t.Fatalf("expected nil output on error, got %#v", out)
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "event ID is required") {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFactoryWithRegistry(t *testing.T) {
|
||||||
|
r := processors.NewRegistry()
|
||||||
|
r.Register("dedupe", Factory(3))
|
||||||
|
|
||||||
|
p, err := r.Build("dedupe")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Build error: %v", err)
|
||||||
|
}
|
||||||
|
if p == nil {
|
||||||
|
t.Fatalf("expected processor, got nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := p.Process(context.Background(), testEvent("evt-factory-1"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Process error: %v", err)
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
t.Fatalf("expected first event to pass through")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustPass(t *testing.T, p *Processor, ctx context.Context, id string) {
|
||||||
|
t.Helper()
|
||||||
|
out, err := p.Process(ctx, testEvent(id))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected pass for id=%q, got error: %v", id, err)
|
||||||
|
}
|
||||||
|
if out == nil {
|
||||||
|
t.Fatalf("expected pass for id=%q, got drop", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func mustDrop(t *testing.T, p *Processor, ctx context.Context, id string) {
|
||||||
|
t.Helper()
|
||||||
|
out, err := p.Process(ctx, testEvent(id))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("expected drop for id=%q, got error: %v", id, err)
|
||||||
|
}
|
||||||
|
if out != nil {
|
||||||
|
t.Fatalf("expected drop for id=%q, got output", id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testEvent(id string) event.Event {
|
||||||
|
return event.Event{
|
||||||
|
ID: id,
|
||||||
|
Kind: event.Kind("observation"),
|
||||||
|
Source: "source-1",
|
||||||
|
EmittedAt: time.Now().UTC(),
|
||||||
|
Payload: map[string]any{"ok": true},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,11 +9,13 @@
|
|||||||
// Example:
|
// Example:
|
||||||
//
|
//
|
||||||
// reg := processors.NewRegistry()
|
// reg := processors.NewRegistry()
|
||||||
|
// reg.Register("dedupe", dedupe.Factory(10_000))
|
||||||
// reg.Register("normalize", func() (processors.Processor, error) {
|
// reg.Register("normalize", func() (processors.Processor, error) {
|
||||||
|
// // import "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||||
// return normalize.NewProcessor(myNormalizers, false), nil
|
// return normalize.NewProcessor(myNormalizers, false), nil
|
||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
// chain, err := reg.BuildChain([]string{"normalize"})
|
// chain, err := reg.BuildChain([]string{"dedupe", "normalize"})
|
||||||
// if err != nil {
|
// if err != nil {
|
||||||
// // handle wiring error
|
// // handle wiring error
|
||||||
// }
|
// }
|
||||||
|
|||||||
Reference in New Issue
Block a user