diff --git a/internal/normalizers/builtins.go b/internal/normalizers/builtins.go index ab77673..193c9f8 100644 --- a/internal/normalizers/builtins.go +++ b/internal/normalizers/builtins.go @@ -9,6 +9,12 @@ import ( "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" ) +var builtinRegistrations = []func([]fknormalize.Normalizer) []fknormalize.Normalizer{ + nws.Register, + openmeteo.Register, + openweather.Register, +} + // RegisterBuiltins registers all normalizers shipped with this binary. // // This mirrors internal/sources.RegisterBuiltins, but note the selection model: @@ -27,9 +33,9 @@ func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer { // // Order here should be stable across releases to reduce surprises when adding // new normalizers. - out = nws.Register(out) - out = openmeteo.Register(out) - out = openweather.Register(out) + for _, register := range builtinRegistrations { + out = register(out) + } return out } diff --git a/internal/normalizers/nws/forecast.go b/internal/normalizers/nws/forecast.go index a029341..f3a7693 100644 --- a/internal/normalizers/nws/forecast.go +++ b/internal/normalizers/nws/forecast.go @@ -75,62 +75,63 @@ func normalizeNarrativeForecastEvent(in event.Event) (*event.Event, error) { ) } +type forecastPeriodMapper[T any] func(idx int, period T) (model.WeatherForecastPeriod, error) + // buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model). func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) { - issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) - if err != nil { - return model.WeatherForecastRun{}, time.Time{}, err - } - - // Best-effort location centroid from the GeoJSON polygon (optional). - lat, lon := centroidLatLon(parsed.Geometry.Coordinates) - - run := newForecastRunBase( - issuedAt, - updatedAt, - model.ForecastProductHourly, - lat, - lon, + return buildForecastRun( + parsed.Properties.GeneratedAt, + parsed.Properties.UpdateTime, + parsed.Geometry.Coordinates, parsed.Properties.Elevation.Value, + model.ForecastProductHourly, + parsed.Properties.Periods, + mapHourlyForecastPeriod, ) - - periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) - for i, p := range parsed.Properties.Periods { - period, err := mapHourlyForecastPeriod(i, p) - if err != nil { - return model.WeatherForecastRun{}, time.Time{}, err - } - periods = append(periods, period) - } - - run.Periods = periods - - // EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly). - return run, issuedAt, nil } // buildNarrativeForecast contains narrative forecast mapping logic (provider -> canonical model). func buildNarrativeForecast(parsed nwsNarrativeForecastResponse) (model.WeatherForecastRun, time.Time, error) { - issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime) + return buildForecastRun( + parsed.Properties.GeneratedAt, + parsed.Properties.UpdateTime, + parsed.Geometry.Coordinates, + parsed.Properties.Elevation.Value, + model.ForecastProductNarrative, + parsed.Properties.Periods, + mapNarrativeForecastPeriod, + ) +} + +func buildForecastRun[T any]( + generatedAt string, + updateTime string, + coordinates [][][]float64, + elevation *float64, + product model.ForecastProduct, + srcPeriods []T, + mapPeriod forecastPeriodMapper[T], +) (model.WeatherForecastRun, time.Time, error) { + issuedAt, updatedAt, err := parseForecastRunTimes(generatedAt, updateTime) if err != nil { return model.WeatherForecastRun{}, time.Time{}, err } // Best-effort location centroid from the GeoJSON polygon (optional). - lat, lon := centroidLatLon(parsed.Geometry.Coordinates) + lat, lon := centroidLatLon(coordinates) run := newForecastRunBase( issuedAt, updatedAt, - model.ForecastProductNarrative, + product, lat, lon, - parsed.Properties.Elevation.Value, + elevation, ) - periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) - for i, p := range parsed.Properties.Periods { - period, err := mapNarrativeForecastPeriod(i, p) + periods := make([]model.WeatherForecastPeriod, 0, len(srcPeriods)) + for i, p := range srcPeriods { + period, err := mapPeriod(i, p) if err != nil { return model.WeatherForecastRun{}, time.Time{}, err } diff --git a/internal/normalizers/nws/forecast_test.go b/internal/normalizers/nws/forecast_test.go index 4e5d71c..ec3dbca 100644 --- a/internal/normalizers/nws/forecast_test.go +++ b/internal/normalizers/nws/forecast_test.go @@ -47,6 +47,55 @@ func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) { assertNoLegacyForecastDescriptionKeys(t, run.Periods[0]) } +func TestBuildHourlyForecastPreservesUpdatedAtCentroidAndElevation(t *testing.T) { + parsed := nwsHourlyForecastResponse{} + parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z" + parsed.Properties.UpdateTime = "2026-03-16T18:30:00Z" + elevation := 123.4 + parsed.Properties.Elevation.Value = &elevation + parsed.Geometry.Coordinates = [][][]float64{ + { + {-90.0, 38.0}, + {-89.0, 38.0}, + {-89.0, 39.0}, + {-90.0, 39.0}, + }, + } + parsed.Properties.Periods = []nwsHourlyForecastPeriod{ + { + StartTime: "2026-03-16T19:00:00Z", + EndTime: "2026-03-16T20:00:00Z", + ShortForecast: "Cloudy", + }, + } + + run, effectiveAt, err := buildHourlyForecast(parsed) + if err != nil { + t.Fatalf("buildHourlyForecast() error = %v", err) + } + + wantIssued := time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC) + wantUpdated := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC) + if !run.IssuedAt.Equal(wantIssued) { + t.Fatalf("IssuedAt = %s, want %s", run.IssuedAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339)) + } + if run.UpdatedAt == nil || !run.UpdatedAt.Equal(wantUpdated) { + t.Fatalf("UpdatedAt = %v, want %s", run.UpdatedAt, wantUpdated.Format(time.RFC3339)) + } + if run.Latitude == nil || math.Abs(*run.Latitude-38.5) > 0.0001 { + t.Fatalf("Latitude = %v, want 38.5", run.Latitude) + } + if run.Longitude == nil || math.Abs(*run.Longitude+89.5) > 0.0001 { + t.Fatalf("Longitude = %v, want -89.5", run.Longitude) + } + if run.ElevationMeters == nil || math.Abs(*run.ElevationMeters-elevation) > 0.0001 { + t.Fatalf("ElevationMeters = %v, want %.1f", run.ElevationMeters, elevation) + } + if !effectiveAt.Equal(wantIssued) { + t.Fatalf("effectiveAt = %s, want %s", effectiveAt.Format(time.RFC3339), wantIssued.Format(time.RFC3339)) + } +} + func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) { _, err := normalizeForecastEventBySchema(event.Event{ Schema: "raw.nws.daily.forecast.v1", @@ -85,6 +134,70 @@ func TestNormalizeForecastEventBySchemaRoutesNarrative(t *testing.T) { } } +func TestNormalizeForecastEventBySchemaProducesCanonicalWeatherForecastSchema(t *testing.T) { + tests := []struct { + name string + schema string + payload map[string]any + }{ + { + name: "hourly", + schema: standards.SchemaRawNWSHourlyForecastV1, + payload: map[string]any{ + "properties": map[string]any{ + "generatedAt": "2026-03-16T18:00:00Z", + "periods": []map[string]any{ + { + "startTime": "2026-03-16T19:00:00Z", + "endTime": "2026-03-16T20:00:00Z", + "shortForecast": "Cloudy", + }, + }, + }, + }, + }, + { + name: "narrative", + schema: standards.SchemaRawNWSNarrativeForecastV1, + payload: map[string]any{ + "properties": map[string]any{ + "generatedAt": "2026-03-16T18:00:00Z", + "periods": []map[string]any{ + { + "startTime": "2026-03-16T19:00:00Z", + "endTime": "2026-03-16T20:00:00Z", + "shortForecast": "Cloudy", + "detailedForecast": "Cloudy", + }, + }, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + out, err := normalizeForecastEventBySchema(event.Event{ + ID: "evt-1", + Kind: event.Kind("forecast"), + Source: "nws-test", + EmittedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC), + Schema: tt.schema, + Payload: tt.payload, + }) + if err != nil { + t.Fatalf("normalizeForecastEventBySchema() error = %v", err) + } + if out == nil { + t.Fatalf("normalizeForecastEventBySchema() returned nil output") + } + if out.Schema != standards.SchemaWeatherForecastV1 { + t.Fatalf("Schema = %q, want %q", out.Schema, standards.SchemaWeatherForecastV1) + } + }) + } +} + func TestBuildNarrativeForecastMapsExpectedFields(t *testing.T) { parsed := nwsNarrativeForecastResponse{} parsed.Properties.GeneratedAt = "2026-03-27T15:17:01Z" diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index 1b4475b..c1ca0bf 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -5,18 +5,13 @@ import ( fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) +var builtins = []fknormalize.Normalizer{ + ObservationNormalizer{}, + ForecastNormalizer{}, + AlertsNormalizer{}, +} + // Register appends NWS normalizers in stable order. func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { - out := in - - // Observations - out = append(out, ObservationNormalizer{}) - - // Forecasts - out = append(out, ForecastNormalizer{}) - - // Alerts - out = append(out, AlertsNormalizer{}) - - return out + return append(in, builtins...) } diff --git a/internal/normalizers/openmeteo/register.go b/internal/normalizers/openmeteo/register.go index 915cab2..0ad012d 100644 --- a/internal/normalizers/openmeteo/register.go +++ b/internal/normalizers/openmeteo/register.go @@ -5,14 +5,12 @@ import ( fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) +var builtins = []fknormalize.Normalizer{ + ObservationNormalizer{}, + ForecastNormalizer{}, +} + // Register appends Open-Meteo normalizers in stable order. func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { - out := in - - // Observations - out = append(out, ObservationNormalizer{}) - // Forecasts - out = append(out, ForecastNormalizer{}) - - return out + return append(in, builtins...) } diff --git a/internal/normalizers/openweather/register.go b/internal/normalizers/openweather/register.go index fe6f87a..d19feed 100644 --- a/internal/normalizers/openweather/register.go +++ b/internal/normalizers/openweather/register.go @@ -5,12 +5,11 @@ import ( fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize" ) +var builtins = []fknormalize.Normalizer{ + ObservationNormalizer{}, +} + // Register appends OpenWeather normalizers in stable order. func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer { - out := in - - // Observations - out = append(out, ObservationNormalizer{}) - - return out + return append(in, builtins...) } diff --git a/internal/sources/builtins.go b/internal/sources/builtins.go index 805916f..35fe9ad 100644 --- a/internal/sources/builtins.go +++ b/internal/sources/builtins.go @@ -9,33 +9,30 @@ import ( fksource "gitea.maximumdirect.net/ejr/feedkit/sources" ) +type pollDriverRegistration struct { + driver string + factory func(config.SourceConfig) (fksource.PollSource, error) +} + +var pollDriverRegistrations = []pollDriverRegistration{ + {driver: "nws_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewObservationSource(cfg) }}, + {driver: "nws_alerts", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewAlertsSource(cfg) }}, + {driver: "nws_forecast_hourly", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewHourlyForecastSource(cfg) }}, + {driver: "nws_forecast_narrative", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return nws.NewNarrativeForecastSource(cfg) }}, + {driver: "openmeteo_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewObservationSource(cfg) }}, + {driver: "openmeteo_forecast", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { return openmeteo.NewForecastSource(cfg) }}, + {driver: "openweather_observation", factory: func(cfg config.SourceConfig) (fksource.PollSource, error) { + return openweather.NewObservationSource(cfg) + }}, +} + // RegisterBuiltins registers the source drivers that ship with this binary. // Keeping this in one place makes main.go very readable. func RegisterBuiltins(r *fksource.Registry) { - // NWS drivers - r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return nws.NewObservationSource(cfg) - }) - r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return nws.NewAlertsSource(cfg) - }) - r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return nws.NewHourlyForecastSource(cfg) - }) - r.RegisterPoll("nws_forecast_narrative", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return nws.NewNarrativeForecastSource(cfg) - }) - - // Open-Meteo drivers - r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return openmeteo.NewObservationSource(cfg) - }) - r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return openmeteo.NewForecastSource(cfg) - }) - - // OpenWeatherMap drivers - r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) { - return openweather.NewObservationSource(cfg) - }) + for _, reg := range pollDriverRegistrations { + reg := reg + r.RegisterPoll(reg.driver, func(cfg config.SourceConfig) (fksource.PollSource, error) { + return reg.factory(cfg) + }) + } } diff --git a/internal/sources/builtins_test.go b/internal/sources/builtins_test.go index d5cce92..0319417 100644 --- a/internal/sources/builtins_test.go +++ b/internal/sources/builtins_test.go @@ -47,13 +47,42 @@ func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) { } } +func TestRegisterBuiltinsRegistersAllCurrentDrivers(t *testing.T) { + reg := fksource.NewRegistry() + RegisterBuiltins(reg) + + drivers := []string{ + "nws_observation", + "nws_alerts", + "nws_forecast_hourly", + "nws_forecast_narrative", + "openmeteo_observation", + "openmeteo_forecast", + "openweather_observation", + } + + for _, driver := range drivers { + in, err := reg.BuildInput(sourceConfigForDriver(driver)) + if err != nil { + t.Fatalf("BuildInput(%s) error = %v", driver, err) + } + if _, ok := in.(fksource.PollSource); !ok { + t.Fatalf("BuildInput(%s) type = %T, want PollSource", driver, in) + } + } +} + func sourceConfigForDriver(driver string) config.SourceConfig { + url := "https://example.invalid" + if driver == "openweather_observation" { + url = "https://example.invalid?units=metric" + } return config.SourceConfig{ Name: "test-source", Driver: driver, Mode: config.SourceModePoll, Params: map[string]any{ - "url": "https://example.invalid", + "url": url, "user_agent": "test-agent", }, } diff --git a/internal/sources/nws/forecast_common.go b/internal/sources/nws/forecast_common.go new file mode 100644 index 0000000..c17372a --- /dev/null +++ b/internal/sources/nws/forecast_common.go @@ -0,0 +1,114 @@ +package nws + +import ( + "context" + "encoding/json" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + fksources "gitea.maximumdirect.net/ejr/feedkit/sources" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" +) + +const nwsForecastAccept = "application/geo+json, application/json" + +type forecastSource struct { + http *fksources.HTTPSource + rawSchema string +} + +type forecastMeta struct { + Properties struct { + GeneratedAt string `json:"generatedAt"` + UpdateTime string `json:"updateTime"` + Updated string `json:"updated"` + } `json:"properties"` + + ParsedGeneratedAt time.Time `json:"-"` + ParsedUpdateTime time.Time `json:"-"` +} + +func newForecastSource(cfg config.SourceConfig, driver, rawSchema string) (*forecastSource, error) { + hs, err := fksources.NewHTTPSource(driver, cfg, nwsForecastAccept) + if err != nil { + return nil, err + } + + return &forecastSource{ + http: hs, + rawSchema: rawSchema, + }, nil +} + +func (s *forecastSource) Name() string { return s.http.Name } + +func (s *forecastSource) Kind() event.Kind { return event.Kind("forecast") } + +func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) { + raw, meta, changed, err := s.fetchRaw(ctx) + if err != nil { + return nil, err + } + if !changed { + return nil, nil + } + + var effectiveAt *time.Time + switch { + case !meta.ParsedGeneratedAt.IsZero(): + t := meta.ParsedGeneratedAt.UTC() + effectiveAt = &t + case !meta.ParsedUpdateTime.IsZero(): + t := meta.ParsedUpdateTime.UTC() + effectiveAt = &t + } + + emittedAt := time.Now().UTC() + eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) + + return fksources.SingleEvent( + s.Kind(), + s.http.Name, + s.rawSchema, + eventID, + emittedAt, + effectiveAt, + raw, + ) +} + +func (s *forecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, bool, error) { + raw, changed, err := s.http.FetchJSONIfChanged(ctx) + if err != nil { + return nil, forecastMeta{}, false, err + } + if !changed { + return nil, forecastMeta{}, false, nil + } + + var meta forecastMeta + if err := json.Unmarshal(raw, &meta); err != nil { + return raw, forecastMeta{}, true, nil + } + + genStr := strings.TrimSpace(meta.Properties.GeneratedAt) + if genStr != "" { + if t, err := nwscommon.ParseTime(genStr); err == nil { + meta.ParsedGeneratedAt = t.UTC() + } + } + + updStr := strings.TrimSpace(meta.Properties.UpdateTime) + if updStr == "" { + updStr = strings.TrimSpace(meta.Properties.Updated) + } + if updStr != "" { + if t, err := nwscommon.ParseTime(updStr); err == nil { + meta.ParsedUpdateTime = t.UTC() + } + } + + return raw, meta, true, nil +} diff --git a/internal/sources/nws/forecast_hourly.go b/internal/sources/nws/forecast_hourly.go index ccf4feb..3cf5e14 100644 --- a/internal/sources/nws/forecast_hourly.go +++ b/internal/sources/nws/forecast_hourly.go @@ -2,15 +2,7 @@ package nws import ( - "context" - "encoding/json" - "strings" - "time" - "gitea.maximumdirect.net/ejr/feedkit/config" - "gitea.maximumdirect.net/ejr/feedkit/event" - fksources "gitea.maximumdirect.net/ejr/feedkit/sources" - nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -22,114 +14,15 @@ import ( // Output schema (current implementation): // - standards.SchemaRawNWSHourlyForecastV1 type HourlyForecastSource struct { - http *fksources.HTTPSource + *forecastSource } func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) { const driver = "nws_forecast_hourly" - - // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). - hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSHourlyForecastV1) if err != nil { return nil, err } - return &HourlyForecastSource{http: hs}, nil -} - -func (s *HourlyForecastSource) Name() string { return s.http.Name } - -// Kind is used for routing/policy. -func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") } - -func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, changed, err := s.fetchRaw(ctx) - if err != nil { - return nil, err - } - if !changed { - return nil, nil - } - - // EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time. - // NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated. - var effectiveAt *time.Time - switch { - case !meta.ParsedGeneratedAt.IsZero(): - t := meta.ParsedGeneratedAt.UTC() - effectiveAt = &t - case !meta.ParsedUpdateTime.IsZero(): - t := meta.ParsedUpdateTime.UTC() - effectiveAt = &t - } - - emittedAt := time.Now().UTC() - - // NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL. - // That is *not* unique per issued run, so we intentionally do not use it for Event.ID. - // Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback). - eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - - return fksources.SingleEvent( - s.Kind(), - s.http.Name, - standards.SchemaRawNWSHourlyForecastV1, - eventID, - emittedAt, - effectiveAt, - raw, - ) -} - -// ---- RAW fetch + minimal metadata decode ---- - -type hourlyForecastMeta struct { - // Present for GeoJSON Feature responses, but often stable (endpoint URL). - ID string `json:"id"` - - Properties struct { - GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time - UpdateTime string `json:"updateTime"` // last update time of underlying data - Updated string `json:"updated"` // deprecated alias for updateTime - } `json:"properties"` - - ParsedGeneratedAt time.Time `json:"-"` - ParsedUpdateTime time.Time `json:"-"` -} - -func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, bool, error) { - raw, changed, err := s.http.FetchJSONIfChanged(ctx) - if err != nil { - return nil, hourlyForecastMeta{}, false, err - } - if !changed { - return nil, hourlyForecastMeta{}, false, nil - } - - var meta hourlyForecastMeta - if err := json.Unmarshal(raw, &meta); err != nil { - // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, hourlyForecastMeta{}, true, nil - } - - // generatedAt (preferred) - genStr := strings.TrimSpace(meta.Properties.GeneratedAt) - if genStr != "" { - if t, err := nwscommon.ParseTime(genStr); err == nil { - meta.ParsedGeneratedAt = t.UTC() - } - } - - // updateTime, with fallback to deprecated "updated" - updStr := strings.TrimSpace(meta.Properties.UpdateTime) - if updStr == "" { - updStr = strings.TrimSpace(meta.Properties.Updated) - } - if updStr != "" { - if t, err := nwscommon.ParseTime(updStr); err == nil { - meta.ParsedUpdateTime = t.UTC() - } - } - - return raw, meta, true, nil + return &HourlyForecastSource{forecastSource: src}, nil } diff --git a/internal/sources/nws/forecast_narrative.go b/internal/sources/nws/forecast_narrative.go index 16b70ec..ab02560 100644 --- a/internal/sources/nws/forecast_narrative.go +++ b/internal/sources/nws/forecast_narrative.go @@ -2,15 +2,7 @@ package nws import ( - "context" - "encoding/json" - "strings" - "time" - "gitea.maximumdirect.net/ejr/feedkit/config" - "gitea.maximumdirect.net/ejr/feedkit/event" - fksources "gitea.maximumdirect.net/ejr/feedkit/sources" - nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/standards" ) @@ -22,114 +14,15 @@ import ( // Output schema: // - standards.SchemaRawNWSNarrativeForecastV1 type NarrativeForecastSource struct { - http *fksources.HTTPSource + *forecastSource } func NewNarrativeForecastSource(cfg config.SourceConfig) (*NarrativeForecastSource, error) { const driver = "nws_forecast_narrative" - - // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). - hs, err := fksources.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + src, err := newForecastSource(cfg, driver, standards.SchemaRawNWSNarrativeForecastV1) if err != nil { return nil, err } - return &NarrativeForecastSource{http: hs}, nil -} - -func (s *NarrativeForecastSource) Name() string { return s.http.Name } - -// Kind is used for routing/policy. -func (s *NarrativeForecastSource) Kind() event.Kind { return event.Kind("forecast") } - -func (s *NarrativeForecastSource) Poll(ctx context.Context) ([]event.Event, error) { - raw, meta, changed, err := s.fetchRaw(ctx) - if err != nil { - return nil, err - } - if !changed { - return nil, nil - } - - // EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time. - // NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated. - var effectiveAt *time.Time - switch { - case !meta.ParsedGeneratedAt.IsZero(): - t := meta.ParsedGeneratedAt.UTC() - effectiveAt = &t - case !meta.ParsedUpdateTime.IsZero(): - t := meta.ParsedUpdateTime.UTC() - effectiveAt = &t - } - - emittedAt := time.Now().UTC() - - // NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL. - // That is *not* unique per issued run, so we intentionally do not use it for Event.ID. - // Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback). - eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) - - return fksources.SingleEvent( - s.Kind(), - s.http.Name, - standards.SchemaRawNWSNarrativeForecastV1, - eventID, - emittedAt, - effectiveAt, - raw, - ) -} - -// ---- RAW fetch + minimal metadata decode ---- - -type narrativeForecastMeta struct { - // Present for GeoJSON Feature responses, but often stable (endpoint URL). - ID string `json:"id"` - - Properties struct { - GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time - UpdateTime string `json:"updateTime"` // last update time of underlying data - Updated string `json:"updated"` // deprecated alias for updateTime - } `json:"properties"` - - ParsedGeneratedAt time.Time `json:"-"` - ParsedUpdateTime time.Time `json:"-"` -} - -func (s *NarrativeForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, narrativeForecastMeta, bool, error) { - raw, changed, err := s.http.FetchJSONIfChanged(ctx) - if err != nil { - return nil, narrativeForecastMeta{}, false, err - } - if !changed { - return nil, narrativeForecastMeta{}, false, nil - } - - var meta narrativeForecastMeta - if err := json.Unmarshal(raw, &meta); err != nil { - // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. - return raw, narrativeForecastMeta{}, true, nil - } - - // generatedAt (preferred) - genStr := strings.TrimSpace(meta.Properties.GeneratedAt) - if genStr != "" { - if t, err := nwscommon.ParseTime(genStr); err == nil { - meta.ParsedGeneratedAt = t.UTC() - } - } - - // updateTime, with fallback to deprecated "updated" - updStr := strings.TrimSpace(meta.Properties.UpdateTime) - if updStr == "" { - updStr = strings.TrimSpace(meta.Properties.Updated) - } - if updStr != "" { - if t, err := nwscommon.ParseTime(updStr); err == nil { - meta.ParsedUpdateTime = t.UTC() - } - } - - return raw, meta, true, nil + return &NarrativeForecastSource{forecastSource: src}, nil } diff --git a/internal/sources/nws/forecast_test.go b/internal/sources/nws/forecast_test.go new file mode 100644 index 0000000..0dfc8bc --- /dev/null +++ b/internal/sources/nws/forecast_test.go @@ -0,0 +1,184 @@ +package nws + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/standards" +) + +type forecastPoller interface { + Poll(ctx context.Context) ([]event.Event, error) +} + +func TestForecastSourcesEmitExpectedSchemaAndPreferGeneratedAt(t *testing.T) { + tests := []struct { + name string + driver string + wantSchema string + newSource func(config.SourceConfig) (forecastPoller, error) + }{ + { + name: "hourly", + driver: "nws_forecast_hourly", + wantSchema: standards.SchemaRawNWSHourlyForecastV1, + newSource: func(cfg config.SourceConfig) (forecastPoller, error) { + return NewHourlyForecastSource(cfg) + }, + }, + { + name: "narrative", + driver: "nws_forecast_narrative", + wantSchema: standards.SchemaRawNWSNarrativeForecastV1, + newSource: func(cfg config.SourceConfig) (forecastPoller, error) { + return NewNarrativeForecastSource(cfg) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"properties":{"generatedAt":"2026-03-28T12:00:00Z","updateTime":"2026-03-28T11:00:00Z"}}`)) + })) + defer srv.Close() + + src, err := tt.newSource(forecastSourceConfig(tt.driver, srv.URL)) + if err != nil { + t.Fatalf("newSource() error = %v", err) + } + + got, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("Poll() error = %v", err) + } + if len(got) != 1 { + t.Fatalf("Poll() len = %d, want 1", len(got)) + } + if got[0].Schema != tt.wantSchema { + t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, tt.wantSchema) + } + if got[0].Kind != event.Kind("forecast") { + t.Fatalf("Poll() kind = %q, want forecast", got[0].Kind) + } + + wantEffectiveAt := time.Date(2026, 3, 28, 12, 0, 0, 0, time.UTC) + if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(wantEffectiveAt) { + t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, wantEffectiveAt) + } + }) + } +} + +func TestForecastSourcePollEffectiveAtFallbackOrder(t *testing.T) { + tests := []struct { + name string + body string + wantEffectiveAt *time.Time + }{ + { + name: "updateTime fallback", + body: `{"properties":{"updateTime":"2026-03-28T11:00:00Z"}}`, + wantEffectiveAt: func() *time.Time { + t := time.Date(2026, 3, 28, 11, 0, 0, 0, time.UTC) + return &t + }(), + }, + { + name: "updated fallback", + body: `{"properties":{"updated":"2026-03-28T10:00:00Z"}}`, + wantEffectiveAt: func() *time.Time { + t := time.Date(2026, 3, 28, 10, 0, 0, 0, time.UTC) + return &t + }(), + }, + { + name: "omitted when metadata lacks timestamps", + body: `{"properties":{}}`, + wantEffectiveAt: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(tt.body)) + })) + defer srv.Close() + + src, err := NewHourlyForecastSource(forecastSourceConfig("nws_forecast_hourly", srv.URL)) + if err != nil { + t.Fatalf("NewHourlyForecastSource() error = %v", err) + } + + got, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("Poll() error = %v", err) + } + if len(got) != 1 { + t.Fatalf("Poll() len = %d, want 1", len(got)) + } + if tt.wantEffectiveAt == nil { + if got[0].EffectiveAt != nil { + t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt) + } + return + } + if got[0].EffectiveAt == nil || !got[0].EffectiveAt.Equal(*tt.wantEffectiveAt) { + t.Fatalf("Poll() effectiveAt = %v, want %s", got[0].EffectiveAt, *tt.wantEffectiveAt) + } + }) + } +} + +func TestForecastSourcePollMetadataDecodeFailureStillEmitsRawEvent(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`not-json`)) + })) + defer srv.Close() + + src, err := NewNarrativeForecastSource(forecastSourceConfig("nws_forecast_narrative", srv.URL)) + if err != nil { + t.Fatalf("NewNarrativeForecastSource() error = %v", err) + } + + got, err := src.Poll(context.Background()) + if err != nil { + t.Fatalf("Poll() error = %v", err) + } + if len(got) != 1 { + t.Fatalf("Poll() len = %d, want 1", len(got)) + } + if got[0].EffectiveAt != nil { + t.Fatalf("Poll() effectiveAt = %v, want nil", got[0].EffectiveAt) + } + if got[0].Schema != standards.SchemaRawNWSNarrativeForecastV1 { + t.Fatalf("Poll() schema = %q, want %q", got[0].Schema, standards.SchemaRawNWSNarrativeForecastV1) + } + + raw, ok := got[0].Payload.(json.RawMessage) + if !ok { + t.Fatalf("Poll() payload type = %T, want json.RawMessage", got[0].Payload) + } + if string(raw) != "not-json" { + t.Fatalf("Poll() payload = %q, want %q", string(raw), "not-json") + } +} + +func forecastSourceConfig(driver, url string) config.SourceConfig { + return config.SourceConfig{ + Name: "test-forecast-source", + Driver: driver, + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": url, + "user_agent": "test-agent", + }, + } +}