package postgres import ( "context" "encoding/json" "strings" "testing" "time" fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" "gitea.maximumdirect.net/ejr/weatherfeeder/model" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) func TestMapPostgresEventObservationStructPayload(t *testing.T) { isDay := true temp := 21.5 obs := model.WeatherObservation{ StationID: "KSTL", StationName: "St. Louis", Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), ConditionCode: model.WMOCode(1), IsDay: &isDay, TextDescription: "few clouds", TemperatureC: &temp, PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}}, } writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherObservationV1, "observation", obs)) if err != nil { t.Fatalf("mapPostgresEvent() error = %v", err) } if len(writes) != 2 { t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes)) } if writes[0].Table != tableObservations { t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations) } if got := writes[0].Values["station_id"]; got != "KSTL" { t.Fatalf("observations station_id = %#v, want KSTL", got) } if writes[1].Table != tableObservationPresentWeather { t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationPresentWeather) } if got := writes[1].Values["raw_text"]; got != `{"a":1,"b":"x"}` { t.Fatalf("present_weather raw_text = %#v, want compact JSON", got) } assertAllWritesIncludeAllColumns(t, writes) } func TestMapPostgresEventForecastStructPayload(t *testing.T) { isDay := true temp := 10.5 run := model.WeatherForecastRun{ LocationID: "LOC-1", LocationName: "St. Louis", IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), Product: model.ForecastProductHourly, Periods: []model.WeatherForecastPeriod{ { StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC), IsDay: &isDay, ConditionCode: model.WMOCode(2), TemperatureC: &temp, }, { StartTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC), EndTime: time.Date(2026, 3, 16, 21, 0, 0, 0, time.UTC), ConditionCode: model.WMOCode(3), }, }, } writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", run)) if err != nil { t.Fatalf("mapPostgresEvent() error = %v", err) } if len(writes) != 3 { t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes)) } if writes[0].Table != tableForecasts { t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts) } if got := writes[0].Values["period_count"]; got != 2 { t.Fatalf("forecasts period_count = %#v, want 2", got) } if writes[1].Table != tableForecastPeriods || writes[2].Table != tableForecastPeriods { t.Fatalf("forecast period writes not in expected order") } if got := writes[1].Values["period_index"]; got != 0 { t.Fatalf("first period index = %#v, want 0", got) } assertAllWritesIncludeAllColumns(t, writes) } func TestMapPostgresEventAlertStructPayload(t *testing.T) { sent := time.Date(2026, 3, 16, 17, 0, 0, 0, time.UTC) run := model.WeatherAlertRun{ AsOf: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), Alerts: []model.WeatherAlert{ { ID: "urn:alert:1", Headline: "Winter Weather Advisory", Severity: "Moderate", References: []model.AlertReference{ {ID: "urn:ref:1", Sent: &sent}, {Identifier: "ref-two"}, }, }, { ID: "urn:alert:2", Headline: "Second alert", }, }, } writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherAlertV1, "alert", run)) if err != nil { t.Fatalf("mapPostgresEvent() error = %v", err) } if len(writes) != 5 { t.Fatalf("mapPostgresEvent() writes len = %d, want 5", len(writes)) } counts := map[string]int{} for _, w := range writes { counts[w.Table]++ } if counts[tableAlertRuns] != 1 || counts[tableAlerts] != 2 || counts[tableAlertReferences] != 2 { t.Fatalf("unexpected table write counts: %#v", counts) } firstAlert, ok := firstWriteForTable(writes, tableAlerts) if !ok { t.Fatalf("missing alerts write") } if got := firstAlert.Values["reference_count"]; got != 2 { t.Fatalf("alerts reference_count = %#v, want 2", got) } assertAllWritesIncludeAllColumns(t, writes) } func TestMapPostgresEventMapPayload(t *testing.T) { run := model.WeatherForecastRun{ IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), Product: model.ForecastProductHourly, Periods: []model.WeatherForecastPeriod{ { StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC), ConditionCode: model.WMOCode(2), }, }, } b, err := json.Marshal(run) if err != nil { t.Fatalf("json.Marshal() error = %v", err) } var payload map[string]any if err := json.Unmarshal(b, &payload); err != nil { t.Fatalf("json.Unmarshal() error = %v", err) } writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", payload)) if err != nil { t.Fatalf("mapPostgresEvent() error = %v", err) } if len(writes) != 2 { t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes)) } if writes[0].Table != tableForecasts { t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts) } assertAllWritesIncludeAllColumns(t, writes) } func TestMapPostgresEventUnknownSchemaNoOp(t *testing.T) { writes, err := mapPostgresEvent(context.Background(), testEvent("weather.unknown.v1", "observation", map[string]any{"x": 1})) if err != nil { t.Fatalf("mapPostgresEvent() error = %v", err) } if len(writes) != 0 { t.Fatalf("mapPostgresEvent() writes len = %d, want 0", len(writes)) } } func TestMapPostgresEventMalformedPayload(t *testing.T) { _, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", "bad")) if err == nil { t.Fatalf("mapPostgresEvent() expected error for malformed payload") } if !strings.Contains(err.Error(), "decode forecast payload") { t.Fatalf("error = %q, want decode forecast payload context", err) } } func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event { effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC) return fkevent.Event{ ID: "evt-1", Kind: kind, Source: "test-source", Schema: schema, EmittedAt: time.Date(2026, 3, 16, 18, 31, 0, 0, time.UTC), EffectiveAt: &effectiveAt, Payload: payload, } } func firstWriteForTable(writes []fksinks.PostgresWrite, table string) (fksinks.PostgresWrite, bool) { for _, w := range writes { if w.Table == table { return w, true } } return fksinks.PostgresWrite{}, false } func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWrite) { t.Helper() colCounts := tableColumnCounts() for i, w := range writes { expectedCount, ok := colCounts[w.Table] if !ok { t.Fatalf("writes[%d] references unknown table %q", i, w.Table) } if len(w.Values) != expectedCount { t.Fatalf("writes[%d] table=%q has %d values, want %d", i, w.Table, len(w.Values), expectedCount) } } } func tableColumnCounts() map[string]int { s := weatherPostgresSchema() m := make(map[string]int, len(s.Tables)) for _, tbl := range s.Tables { m[tbl.Name] = len(tbl.Columns) } return m }