From 00e811f8f7a256258d8709d0205fbfa94ab248c7 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Fri, 16 Jan 2026 21:40:20 -0600 Subject: [PATCH] normalizers/nws: add NWS alerts normalizer and canonical alert mapping - Introduce AlertsNormalizer to convert Raw NWS Alerts (SchemaRawNWSAlertsV1) into canonical WeatherAlert runs (SchemaWeatherAlertV1) - Add minimal NWS alerts response/types to support GeoJSON FeatureCollection parsing - Map NWS alert properties (event, headline, severity, timing, area, references) into model.WeatherAlert with best-effort timestamp handling - Establish clear AsOf / EffectiveAt policy for alert runs to support stable deduplication and snapshot semantics - Register the new alerts normalizer alongside existing NWS observation and forecast normalizers --- cmd/weatherfeeder/config.yml | 32 ++-- internal/model/alert.go | 70 ++++++- internal/normalizers/nws/alerts.go | 268 +++++++++++++++++++++++++++ internal/normalizers/nws/register.go | 3 + internal/normalizers/nws/types.go | 65 +++++++ internal/sources/nws/alerts.go | 122 +++++++++++- internal/standards/schema.go | 3 + 7 files changed, 536 insertions(+), 27 deletions(-) create mode 100644 internal/normalizers/nws/alerts.go diff --git a/cmd/weatherfeeder/config.yml b/cmd/weatherfeeder/config.yml index 2c862dd..e88d4a6 100644 --- a/cmd/weatherfeeder/config.yml +++ b/cmd/weatherfeeder/config.yml @@ -48,22 +48,30 @@ sources: # url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast" # user_agent: "HomeOps (eric@maximumdirect.net)" - - name: NWSHourlyForecastSTL - kind: forecast - driver: nws_forecast - every: 1m - params: - url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly" - user_agent: "HomeOps (eric@maximumdirect.net)" - -# - name: NWSAlertsSTL -# kind: alert -# driver: nws_alerts +# - name: NWSHourlyForecastSTL +# kind: forecast +# driver: nws_forecast # every: 1m # params: -# url: "https://api.weather.gov/alerts?point=38.6239,-90.3571&limit=500" +# url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly" # user_agent: "HomeOps (eric@maximumdirect.net)" + - name: NWSAlertsSTL + kind: alert + driver: nws_alerts + every: 1m + params: + url: "https://api.weather.gov/alerts?point=38.6239,-90.3571&limit=500" + user_agent: "HomeOps (eric@maximumdirect.net)" + + - name: NWSAlertsIowa + kind: alert + driver: nws_alerts + every: 1m + params: + url: "https://api.weather.gov/alerts/active/zone/IAZ048" + user_agent: "HomeOps (eric@maximumdirect.net)" + sinks: - name: stdout driver: stdout diff --git a/internal/model/alert.go b/internal/model/alert.go index 141d933..f0c5a77 100644 --- a/internal/model/alert.go +++ b/internal/model/alert.go @@ -3,22 +3,76 @@ package model import "time" -// Placeholder for NWS alerts (GeoJSON feature properties are rich). +// WeatherAlertRun is a snapshot of *active* alerts for a location as-of a point in time. +// +// This mirrors WeatherForecastRun's "one issued snapshot -> many contained items" shape: +// +// - A single run may contain zero, one, or many alerts. +// - Runs are intended to be immutable snapshots (“provider asserted X at AsOf”). +// +// Normalizers should prefer to set AsOf from a provider-supplied “updated/generated” timestamp. +// If unavailable, AsOf may be set to the poll/emit time as a fallback. +type WeatherAlertRun struct { + // Optional location metadata (provider-dependent). + LocationID string `json:"locationId,omitempty"` + LocationName string `json:"locationName,omitempty"` + + // AsOf is when the provider asserted this alert snapshot is current (required). + AsOf time.Time `json:"asOf"` + + // Optional spatial context. + Latitude *float64 `json:"latitude,omitempty"` + Longitude *float64 `json:"longitude,omitempty"` + + // Active alerts contained in this snapshot (order is provider-dependent). + Alerts []WeatherAlert `json:"alerts"` +} + +// WeatherAlert is the canonical representation of a single alert. +// +// This is intentionally a “useful subset” of rich provider payloads. +// Normalizers may populate ProviderExtras for structured provider-specific fields +// that don’t cleanly fit the canonical shape. type WeatherAlert struct { + // Provider-stable identifier (often a URL/URI). ID string `json:"id"` - Event string `json:"event,omitempty"` - Headline string `json:"headline,omitempty"` + // Classification / headline fields. + Event string `json:"event,omitempty"` + Headline string `json:"headline,omitempty"` + + Severity string `json:"severity,omitempty"` // e.g. Extreme/Severe/Moderate/Minor/Unknown + Urgency string `json:"urgency,omitempty"` // e.g. Immediate/Expected/Future/Past/Unknown + Certainty string `json:"certainty,omitempty"` // e.g. Observed/Likely/Possible/Unlikely/Unknown + + Status string `json:"status,omitempty"` // e.g. Actual/Exercise/Test/System/Unknown + MessageType string `json:"messageType,omitempty"` // e.g. Alert/Update/Cancel + Category string `json:"category,omitempty"` // e.g. Met/Geo/Safety/Rescue/Fire/Health/Env/Transport/Infra/CBRNE/Other + Response string `json:"response,omitempty"` // e.g. Shelter/Evacuate/Prepare/Execute/Avoid/Monitor/Assess/AllClear/None + + // Narrative. Description string `json:"description,omitempty"` Instruction string `json:"instruction,omitempty"` - Severity string `json:"severity,omitempty"` - Urgency string `json:"urgency,omitempty"` - Certainty string `json:"certainty,omitempty"` - + // Timing (all optional; provider-dependent). Sent *time.Time `json:"sent,omitempty"` Effective *time.Time `json:"effective,omitempty"` + Onset *time.Time `json:"onset,omitempty"` Expires *time.Time `json:"expires,omitempty"` - Areas []string `json:"areas,omitempty"` + // Scope / affected area. + AreaDescription string `json:"areaDescription,omitempty"` // often a provider string + + // Provenance. + SenderName string `json:"senderName,omitempty"` + + References []AlertReference `json:"references,omitempty"` +} + +// AlertReference is a reference to a related alert (updates, replacements, etc.). +type AlertReference struct { + ID string `json:"id,omitempty"` // provider reference ID/URI + Identifier string `json:"identifier,omitempty"` // provider identifier string, if distinct + Sender string `json:"sender,omitempty"` + Sent *time.Time `json:"sent,omitempty"` } diff --git a/internal/normalizers/nws/alerts.go b/internal/normalizers/nws/alerts.go new file mode 100644 index 0000000..3c1fd09 --- /dev/null +++ b/internal/normalizers/nws/alerts.go @@ -0,0 +1,268 @@ +// FILE: internal/normalizers/nws/alerts.go +package nws + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// AlertsNormalizer converts: +// +// standards.SchemaRawNWSAlertsV1 -> standards.SchemaWeatherAlertV1 +// +// It interprets NWS /alerts FeatureCollection payloads (GeoJSON-ish) and maps them into +// the canonical model.WeatherAlertRun representation. +// +// Caveats / policy: +// 1. Run.AsOf prefers the collection-level "updated" timestamp. If missing/unparseable, +// we fall back to the latest per-alert timestamp, and then to the input event’s +// EffectiveAt/EmittedAt. +// 2. Alert timing fields are best-effort parsed; invalid timestamps do not fail the +// entire normalization (they are left nil). +// 3. Some fields are intentionally passed through as strings (severity/urgency/etc.) +// since canonical vocabularies may evolve later. +type AlertsNormalizer struct{} + +func (AlertsNormalizer) Match(e event.Event) bool { + return strings.TrimSpace(e.Schema) == standards.SchemaRawNWSAlertsV1 +} + +func (AlertsNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps + + parsed, err := normcommon.DecodeJSONPayload[nwsAlertsResponse](in) + if err != nil { + return nil, fmt.Errorf("nws alerts normalize: %w", err) + } + + // If we can't derive AsOf from the payload, fall back to the existing event envelope. + fallbackAsOf := in.EmittedAt.UTC() + if in.EffectiveAt != nil && !in.EffectiveAt.IsZero() { + fallbackAsOf = in.EffectiveAt.UTC() + } + + payload, effectiveAt, err := buildAlerts(parsed, fallbackAsOf) + if err != nil { + return nil, fmt.Errorf("nws alerts normalize: build: %w", err) + } + + out, err := normcommon.Finalize(in, standards.SchemaWeatherAlertV1, payload, effectiveAt) + if err != nil { + return nil, fmt.Errorf("nws alerts normalize: %w", err) + } + + return out, nil +} + +// buildAlerts contains the domain mapping logic (provider -> canonical model). +func buildAlerts(parsed nwsAlertsResponse, fallbackAsOf time.Time) (model.WeatherAlertRun, time.Time, error) { + // 1) Determine AsOf (required by canonical model; also used as EffectiveAt). + asOf := parseNWSTimeUTC(parsed.Updated) + if asOf.IsZero() { + asOf = latestAlertTimestamp(parsed.Features) + } + if asOf.IsZero() { + asOf = fallbackAsOf.UTC() + } + + run := model.WeatherAlertRun{ + LocationID: "", + LocationName: strings.TrimSpace(parsed.Title), + + AsOf: asOf, + + Latitude: nil, + Longitude: nil, + + Alerts: nil, + } + + // 2) Map each feature into a canonical WeatherAlert. + alerts := make([]model.WeatherAlert, 0, len(parsed.Features)) + for i, f := range parsed.Features { + p := f.Properties + + id := firstNonEmpty(strings.TrimSpace(f.ID), strings.TrimSpace(p.ID), strings.TrimSpace(p.Identifier)) + if id == "" { + // NWS usually supplies an ID, but be defensive. Prefer a stable-ish synth ID. + // Include the run as-of time to reduce collisions across snapshots. + id = fmt.Sprintf("nws:alert:%s:%d", asOf.UTC().Format(time.RFC3339Nano), i) + } + + sent := parseNWSTimePtr(p.Sent) + effective := parseNWSTimePtr(p.Effective) + onset := parseNWSTimePtr(p.Onset) + + // Expires: prefer "expires"; fall back to "ends" if present. + expires := parseNWSTimePtr(p.Expires) + if expires == nil { + expires = parseNWSTimePtr(p.Ends) + } + + refs := parseNWSAlertReferences(p.References) + + alert := model.WeatherAlert{ + ID: id, + + Event: strings.TrimSpace(p.Event), + Headline: strings.TrimSpace(p.Headline), + + Severity: strings.TrimSpace(p.Severity), + Urgency: strings.TrimSpace(p.Urgency), + Certainty: strings.TrimSpace(p.Certainty), + + Status: strings.TrimSpace(p.Status), + MessageType: strings.TrimSpace(p.MessageType), + Category: strings.TrimSpace(p.Category), + Response: strings.TrimSpace(p.Response), + + Description: strings.TrimSpace(p.Description), + Instruction: strings.TrimSpace(p.Instruction), + + Sent: sent, + Effective: effective, + Onset: onset, + Expires: expires, + + AreaDescription: strings.TrimSpace(p.AreaDesc), + + SenderName: firstNonEmpty(strings.TrimSpace(p.SenderName), strings.TrimSpace(p.Sender)), + + References: refs, + } + + // Headline fallback (NWS commonly provides it, but not guaranteed). + if alert.Headline == "" { + alert.Headline = alert.Event + } + + alerts = append(alerts, alert) + } + + run.Alerts = alerts + + // EffectiveAt policy for alerts: treat AsOf as the effective time (dedupe-friendly). + return run, asOf, nil +} + +// latestAlertTimestamp scans alert features for the most recent timestamp. +// It prefers Sent/Effective, and falls back to Expires/Ends when needed. +func latestAlertTimestamp(features []nwsAlertFeature) time.Time { + var latest time.Time + for _, f := range features { + p := f.Properties + candidates := []string{ + p.Sent, + p.Effective, + p.Expires, + p.Ends, + p.Onset, + } + for _, s := range candidates { + t := parseNWSTimeUTC(s) + if t.IsZero() { + continue + } + if latest.IsZero() || t.After(latest) { + latest = t + } + } + } + return latest +} + +// parseNWSTimeUTC parses an NWS timestamp string into UTC time.Time. +// Returns zero time on empty/unparseable input (best-effort; alerts should be resilient). +func parseNWSTimeUTC(s string) time.Time { + s = strings.TrimSpace(s) + if s == "" { + return time.Time{} + } + t, err := nwscommon.ParseTime(s) + if err != nil { + return time.Time{} + } + return t.UTC() +} + +func parseNWSTimePtr(s string) *time.Time { + t := parseNWSTimeUTC(s) + if t.IsZero() { + return nil + } + return &t +} + +func firstNonEmpty(vals ...string) string { + for _, v := range vals { + if strings.TrimSpace(v) != "" { + return strings.TrimSpace(v) + } + } + return "" +} + +// parseNWSAlertReferences tries to interpret the NWS "references" field, which may +// vary by endpoint/version. We accept the common object-array form and a few +// degraded shapes (string array / single string). +func parseNWSAlertReferences(raw json.RawMessage) []model.AlertReference { + if len(raw) == 0 { + return nil + } + + // Most common: array of objects. + var objs []nwsAlertReference + if err := json.Unmarshal(raw, &objs); err == nil && len(objs) > 0 { + out := make([]model.AlertReference, 0, len(objs)) + for _, r := range objs { + ref := model.AlertReference{ + ID: strings.TrimSpace(r.ID), + Identifier: strings.TrimSpace(r.Identifier), + Sender: strings.TrimSpace(r.Sender), + Sent: parseNWSTimePtr(r.Sent), + } + // If only Identifier is present, preserve it as ID too (useful downstream). + if ref.ID == "" && ref.Identifier != "" { + ref.ID = ref.Identifier + } + out = append(out, ref) + } + return out + } + + // Sometimes: array of strings. + var strs []string + if err := json.Unmarshal(raw, &strs); err == nil && len(strs) > 0 { + out := make([]model.AlertReference, 0, len(strs)) + for _, s := range strs { + id := strings.TrimSpace(s) + if id == "" { + continue + } + out = append(out, model.AlertReference{ID: id}) + } + if len(out) > 0 { + return out + } + } + + // Rare: single string. + var single string + if err := json.Unmarshal(raw, &single); err == nil { + id := strings.TrimSpace(single) + if id != "" { + return []model.AlertReference{{ID: id}} + } + } + + return nil +} diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index f2a4f52..fb5900f 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -16,4 +16,7 @@ func Register(reg *fknormalize.Registry) { // Forecasts reg.Register(ForecastNormalizer{}) + + // Alerts + reg.Register(AlertsNormalizer{}) } diff --git a/internal/normalizers/nws/types.go b/internal/normalizers/nws/types.go index 83e2359..3024ab0 100644 --- a/internal/normalizers/nws/types.go +++ b/internal/normalizers/nws/types.go @@ -1,6 +1,10 @@ // FILE: ./internal/normalizers/nws/types.go package nws +import ( + "encoding/json" +) + // nwsObservationResponse is a minimal-but-sufficient representation of the NWS // station observation GeoJSON payload needed for mapping into model.WeatherObservation. type nwsObservationResponse struct { @@ -150,3 +154,64 @@ type nwsForecastPeriod struct { ShortForecast string `json:"shortForecast"` DetailedForecast string `json:"detailedForecast"` } + +// nwsAlertsResponse is a minimal-but-sufficient representation of the NWS /alerts +// FeatureCollection payload needed for mapping into model.WeatherAlertRun. +type nwsAlertsResponse struct { + Updated string `json:"updated"` + Title string `json:"title"` + + Features []nwsAlertFeature `json:"features"` +} + +type nwsAlertFeature struct { + ID string `json:"id"` + + Properties nwsAlertProperties `json:"properties"` +} + +type nwsAlertProperties struct { + // Identifiers. + ID string `json:"id"` // often a URL/URI + Identifier string `json:"identifier"` // CAP identifier string (sometimes distinct) + + // Classification / headline. + Event string `json:"event"` + Headline string `json:"headline"` + + Severity string `json:"severity"` + Urgency string `json:"urgency"` + Certainty string `json:"certainty"` + + Status string `json:"status"` + MessageType string `json:"messageType"` + Category string `json:"category"` + Response string `json:"response"` + + // Narrative. + Description string `json:"description"` + Instruction string `json:"instruction"` + + // Timing. + Sent string `json:"sent"` + Effective string `json:"effective"` + Onset string `json:"onset"` + Expires string `json:"expires"` + Ends string `json:"ends"` + + // Area / provenance. + AreaDesc string `json:"areaDesc"` + Sender string `json:"sender"` + SenderName string `json:"senderName"` + + // Related alerts (updates/replacements). + // Keep flexible: some NWS payloads use an object array; others may degrade to strings. + References json.RawMessage `json:"references"` +} + +type nwsAlertReference struct { + ID string `json:"id"` + Identifier string `json:"identifier"` + Sender string `json:"sender"` + Sent string `json:"sent"` +} diff --git a/internal/sources/nws/alerts.go b/internal/sources/nws/alerts.go index 9533e17..30fe007 100644 --- a/internal/sources/nws/alerts.go +++ b/internal/sources/nws/alerts.go @@ -1,16 +1,26 @@ +// FILE: internal/sources/nws/alerts.go package nws import ( "context" - "fmt" + "encoding/json" + "strings" + "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) -// AlertsSource polls an NWS alerts endpoint and will emit RAW alert Events. -// For now Poll remains TODO; this file just migrates to the shared HTTPSource spine. +// AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event. +// +// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes +// minimal metadata for Event.EffectiveAt and Event.ID. +// +// Output schema: +// - standards.SchemaRawNWSAlertsV1 type AlertsSource struct { http *common.HTTPSource } @@ -18,7 +28,7 @@ type AlertsSource struct { func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) { const driver = "nws_alerts" - // NWS APIs are typically GeoJSON; allow fallback to plain JSON as well. + // NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well. hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err @@ -30,10 +40,108 @@ func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) { func (s *AlertsSource) Name() string { return s.http.Name } // Kind is used for routing/policy. -// The envelope type is event.Event; payload will eventually normalize into model.WeatherAlert. func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { - _ = ctx - return nil, fmt.Errorf("nws.AlertsSource.Poll: TODO implement") + raw, meta, err := s.fetchRaw(ctx) + if err != nil { + return nil, err + } + + // EffectiveAt policy for alerts: + // Prefer the collection-level "updated" timestamp (best dedupe signal). + // If missing, fall back to the most recent per-alert timestamp we can parse. + var effectiveAt *time.Time + switch { + case !meta.ParsedUpdated.IsZero(): + t := meta.ParsedUpdated.UTC() + effectiveAt = &t + case !meta.ParsedLatestFeatureTime.IsZero(): + t := meta.ParsedLatestFeatureTime.UTC() + effectiveAt = &t + } + + emittedAt := time.Now().UTC() + + // NWS alerts collections do not provide a stable per-snapshot ID. + // Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness. + eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + + return common.SingleRawEvent( + s.Kind(), + s.http.Name, + standards.SchemaRawNWSAlertsV1, + eventID, + emittedAt, + effectiveAt, + raw, + ) +} + +// ---- RAW fetch + minimal metadata decode ---- + +// alertsMeta is a minimal view of the NWS /alerts FeatureCollection. +// We only decode fields used to set EffectiveAt deterministically. +type alertsMeta struct { + Updated string `json:"updated"` + Features []struct { + Properties struct { + Sent string `json:"sent"` + Effective string `json:"effective"` + Expires string `json:"expires"` + Ends string `json:"ends"` + } `json:"properties"` + } `json:"features"` + + ParsedUpdated time.Time `json:"-"` + ParsedLatestFeatureTime time.Time `json:"-"` +} + +func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) { + raw, err := s.http.FetchJSON(ctx) + if err != nil { + return nil, alertsMeta{}, err + } + + var meta alertsMeta + if err := json.Unmarshal(raw, &meta); err != nil { + // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. + return raw, alertsMeta{}, nil + } + + // Top-level updated (preferred). + if us := strings.TrimSpace(meta.Updated); us != "" { + if t, err := nwscommon.ParseTime(us); err == nil { + meta.ParsedUpdated = t.UTC() + } + } + + // If Updated is missing/unparseable, compute a best-effort "latest" feature timestamp. + // We prefer Sent/Effective, and fall back to Expires/Ends if needed. + var latest time.Time + for _, f := range meta.Features { + candidates := []string{ + f.Properties.Sent, + f.Properties.Effective, + f.Properties.Expires, + f.Properties.Ends, + } + for _, s := range candidates { + ts := strings.TrimSpace(s) + if ts == "" { + continue + } + t, err := nwscommon.ParseTime(ts) + if err != nil { + continue + } + t = t.UTC() + if latest.IsZero() || t.After(latest) { + latest = t + } + } + } + meta.ParsedLatestFeatureTime = latest + + return raw, meta, nil } diff --git a/internal/standards/schema.go b/internal/standards/schema.go index 32fed1d..e2a15f4 100644 --- a/internal/standards/schema.go +++ b/internal/standards/schema.go @@ -19,7 +19,10 @@ const ( SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1" SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1" + SchemaRawNWSAlertsV1 = "raw.nws.alerts.v1" + // Canonical domain schemas (emitted after normalization). SchemaWeatherObservationV1 = "weather.observation.v1" SchemaWeatherForecastV1 = "weather.forecast.v1" + SchemaWeatherAlertV1 = "weather.alert.v1" )