All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
248 lines
7.4 KiB
Go
248 lines
7.4 KiB
Go
package postgres
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
|
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
|
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
|
)
|
|
|
|
func TestMapPostgresEventObservationStructPayload(t *testing.T) {
|
|
isDay := true
|
|
temp := 21.5
|
|
obs := model.WeatherObservation{
|
|
StationID: "KSTL",
|
|
StationName: "St. Louis",
|
|
Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
|
ConditionCode: model.WMOCode(1),
|
|
IsDay: &isDay,
|
|
TextDescription: "few clouds",
|
|
TemperatureC: &temp,
|
|
PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}},
|
|
}
|
|
|
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherObservationV1, "observation", obs))
|
|
if err != nil {
|
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
|
}
|
|
if len(writes) != 2 {
|
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
|
|
}
|
|
if writes[0].Table != tableObservations {
|
|
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations)
|
|
}
|
|
if got := writes[0].Values["station_id"]; got != "KSTL" {
|
|
t.Fatalf("observations station_id = %#v, want KSTL", got)
|
|
}
|
|
if writes[1].Table != tableObservationPresentWeather {
|
|
t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationPresentWeather)
|
|
}
|
|
if got := writes[1].Values["raw_text"]; got != `{"a":1,"b":"x"}` {
|
|
t.Fatalf("present_weather raw_text = %#v, want compact JSON", got)
|
|
}
|
|
|
|
assertAllWritesIncludeAllColumns(t, writes)
|
|
}
|
|
|
|
func TestMapPostgresEventForecastStructPayload(t *testing.T) {
|
|
isDay := true
|
|
temp := 10.5
|
|
run := model.WeatherForecastRun{
|
|
LocationID: "LOC-1",
|
|
LocationName: "St. Louis",
|
|
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
|
Product: model.ForecastProductHourly,
|
|
Periods: []model.WeatherForecastPeriod{
|
|
{
|
|
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
|
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
|
IsDay: &isDay,
|
|
ConditionCode: model.WMOCode(2),
|
|
TemperatureC: &temp,
|
|
},
|
|
{
|
|
StartTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
|
EndTime: time.Date(2026, 3, 16, 21, 0, 0, 0, time.UTC),
|
|
ConditionCode: model.WMOCode(3),
|
|
},
|
|
},
|
|
}
|
|
|
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", run))
|
|
if err != nil {
|
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
|
}
|
|
if len(writes) != 3 {
|
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
|
|
}
|
|
|
|
if writes[0].Table != tableForecasts {
|
|
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
|
|
}
|
|
if got := writes[0].Values["period_count"]; got != 2 {
|
|
t.Fatalf("forecasts period_count = %#v, want 2", got)
|
|
}
|
|
if writes[1].Table != tableForecastPeriods || writes[2].Table != tableForecastPeriods {
|
|
t.Fatalf("forecast period writes not in expected order")
|
|
}
|
|
if got := writes[1].Values["period_index"]; got != 0 {
|
|
t.Fatalf("first period index = %#v, want 0", got)
|
|
}
|
|
|
|
assertAllWritesIncludeAllColumns(t, writes)
|
|
}
|
|
|
|
func TestMapPostgresEventAlertStructPayload(t *testing.T) {
|
|
sent := time.Date(2026, 3, 16, 17, 0, 0, 0, time.UTC)
|
|
run := model.WeatherAlertRun{
|
|
AsOf: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
|
Alerts: []model.WeatherAlert{
|
|
{
|
|
ID: "urn:alert:1",
|
|
Headline: "Winter Weather Advisory",
|
|
Severity: "Moderate",
|
|
References: []model.AlertReference{
|
|
{ID: "urn:ref:1", Sent: &sent},
|
|
{Identifier: "ref-two"},
|
|
},
|
|
},
|
|
{
|
|
ID: "urn:alert:2",
|
|
Headline: "Second alert",
|
|
},
|
|
},
|
|
}
|
|
|
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherAlertV1, "alert", run))
|
|
if err != nil {
|
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
|
}
|
|
if len(writes) != 5 {
|
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 5", len(writes))
|
|
}
|
|
|
|
counts := map[string]int{}
|
|
for _, w := range writes {
|
|
counts[w.Table]++
|
|
}
|
|
if counts[tableAlertRuns] != 1 || counts[tableAlerts] != 2 || counts[tableAlertReferences] != 2 {
|
|
t.Fatalf("unexpected table write counts: %#v", counts)
|
|
}
|
|
|
|
firstAlert, ok := firstWriteForTable(writes, tableAlerts)
|
|
if !ok {
|
|
t.Fatalf("missing alerts write")
|
|
}
|
|
if got := firstAlert.Values["reference_count"]; got != 2 {
|
|
t.Fatalf("alerts reference_count = %#v, want 2", got)
|
|
}
|
|
|
|
assertAllWritesIncludeAllColumns(t, writes)
|
|
}
|
|
|
|
func TestMapPostgresEventMapPayload(t *testing.T) {
|
|
run := model.WeatherForecastRun{
|
|
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
|
|
Product: model.ForecastProductHourly,
|
|
Periods: []model.WeatherForecastPeriod{
|
|
{
|
|
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
|
|
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
|
|
ConditionCode: model.WMOCode(2),
|
|
},
|
|
},
|
|
}
|
|
b, err := json.Marshal(run)
|
|
if err != nil {
|
|
t.Fatalf("json.Marshal() error = %v", err)
|
|
}
|
|
var payload map[string]any
|
|
if err := json.Unmarshal(b, &payload); err != nil {
|
|
t.Fatalf("json.Unmarshal() error = %v", err)
|
|
}
|
|
|
|
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", payload))
|
|
if err != nil {
|
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
|
}
|
|
if len(writes) != 2 {
|
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
|
|
}
|
|
if writes[0].Table != tableForecasts {
|
|
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
|
|
}
|
|
|
|
assertAllWritesIncludeAllColumns(t, writes)
|
|
}
|
|
|
|
func TestMapPostgresEventUnknownSchemaNoOp(t *testing.T) {
|
|
writes, err := mapPostgresEvent(context.Background(), testEvent("weather.unknown.v1", "observation", map[string]any{"x": 1}))
|
|
if err != nil {
|
|
t.Fatalf("mapPostgresEvent() error = %v", err)
|
|
}
|
|
if len(writes) != 0 {
|
|
t.Fatalf("mapPostgresEvent() writes len = %d, want 0", len(writes))
|
|
}
|
|
}
|
|
|
|
func TestMapPostgresEventMalformedPayload(t *testing.T) {
|
|
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", "bad"))
|
|
if err == nil {
|
|
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
|
|
}
|
|
if !strings.Contains(err.Error(), "decode forecast payload") {
|
|
t.Fatalf("error = %q, want decode forecast payload context", err)
|
|
}
|
|
}
|
|
|
|
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
|
|
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
|
|
return fkevent.Event{
|
|
ID: "evt-1",
|
|
Kind: kind,
|
|
Source: "test-source",
|
|
Schema: schema,
|
|
EmittedAt: time.Date(2026, 3, 16, 18, 31, 0, 0, time.UTC),
|
|
EffectiveAt: &effectiveAt,
|
|
Payload: payload,
|
|
}
|
|
}
|
|
|
|
func firstWriteForTable(writes []fksinks.PostgresWrite, table string) (fksinks.PostgresWrite, bool) {
|
|
for _, w := range writes {
|
|
if w.Table == table {
|
|
return w, true
|
|
}
|
|
}
|
|
return fksinks.PostgresWrite{}, false
|
|
}
|
|
|
|
func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWrite) {
|
|
t.Helper()
|
|
colCounts := tableColumnCounts()
|
|
for i, w := range writes {
|
|
expectedCount, ok := colCounts[w.Table]
|
|
if !ok {
|
|
t.Fatalf("writes[%d] references unknown table %q", i, w.Table)
|
|
}
|
|
if len(w.Values) != expectedCount {
|
|
t.Fatalf("writes[%d] table=%q has %d values, want %d", i, w.Table, len(w.Values), expectedCount)
|
|
}
|
|
}
|
|
}
|
|
|
|
func tableColumnCounts() map[string]int {
|
|
s := PostgresSchema()
|
|
m := make(map[string]int, len(s.Tables))
|
|
for _, tbl := range s.Tables {
|
|
m[tbl.Name] = len(tbl.Columns)
|
|
}
|
|
return m
|
|
}
|