Files
Eric Rakestraw 123e8ff763
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
Moved the standards package out of internal/ so it can be imported by downstream consumers.
2026-02-08 09:15:07 -06:00

148 lines
3.9 KiB
Go

// FILE: internal/sources/nws/alerts.go
package nws
import (
"context"
"encoding/json"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event.
//
// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes
// minimal metadata for Event.EffectiveAt and Event.ID.
//
// Output schema:
// - standards.SchemaRawNWSAlertsV1
type AlertsSource struct {
http *common.HTTPSource
}
func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) {
const driver = "nws_alerts"
// NWS alerts responses are GeoJSON-ish; allow fallback to plain JSON as well.
hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json")
if err != nil {
return nil, err
}
return &AlertsSource{http: hs}, nil
}
func (s *AlertsSource) Name() string { return s.http.Name }
// Kind is used for routing/policy.
func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") }
func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
// EffectiveAt policy for alerts:
// Prefer the collection-level "updated" timestamp (best dedupe signal).
// If missing, fall back to the most recent per-alert timestamp we can parse.
var effectiveAt *time.Time
switch {
case !meta.ParsedUpdated.IsZero():
t := meta.ParsedUpdated.UTC()
effectiveAt = &t
case !meta.ParsedLatestFeatureTime.IsZero():
t := meta.ParsedLatestFeatureTime.UTC()
effectiveAt = &t
}
emittedAt := time.Now().UTC()
// NWS alerts collections do not provide a stable per-snapshot ID.
// Use Source:EffectiveAt (or Source:EmittedAt fallback) for dedupe friendliness.
eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt)
return common.SingleRawEvent(
s.Kind(),
s.http.Name,
standards.SchemaRawNWSAlertsV1,
eventID,
emittedAt,
effectiveAt,
raw,
)
}
// ---- RAW fetch + minimal metadata decode ----
// alertsMeta is a minimal view of the NWS /alerts FeatureCollection.
// We only decode fields used to set EffectiveAt deterministically.
type alertsMeta struct {
Updated string `json:"updated"`
Features []struct {
Properties struct {
Sent string `json:"sent"`
Effective string `json:"effective"`
Expires string `json:"expires"`
Ends string `json:"ends"`
} `json:"properties"`
} `json:"features"`
ParsedUpdated time.Time `json:"-"`
ParsedLatestFeatureTime time.Time `json:"-"`
}
func (s *AlertsSource) fetchRaw(ctx context.Context) (json.RawMessage, alertsMeta, error) {
raw, err := s.http.FetchJSON(ctx)
if err != nil {
return nil, alertsMeta{}, err
}
var meta alertsMeta
if err := json.Unmarshal(raw, &meta); err != nil {
// If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt.
return raw, alertsMeta{}, nil
}
// Top-level updated (preferred).
if us := strings.TrimSpace(meta.Updated); us != "" {
if t, err := nwscommon.ParseTime(us); err == nil {
meta.ParsedUpdated = t.UTC()
}
}
// If Updated is missing/unparseable, compute a best-effort "latest" feature timestamp.
// We prefer Sent/Effective, and fall back to Expires/Ends if needed.
var latest time.Time
for _, f := range meta.Features {
candidates := []string{
f.Properties.Sent,
f.Properties.Effective,
f.Properties.Expires,
f.Properties.Ends,
}
for _, s := range candidates {
ts := strings.TrimSpace(s)
if ts == "" {
continue
}
t, err := nwscommon.ParseTime(ts)
if err != nil {
continue
}
t = t.UTC()
if latest.IsZero() || t.After(latest) {
latest = t
}
}
}
meta.ParsedLatestFeatureTime = latest
return raw, meta, nil
}