// FILE: internal/sources/nws/alerts.go package nws import ( "context" "encoding/json" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) // AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event. // // It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes // minimal metadata for Event.EffectiveAt and Event.ID. // // Output schema: // - standards.SchemaRawNWSAlertsV1 type AlertsSource struct { http *common.HTTPSource } func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) { const driver = "nws_alerts" // NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well. hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") if err != nil { return nil, err } return &AlertsSource{http: hs}, nil } func (s *AlertsSource) Name() string { return s.http.Name } // Kind is used for routing/policy. func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } // EffectiveAt policy for alerts: // Prefer the collection-level "updated" timestamp (best dedupe signal). // If missing, fall back to the most recent per-alert timestamp we can parse. var effectiveAt *time.Time switch { case !meta.ParsedUpdated.IsZero(): t := meta.ParsedUpdated.UTC() effectiveAt = &t case !meta.ParsedLatestFeatureTime.IsZero(): t := meta.ParsedLatestFeatureTime.UTC() effectiveAt = &t } emittedAt := time.Now().UTC() // NWS alerts collections do not provide a stable per-snapshot ID. // Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness. eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) return common.SingleRawEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSAlertsV1, eventID, emittedAt, effectiveAt, raw, ) } // ---- RAW fetch + minimal metadata decode ---- // alertsMeta is a minimal view of the NWS /alerts FeatureCollection. // We only decode fields used to set EffectiveAt deterministically. type alertsMeta struct { Updated string `json:"updated"` Features []struct { Properties struct { Sent string `json:"sent"` Effective string `json:"effective"` Expires string `json:"expires"` Ends string `json:"ends"` } `json:"properties"` } `json:"features"` ParsedUpdated time.Time `json:"-"` ParsedLatestFeatureTime time.Time `json:"-"` } func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) { raw, err := s.http.FetchJSON(ctx) if err != nil { return nil, alertsMeta{}, err } var meta alertsMeta if err := json.Unmarshal(raw, &meta); err != nil { // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. return raw, alertsMeta{}, nil } // Top-level updated (preferred). if us := strings.TrimSpace(meta.Updated); us != "" { if t, err := nwscommon.ParseTime(us); err == nil { meta.ParsedUpdated = t.UTC() } } // If Updated is missing/unparseable, compute a best-effort "latest" feature timestamp. // We prefer Sent/Effective, and fall back to Expires/Ends if needed. var latest time.Time for _, f := range meta.Features { candidates := []string{ f.Properties.Sent, f.Properties.Effective, f.Properties.Expires, f.Properties.Ends, } for _, s := range candidates { ts := strings.TrimSpace(s) if ts == "" { continue } t, err := nwscommon.ParseTime(ts) if err != nil { continue } t = t.UTC() if latest.IsZero() || t.After(latest) { latest = t } } } meta.ParsedLatestFeatureTime = latest return raw, meta, nil }