package pipeline import ( "context" "fmt" "strings" "testing" "time" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/processors" ) type procFunc func(context.Context, event.Event) (*event.Event, error) func (f procFunc) Process(ctx context.Context, in event.Event) (*event.Event, error) { return f(ctx, in) } func TestPipelineProcessSequentialOrder(t *testing.T) { var gotOrder []string p := &Pipeline{ Processors: []processors.Processor{ procFunc(func(_ context.Context, in event.Event) (*event.Event, error) { gotOrder = append(gotOrder, "first") out := in out.Schema = "stage.one.v1" return &out, nil }), procFunc(func(_ context.Context, in event.Event) (*event.Event, error) { gotOrder = append(gotOrder, "second") if in.Schema != "stage.one.v1" { return nil, fmt.Errorf("expected schema from first stage, got %q", in.Schema) } out := in out.Schema = "stage.two.v1" return &out, nil }), }, } out, err := p.Process(context.Background(), validEvent()) if err != nil { t.Fatalf("Process error: %v", err) } if out == nil { t.Fatalf("expected output event, got nil") } if out.Schema != "stage.two.v1" { t.Fatalf("unexpected output schema: %q", out.Schema) } if strings.Join(gotOrder, ",") != "first,second" { t.Fatalf("unexpected processor order: %v", gotOrder) } } func TestPipelineProcessInvalidInput(t *testing.T) { p := &Pipeline{} _, err := p.Process(context.Background(), event.Event{}) if err == nil { t.Fatalf("expected input validation error") } if !strings.Contains(err.Error(), "invalid input event") { t.Fatalf("unexpected error: %v", err) } } func TestPipelineProcessDrop(t *testing.T) { p := &Pipeline{ Processors: []processors.Processor{ procFunc(func(context.Context, event.Event) (*event.Event, error) { return nil, nil }), }, } out, err := p.Process(context.Background(), validEvent()) if err != nil { t.Fatalf("Process error: %v", err) } if out != nil { t.Fatalf("expected nil output for dropped event, got %#v", out) } } func TestPipelineProcessInvalidOutput(t *testing.T) { p := &Pipeline{ Processors: []processors.Processor{ procFunc(func(_ context.Context, in event.Event) (*event.Event, error) { out := in out.Payload = nil return &out, nil }), }, } _, err := p.Process(context.Background(), validEvent()) if err == nil { t.Fatalf("expected output validation error") } if !strings.Contains(err.Error(), "invalid output event") { t.Fatalf("unexpected error: %v", err) } } func validEvent() event.Event { return event.Event{ ID: "evt-1", Kind: event.Kind("observation"), Source: "source-1", EmittedAt: time.Now().UTC(), Payload: map[string]any{"ok": true}, } }