diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go index 7dde7f4..4d58c35 100644 --- a/cmd/weatherfeeder/main.go +++ b/cmd/weatherfeeder/main.go @@ -23,6 +23,7 @@ import ( fksources "gitea.maximumdirect.net/ejr/feedkit/sources" wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" + wfpgsink "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sinks/postgres" wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources" ) @@ -37,6 +38,9 @@ func main() { if err != nil { log.Fatalf("config load failed: %v", err) } + if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil { + log.Fatalf("postgres schema registration failed: %v", err) + } // --- Registries --- srcReg := fksources.NewRegistry() diff --git a/internal/sinks/postgres/map.go b/internal/sinks/postgres/map.go new file mode 100644 index 0000000..4365fb5 --- /dev/null +++ b/internal/sinks/postgres/map.go @@ -0,0 +1,343 @@ +package postgres + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + fkevent "gitea.maximumdirect.net/ejr/feedkit/event" + fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" + "gitea.maximumdirect.net/ejr/weatherfeeder/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWrite, error) { + schema := strings.TrimSpace(e.Schema) + switch schema { + case standards.SchemaWeatherObservationV1: + return mapObservationEvent(e) + case standards.SchemaWeatherForecastV1: + return mapForecastEvent(e) + case standards.SchemaWeatherAlertV1: + return mapAlertEvent(e) + default: + return nil, nil + } +} + +func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { + obs, err := decodePayload[model.WeatherObservation](e.Payload) + if err != nil { + return nil, fmt.Errorf("decode observation payload: %w", err) + } + if obs.Timestamp.IsZero() { + return nil, fmt.Errorf("decode observation payload: timestamp is required") + } + + observedAt := obs.Timestamp.UTC() + writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.CloudLayers)+len(obs.PresentWeather)) + + writes = append(writes, fksinks.PostgresWrite{ + Table: tableObservations, + Values: map[string]any{ + "event_id": e.ID, + "event_kind": string(e.Kind), + "event_source": e.Source, + "event_schema": e.Schema, + "event_emitted_at": e.EmittedAt.UTC(), + "event_effective_at": nullableTime(e.EffectiveAt), + "station_id": nullableString(obs.StationID), + "station_name": nullableString(obs.StationName), + "observed_at": observedAt, + "condition_code": int(obs.ConditionCode), + "condition_text": nullableString(obs.ConditionText), + "is_day": nullableBool(obs.IsDay), + "provider_raw_description": nullableString(obs.ProviderRawDescription), + "text_description": nullableString(obs.TextDescription), + "icon_url": nullableString(obs.IconURL), + "temperature_c": nullableFloat64(obs.TemperatureC), + "dewpoint_c": nullableFloat64(obs.DewpointC), + "wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees), + "wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh), + "wind_gust_kmh": nullableFloat64(obs.WindGustKmh), + "barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa), + "sea_level_pressure_pa": nullableFloat64(obs.SeaLevelPressurePa), + "visibility_meters": nullableFloat64(obs.VisibilityMeters), + "relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent), + "apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC), + "elevation_meters": nullableFloat64(obs.ElevationMeters), + "raw_message": nullableString(obs.RawMessage), + }, + }) + + for i, cl := range obs.CloudLayers { + writes = append(writes, fksinks.PostgresWrite{ + Table: tableObservationCloudLayers, + Values: map[string]any{ + "event_id": e.ID, + "layer_index": i, + "observed_at": observedAt, + "base_meters": nullableFloat64(cl.BaseMeters), + "amount": nullableString(cl.Amount), + }, + }) + } + + for i, pw := range obs.PresentWeather { + rawText, err := compactJSONText(pw.Raw) + if err != nil { + return nil, fmt.Errorf("observation presentWeather[%d].raw: %w", i, err) + } + writes = append(writes, fksinks.PostgresWrite{ + Table: tableObservationPresentWeather, + Values: map[string]any{ + "event_id": e.ID, + "weather_index": i, + "observed_at": observedAt, + "raw_text": rawText, + }, + }) + } + + return writes, nil +} + +func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { + run, err := decodePayload[model.WeatherForecastRun](e.Payload) + if err != nil { + return nil, fmt.Errorf("decode forecast payload: %w", err) + } + if run.IssuedAt.IsZero() { + return nil, fmt.Errorf("decode forecast payload: issuedAt is required") + } + if strings.TrimSpace(string(run.Product)) == "" { + return nil, fmt.Errorf("decode forecast payload: product is required") + } + + issuedAt := run.IssuedAt.UTC() + writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Periods)) + + writes = append(writes, fksinks.PostgresWrite{ + Table: tableForecasts, + Values: map[string]any{ + "event_id": e.ID, + "event_kind": string(e.Kind), + "event_source": e.Source, + "event_schema": e.Schema, + "event_emitted_at": e.EmittedAt.UTC(), + "event_effective_at": nullableTime(e.EffectiveAt), + "location_id": nullableString(run.LocationID), + "location_name": nullableString(run.LocationName), + "issued_at": issuedAt, + "updated_at": nullableTime(run.UpdatedAt), + "product": string(run.Product), + "latitude": nullableFloat64(run.Latitude), + "longitude": nullableFloat64(run.Longitude), + "elevation_meters": nullableFloat64(run.ElevationMeters), + "period_count": len(run.Periods), + }, + }) + + for i, p := range run.Periods { + if p.StartTime.IsZero() || p.EndTime.IsZero() { + return nil, fmt.Errorf("decode forecast payload: periods[%d] startTime/endTime are required", i) + } + writes = append(writes, fksinks.PostgresWrite{ + Table: tableForecastPeriods, + Values: map[string]any{ + "run_event_id": e.ID, + "period_index": i, + "issued_at": issuedAt, + "start_time": p.StartTime.UTC(), + "end_time": p.EndTime.UTC(), + "name": nullableString(p.Name), + "is_day": nullableBool(p.IsDay), + "condition_code": int(p.ConditionCode), + "condition_text": nullableString(p.ConditionText), + "provider_raw_description": nullableString(p.ProviderRawDescription), + "text_description": nullableString(p.TextDescription), + "detailed_text": nullableString(p.DetailedText), + "icon_url": nullableString(p.IconURL), + "temperature_c": nullableFloat64(p.TemperatureC), + "temperature_c_min": nullableFloat64(p.TemperatureCMin), + "temperature_c_max": nullableFloat64(p.TemperatureCMax), + "dewpoint_c": nullableFloat64(p.DewpointC), + "relative_humidity_percent": nullableFloat64(p.RelativeHumidityPercent), + "wind_direction_degrees": nullableFloat64(p.WindDirectionDegrees), + "wind_speed_kmh": nullableFloat64(p.WindSpeedKmh), + "wind_gust_kmh": nullableFloat64(p.WindGustKmh), + "barometric_pressure_pa": nullableFloat64(p.BarometricPressurePa), + "visibility_meters": nullableFloat64(p.VisibilityMeters), + "apparent_temperature_c": nullableFloat64(p.ApparentTemperatureC), + "cloud_cover_percent": nullableFloat64(p.CloudCoverPercent), + "probability_of_precipitation_percent": nullableFloat64(p.ProbabilityOfPrecipitationPercent), + "precipitation_amount_mm": nullableFloat64(p.PrecipitationAmountMm), + "snowfall_depth_mm": nullableFloat64(p.SnowfallDepthMM), + "uv_index": nullableFloat64(p.UVIndex), + }, + }) + } + + return writes, nil +} + +func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) { + run, err := decodePayload[model.WeatherAlertRun](e.Payload) + if err != nil { + return nil, fmt.Errorf("decode alert payload: %w", err) + } + if run.AsOf.IsZero() { + return nil, fmt.Errorf("decode alert payload: asOf is required") + } + + asOf := run.AsOf.UTC() + writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Alerts)+countAlertReferences(run.Alerts)) + + writes = append(writes, fksinks.PostgresWrite{ + Table: tableAlertRuns, + Values: map[string]any{ + "event_id": e.ID, + "event_kind": string(e.Kind), + "event_source": e.Source, + "event_schema": e.Schema, + "event_emitted_at": e.EmittedAt.UTC(), + "event_effective_at": nullableTime(e.EffectiveAt), + "location_id": nullableString(run.LocationID), + "location_name": nullableString(run.LocationName), + "as_of": asOf, + "latitude": nullableFloat64(run.Latitude), + "longitude": nullableFloat64(run.Longitude), + "alert_count": len(run.Alerts), + }, + }) + + for i, a := range run.Alerts { + if strings.TrimSpace(a.ID) == "" { + return nil, fmt.Errorf("decode alert payload: alerts[%d].id is required", i) + } + + writes = append(writes, fksinks.PostgresWrite{ + Table: tableAlerts, + Values: map[string]any{ + "run_event_id": e.ID, + "alert_index": i, + "as_of": asOf, + "alert_id": a.ID, + "event": nullableString(a.Event), + "headline": nullableString(a.Headline), + "severity": nullableString(a.Severity), + "urgency": nullableString(a.Urgency), + "certainty": nullableString(a.Certainty), + "status": nullableString(a.Status), + "message_type": nullableString(a.MessageType), + "category": nullableString(a.Category), + "response": nullableString(a.Response), + "description": nullableString(a.Description), + "instruction": nullableString(a.Instruction), + "sent": nullableTime(a.Sent), + "effective": nullableTime(a.Effective), + "onset": nullableTime(a.Onset), + "expires": nullableTime(a.Expires), + "area_description": nullableString(a.AreaDescription), + "sender_name": nullableString(a.SenderName), + "reference_count": len(a.References), + }, + }) + + for j, ref := range a.References { + writes = append(writes, fksinks.PostgresWrite{ + Table: tableAlertReferences, + Values: map[string]any{ + "run_event_id": e.ID, + "alert_index": i, + "reference_index": j, + "as_of": asOf, + "id": nullableString(ref.ID), + "identifier": nullableString(ref.Identifier), + "sender": nullableString(ref.Sender), + "sent": nullableTime(ref.Sent), + }, + }) + } + } + + return writes, nil +} + +func decodePayload[T any](payload any) (T, error) { + var out T + if payload == nil { + return out, fmt.Errorf("payload is nil") + } + + if typed, ok := payload.(T); ok { + return typed, nil + } + if ptr, ok := payload.(*T); ok { + if ptr == nil { + return out, fmt.Errorf("payload pointer is nil") + } + return *ptr, nil + } + + b, err := json.Marshal(payload) + if err != nil { + return out, fmt.Errorf("marshal payload: %w", err) + } + if err := json.Unmarshal(b, &out); err != nil { + return out, fmt.Errorf("unmarshal payload: %w", err) + } + return out, nil +} + +func countAlertReferences(alerts []model.WeatherAlert) int { + total := 0 + for _, a := range alerts { + total += len(a.References) + } + return total +} + +func nullableString(s string) any { + if strings.TrimSpace(s) == "" { + return nil + } + return s +} + +func nullableFloat64(v *float64) any { + if v == nil { + return nil + } + return *v +} + +func nullableBool(v *bool) any { + if v == nil { + return nil + } + return *v +} + +func nullableTime(v *time.Time) any { + if v == nil || v.IsZero() { + return nil + } + return v.UTC() +} + +func compactJSONText(v any) (any, error) { + if v == nil { + return nil, nil + } + b, err := json.Marshal(v) + if err != nil { + return nil, err + } + if string(b) == "null" { + return nil, nil + } + return string(b), nil +} diff --git a/internal/sinks/postgres/map_test.go b/internal/sinks/postgres/map_test.go new file mode 100644 index 0000000..dad0fd9 --- /dev/null +++ b/internal/sinks/postgres/map_test.go @@ -0,0 +1,256 @@ +package postgres + +import ( + "context" + "encoding/json" + "strings" + "testing" + "time" + + fkevent "gitea.maximumdirect.net/ejr/feedkit/event" + fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" + "gitea.maximumdirect.net/ejr/weatherfeeder/model" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +func TestMapPostgresEventObservationStructPayload(t *testing.T) { + isDay := true + temp := 21.5 + base := 1200.0 + obs := model.WeatherObservation{ + StationID: "KSTL", + StationName: "St. Louis", + Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), + ConditionCode: model.WMOCode(1), + ConditionText: "Mainly Sunny", + IsDay: &isDay, + ProviderRawDescription: "few clouds", + TextDescription: "Mainly Sunny", + IconURL: "https://example/icon.png", + TemperatureC: &temp, + CloudLayers: []model.CloudLayer{{BaseMeters: &base, Amount: "FEW"}}, + PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}}, + } + + writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherObservationV1, "observation", obs)) + if err != nil { + t.Fatalf("mapPostgresEvent() error = %v", err) + } + if len(writes) != 3 { + t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes)) + } + if writes[0].Table != tableObservations { + t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations) + } + if got := writes[0].Values["station_id"]; got != "KSTL" { + t.Fatalf("observations station_id = %#v, want KSTL", got) + } + if writes[1].Table != tableObservationCloudLayers { + t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationCloudLayers) + } + if writes[2].Table != tableObservationPresentWeather { + t.Fatalf("writes[2].Table = %q, want %q", writes[2].Table, tableObservationPresentWeather) + } + if got := writes[2].Values["raw_text"]; got != `{"a":1,"b":"x"}` { + t.Fatalf("present_weather raw_text = %#v, want compact JSON", got) + } + + assertAllWritesIncludeAllColumns(t, writes) +} + +func TestMapPostgresEventForecastStructPayload(t *testing.T) { + isDay := true + temp := 10.5 + run := model.WeatherForecastRun{ + LocationID: "LOC-1", + LocationName: "St. Louis", + IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), + Product: model.ForecastProductHourly, + Periods: []model.WeatherForecastPeriod{ + { + StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), + EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC), + IsDay: &isDay, + ConditionCode: model.WMOCode(2), + ConditionText: "Partly Cloudy", + TemperatureC: &temp, + }, + { + StartTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC), + EndTime: time.Date(2026, 3, 16, 21, 0, 0, 0, time.UTC), + ConditionCode: model.WMOCode(3), + }, + }, + } + + writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", run)) + if err != nil { + t.Fatalf("mapPostgresEvent() error = %v", err) + } + if len(writes) != 3 { + t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes)) + } + + if writes[0].Table != tableForecasts { + t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts) + } + if got := writes[0].Values["period_count"]; got != 2 { + t.Fatalf("forecasts period_count = %#v, want 2", got) + } + if writes[1].Table != tableForecastPeriods || writes[2].Table != tableForecastPeriods { + t.Fatalf("forecast period writes not in expected order") + } + if got := writes[1].Values["period_index"]; got != 0 { + t.Fatalf("first period index = %#v, want 0", got) + } + + assertAllWritesIncludeAllColumns(t, writes) +} + +func TestMapPostgresEventAlertStructPayload(t *testing.T) { + sent := time.Date(2026, 3, 16, 17, 0, 0, 0, time.UTC) + run := model.WeatherAlertRun{ + AsOf: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), + Alerts: []model.WeatherAlert{ + { + ID: "urn:alert:1", + Headline: "Winter Weather Advisory", + Severity: "Moderate", + References: []model.AlertReference{ + {ID: "urn:ref:1", Sent: &sent}, + {Identifier: "ref-two"}, + }, + }, + { + ID: "urn:alert:2", + Headline: "Second alert", + }, + }, + } + + writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherAlertV1, "alert", run)) + if err != nil { + t.Fatalf("mapPostgresEvent() error = %v", err) + } + if len(writes) != 5 { + t.Fatalf("mapPostgresEvent() writes len = %d, want 5", len(writes)) + } + + counts := map[string]int{} + for _, w := range writes { + counts[w.Table]++ + } + if counts[tableAlertRuns] != 1 || counts[tableAlerts] != 2 || counts[tableAlertReferences] != 2 { + t.Fatalf("unexpected table write counts: %#v", counts) + } + + firstAlert, ok := firstWriteForTable(writes, tableAlerts) + if !ok { + t.Fatalf("missing alerts write") + } + if got := firstAlert.Values["reference_count"]; got != 2 { + t.Fatalf("alerts reference_count = %#v, want 2", got) + } + + assertAllWritesIncludeAllColumns(t, writes) +} + +func TestMapPostgresEventMapPayload(t *testing.T) { + run := model.WeatherForecastRun{ + IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), + Product: model.ForecastProductHourly, + Periods: []model.WeatherForecastPeriod{ + { + StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC), + EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC), + ConditionCode: model.WMOCode(2), + }, + }, + } + b, err := json.Marshal(run) + if err != nil { + t.Fatalf("json.Marshal() error = %v", err) + } + var payload map[string]any + if err := json.Unmarshal(b, &payload); err != nil { + t.Fatalf("json.Unmarshal() error = %v", err) + } + + writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", payload)) + if err != nil { + t.Fatalf("mapPostgresEvent() error = %v", err) + } + if len(writes) != 2 { + t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes)) + } + if writes[0].Table != tableForecasts { + t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts) + } + + assertAllWritesIncludeAllColumns(t, writes) +} + +func TestMapPostgresEventUnknownSchemaNoOp(t *testing.T) { + writes, err := mapPostgresEvent(context.Background(), testEvent("weather.unknown.v1", "observation", map[string]any{"x": 1})) + if err != nil { + t.Fatalf("mapPostgresEvent() error = %v", err) + } + if len(writes) != 0 { + t.Fatalf("mapPostgresEvent() writes len = %d, want 0", len(writes)) + } +} + +func TestMapPostgresEventMalformedPayload(t *testing.T) { + _, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", "bad")) + if err == nil { + t.Fatalf("mapPostgresEvent() expected error for malformed payload") + } + if !strings.Contains(err.Error(), "decode forecast payload") { + t.Fatalf("error = %q, want decode forecast payload context", err) + } +} + +func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event { + effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC) + return fkevent.Event{ + ID: "evt-1", + Kind: kind, + Source: "test-source", + Schema: schema, + EmittedAt: time.Date(2026, 3, 16, 18, 31, 0, 0, time.UTC), + EffectiveAt: &effectiveAt, + Payload: payload, + } +} + +func firstWriteForTable(writes []fksinks.PostgresWrite, table string) (fksinks.PostgresWrite, bool) { + for _, w := range writes { + if w.Table == table { + return w, true + } + } + return fksinks.PostgresWrite{}, false +} + +func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWrite) { + t.Helper() + colCounts := tableColumnCounts() + for i, w := range writes { + expectedCount, ok := colCounts[w.Table] + if !ok { + t.Fatalf("writes[%d] references unknown table %q", i, w.Table) + } + if len(w.Values) != expectedCount { + t.Fatalf("writes[%d] table=%q has %d values, want %d", i, w.Table, len(w.Values), expectedCount) + } + } +} + +func tableColumnCounts() map[string]int { + s := weatherPostgresSchema() + m := make(map[string]int, len(s.Tables)) + for _, tbl := range s.Tables { + m[tbl.Name] = len(tbl.Columns) + } + return m +} diff --git a/internal/sinks/postgres/schema.go b/internal/sinks/postgres/schema.go new file mode 100644 index 0000000..3c18ce7 --- /dev/null +++ b/internal/sinks/postgres/schema.go @@ -0,0 +1,264 @@ +package postgres + +import ( + "fmt" + "strings" + + "gitea.maximumdirect.net/ejr/feedkit/config" + fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" +) + +const ( + tableObservations = "observations" + tableObservationCloudLayers = "observation_cloud_layers" + tableObservationPresentWeather = "observation_present_weather" + tableForecasts = "forecasts" + tableForecastPeriods = "forecast_periods" + tableAlertRuns = "alert_runs" + tableAlerts = "alerts" + tableAlertReferences = "alert_references" +) + +// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each +// configured sink using driver=postgres. +func RegisterPostgresSchemas(cfg *config.Config) error { + if cfg == nil { + return fmt.Errorf("register postgres schemas: config is nil") + } + + schema := weatherPostgresSchema() + for i, sk := range cfg.Sinks { + if !isPostgresDriver(sk.Driver) { + continue + } + if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil { + return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err) + } + } + + return nil +} + +func isPostgresDriver(driver string) bool { + return strings.EqualFold(strings.TrimSpace(driver), "postgres") +} + +func weatherPostgresSchema() fksinks.PostgresSchema { + return fksinks.PostgresSchema{ + Tables: []fksinks.PostgresTable{ + { + Name: tableObservations, + Columns: []fksinks.PostgresColumn{ + {Name: "event_id", Type: "TEXT", Nullable: false}, + {Name: "event_kind", Type: "TEXT", Nullable: false}, + {Name: "event_source", Type: "TEXT", Nullable: false}, + {Name: "event_schema", Type: "TEXT", Nullable: false}, + {Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "station_id", Type: "TEXT", Nullable: true}, + {Name: "station_name", Type: "TEXT", Nullable: true}, + {Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "condition_code", Type: "INTEGER", Nullable: false}, + {Name: "condition_text", Type: "TEXT", Nullable: true}, + {Name: "is_day", Type: "BOOLEAN", Nullable: true}, + {Name: "provider_raw_description", Type: "TEXT", Nullable: true}, + {Name: "text_description", Type: "TEXT", Nullable: true}, + {Name: "icon_url", Type: "TEXT", Nullable: true}, + {Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "sea_level_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "raw_message", Type: "TEXT", Nullable: true}, + }, + PrimaryKey: []string{"event_id"}, + PruneColumn: "observed_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_obs_station_observed_at", Columns: []string{"station_id", "observed_at"}}, + {Name: "idx_wf_obs_observed_at", Columns: []string{"observed_at"}}, + {Name: "idx_wf_obs_condition_code", Columns: []string{"condition_code"}}, + }, + }, + { + Name: tableObservationCloudLayers, + Columns: []fksinks.PostgresColumn{ + {Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false}, + {Name: "layer_index", Type: "INTEGER", Nullable: false}, + {Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "base_meters", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "amount", Type: "TEXT", Nullable: true}, + }, + PrimaryKey: []string{"event_id", "layer_index"}, + PruneColumn: "observed_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_obs_cloud_observed_at", Columns: []string{"observed_at"}}, + }, + }, + { + Name: tableObservationPresentWeather, + Columns: []fksinks.PostgresColumn{ + {Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false}, + {Name: "weather_index", Type: "INTEGER", Nullable: false}, + {Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "raw_text", Type: "TEXT", Nullable: true}, + }, + PrimaryKey: []string{"event_id", "weather_index"}, + PruneColumn: "observed_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_obs_present_observed_at", Columns: []string{"observed_at"}}, + }, + }, + { + Name: tableForecasts, + Columns: []fksinks.PostgresColumn{ + {Name: "event_id", Type: "TEXT", Nullable: false}, + {Name: "event_kind", Type: "TEXT", Nullable: false}, + {Name: "event_source", Type: "TEXT", Nullable: false}, + {Name: "event_schema", Type: "TEXT", Nullable: false}, + {Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "location_id", Type: "TEXT", Nullable: true}, + {Name: "location_name", Type: "TEXT", Nullable: true}, + {Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "product", Type: "TEXT", Nullable: false}, + {Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "period_count", Type: "INTEGER", Nullable: false}, + }, + PrimaryKey: []string{"event_id"}, + PruneColumn: "issued_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_fc_location_product_issued_at", Columns: []string{"location_id", "product", "issued_at"}}, + {Name: "idx_wf_fc_issued_at", Columns: []string{"issued_at"}}, + {Name: "idx_wf_fc_product_issued_at", Columns: []string{"product", "issued_at"}}, + }, + }, + { + Name: tableForecastPeriods, + Columns: []fksinks.PostgresColumn{ + {Name: "run_event_id", Type: "TEXT REFERENCES forecasts(event_id) ON DELETE CASCADE", Nullable: false}, + {Name: "period_index", Type: "INTEGER", Nullable: false}, + {Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "start_time", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "end_time", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "name", Type: "TEXT", Nullable: true}, + {Name: "is_day", Type: "BOOLEAN", Nullable: true}, + {Name: "condition_code", Type: "INTEGER", Nullable: false}, + {Name: "condition_text", Type: "TEXT", Nullable: true}, + {Name: "provider_raw_description", Type: "TEXT", Nullable: true}, + {Name: "text_description", Type: "TEXT", Nullable: true}, + {Name: "detailed_text", Type: "TEXT", Nullable: true}, + {Name: "icon_url", Type: "TEXT", Nullable: true}, + {Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "cloud_cover_percent", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "probability_of_precipitation_percent", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "precipitation_amount_mm", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "snowfall_depth_mm", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "uv_index", Type: "DOUBLE PRECISION", Nullable: true}, + }, + PrimaryKey: []string{"run_event_id", "period_index"}, + PruneColumn: "issued_at", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_fc_period_start_time", Columns: []string{"start_time"}}, + {Name: "idx_wf_fc_period_end_time", Columns: []string{"end_time"}}, + {Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}}, + }, + }, + { + Name: tableAlertRuns, + Columns: []fksinks.PostgresColumn{ + {Name: "event_id", Type: "TEXT", Nullable: false}, + {Name: "event_kind", Type: "TEXT", Nullable: false}, + {Name: "event_source", Type: "TEXT", Nullable: false}, + {Name: "event_schema", Type: "TEXT", Nullable: false}, + {Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "location_id", Type: "TEXT", Nullable: true}, + {Name: "location_name", Type: "TEXT", Nullable: true}, + {Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true}, + {Name: "alert_count", Type: "INTEGER", Nullable: false}, + }, + PrimaryKey: []string{"event_id"}, + PruneColumn: "as_of", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_alert_run_location_as_of", Columns: []string{"location_id", "as_of"}}, + {Name: "idx_wf_alert_run_as_of", Columns: []string{"as_of"}}, + }, + }, + { + Name: tableAlerts, + Columns: []fksinks.PostgresColumn{ + {Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false}, + {Name: "alert_index", Type: "INTEGER", Nullable: false}, + {Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "alert_id", Type: "TEXT", Nullable: false}, + {Name: "event", Type: "TEXT", Nullable: true}, + {Name: "headline", Type: "TEXT", Nullable: true}, + {Name: "severity", Type: "TEXT", Nullable: true}, + {Name: "urgency", Type: "TEXT", Nullable: true}, + {Name: "certainty", Type: "TEXT", Nullable: true}, + {Name: "status", Type: "TEXT", Nullable: true}, + {Name: "message_type", Type: "TEXT", Nullable: true}, + {Name: "category", Type: "TEXT", Nullable: true}, + {Name: "response", Type: "TEXT", Nullable: true}, + {Name: "description", Type: "TEXT", Nullable: true}, + {Name: "instruction", Type: "TEXT", Nullable: true}, + {Name: "sent", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "effective", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "onset", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "expires", Type: "TIMESTAMPTZ", Nullable: true}, + {Name: "area_description", Type: "TEXT", Nullable: true}, + {Name: "sender_name", Type: "TEXT", Nullable: true}, + {Name: "reference_count", Type: "INTEGER", Nullable: false}, + }, + PrimaryKey: []string{"run_event_id", "alert_index"}, + PruneColumn: "as_of", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_alerts_alert_id", Columns: []string{"alert_id"}}, + {Name: "idx_wf_alerts_severity_expires", Columns: []string{"severity", "expires"}}, + {Name: "idx_wf_alerts_as_of", Columns: []string{"as_of"}}, + }, + }, + { + Name: tableAlertReferences, + Columns: []fksinks.PostgresColumn{ + {Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false}, + {Name: "alert_index", Type: "INTEGER", Nullable: false}, + {Name: "reference_index", Type: "INTEGER", Nullable: false}, + {Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false}, + {Name: "id", Type: "TEXT", Nullable: true}, + {Name: "identifier", Type: "TEXT", Nullable: true}, + {Name: "sender", Type: "TEXT", Nullable: true}, + {Name: "sent", Type: "TIMESTAMPTZ", Nullable: true}, + }, + PrimaryKey: []string{"run_event_id", "alert_index", "reference_index"}, + PruneColumn: "as_of", + Indexes: []fksinks.PostgresIndex{ + {Name: "idx_wf_alert_refs_as_of", Columns: []string{"as_of"}}, + {Name: "idx_wf_alert_refs_sent", Columns: []string{"sent"}}, + }, + }, + }, + MapEvent: mapPostgresEvent, + } +} diff --git a/internal/sinks/postgres/schema_test.go b/internal/sinks/postgres/schema_test.go new file mode 100644 index 0000000..719875a --- /dev/null +++ b/internal/sinks/postgres/schema_test.go @@ -0,0 +1,96 @@ +package postgres + +import ( + "fmt" + "strings" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" +) + +func TestRegisterPostgresSchemasNilConfig(t *testing.T) { + err := RegisterPostgresSchemas(nil) + if err == nil { + t.Fatalf("RegisterPostgresSchemas(nil) expected error") + } + if !strings.Contains(err.Error(), "config is nil") { + t.Fatalf("error = %q, want config is nil", err) + } +} + +func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) { + cfg := &config.Config{ + Sinks: []config.SinkConfig{ + {Name: "stdout_only", Driver: "stdout"}, + {Name: "nats_only", Driver: "nats"}, + }, + } + + if err := RegisterPostgresSchemas(cfg); err != nil { + t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err) + } +} + +func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) { + sinkName := uniqueSinkName("pg_test") + cfg := &config.Config{ + Sinks: []config.SinkConfig{ + {Name: sinkName, Driver: "postgres"}, + }, + } + + if err := RegisterPostgresSchemas(cfg); err != nil { + t.Fatalf("first RegisterPostgresSchemas() error = %v", err) + } + + err := RegisterPostgresSchemas(cfg) + if err == nil { + t.Fatalf("second RegisterPostgresSchemas() expected duplicate error") + } + if !strings.Contains(err.Error(), "already registered") { + t.Fatalf("error = %q, want already registered", err) + } +} + +func TestWeatherPostgresSchemaShape(t *testing.T) { + s := weatherPostgresSchema() + if s.MapEvent == nil { + t.Fatalf("weatherPostgresSchema().MapEvent is nil") + } + + wantTables := map[string]bool{ + tableObservations: true, + tableObservationCloudLayers: true, + tableObservationPresentWeather: true, + tableForecasts: true, + tableForecastPeriods: true, + tableAlertRuns: true, + tableAlerts: true, + tableAlertReferences: true, + } + + if len(s.Tables) != len(wantTables) { + t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables)) + } + + seenIndexes := map[string]bool{} + for _, tbl := range s.Tables { + if !wantTables[tbl.Name] { + t.Fatalf("unexpected table %q in schema", tbl.Name) + } + if tbl.PruneColumn == "" { + t.Fatalf("table %q missing prune column", tbl.Name) + } + for _, idx := range tbl.Indexes { + if seenIndexes[idx.Name] { + t.Fatalf("duplicate index name %q", idx.Name) + } + seenIndexes[idx.Name] = true + } + } +} + +func uniqueSinkName(prefix string) string { + return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano()) +}