From dbaebbbd7a2f5ba0536d44b3ddabd00ac7264253 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Fri, 27 Mar 2026 12:58:23 -0500 Subject: [PATCH] Updates to the nws forecast source and normalizer to separate code specific to hourly forecasts and prepare for upcoming feature addition of daily and narrative forecasts --- cmd/weatherfeeder/config.yml | 2 +- internal/normalizers/nws/forecast.go | 232 +++++++++++------- internal/normalizers/nws/forecast_test.go | 39 ++- internal/normalizers/nws/types.go | 13 +- internal/sources/builtins.go | 4 +- internal/sources/builtins_test.go | 47 ++++ .../nws/{forecast.go => forecast_hourly.go} | 28 +-- 7 files changed, 245 insertions(+), 120 deletions(-) create mode 100644 internal/sources/builtins_test.go rename internal/sources/nws/{forecast.go => forecast_hourly.go} (79%) diff --git a/cmd/weatherfeeder/config.yml b/cmd/weatherfeeder/config.yml index 715ef48..41f6900 100644 --- a/cmd/weatherfeeder/config.yml +++ b/cmd/weatherfeeder/config.yml @@ -48,7 +48,7 @@ sources: - name: NWSHourlyForecastSTL mode: poll kinds: ["forecast"] - driver: nws_forecast + driver: nws_forecast_hourly every: 45m params: url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly" diff --git a/internal/normalizers/nws/forecast.go b/internal/normalizers/nws/forecast.go index b0dec68..fb892e6 100644 --- a/internal/normalizers/nws/forecast.go +++ b/internal/normalizers/nws/forecast.go @@ -18,8 +18,8 @@ import ( // // standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1 // -// It interprets NWS GeoJSON gridpoint *hourly* forecast responses and maps them into -// the canonical model.WeatherForecastRun representation. +// It keeps one NWS forecast normalization entrypoint and dispatches to product-specific +// builders by raw schema. Today only hourly is implemented. // // Caveats / policy: // 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode @@ -29,118 +29,63 @@ import ( type ForecastNormalizer struct{} func (ForecastNormalizer) Match(e event.Event) bool { - s := strings.TrimSpace(e.Schema) - return s == standards.SchemaRawNWSHourlyForecastV1 + switch strings.TrimSpace(e.Schema) { + case standards.SchemaRawNWSHourlyForecastV1: + return true + default: + return false + } } func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps + return normalizeForecastEventBySchema(in) +} + +func normalizeForecastEventBySchema(in event.Event) (*event.Event, error) { + switch strings.TrimSpace(in.Schema) { + case standards.SchemaRawNWSHourlyForecastV1: + return normalizeHourlyForecastEvent(in) + default: + return nil, fmt.Errorf("unsupported nws forecast schema %q", strings.TrimSpace(in.Schema)) + } +} + +func normalizeHourlyForecastEvent(in event.Event) (*event.Event, error) { return normcommon.NormalizeJSON( in, "nws hourly forecast", standards.SchemaWeatherForecastV1, - buildForecast, + buildHourlyForecast, ) } -// buildForecast contains the domain mapping logic (provider -> canonical model). -func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.Time, error) { - // IssuedAt is required by the canonical model. - issuedStr := strings.TrimSpace(parsed.Properties.GeneratedAt) - if issuedStr == "" { - return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing properties.generatedAt") - } - issuedAt, err := nwscommon.ParseTime(issuedStr) +// buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model). +func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) { + issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) if err != nil { - return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err) - } - issuedAt = issuedAt.UTC() - - // UpdatedAt is optional. - var updatedAt *time.Time - if s := strings.TrimSpace(parsed.Properties.UpdateTime); s != "" { - if t, err := nwscommon.ParseTime(s); err == nil { - tt := t.UTC() - updatedAt = &tt - } + return model.WeatherForecastRun{}, time.Time{}, err } // Best-effort location centroid from the GeoJSON polygon (optional). lat, lon := centroidLatLon(parsed.Geometry.Coordinates) - // Schema is explicitly hourly, so product is not a heuristic. - run := model.WeatherForecastRun{ - LocationID: "", - LocationName: "", - - IssuedAt: issuedAt, - UpdatedAt: updatedAt, - Product: model.ForecastProductHourly, - - Latitude: lat, - Longitude: lon, - ElevationMeters: parsed.Properties.Elevation.Value, - - Periods: nil, - } + run := newForecastRunBase( + issuedAt, + updatedAt, + model.ForecastProductHourly, + lat, + lon, + parsed.Properties.Elevation.Value, + ) periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) for i, p := range parsed.Properties.Periods { - startStr := strings.TrimSpace(p.StartTime) - endStr := strings.TrimSpace(p.EndTime) - - if startStr == "" || endStr == "" { - return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", i) - } - - start, err := nwscommon.ParseTime(startStr) + period, err := mapHourlyForecastPeriod(i, p) if err != nil { - return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", i, startStr, err) + return model.WeatherForecastRun{}, time.Time{}, err } - end, err := nwscommon.ParseTime(endStr) - if err != nil { - return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", i, endStr, err) - } - start = start.UTC() - end = end.UTC() - - // NWS hourly supplies isDaytime; make it a pointer to match the canonical model. - var isDay *bool - if p.IsDaytime != nil { - b := *p.IsDaytime - isDay = &b - } - - tempC := tempCFromNWS(p.Temperature, p.TemperatureUnit) - - // Infer WMO from shortForecast (and fall back to icon token). - providerDesc := strings.TrimSpace(p.ShortForecast) - wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC) - - period := model.WeatherForecastPeriod{ - StartTime: start, - EndTime: end, - - Name: strings.TrimSpace(p.Name), - IsDay: isDay, - - ConditionCode: wmo, - - // For forecasts, keep provider short forecast text as the human-facing description. - TextDescription: providerDesc, - - TemperatureC: tempC, - - DewpointC: p.Dewpoint.Value, - RelativeHumidityPercent: p.RelativeHumidity.Value, - - WindDirectionDegrees: parseNWSWindDirectionDegrees(p.WindDirection), - WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed), - - ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value, - } - periods = append(periods, period) } @@ -149,3 +94,108 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T // EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly). return run, issuedAt, nil } + +func parseForecastRunTimes(generatedAt, updateTime string) (time.Time, *time.Time, error) { + issuedStr := strings.TrimSpace(generatedAt) + if issuedStr == "" { + return time.Time{}, nil, fmt.Errorf("missing properties.generatedAt") + } + issuedAt, err := nwscommon.ParseTime(issuedStr) + if err != nil { + return time.Time{}, nil, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err) + } + issuedAt = issuedAt.UTC() + + var updatedAt *time.Time + if s := strings.TrimSpace(updateTime); s != "" { + if t, err := nwscommon.ParseTime(s); err == nil { + tt := t.UTC() + updatedAt = &tt + } + } + + return issuedAt, updatedAt, nil +} + +func newForecastRunBase( + issuedAt time.Time, + updatedAt *time.Time, + product model.ForecastProduct, + lat, lon, elevation *float64, +) model.WeatherForecastRun { + return model.WeatherForecastRun{ + LocationID: "", + LocationName: "", + IssuedAt: issuedAt, + UpdatedAt: updatedAt, + Product: product, + Latitude: lat, + Longitude: lon, + + ElevationMeters: elevation, + Periods: nil, + } +} + +func parseForecastPeriodWindow(startStr, endStr string, idx int) (time.Time, time.Time, error) { + startStr = strings.TrimSpace(startStr) + endStr = strings.TrimSpace(endStr) + + if startStr == "" || endStr == "" { + return time.Time{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", idx) + } + + start, err := nwscommon.ParseTime(startStr) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", idx, startStr, err) + } + end, err := nwscommon.ParseTime(endStr) + if err != nil { + return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", idx, endStr, err) + } + + return start.UTC(), end.UTC(), nil +} + +func mapHourlyForecastPeriod(idx int, p nwsHourlyForecastPeriod) (model.WeatherForecastPeriod, error) { + start, end, err := parseForecastPeriodWindow(p.StartTime, p.EndTime, idx) + if err != nil { + return model.WeatherForecastPeriod{}, err + } + + // NWS hourly supplies isDaytime; make it a pointer to match the canonical model. + var isDay *bool + if p.IsDaytime != nil { + b := *p.IsDaytime + isDay = &b + } + + tempC := tempCFromNWS(p.Temperature, p.TemperatureUnit) + + // Infer WMO from shortForecast (and fall back to icon token). + providerDesc := strings.TrimSpace(p.ShortForecast) + wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC) + + return model.WeatherForecastPeriod{ + StartTime: start, + EndTime: end, + + Name: strings.TrimSpace(p.Name), + IsDay: isDay, + + ConditionCode: wmo, + + // For forecasts, keep provider short forecast text as the human-facing description. + TextDescription: providerDesc, + + TemperatureC: tempC, + + DewpointC: p.Dewpoint.Value, + RelativeHumidityPercent: p.RelativeHumidity.Value, + + WindDirectionDegrees: parseNWSWindDirectionDegrees(p.WindDirection), + WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed), + + ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value, + }, nil +} diff --git a/internal/normalizers/nws/forecast_test.go b/internal/normalizers/nws/forecast_test.go index b3c90fe..5210163 100644 --- a/internal/normalizers/nws/forecast_test.go +++ b/internal/normalizers/nws/forecast_test.go @@ -2,14 +2,18 @@ package nws import ( "encoding/json" + "strings" "testing" "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) -func TestBuildForecastUsesShortForecastAsTextDescription(t *testing.T) { - parsed := nwsForecastResponse{} +func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) { + parsed := nwsHourlyForecastResponse{} parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z" - parsed.Properties.Periods = []nwsForecastPeriod{ + parsed.Properties.Periods = []nwsHourlyForecastPeriod{ { StartTime: "2026-03-16T19:00:00Z", EndTime: "2026-03-16T20:00:00Z", @@ -19,9 +23,9 @@ func TestBuildForecastUsesShortForecastAsTextDescription(t *testing.T) { }, } - run, effectiveAt, err := buildForecast(parsed) + run, effectiveAt, err := buildHourlyForecast(parsed) if err != nil { - t.Fatalf("buildForecast() error = %v", err) + t.Fatalf("buildHourlyForecast() error = %v", err) } if len(run.Periods) != 1 { t.Fatalf("periods len = %d, want 1", len(run.Periods)) @@ -41,6 +45,31 @@ func TestBuildForecastUsesShortForecastAsTextDescription(t *testing.T) { assertNoLegacyForecastDescriptionKeys(t, run.Periods[0]) } +func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) { + _, err := normalizeForecastEventBySchema(event.Event{ + Schema: "raw.nws.daily.forecast.v1", + }) + if err == nil { + t.Fatalf("normalizeForecastEventBySchema() expected unsupported schema error") + } + if !strings.Contains(err.Error(), "unsupported nws forecast schema") { + t.Fatalf("error = %q, want unsupported schema context", err) + } +} + +func TestNormalizeForecastEventBySchemaRoutesHourly(t *testing.T) { + _, err := normalizeForecastEventBySchema(event.Event{ + Schema: standards.SchemaRawNWSHourlyForecastV1, + Payload: map[string]any{"properties": map[string]any{}}, + }) + if err == nil { + t.Fatalf("normalizeForecastEventBySchema() expected build error for missing generatedAt") + } + if !strings.Contains(err.Error(), "missing properties.generatedAt") { + t.Fatalf("error = %q, want missing properties.generatedAt", err) + } +} + func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) { t.Helper() diff --git a/internal/normalizers/nws/types.go b/internal/normalizers/nws/types.go index 8719642..e4c3a30 100644 --- a/internal/normalizers/nws/types.go +++ b/internal/normalizers/nws/types.go @@ -98,12 +98,11 @@ type nwsCloudLayer struct { Amount string `json:"amount"` } -// nwsForecastResponse is a minimal-but-sufficient representation of the NWS -// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun. +// nwsHourlyForecastResponse is a minimal-but-sufficient representation of the NWS +// gridpoint hourly forecast GeoJSON payload needed for mapping into model.WeatherForecastRun. // -// This is currently designed to support the hourly forecast endpoint; revisions may be needed -// to accommodate other forecast endpoints in the future. -type nwsForecastResponse struct { +// Daily and narrative variants should be added as distinct structs in follow-up work. +type nwsHourlyForecastResponse struct { Geometry struct { Type string `json:"type"` Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat] @@ -122,11 +121,11 @@ type nwsForecastResponse struct { Value *float64 `json:"value"` } `json:"elevation"` - Periods []nwsForecastPeriod `json:"periods"` + Periods []nwsHourlyForecastPeriod `json:"periods"` } `json:"properties"` } -type nwsForecastPeriod struct { +type nwsHourlyForecastPeriod struct { Number int `json:"number"` Name string `json:"name"` StartTime string `json:"startTime"` diff --git a/internal/sources/builtins.go b/internal/sources/builtins.go index 22dd475..0011a87 100644 --- a/internal/sources/builtins.go +++ b/internal/sources/builtins.go @@ -19,8 +19,8 @@ func RegisterBuiltins(r *fksource.Registry) { r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }) - r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return nws.NewForecastSource(cfg) + r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) { + return nws.NewHourlyForecastSource(cfg) }) // Open-Meteo drivers diff --git a/internal/sources/builtins_test.go b/internal/sources/builtins_test.go new file mode 100644 index 0000000..9ef013d --- /dev/null +++ b/internal/sources/builtins_test.go @@ -0,0 +1,47 @@ +package sources + +import ( + "strings" + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" + fksource "gitea.maximumdirect.net/ejr/feedkit/sources" +) + +func TestRegisterBuiltinsRegistersNWSHourlyForecastDriver(t *testing.T) { + reg := fksource.NewRegistry() + RegisterBuiltins(reg) + + in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_hourly")) + if err != nil { + t.Fatalf("BuildInput(nws_forecast_hourly) error = %v", err) + } + if _, ok := in.(fksource.PollSource); !ok { + t.Fatalf("BuildInput(nws_forecast_hourly) type = %T, want PollSource", in) + } +} + +func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) { + reg := fksource.NewRegistry() + RegisterBuiltins(reg) + + _, err := reg.BuildInput(sourceConfigForDriver("nws_forecast")) + if err == nil { + t.Fatalf("BuildInput(nws_forecast) expected unknown driver error") + } + if !strings.Contains(err.Error(), `unknown source driver: "nws_forecast"`) { + t.Fatalf("error = %q, want unknown source driver for nws_forecast", err) + } +} + +func sourceConfigForDriver(driver string) config.SourceConfig { + return config.SourceConfig{ + Name: "test-source", + Driver: driver, + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": "https://example.invalid", + "user_agent": "test-agent", + }, + } +} diff --git a/internal/sources/nws/forecast.go b/internal/sources/nws/forecast_hourly.go similarity index 79% rename from internal/sources/nws/forecast.go rename to internal/sources/nws/forecast_hourly.go index bba6f12..886dab6 100644 --- a/internal/sources/nws/forecast.go +++ b/internal/sources/nws/forecast_hourly.go @@ -1,4 +1,4 @@ -// FILE: internal/sources/nws/forecast.go +// FILE: internal/sources/nws/forecast_hourly.go package nws import ( @@ -14,19 +14,19 @@ import ( "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) -// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event. +// HourlyForecastSource polls an NWS hourly forecast endpoint and emits a RAW forecast Event. // // It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes // minimal metadata for Event.EffectiveAt and Event.ID. // // Output schema (current implementation): // - standards.SchemaRawNWSHourlyForecastV1 -type ForecastSource struct { +type HourlyForecastSource struct { http *common.HTTPSource } -func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { - const driver = "nws_forecast" +func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) { + const driver = "nws_forecast_hourly" // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") @@ -34,15 +34,15 @@ func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { return nil, err } - return &ForecastSource{http: hs}, nil + return &HourlyForecastSource{http: hs}, nil } -func (s *ForecastSource) Name() string { return s.http.Name } +func (s *HourlyForecastSource) Name() string { return s.http.Name } // Kind is used for routing/policy. -func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } +func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") } -func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { +func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err @@ -80,7 +80,7 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { // ---- RAW fetch + minimal metadata decode ---- -type forecastMeta struct { +type hourlyForecastMeta struct { // Present for GeoJSON Feature responses, but often stable (endpoint URL). ID string `json:"id"` @@ -94,16 +94,16 @@ type forecastMeta struct { ParsedUpdateTime time.Time `json:"-"` } -func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) { +func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, error) { raw, err := s.http.FetchJSON(ctx) if err != nil { - return nil, forecastMeta{}, err + return nil, hourlyForecastMeta{}, err } - var meta forecastMeta + var meta hourlyForecastMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, forecastMeta{}, nil + return raw, hourlyForecastMeta{}, nil } // generatedAt (preferred)