Updates to the nws forecast source and normalizer to separate code specific to hourly forecasts and prepare for upcoming feature addition of daily and narrative forecasts
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful

This commit is contained in:
2026-03-27 12:58:23 -05:00
parent 88d5727a84
commit dbaebbbd7a
7 changed files with 245 additions and 120 deletions

View File

@@ -48,7 +48,7 @@ sources:
- name: NWSHourlyForecastSTL - name: NWSHourlyForecastSTL
mode: poll mode: poll
kinds: ["forecast"] kinds: ["forecast"]
driver: nws_forecast driver: nws_forecast_hourly
every: 45m every: 45m
params: params:
url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly" url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly"

View File

@@ -18,8 +18,8 @@ import (
// //
// standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1 // standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1
// //
// It interprets NWS GeoJSON gridpoint *hourly* forecast responses and maps them into // It keeps one NWS forecast normalization entrypoint and dispatches to product-specific
// the canonical model.WeatherForecastRun representation. // builders by raw schema. Today only hourly is implemented.
// //
// Caveats / policy: // Caveats / policy:
// 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode // 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode
@@ -29,81 +29,139 @@ import (
type ForecastNormalizer struct{} type ForecastNormalizer struct{}
func (ForecastNormalizer) Match(e event.Event) bool { func (ForecastNormalizer) Match(e event.Event) bool {
s := strings.TrimSpace(e.Schema) switch strings.TrimSpace(e.Schema) {
return s == standards.SchemaRawNWSHourlyForecastV1 case standards.SchemaRawNWSHourlyForecastV1:
return true
default:
return false
}
} }
func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
return normalizeForecastEventBySchema(in)
}
func normalizeForecastEventBySchema(in event.Event) (*event.Event, error) {
switch strings.TrimSpace(in.Schema) {
case standards.SchemaRawNWSHourlyForecastV1:
return normalizeHourlyForecastEvent(in)
default:
return nil, fmt.Errorf("unsupported nws forecast schema %q", strings.TrimSpace(in.Schema))
}
}
func normalizeHourlyForecastEvent(in event.Event) (*event.Event, error) {
return normcommon.NormalizeJSON( return normcommon.NormalizeJSON(
in, in,
"nws hourly forecast", "nws hourly forecast",
standards.SchemaWeatherForecastV1, standards.SchemaWeatherForecastV1,
buildForecast, buildHourlyForecast,
) )
} }
// buildForecast contains the domain mapping logic (provider -> canonical model). // buildHourlyForecast contains hourly forecast mapping logic (provider -> canonical model).
func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.Time, error) { func buildHourlyForecast(parsed nwsHourlyForecastResponse) (model.WeatherForecastRun, time.Time, error) {
// IssuedAt is required by the canonical model. issuedAt, updatedAt, err := parseForecastRunTimes(parsed.Properties.GeneratedAt, parsed.Properties.UpdateTime)
issuedStr := strings.TrimSpace(parsed.Properties.GeneratedAt) if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
// Best-effort location centroid from the GeoJSON polygon (optional).
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
run := newForecastRunBase(
issuedAt,
updatedAt,
model.ForecastProductHourly,
lat,
lon,
parsed.Properties.Elevation.Value,
)
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods))
for i, p := range parsed.Properties.Periods {
period, err := mapHourlyForecastPeriod(i, p)
if err != nil {
return model.WeatherForecastRun{}, time.Time{}, err
}
periods = append(periods, period)
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
}
func parseForecastRunTimes(generatedAt, updateTime string) (time.Time, *time.Time, error) {
issuedStr := strings.TrimSpace(generatedAt)
if issuedStr == "" { if issuedStr == "" {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing properties.generatedAt") return time.Time{}, nil, fmt.Errorf("missing properties.generatedAt")
} }
issuedAt, err := nwscommon.ParseTime(issuedStr) issuedAt, err := nwscommon.ParseTime(issuedStr)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err) return time.Time{}, nil, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err)
} }
issuedAt = issuedAt.UTC() issuedAt = issuedAt.UTC()
// UpdatedAt is optional.
var updatedAt *time.Time var updatedAt *time.Time
if s := strings.TrimSpace(parsed.Properties.UpdateTime); s != "" { if s := strings.TrimSpace(updateTime); s != "" {
if t, err := nwscommon.ParseTime(s); err == nil { if t, err := nwscommon.ParseTime(s); err == nil {
tt := t.UTC() tt := t.UTC()
updatedAt = &tt updatedAt = &tt
} }
} }
// Best-effort location centroid from the GeoJSON polygon (optional). return issuedAt, updatedAt, nil
lat, lon := centroidLatLon(parsed.Geometry.Coordinates)
// Schema is explicitly hourly, so product is not a heuristic.
run := model.WeatherForecastRun{
LocationID: "",
LocationName: "",
IssuedAt: issuedAt,
UpdatedAt: updatedAt,
Product: model.ForecastProductHourly,
Latitude: lat,
Longitude: lon,
ElevationMeters: parsed.Properties.Elevation.Value,
Periods: nil,
} }
periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) func newForecastRunBase(
for i, p := range parsed.Properties.Periods { issuedAt time.Time,
startStr := strings.TrimSpace(p.StartTime) updatedAt *time.Time,
endStr := strings.TrimSpace(p.EndTime) product model.ForecastProduct,
lat, lon, elevation *float64,
) model.WeatherForecastRun {
return model.WeatherForecastRun{
LocationID: "",
LocationName: "",
IssuedAt: issuedAt,
UpdatedAt: updatedAt,
Product: product,
Latitude: lat,
Longitude: lon,
ElevationMeters: elevation,
Periods: nil,
}
}
func parseForecastPeriodWindow(startStr, endStr string, idx int) (time.Time, time.Time, error) {
startStr = strings.TrimSpace(startStr)
endStr = strings.TrimSpace(endStr)
if startStr == "" || endStr == "" { if startStr == "" || endStr == "" {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", i) return time.Time{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", idx)
} }
start, err := nwscommon.ParseTime(startStr) start, err := nwscommon.ParseTime(startStr)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", i, startStr, err) return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", idx, startStr, err)
} }
end, err := nwscommon.ParseTime(endStr) end, err := nwscommon.ParseTime(endStr)
if err != nil { if err != nil {
return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", i, endStr, err) return time.Time{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", idx, endStr, err)
}
return start.UTC(), end.UTC(), nil
}
func mapHourlyForecastPeriod(idx int, p nwsHourlyForecastPeriod) (model.WeatherForecastPeriod, error) {
start, end, err := parseForecastPeriodWindow(p.StartTime, p.EndTime, idx)
if err != nil {
return model.WeatherForecastPeriod{}, err
} }
start = start.UTC()
end = end.UTC()
// NWS hourly supplies isDaytime; make it a pointer to match the canonical model. // NWS hourly supplies isDaytime; make it a pointer to match the canonical model.
var isDay *bool var isDay *bool
@@ -118,7 +176,7 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
providerDesc := strings.TrimSpace(p.ShortForecast) providerDesc := strings.TrimSpace(p.ShortForecast)
wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC) wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC)
period := model.WeatherForecastPeriod{ return model.WeatherForecastPeriod{
StartTime: start, StartTime: start,
EndTime: end, EndTime: end,
@@ -139,13 +197,5 @@ func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.T
WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed), WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed),
ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value, ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value,
} }, nil
periods = append(periods, period)
}
run.Periods = periods
// EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly).
return run, issuedAt, nil
} }

View File

@@ -2,14 +2,18 @@ package nws
import ( import (
"encoding/json" "encoding/json"
"strings"
"testing" "testing"
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
func TestBuildForecastUsesShortForecastAsTextDescription(t *testing.T) { func TestBuildHourlyForecastUsesShortForecastAsTextDescription(t *testing.T) {
parsed := nwsForecastResponse{} parsed := nwsHourlyForecastResponse{}
parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z" parsed.Properties.GeneratedAt = "2026-03-16T18:00:00Z"
parsed.Properties.Periods = []nwsForecastPeriod{ parsed.Properties.Periods = []nwsHourlyForecastPeriod{
{ {
StartTime: "2026-03-16T19:00:00Z", StartTime: "2026-03-16T19:00:00Z",
EndTime: "2026-03-16T20:00:00Z", EndTime: "2026-03-16T20:00:00Z",
@@ -19,9 +23,9 @@ func TestBuildForecastUsesShortForecastAsTextDescription(t *testing.T) {
}, },
} }
run, effectiveAt, err := buildForecast(parsed) run, effectiveAt, err := buildHourlyForecast(parsed)
if err != nil { if err != nil {
t.Fatalf("buildForecast() error = %v", err) t.Fatalf("buildHourlyForecast() error = %v", err)
} }
if len(run.Periods) != 1 { if len(run.Periods) != 1 {
t.Fatalf("periods len = %d, want 1", len(run.Periods)) t.Fatalf("periods len = %d, want 1", len(run.Periods))
@@ -41,6 +45,31 @@ func TestBuildForecastUsesShortForecastAsTextDescription(t *testing.T) {
assertNoLegacyForecastDescriptionKeys(t, run.Periods[0]) assertNoLegacyForecastDescriptionKeys(t, run.Periods[0])
} }
func TestNormalizeForecastEventBySchemaRejectsUnsupportedSchema(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{
Schema: "raw.nws.daily.forecast.v1",
})
if err == nil {
t.Fatalf("normalizeForecastEventBySchema() expected unsupported schema error")
}
if !strings.Contains(err.Error(), "unsupported nws forecast schema") {
t.Fatalf("error = %q, want unsupported schema context", err)
}
}
func TestNormalizeForecastEventBySchemaRoutesHourly(t *testing.T) {
_, err := normalizeForecastEventBySchema(event.Event{
Schema: standards.SchemaRawNWSHourlyForecastV1,
Payload: map[string]any{"properties": map[string]any{}},
})
if err == nil {
t.Fatalf("normalizeForecastEventBySchema() expected build error for missing generatedAt")
}
if !strings.Contains(err.Error(), "missing properties.generatedAt") {
t.Fatalf("error = %q, want missing properties.generatedAt", err)
}
}
func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) { func assertNoLegacyForecastDescriptionKeys(t *testing.T, period any) {
t.Helper() t.Helper()

View File

@@ -98,12 +98,11 @@ type nwsCloudLayer struct {
Amount string `json:"amount"` Amount string `json:"amount"`
} }
// nwsForecastResponse is a minimal-but-sufficient representation of the NWS // nwsHourlyForecastResponse is a minimal-but-sufficient representation of the NWS
// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun. // gridpoint hourly forecast GeoJSON payload needed for mapping into model.WeatherForecastRun.
// //
// This is currently designed to support the hourly forecast endpoint; revisions may be needed // Daily and narrative variants should be added as distinct structs in follow-up work.
// to accommodate other forecast endpoints in the future. type nwsHourlyForecastResponse struct {
type nwsForecastResponse struct {
Geometry struct { Geometry struct {
Type string `json:"type"` Type string `json:"type"`
Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat] Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat]
@@ -122,11 +121,11 @@ type nwsForecastResponse struct {
Value *float64 `json:"value"` Value *float64 `json:"value"`
} `json:"elevation"` } `json:"elevation"`
Periods []nwsForecastPeriod `json:"periods"` Periods []nwsHourlyForecastPeriod `json:"periods"`
} `json:"properties"` } `json:"properties"`
} }
type nwsForecastPeriod struct { type nwsHourlyForecastPeriod struct {
Number int `json:"number"` Number int `json:"number"`
Name string `json:"name"` Name string `json:"name"`
StartTime string `json:"startTime"` StartTime string `json:"startTime"`

View File

@@ -19,8 +19,8 @@ func RegisterBuiltins(r *fksource.Registry) {
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) { r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewAlertsSource(cfg) return nws.NewAlertsSource(cfg)
}) })
r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) { r.RegisterPoll("nws_forecast_hourly", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewForecastSource(cfg) return nws.NewHourlyForecastSource(cfg)
}) })
// Open-Meteo drivers // Open-Meteo drivers

View File

@@ -0,0 +1,47 @@
package sources
import (
"strings"
"testing"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksource "gitea.maximumdirect.net/ejr/feedkit/sources"
)
func TestRegisterBuiltinsRegistersNWSHourlyForecastDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
in, err := reg.BuildInput(sourceConfigForDriver("nws_forecast_hourly"))
if err != nil {
t.Fatalf("BuildInput(nws_forecast_hourly) error = %v", err)
}
if _, ok := in.(fksource.PollSource); !ok {
t.Fatalf("BuildInput(nws_forecast_hourly) type = %T, want PollSource", in)
}
}
func TestRegisterBuiltinsDoesNotRegisterLegacyNWSForecastDriver(t *testing.T) {
reg := fksource.NewRegistry()
RegisterBuiltins(reg)
_, err := reg.BuildInput(sourceConfigForDriver("nws_forecast"))
if err == nil {
t.Fatalf("BuildInput(nws_forecast) expected unknown driver error")
}
if !strings.Contains(err.Error(), `unknown source driver: "nws_forecast"`) {
t.Fatalf("error = %q, want unknown source driver for nws_forecast", err)
}
}
func sourceConfigForDriver(driver string) config.SourceConfig {
return config.SourceConfig{
Name: "test-source",
Driver: driver,
Mode: config.SourceModePoll,
Params: map[string]any{
"url": "https://example.invalid",
"user_agent": "test-agent",
},
}
}

View File

@@ -1,4 +1,4 @@
// FILE: internal/sources/nws/forecast.go // FILE: internal/sources/nws/forecast_hourly.go
package nws package nws
import ( import (
@@ -14,19 +14,19 @@ import (
"gitea.maximumdirect.net/ejr/weatherfeeder/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event. // HourlyForecastSource polls an NWS hourly forecast endpoint and emits a RAW forecast Event.
// //
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes // It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
// minimal metadata for Event.EffectiveAt and Event.ID. // minimal metadata for Event.EffectiveAt and Event.ID.
// //
// Output schema (current implementation): // Output schema (current implementation):
// - standards.SchemaRawNWSHourlyForecastV1 // - standards.SchemaRawNWSHourlyForecastV1
type ForecastSource struct { type HourlyForecastSource struct {
http *common.HTTPSource http *common.HTTPSource
} }
func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { func NewHourlyForecastSource(cfg config.SourceConfig) (*HourlyForecastSource, error) {
const driver = "nws_forecast" const driver = "nws_forecast_hourly"
// NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json).
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
@@ -34,15 +34,15 @@ func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) {
return nil, err return nil, err
} }
return &ForecastSource{http: hs}, nil return &HourlyForecastSource{http: hs}, nil
} }
func (s *ForecastSource) Name() string { return s.http.Name } func (s *HourlyForecastSource) Name() string { return s.http.Name }
// Kind is used for routing/policy. // Kind is used for routing/policy.
func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *HourlyForecastSource) Kind() event.Kind { return event.Kind("forecast") }
func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { func (s *HourlyForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx) raw, meta, err := s.fetchRaw(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -80,7 +80,7 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) {
// ---- RAW fetch + minimal metadata decode ---- // ---- RAW fetch + minimal metadata decode ----
type forecastMeta struct { type hourlyForecastMeta struct {
// Present for GeoJSON Feature responses, but often stable (endpoint URL). // Present for GeoJSON Feature responses, but often stable (endpoint URL).
ID string `json:"id"` ID string `json:"id"`
@@ -94,16 +94,16 @@ type forecastMeta struct {
ParsedUpdateTime time.Time `json:"-"` ParsedUpdateTime time.Time `json:"-"`
} }
func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) { func (s *HourlyForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, hourlyForecastMeta, error) {
raw, err := s.http.FetchJSON(ctx) raw, err := s.http.FetchJSON(ctx)
if err != nil { if err != nil {
return nil, forecastMeta{}, err return nil, hourlyForecastMeta{}, err
} }
var meta forecastMeta var meta hourlyForecastMeta
if err := json.Unmarshal(raw, &meta); err != nil { if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, forecastMeta{}, nil return raw, hourlyForecastMeta{}, nil
} }
// generatedAt (preferred) // generatedAt (preferred)