diff --git a/processors/normalize/helpers.go b/processors/normalize/helpers.go new file mode 100644 index 0000000..3a43204 --- /dev/null +++ b/processors/normalize/helpers.go @@ -0,0 +1,84 @@ +package normalize + +import ( + "encoding/json" + "fmt" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// PayloadJSONBytes extracts a JSON payload into bytes suitable for json.Unmarshal. +// +// Supported payload shapes: +// - json.RawMessage +// - []byte +// - string +// - map[string]any +func PayloadJSONBytes(e event.Event) ([]byte, error) { + if e.Payload == nil { + return nil, fmt.Errorf("payload is nil") + } + + switch v := e.Payload.(type) { + case json.RawMessage: + if len(v) == 0 { + return nil, fmt.Errorf("payload is empty json.RawMessage") + } + return []byte(v), nil + case []byte: + if len(v) == 0 { + return nil, fmt.Errorf("payload is empty []byte") + } + return v, nil + case string: + if v == "" { + return nil, fmt.Errorf("payload is empty string") + } + return []byte(v), nil + case map[string]any: + b, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("marshal map payload: %w", err) + } + return b, nil + default: + return nil, fmt.Errorf("unsupported payload type %T", e.Payload) + } +} + +// DecodeJSONPayload extracts the event payload as bytes and unmarshals it into T. +func DecodeJSONPayload[T any](in event.Event) (T, error) { + var zero T + + b, err := PayloadJSONBytes(in) + if err != nil { + return zero, fmt.Errorf("extract payload: %w", err) + } + + var parsed T + if err := json.Unmarshal(b, &parsed); err != nil { + return zero, fmt.Errorf("decode raw payload: %w", err) + } + + return parsed, nil +} + +// FinalizeEvent builds the output event envelope by copying the input and applying +// the new schema/payload, plus optional EffectiveAt. +func FinalizeEvent(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) { + out := in + out.Schema = outSchema + out.Payload = outPayload + + if !effectiveAt.IsZero() { + t := effectiveAt.UTC() + out.EffectiveAt = &t + } + + if err := out.Validate(); err != nil { + return nil, err + } + + return &out, nil +} diff --git a/processors/normalize/helpers_test.go b/processors/normalize/helpers_test.go new file mode 100644 index 0000000..c01e7c3 --- /dev/null +++ b/processors/normalize/helpers_test.go @@ -0,0 +1,118 @@ +package normalize + +import ( + "encoding/json" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestPayloadJSONBytesSupportedShapes(t *testing.T) { + cases := []struct { + name string + payload any + want string + }{ + {name: "rawmessage", payload: json.RawMessage(`{"a":1}`), want: `{"a":1}`}, + {name: "bytes", payload: []byte(`{"a":2}`), want: `{"a":2}`}, + {name: "string", payload: `{"a":3}`, want: `{"a":3}`}, + {name: "map", payload: map[string]any{"a": 4}, want: `{"a":4}`}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got, err := PayloadJSONBytes(event.Event{Payload: tc.payload}) + if err != nil { + t.Fatalf("PayloadJSONBytes() unexpected error: %v", err) + } + if string(got) != tc.want { + t.Fatalf("PayloadJSONBytes() = %s, want %s", string(got), tc.want) + } + }) + } +} + +func TestPayloadJSONBytesRejectsInvalidPayloads(t *testing.T) { + cases := []struct { + name string + payload any + want string + }{ + {name: "nil", payload: nil, want: "payload is nil"}, + {name: "empty rawmessage", payload: json.RawMessage{}, want: "payload is empty json.RawMessage"}, + {name: "empty bytes", payload: []byte{}, want: "payload is empty []byte"}, + {name: "empty string", payload: "", want: "payload is empty string"}, + {name: "unsupported", payload: 123, want: "unsupported payload type"}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + _, err := PayloadJSONBytes(event.Event{Payload: tc.payload}) + if err == nil { + t.Fatalf("PayloadJSONBytes() expected error") + } + if !strings.Contains(err.Error(), tc.want) { + t.Fatalf("PayloadJSONBytes() error = %q, want substring %q", err, tc.want) + } + }) + } +} + +func TestDecodeJSONPayload(t *testing.T) { + type payload struct { + Name string `json:"name"` + } + + got, err := DecodeJSONPayload[payload](event.Event{ + Payload: json.RawMessage(`{"name":"alice"}`), + }) + if err != nil { + t.Fatalf("DecodeJSONPayload() unexpected error: %v", err) + } + if got.Name != "alice" { + t.Fatalf("DecodeJSONPayload() = %#v, want name alice", got) + } +} + +func TestFinalizeEventPreservesEnvelopeAndEffectiveAtBehavior(t *testing.T) { + existingEffectiveAt := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC) + in := event.Event{ + ID: "evt-1", + Kind: event.Kind("observation"), + Source: "source-a", + EmittedAt: time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC), + EffectiveAt: &existingEffectiveAt, + Schema: "raw.example.v1", + Payload: map[string]any{"old": true}, + } + + out, err := FinalizeEvent(in, "example.v1", map[string]any{"value": 1.234567}, time.Time{}) + if err != nil { + t.Fatalf("FinalizeEvent() unexpected error: %v", err) + } + if out.ID != in.ID || out.Kind != in.Kind || out.Source != in.Source || out.EmittedAt != in.EmittedAt { + t.Fatalf("FinalizeEvent() changed preserved envelope fields: %#v", out) + } + if out.EffectiveAt == nil || !out.EffectiveAt.Equal(existingEffectiveAt) { + t.Fatalf("FinalizeEvent() effectiveAt = %#v, want preserved existing value", out.EffectiveAt) + } + + nextEffectiveAt := time.Date(2026, 3, 28, 13, 0, 0, 0, time.FixedZone("x", -4*3600)) + out, err = FinalizeEvent(in, "example.v1", map[string]any{"value": 1.234567}, nextEffectiveAt) + if err != nil { + t.Fatalf("FinalizeEvent() unexpected overwrite error: %v", err) + } + if out.EffectiveAt == nil || !out.EffectiveAt.Equal(nextEffectiveAt.UTC()) { + t.Fatalf("FinalizeEvent() effectiveAt = %#v, want %s", out.EffectiveAt, nextEffectiveAt.UTC()) + } + + payloadMap, ok := out.Payload.(map[string]any) + if !ok { + t.Fatalf("FinalizeEvent() payload type = %T, want map[string]any", out.Payload) + } + if payloadMap["value"] != 1.234567 { + t.Fatalf("FinalizeEvent() payload value = %#v, want unrounded 1.234567", payloadMap["value"]) + } +} diff --git a/sinks/helpers.go b/sinks/helpers.go new file mode 100644 index 0000000..3188f94 --- /dev/null +++ b/sinks/helpers.go @@ -0,0 +1,27 @@ +package sinks + +import ( + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +// RegisterPostgresSchemaForConfiguredSinks registers one Postgres schema for each +// configured sink using driver=postgres. +func RegisterPostgresSchemaForConfiguredSinks(cfg *config.Config, schema PostgresSchema) error { + if cfg == nil { + return fmt.Errorf("register postgres schemas: config is nil") + } + + for i, sk := range cfg.Sinks { + if !strings.EqualFold(strings.TrimSpace(sk.Driver), "postgres") { + continue + } + if err := RegisterPostgresSchema(sk.Name, schema); err != nil { + return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err) + } + } + + return nil +} diff --git a/sinks/helpers_test.go b/sinks/helpers_test.go new file mode 100644 index 0000000..8e318bd --- /dev/null +++ b/sinks/helpers_test.go @@ -0,0 +1,86 @@ +package sinks + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestRegisterPostgresSchemaForConfiguredSinksNilConfig(t *testing.T) { + err := RegisterPostgresSchemaForConfiguredSinks(nil, testPostgresSchema()) + if err == nil { + t.Fatalf("RegisterPostgresSchemaForConfiguredSinks(nil) expected error") + } + if !strings.Contains(err.Error(), "config is nil") { + t.Fatalf("error = %q, want config is nil", err) + } +} + +func TestRegisterPostgresSchemaForConfiguredSinksNonPostgresNoOp(t *testing.T) { + cfg := &config.Config{ + Sinks: []config.SinkConfig{ + {Name: uniqueSinkName("stdout"), Driver: "stdout"}, + {Name: uniqueSinkName("nats"), Driver: "nats"}, + }, + } + + if err := RegisterPostgresSchemaForConfiguredSinks(cfg, testPostgresSchema()); err != nil { + t.Fatalf("RegisterPostgresSchemaForConfiguredSinks(non-postgres) error = %v", err) + } +} + +func TestRegisterPostgresSchemaForConfiguredSinksDuplicateRegistrationFails(t *testing.T) { + cfg := &config.Config{ + Sinks: []config.SinkConfig{ + {Name: uniqueSinkName("pg"), Driver: "postgres"}, + }, + } + + if err := RegisterPostgresSchemaForConfiguredSinks(cfg, testPostgresSchema()); err != nil { + t.Fatalf("first RegisterPostgresSchemaForConfiguredSinks() error = %v", err) + } + + err := RegisterPostgresSchemaForConfiguredSinks(cfg, testPostgresSchema()) + if err == nil { + t.Fatalf("second RegisterPostgresSchemaForConfiguredSinks() expected duplicate error") + } + if !strings.Contains(err.Error(), "already registered") { + t.Fatalf("error = %q, want already registered", err) + } +} + +func testPostgresSchema() PostgresSchema { + return PostgresSchema{ + Tables: []PostgresTable{ + { + Name: "events", + Columns: []PostgresColumn{ + {Name: "event_id", Type: "TEXT", Nullable: false}, + {Name: "emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, + }, + PrimaryKey: []string{"event_id"}, + PruneColumn: "emitted_at", + }, + }, + MapEvent: func(_ context.Context, e event.Event) ([]PostgresWrite, error) { + return []PostgresWrite{ + { + Table: "events", + Values: map[string]any{ + "event_id": e.ID, + "emitted_at": e.EmittedAt, + }, + }, + }, nil + }, + } +} + +func uniqueSinkName(prefix string) string { + return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano()) +} diff --git a/sources/helpers.go b/sources/helpers.go new file mode 100644 index 0000000..2984ab9 --- /dev/null +++ b/sources/helpers.go @@ -0,0 +1,145 @@ +package sources + +import ( + "fmt" + "sort" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// DefaultEventID applies feedkit's default Event.ID policy: +// +// - If upstream provides an ID, use it (trimmed). +// - Otherwise, ID is ":" when available. +// - If EffectiveAt is unavailable, fall back to ":". +// +// Timestamps are encoded as RFC3339Nano in UTC. +func DefaultEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string { + if id := strings.TrimSpace(upstreamID); id != "" { + return id + } + + src := strings.TrimSpace(sourceName) + if src == "" { + src = "UNKNOWN_SOURCE" + } + + if effectiveAt != nil && !effectiveAt.IsZero() { + return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano)) + } + + t := emittedAt.UTC() + if t.IsZero() { + t = time.Now().UTC() + } + + return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano)) +} + +// SingleEvent constructs, validates, and returns a slice containing exactly one event. +func SingleEvent( + kind event.Kind, + sourceName string, + schema string, + id string, + emittedAt time.Time, + effectiveAt *time.Time, + payload any, +) ([]event.Event, error) { + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } else { + emittedAt = emittedAt.UTC() + } + + e := event.Event{ + ID: id, + Kind: kind, + Source: sourceName, + EmittedAt: emittedAt, + EffectiveAt: effectiveAt, + Schema: schema, + Payload: payload, + } + + if err := e.Validate(); err != nil { + return nil, err + } + + return []event.Event{e}, nil +} + +// ValidateExpectedKinds checks that configured source expected kinds are a subset +// of the kinds advertised by the built source, when the source exposes kind +// metadata. If the source does not advertise kinds, the check is skipped. +func ValidateExpectedKinds(cfg config.SourceConfig, in Input) error { + expectedKinds, err := parseExpectedKinds(cfg.ExpectedKinds()) + if err != nil { + return err + } + if len(expectedKinds) == 0 { + return nil + } + + advertisedKinds := advertisedSourceKinds(in) + if len(advertisedKinds) == 0 { + return nil + } + + for kind := range expectedKinds { + if !advertisedKinds[kind] { + return fmt.Errorf( + "configured expected kind %q not advertised by source (configured=%v advertised=%v)", + kind, + sortedKinds(expectedKinds), + sortedKinds(advertisedKinds), + ) + } + } + return nil +} + +func parseExpectedKinds(raw []string) (map[event.Kind]bool, error) { + kinds := map[event.Kind]bool{} + for i, k := range raw { + kind, err := event.ParseKind(k) + if err != nil { + return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err) + } + kinds[kind] = true + } + return kinds, nil +} + +func advertisedSourceKinds(in Input) map[event.Kind]bool { + if in == nil { + return nil + } + + kinds := map[event.Kind]bool{} + if ks, ok := in.(KindsSource); ok { + for _, kind := range ks.Kinds() { + kinds[kind] = true + } + return kinds + } + + if ks, ok := in.(KindSource); ok { + kinds[ks.Kind()] = true + return kinds + } + + return nil +} + +func sortedKinds(kindSet map[event.Kind]bool) []string { + out := make([]string, 0, len(kindSet)) + for kind := range kindSet { + out = append(out, string(kind)) + } + sort.Strings(out) + return out +} diff --git a/sources/helpers_test.go b/sources/helpers_test.go new file mode 100644 index 0000000..57d4f8a --- /dev/null +++ b/sources/helpers_test.go @@ -0,0 +1,131 @@ +package sources + +import ( + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +type testInput struct { + name string +} + +func (s testInput) Name() string { return s.name } + +type testKindSource struct { + testInput + kind event.Kind +} + +func (s testKindSource) Kind() event.Kind { return s.kind } + +type testKindsSource struct { + testInput + kinds []event.Kind +} + +func (s testKindsSource) Kinds() []event.Kind { return s.kinds } + +func TestValidateExpectedKindsLegacyKindFallback(t *testing.T) { + cfg := config.SourceConfig{Kind: "observation"} + in := testKindSource{ + testInput: testInput{name: "test"}, + kind: event.Kind("observation"), + } + + if err := ValidateExpectedKinds(cfg, in); err != nil { + t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) + } +} + +func TestValidateExpectedKindsSubsetAllowed(t *testing.T) { + cfg := config.SourceConfig{Kinds: []string{"observation"}} + in := testKindsSource{ + testInput: testInput{name: "test"}, + kinds: []event.Kind{"observation", "forecast"}, + } + + if err := ValidateExpectedKinds(cfg, in); err != nil { + t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) + } +} + +func TestValidateExpectedKindsMismatchFails(t *testing.T) { + cfg := config.SourceConfig{Kinds: []string{"alert"}} + in := testKindsSource{ + testInput: testInput{name: "test"}, + kinds: []event.Kind{"observation", "forecast"}, + } + + err := ValidateExpectedKinds(cfg, in) + if err == nil { + t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil") + } + if !strings.Contains(err.Error(), "configured expected kind") { + t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err) + } +} + +func TestValidateExpectedKindsNoMetadataSkipsCheck(t *testing.T) { + cfg := config.SourceConfig{Kinds: []string{"alert"}} + in := testInput{name: "test"} + + if err := ValidateExpectedKinds(cfg, in); err != nil { + t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) + } +} + +func TestDefaultEventIDUsesUpstreamID(t *testing.T) { + emittedAt := time.Date(2026, 3, 28, 15, 4, 5, 123, time.UTC) + got := DefaultEventID(" upstream-id ", "source", nil, emittedAt) + if got != "upstream-id" { + t.Fatalf("DefaultEventID() = %q, want upstream-id", got) + } +} + +func TestDefaultEventIDPrefersEffectiveAt(t *testing.T) { + effectiveAt := time.Date(2026, 3, 28, 16, 4, 5, 987654321, time.FixedZone("x", -6*3600)) + emittedAt := time.Date(2026, 3, 28, 15, 4, 5, 123, time.UTC) + + got := DefaultEventID("", "source", &effectiveAt, emittedAt) + want := "source:" + effectiveAt.UTC().Format(time.RFC3339Nano) + if got != want { + t.Fatalf("DefaultEventID() = %q, want %q", got, want) + } +} + +func TestDefaultEventIDFallsBackToEmittedAt(t *testing.T) { + emittedAt := time.Date(2026, 3, 28, 15, 4, 5, 123456789, time.FixedZone("y", 3*3600)) + got := DefaultEventID("", "source", nil, emittedAt) + want := "source:" + emittedAt.UTC().Format(time.RFC3339Nano) + if got != want { + t.Fatalf("DefaultEventID() = %q, want %q", got, want) + } +} + +func TestSingleEventBuildsValidatedSlice(t *testing.T) { + effectiveAt := time.Date(2026, 3, 28, 16, 0, 0, 0, time.UTC) + emittedAt := time.Date(2026, 3, 28, 15, 0, 0, 0, time.FixedZone("z", -5*3600)) + + got, err := SingleEvent( + event.Kind("observation"), + "source-a", + "raw.example.v1", + "evt-1", + emittedAt, + &effectiveAt, + map[string]any{"ok": true}, + ) + if err != nil { + t.Fatalf("SingleEvent() unexpected error: %v", err) + } + if len(got) != 1 { + t.Fatalf("SingleEvent() len = %d, want 1", len(got)) + } + if got[0].EmittedAt != emittedAt.UTC() { + t.Fatalf("SingleEvent() emittedAt = %s, want %s", got[0].EmittedAt, emittedAt.UTC()) + } +}