normalizers/nws: add NWS alerts normalizer and canonical alert mapping
- Introduce AlertsNormalizer to convert Raw NWS Alerts (SchemaRawNWSAlertsV1) into canonical WeatherAlert runs (SchemaWeatherAlertV1) - Add minimal NWS alerts response/types to support GeoJSON FeatureCollection parsing - Map NWS alert properties (event, headline, severity, timing, area, references) into model.WeatherAlert with best-effort timestamp handling - Establish clear AsOf / EffectiveAt policy for alert runs to support stable deduplication and snapshot semantics - Register the new alerts normalizer alongside existing NWS observation and forecast normalizers
This commit is contained in:
@@ -1,16 +1,26 @@
|
||||
// FILE: internal/sources/nws/alerts.go
|
||||
package nws
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
)
|
||||
|
||||
// AlertsSource polls an NWS alerts endpoint and will emit RAW alert Events.
|
||||
// For now Poll remains TODO; this file just migrates to the shared HTTPSource spine.
|
||||
// AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event.
|
||||
//
|
||||
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
|
||||
// minimal metadata for Event.EffectiveAt and Event.ID.
|
||||
//
|
||||
// Output schema:
|
||||
// - standards.SchemaRawNWSAlertsV1
|
||||
type AlertsSource struct {
|
||||
http *common.HTTPSource
|
||||
}
|
||||
@@ -18,7 +28,7 @@ type AlertsSource struct {
|
||||
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
||||
const driver = "nws_alerts"
|
||||
|
||||
// NWS APIs are typically GeoJSON; allow fallback to plain JSON as well.
|
||||
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
|
||||
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -30,10 +40,108 @@ func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
|
||||
func (s *AlertsSource) Name() string { return s.http.Name }
|
||||
|
||||
// Kind is used for routing/policy.
|
||||
// The envelope type is event.Event; payload will eventually normalize into model.WeatherAlert.
|
||||
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
|
||||
|
||||
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||
_ = ctx
|
||||
return nil, fmt.Errorf("nws.AlertsSource.Poll: TODO implement")
|
||||
raw, meta, err := s.fetchRaw(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// EffectiveAt policy for alerts:
|
||||
// Prefer the collection-level "updated" timestamp (best dedupe signal).
|
||||
// If missing, fall back to the most recent per-alert timestamp we can parse.
|
||||
var effectiveAt *time.Time
|
||||
switch {
|
||||
case !meta.ParsedUpdated.IsZero():
|
||||
t := meta.ParsedUpdated.UTC()
|
||||
effectiveAt = &t
|
||||
case !meta.ParsedLatestFeatureTime.IsZero():
|
||||
t := meta.ParsedLatestFeatureTime.UTC()
|
||||
effectiveAt = &t
|
||||
}
|
||||
|
||||
emittedAt := time.Now().UTC()
|
||||
|
||||
// NWS alerts collections do not provide a stable per-snapshot ID.
|
||||
// Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness.
|
||||
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
|
||||
|
||||
return common.SingleRawEvent(
|
||||
s.Kind(),
|
||||
s.http.Name,
|
||||
standards.SchemaRawNWSAlertsV1,
|
||||
eventID,
|
||||
emittedAt,
|
||||
effectiveAt,
|
||||
raw,
|
||||
)
|
||||
}
|
||||
|
||||
// ---- RAW fetch + minimal metadata decode ----
|
||||
|
||||
// alertsMeta is a minimal view of the NWS /alerts FeatureCollection.
|
||||
// We only decode fields used to set EffectiveAt deterministically.
|
||||
type alertsMeta struct {
|
||||
Updated string `json:"updated"`
|
||||
Features []struct {
|
||||
Properties struct {
|
||||
Sent string `json:"sent"`
|
||||
Effective string `json:"effective"`
|
||||
Expires string `json:"expires"`
|
||||
Ends string `json:"ends"`
|
||||
} `json:"properties"`
|
||||
} `json:"features"`
|
||||
|
||||
ParsedUpdated time.Time `json:"-"`
|
||||
ParsedLatestFeatureTime time.Time `json:"-"`
|
||||
}
|
||||
|
||||
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) {
|
||||
raw, err := s.http.FetchJSON(ctx)
|
||||
if err != nil {
|
||||
return nil, alertsMeta{}, err
|
||||
}
|
||||
|
||||
var meta alertsMeta
|
||||
if err := json.Unmarshal(raw, &meta); err != nil {
|
||||
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
|
||||
return raw, alertsMeta{}, nil
|
||||
}
|
||||
|
||||
// Top-level updated (preferred).
|
||||
if us := strings.TrimSpace(meta.Updated); us != "" {
|
||||
if t, err := nwscommon.ParseTime(us); err == nil {
|
||||
meta.ParsedUpdated = t.UTC()
|
||||
}
|
||||
}
|
||||
|
||||
// If Updated is missing/unparseable, compute a best-effort "latest" feature timestamp.
|
||||
// We prefer Sent/Effective, and fall back to Expires/Ends if needed.
|
||||
var latest time.Time
|
||||
for _, f := range meta.Features {
|
||||
candidates := []string{
|
||||
f.Properties.Sent,
|
||||
f.Properties.Effective,
|
||||
f.Properties.Expires,
|
||||
f.Properties.Ends,
|
||||
}
|
||||
for _, s := range candidates {
|
||||
ts := strings.TrimSpace(s)
|
||||
if ts == "" {
|
||||
continue
|
||||
}
|
||||
t, err := nwscommon.ParseTime(ts)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
t = t.UTC()
|
||||
if latest.IsZero() || t.After(latest) {
|
||||
latest = t
|
||||
}
|
||||
}
|
||||
}
|
||||
meta.ParsedLatestFeatureTime = latest
|
||||
|
||||
return raw, meta, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user