All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
239 lines
6.7 KiB
Go
239 lines
6.7 KiB
Go
// FILE: internal/normalizers/nws/alerts.go
|
||
package nws
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strings"
|
||
"time"
|
||
|
||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||
)
|
||
|
||
// AlertsNormalizer converts:
|
||
//
|
||
// standards.SchemaRawNWSAlertsV1 -> standards.SchemaWeatherAlertV1
|
||
//
|
||
// It interprets NWS /alerts FeatureCollection payloads (GeoJSON-ish) and maps them into
|
||
// the canonical model.WeatherAlertRun representation.
|
||
//
|
||
// Caveats / policy:
|
||
// 1. Run.AsOf prefers the collection-level "updated" timestamp. If missing/unparseable,
|
||
// we fall back to the latest per-alert timestamp, and then to the input event’s
|
||
// EffectiveAt/EmittedAt.
|
||
// 2. Alert timing fields are best-effort parsed; invalid timestamps do not fail the
|
||
// entire normalization (they are left nil).
|
||
// 3. Some fields are intentionally passed through as strings (severity/urgency/etc.)
|
||
// since canonical vocabularies may evolve later.
|
||
type AlertsNormalizer struct{}
|
||
|
||
func (AlertsNormalizer) Match(e event.Event) bool {
|
||
return strings.TrimSpace(e.Schema) == standards.SchemaRawNWSAlertsV1
|
||
}
|
||
|
||
func (AlertsNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||
|
||
// If we can't derive AsOf from the payload, fall back to the existing event envelope.
|
||
fallbackAsOf := in.EmittedAt.UTC()
|
||
if in.EffectiveAt != nil && !in.EffectiveAt.IsZero() {
|
||
fallbackAsOf = in.EffectiveAt.UTC()
|
||
}
|
||
|
||
return normcommon.NormalizeJSON(
|
||
in,
|
||
"nws alerts",
|
||
standards.SchemaWeatherAlertV1,
|
||
func(parsed nwsAlertsResponse) (model.WeatherAlertRun, time.Time, error) {
|
||
return buildAlerts(parsed, fallbackAsOf)
|
||
},
|
||
)
|
||
}
|
||
|
||
// buildAlerts contains the domain mapping logic (provider -> canonical model).
|
||
func buildAlerts(parsed nwsAlertsResponse, fallbackAsOf time.Time) (model.WeatherAlertRun, time.Time, error) {
|
||
// 1) Determine AsOf (required by canonical model; also used as EffectiveAt).
|
||
asOf := nwscommon.ParseTimeBestEffort(parsed.Updated)
|
||
if asOf.IsZero() {
|
||
asOf = latestAlertTimestamp(parsed.Features)
|
||
}
|
||
if asOf.IsZero() {
|
||
asOf = fallbackAsOf.UTC()
|
||
}
|
||
|
||
run := model.WeatherAlertRun{
|
||
LocationID: "",
|
||
LocationName: strings.TrimSpace(parsed.Title),
|
||
|
||
AsOf: asOf,
|
||
|
||
Latitude: nil,
|
||
Longitude: nil,
|
||
|
||
Alerts: nil,
|
||
}
|
||
|
||
// 2) Map each feature into a canonical WeatherAlert.
|
||
alerts := make([]model.WeatherAlert, 0, len(parsed.Features))
|
||
for i, f := range parsed.Features {
|
||
p := f.Properties
|
||
|
||
id := firstNonEmpty(strings.TrimSpace(f.ID), strings.TrimSpace(p.ID), strings.TrimSpace(p.Identifier))
|
||
if id == "" {
|
||
// NWS usually supplies an ID, but be defensive. Prefer a stable-ish synth ID.
|
||
// Include the run as-of time to reduce collisions across snapshots.
|
||
id = fmt.Sprintf("nws:alert:%s:%d", asOf.UTC().Format(time.RFC3339Nano), i)
|
||
}
|
||
|
||
sent := nwscommon.ParseTimePtr(p.Sent)
|
||
effective := nwscommon.ParseTimePtr(p.Effective)
|
||
onset := nwscommon.ParseTimePtr(p.Onset)
|
||
|
||
// Expires: prefer "expires"; fall back to "ends" if present.
|
||
expires := nwscommon.ParseTimePtr(p.Expires)
|
||
if expires == nil {
|
||
expires = nwscommon.ParseTimePtr(p.Ends)
|
||
}
|
||
|
||
refs := parseNWSAlertReferences(p.References)
|
||
|
||
alert := model.WeatherAlert{
|
||
ID: id,
|
||
|
||
Event: strings.TrimSpace(p.Event),
|
||
Headline: strings.TrimSpace(p.Headline),
|
||
|
||
Severity: strings.TrimSpace(p.Severity),
|
||
Urgency: strings.TrimSpace(p.Urgency),
|
||
Certainty: strings.TrimSpace(p.Certainty),
|
||
|
||
Status: strings.TrimSpace(p.Status),
|
||
MessageType: strings.TrimSpace(p.MessageType),
|
||
Category: strings.TrimSpace(p.Category),
|
||
Response: strings.TrimSpace(p.Response),
|
||
|
||
Description: strings.TrimSpace(p.Description),
|
||
Instruction: strings.TrimSpace(p.Instruction),
|
||
|
||
Sent: sent,
|
||
Effective: effective,
|
||
Onset: onset,
|
||
Expires: expires,
|
||
|
||
AreaDescription: strings.TrimSpace(p.AreaDesc),
|
||
|
||
SenderName: firstNonEmpty(strings.TrimSpace(p.SenderName), strings.TrimSpace(p.Sender)),
|
||
|
||
References: refs,
|
||
}
|
||
|
||
// Headline fallback (NWS commonly provides it, but not guaranteed).
|
||
if alert.Headline == "" {
|
||
alert.Headline = alert.Event
|
||
}
|
||
|
||
alerts = append(alerts, alert)
|
||
}
|
||
|
||
run.Alerts = alerts
|
||
|
||
// EffectiveAt policy for alerts: treat AsOf as the effective time (dedupe-friendly).
|
||
return run, asOf, nil
|
||
}
|
||
|
||
// latestAlertTimestamp scans alert features for the most recent timestamp.
|
||
// It prefers Sent/Effective, and falls back to Expires/Ends when needed.
|
||
func latestAlertTimestamp(features []nwsAlertFeature) time.Time {
|
||
var latest time.Time
|
||
for _, f := range features {
|
||
p := f.Properties
|
||
candidates := []string{
|
||
p.Sent,
|
||
p.Effective,
|
||
p.Expires,
|
||
p.Ends,
|
||
p.Onset,
|
||
}
|
||
for _, s := range candidates {
|
||
t := nwscommon.ParseTimeBestEffort(s)
|
||
if t.IsZero() {
|
||
continue
|
||
}
|
||
if latest.IsZero() || t.After(latest) {
|
||
latest = t
|
||
}
|
||
}
|
||
}
|
||
return latest
|
||
}
|
||
|
||
func firstNonEmpty(vals ...string) string {
|
||
for _, v := range vals {
|
||
if strings.TrimSpace(v) != "" {
|
||
return strings.TrimSpace(v)
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// parseNWSAlertReferences tries to interpret the NWS "references" field, which may
|
||
// vary by endpoint/version. We accept the common object-array form and a few
|
||
// degraded shapes (string array / single string).
|
||
func parseNWSAlertReferences(raw json.RawMessage) []model.AlertReference {
|
||
if len(raw) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// Most common: array of objects.
|
||
var objs []nwsAlertReference
|
||
if err := json.Unmarshal(raw, &objs); err == nil && len(objs) > 0 {
|
||
out := make([]model.AlertReference, 0, len(objs))
|
||
for _, r := range objs {
|
||
ref := model.AlertReference{
|
||
ID: strings.TrimSpace(r.ID),
|
||
Identifier: strings.TrimSpace(r.Identifier),
|
||
Sender: strings.TrimSpace(r.Sender),
|
||
Sent: nwscommon.ParseTimePtr(r.Sent),
|
||
}
|
||
// If only Identifier is present, preserve it as ID too (useful downstream).
|
||
if ref.ID == "" && ref.Identifier != "" {
|
||
ref.ID = ref.Identifier
|
||
}
|
||
out = append(out, ref)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// Sometimes: array of strings.
|
||
var strs []string
|
||
if err := json.Unmarshal(raw, &strs); err == nil && len(strs) > 0 {
|
||
out := make([]model.AlertReference, 0, len(strs))
|
||
for _, s := range strs {
|
||
id := strings.TrimSpace(s)
|
||
if id == "" {
|
||
continue
|
||
}
|
||
out = append(out, model.AlertReference{ID: id})
|
||
}
|
||
if len(out) > 0 {
|
||
return out
|
||
}
|
||
}
|
||
|
||
// Rare: single string.
|
||
var single string
|
||
if err := json.Unmarshal(raw, &single); err == nil {
|
||
id := strings.TrimSpace(single)
|
||
if id != "" {
|
||
return []model.AlertReference{{ID: id}}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|