package sinks import ( "context" "fmt" "strings" "testing" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" ) func TestRegisterPostgresSchemaForConfiguredSinksNilConfig(t *testing.T) { err := RegisterPostgresSchemaForConfiguredSinks(nil, testPostgresSchema()) if err == nil { t.Fatalf("RegisterPostgresSchemaForConfiguredSinks(nil) expected error") } if !strings.Contains(err.Error(), "config is nil") { t.Fatalf("error = %q, want config is nil", err) } } func TestRegisterPostgresSchemaForConfiguredSinksNonPostgresNoOp(t *testing.T) { cfg := &config.Config{ Sinks: []config.SinkConfig{ {Name: uniqueSinkName("stdout"), Driver: "stdout"}, {Name: uniqueSinkName("nats"), Driver: "nats"}, }, } if err := RegisterPostgresSchemaForConfiguredSinks(cfg, testPostgresSchema()); err != nil { t.Fatalf("RegisterPostgresSchemaForConfiguredSinks(non-postgres) error = %v", err) } } func TestRegisterPostgresSchemaForConfiguredSinksDuplicateRegistrationFails(t *testing.T) { cfg := &config.Config{ Sinks: []config.SinkConfig{ {Name: uniqueSinkName("pg"), Driver: "postgres"}, }, } if err := RegisterPostgresSchemaForConfiguredSinks(cfg, testPostgresSchema()); err != nil { t.Fatalf("first RegisterPostgresSchemaForConfiguredSinks() error = %v", err) } err := RegisterPostgresSchemaForConfiguredSinks(cfg, testPostgresSchema()) if err == nil { t.Fatalf("second RegisterPostgresSchemaForConfiguredSinks() expected duplicate error") } if !strings.Contains(err.Error(), "already registered") { t.Fatalf("error = %q, want already registered", err) } } func testPostgresSchema() PostgresSchema { return PostgresSchema{ Tables: []PostgresTable{ { Name: "events", Columns: []PostgresColumn{ {Name: "event_id", Type: "TEXT", Nullable: false}, {Name: "emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, }, PrimaryKey: []string{"event_id"}, PruneColumn: "emitted_at", }, }, MapEvent: func(_ context.Context, e event.Event) ([]PostgresWrite, error) { return []PostgresWrite{ { Table: "events", Values: map[string]any{ "event_id": e.ID, "emitted_at": e.EmittedAt, }, }, }, nil }, } } func uniqueSinkName(prefix string) string { return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano()) }