diff --git a/internal/normalizers/common/doc.go b/internal/normalizers/common/doc.go new file mode 100644 index 0000000..a9552a3 --- /dev/null +++ b/internal/normalizers/common/doc.go @@ -0,0 +1,49 @@ +// Package common contains cross-provider helper code used by weatherfeeder normalizers. +// +// Purpose +// ------- +// Normalizers convert provider-specific RAW payloads into canonical internal/model types. +// Some small utilities are naturally reusable across multiple providers (unit conversions, +// payload extraction, common parsing, shared fallbacks). Those belong here. +// +// This package is intentionally "boring": +// - pure helpers (deterministic, no I/O) +// - minimal abstractions (prefer straightforward functions) +// - easy to unit test +// +// What belongs here +// ----------------- +// Put code in internal/normalizers/common when it is: +// +// - potentially reusable by more than one provider +// - provider-agnostic (no NWS/OpenWeather/Open-Meteo specific assumptions) +// - stable, small, and readable +// +// Typical examples: +// - unit conversion helpers (°F <-> °C, m/s <-> km/h, hPa <-> Pa, etc.) +// - json.RawMessage payload extraction helpers (with good error messages) +// - shared parsing helpers (timestamps, simple numeric coercions) +// - generic fallbacks (e.g., mapping a human text description into a coarse canonical code), +// so long as the logic truly applies across providers +// +// What does NOT belong here +// ------------------------- +// Do NOT put the following in this package: +// +// - Normalizer implementations (types that satisfy feedkit/normalize.Normalizer) +// - provider-specific JSON structs or mapping logic (put those under +// internal/normalizers//) +// - network or filesystem I/O (sources fetch; normalizers transform) +// - code that depends on event.Source naming, config fields, or driver-specific params +// +// Style and API guidelines +// ------------------------ +// - Prefer small, single-purpose functions. +// - Keep function names explicit (avoid clever generic “DoThing” helpers). +// - Return typed errors with context (include schema/field names where helpful). +// - Keep dependencies minimal: standard library + weatherfeeder packages only. +// - Add unit tests for any non-trivial logic (especially parsing and fallbacks). +// +// Keeping this clean matters: common is shared by all providers, so complexity here +// multiplies across the project. +package common diff --git a/internal/normalizers/common/payload.go b/internal/normalizers/common/payload.go new file mode 100644 index 0000000..00da9e3 --- /dev/null +++ b/internal/normalizers/common/payload.go @@ -0,0 +1,54 @@ +// FILE: ./internal/normalizers/common/payload.go +package common + +import ( + "encoding/json" + "fmt" + + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +// PayloadBytes extracts a JSON-ish payload into bytes suitable for json.Unmarshal. +// +// Supported payload shapes (weatherfeeder convention): +// - json.RawMessage (recommended for raw events) +// - []byte +// - string (assumed to contain JSON) +// - map[string]any (re-marshaled to JSON) +// +// If you add other raw representations later, extend this function. +func PayloadBytes(e event.Event) ([]byte, error) { + if e.Payload == nil { + return nil, fmt.Errorf("payload is nil") + } + + switch v := e.Payload.(type) { + case json.RawMessage: + if len(v) == 0 { + return nil, fmt.Errorf("payload is empty json.RawMessage") + } + return []byte(v), nil + + case []byte: + if len(v) == 0 { + return nil, fmt.Errorf("payload is empty []byte") + } + return v, nil + + case string: + if v == "" { + return nil, fmt.Errorf("payload is empty string") + } + return []byte(v), nil + + case map[string]any: + b, err := json.Marshal(v) + if err != nil { + return nil, fmt.Errorf("marshal map payload: %w", err) + } + return b, nil + + default: + return nil, fmt.Errorf("unsupported payload type %T", e.Payload) + } +} diff --git a/internal/normalizers/common/units.go b/internal/normalizers/common/units.go new file mode 100644 index 0000000..d64c7a9 --- /dev/null +++ b/internal/normalizers/common/units.go @@ -0,0 +1,15 @@ +// FILE: ./internal/normalizers/common/units.go +package common + +// Common unit conversions used across providers. +// +// These helpers are intentionally small and “obvious” and are meant to remove +// duplication across normalizers (and eventually across sources, once refactored). + +func TempCFromF(f float64) float64 { return (f - 32.0) * 5.0 / 9.0 } +func TempCFromK(k float64) float64 { return k - 273.15 } + +func SpeedKmhFromMps(ms float64) float64 { return ms * 3.6 } +func SpeedKmhFromMph(mph float64) float64 { return mph * 1.609344 } + +func PressurePaFromHPa(hpa float64) float64 { return hpa * 100.0 } diff --git a/internal/normalizers/common/wmo_text.go b/internal/normalizers/common/wmo_text.go new file mode 100644 index 0000000..2c661ac --- /dev/null +++ b/internal/normalizers/common/wmo_text.go @@ -0,0 +1,129 @@ +// FILE: ./internal/normalizers/common/wmo_text.go +package common + +import ( + "strings" + + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" +) + +// WMOFromTextDescription is a cross-provider fallback that tries to infer a WMO code +// from a human-readable condition string. +// +// This is intentionally “coarse” and conservative. It is useful when a provider: +// +// - does not provide a condition code, or +// - provides it inconsistently / null, or +// - provides only textual conditions for some endpoints. +// +// Providers may still choose to override this with richer signals +// (METAR phenomena, explicit numeric codes, etc.). +func WMOFromTextDescription(desc string) model.WMOCode { + s := strings.ToLower(strings.TrimSpace(desc)) + if s == "" { + return model.WMOUnknown + } + + // Thunder / hail + if strings.Contains(s, "thunder") { + if strings.Contains(s, "hail") { + return 99 + } + return 95 + } + + // Freezing hazards + if strings.Contains(s, "freezing rain") { + if strings.Contains(s, "light") { + return 66 + } + return 67 + } + if strings.Contains(s, "freezing drizzle") { + if strings.Contains(s, "light") { + return 56 + } + return 57 + } + + // Drizzle + if strings.Contains(s, "drizzle") { + if strings.Contains(s, "heavy") || strings.Contains(s, "dense") { + return 55 + } + if strings.Contains(s, "light") { + return 51 + } + return 53 + } + + // Showers + if strings.Contains(s, "showers") { + if strings.Contains(s, "heavy") { + return 82 + } + if strings.Contains(s, "light") { + return 80 + } + return 81 + } + + // Rain + if strings.Contains(s, "rain") { + if strings.Contains(s, "heavy") { + return 65 + } + if strings.Contains(s, "light") { + return 61 + } + return 63 + } + + // Snow (check snow showers first) + if strings.Contains(s, "snow showers") { + if strings.Contains(s, "light") { + return 85 + } + return 86 + } + if strings.Contains(s, "snow grains") { + return 77 + } + if strings.Contains(s, "snow") { + if strings.Contains(s, "heavy") { + return 75 + } + if strings.Contains(s, "light") { + return 71 + } + return 73 + } + + // Fog / mist + if strings.Contains(s, "rime fog") { + return 48 + } + if strings.Contains(s, "fog") || strings.Contains(s, "mist") { + return 45 + } + + // Sky-only + if strings.Contains(s, "overcast") { + return 3 + } + if strings.Contains(s, "cloudy") { + return 3 + } + if strings.Contains(s, "partly cloudy") { + return 2 + } + if strings.Contains(s, "mostly sunny") || strings.Contains(s, "mostly clear") || + strings.Contains(s, "mainly sunny") || strings.Contains(s, "mainly clear") { + return 1 + } + if strings.Contains(s, "clear") || strings.Contains(s, "sunny") { + return 0 + } + + return model.WMOUnknown +} diff --git a/internal/normalizers/doc.go b/internal/normalizers/doc.go index 0f92748..a8bb850 100644 --- a/internal/normalizers/doc.go +++ b/internal/normalizers/doc.go @@ -19,7 +19,8 @@ // Example: // // internal/normalizers/nws/observation.go -// internal/normalizers/nws/common.go +// internal/normalizers/nws/types.go +// internal/normalizers/nws/wmo_map.go // internal/normalizers/openweather/observation.go // internal/normalizers/openmeteo/observation.go // internal/normalizers/common/units.go @@ -27,14 +28,30 @@ // Rules: // // 1. One normalizer per file. -// Each file contains exactly one Normalizer implementation (one type). +// Each file contains exactly one Normalizer implementation (one type that +// satisfies feedkit/normalize.Normalizer). +// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long +// as they do not define additional Normalizer types. // -// 2. Provider-level shared helpers live in: -// internal/normalizers//common.go +// 2. Provider-level shared helpers live under the provider directory: +// internal/normalizers// +// +// You may use multiple helper files (recommended) when it improves clarity: +// - types.go (provider JSON structs) +// - common.go (provider-shared helpers) +// - mapping.go (provider mapping logic) +// Use common.go only when you truly have “shared across multiple normalizers +// within this provider” helpers. // // 3. Cross-provider helpers live in: // internal/normalizers/common/ // +// Prefer extracting small, pure helpers here when they are reused by ≥2 providers. +// Keep these helpers: +// - deterministic (no I/O) +// - side-effect free +// - easy to read (avoid clever abstractions) +// // 4. Matching is standardized on Event.Schema. // (Do not match on event.Source or event.Kind in weatherfeeder normalizers.) // @@ -82,10 +99,13 @@ // Every normalizer type must have a doc comment that states: // // - what it converts (e.g., “OpenWeather current -> WeatherObservation”) -// - which raw schema it matches (constant name + value) -// - which canonical schema it produces (constant name + value) +// - which raw schema it matches (constant identifier from internal/standards) +// - which canonical schema it produces (constant identifier from internal/standards) // - any special caveats (units, day/night inference, missing fields, etc.) // +// Including literal schema string values is optional, +// but the constant identifiers are required. +// // Event field handling (strong defaults) // -------------------------------------- // Normalizers should treat the incoming event envelope as stable identity and @@ -126,6 +146,22 @@ // // which calls each provider’s Register() in a stable order. // +// Registry ordering +// ----------------------------- +// feedkit normalization uses a match-driven registry (“first match wins”). +// Therefore order matters: +// +// - Register more specific normalizers before more general ones. +// - Avoid “catch-all” Match() implementations. +// - Keep Match() cheap and deterministic (Schema equality checks are ideal). +// +// Reuse guidance (strong recommendation) +// -------------------------------------- +// Before adding provider-specific logic, check internal/normalizers/common for an +// existing helper (payload extraction, unit conversions, text fallbacks, etc.). +// If you discover logic that could potentially apply to another provider, prefer extracting +// it into internal/normalizers/common as appropriate. +// // Testing guidance (recommended) // ------------------------------ // Add a unit test per normalizer: diff --git a/internal/normalizers/nws/metar.go b/internal/normalizers/nws/metar.go new file mode 100644 index 0000000..ab11b4f --- /dev/null +++ b/internal/normalizers/nws/metar.go @@ -0,0 +1,51 @@ +// FILE: ./internal/normalizers/nws/metar.go +package nws + +import ( + "encoding/json" + "strings" +) + +// metarPhenomenon is a typed view of NWS presentWeather objects. +type metarPhenomenon struct { + Intensity *string `json:"intensity"` // "light", "heavy", or null + Modifier *string `json:"modifier"` // "freezing", "showers", etc., or null + Weather string `json:"weather"` // e.g. "rain", "snow", "fog_mist", ... + RawString string `json:"rawString"` + InVicinity *bool `json:"inVicinity"` +} + +func decodeMetarPhenomena(raw []map[string]any) []metarPhenomenon { + if len(raw) == 0 { + return nil + } + + out := make([]metarPhenomenon, 0, len(raw)) + for _, m := range raw { + b, err := json.Marshal(m) + if err != nil { + continue + } + + var p metarPhenomenon + if err := json.Unmarshal(b, &p); err != nil { + continue + } + + p.Weather = strings.ToLower(strings.TrimSpace(p.Weather)) + p.RawString = strings.TrimSpace(p.RawString) + out = append(out, p) + } + + return out +} + +func containsWeather(phenomena []metarPhenomenon, weather string) bool { + weather = strings.ToLower(strings.TrimSpace(weather)) + for _, p := range phenomena { + if p.Weather == weather { + return true + } + } + return false +} diff --git a/internal/normalizers/nws/observation.go b/internal/normalizers/nws/observation.go new file mode 100644 index 0000000..ef1693b --- /dev/null +++ b/internal/normalizers/nws/observation.go @@ -0,0 +1,145 @@ +// FILE: ./internal/normalizers/nws/observation.go +package nws + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ObservationNormalizer converts: +// +// standards.SchemaRawNWSObservationV1 -> standards.SchemaWeatherObservationV1 +// +// It interprets NWS GeoJSON station observations and maps them into the +// canonical model.WeatherObservation representation. +// +// Precedence for determining ConditionCode (WMO): +// 1. presentWeather (METAR phenomena objects) — strongest signal +// 2. textDescription keyword fallback — reusable across providers +// 3. cloudLayers sky-only fallback — NWS/METAR-specific +type ObservationNormalizer struct{} + +func (ObservationNormalizer) Match(e event.Event) bool { + return strings.TrimSpace(e.Schema) == standards.SchemaRawNWSObservationV1 +} + +func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps + + rawBytes, err := normcommon.PayloadBytes(in) + if err != nil { + return nil, fmt.Errorf("nws observation normalize: %w", err) + } + + var parsed nwsObservationResponse + if err := json.Unmarshal(rawBytes, &parsed); err != nil { + return nil, fmt.Errorf("nws observation normalize: decode raw payload: %w", err) + } + + obs, effectiveAt, err := buildObservation(parsed) + if err != nil { + return nil, err + } + + out := in + out.Schema = standards.SchemaWeatherObservationV1 + out.Payload = obs + + // EffectiveAt is optional; for observations it is naturally the observation timestamp. + if !effectiveAt.IsZero() { + t := effectiveAt.UTC() + out.EffectiveAt = &t + } + + if err := out.Validate(); err != nil { + return nil, err + } + return &out, nil +} + +// buildObservation contains the domain mapping logic (provider -> canonical model). +func buildObservation(parsed nwsObservationResponse) (model.WeatherObservation, time.Time, error) { + // Timestamp (RFC3339) + var ts time.Time + if s := strings.TrimSpace(parsed.Properties.Timestamp); s != "" { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + return model.WeatherObservation{}, time.Time{}, fmt.Errorf("nws observation normalize: invalid timestamp %q: %w", s, err) + } + ts = t + } + + cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers)) + for _, cl := range parsed.Properties.CloudLayers { + cloudLayers = append(cloudLayers, model.CloudLayer{ + BaseMeters: cl.Base.Value, + Amount: cl.Amount, + }) + } + + // Preserve raw presentWeather objects (for troubleshooting / drift analysis). + present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather)) + for _, pw := range parsed.Properties.PresentWeather { + present = append(present, model.PresentWeather{Raw: pw}) + } + + // Decode presentWeather into typed METAR phenomena for mapping. + phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather) + + providerDesc := strings.TrimSpace(parsed.Properties.TextDescription) + + // Determine canonical WMO condition code. + wmo := mapNWSToWMO(providerDesc, cloudLayers, phenomena) + + // Canonical condition text comes from our WMO table. + // NWS observation responses typically do not include a day/night flag -> nil. + canonicalText := standards.WMOText(wmo, nil) + + obs := model.WeatherObservation{ + StationID: parsed.Properties.StationID, + StationName: parsed.Properties.StationName, + Timestamp: ts, + + ConditionCode: wmo, + ConditionText: canonicalText, + IsDay: nil, + + ProviderRawDescription: providerDesc, + + // Transitional / human-facing: + // keep output consistent by populating TextDescription from canonical text. + TextDescription: canonicalText, + IconURL: parsed.Properties.Icon, + + TemperatureC: parsed.Properties.Temperature.Value, + DewpointC: parsed.Properties.Dewpoint.Value, + + WindDirectionDegrees: parsed.Properties.WindDirection.Value, + WindSpeedKmh: parsed.Properties.WindSpeed.Value, + WindGustKmh: parsed.Properties.WindGust.Value, + + BarometricPressurePa: parsed.Properties.BarometricPressure.Value, + SeaLevelPressurePa: parsed.Properties.SeaLevelPressure.Value, + VisibilityMeters: parsed.Properties.Visibility.Value, + + RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value, + WindChillC: parsed.Properties.WindChill.Value, + HeatIndexC: parsed.Properties.HeatIndex.Value, + + ElevationMeters: parsed.Properties.Elevation.Value, + RawMessage: parsed.Properties.RawMessage, + + PresentWeather: present, + CloudLayers: cloudLayers, + } + + return obs, ts, nil +} diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index 64c31fa..01c635f 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -1,3 +1,4 @@ +// FILE: ./internal/normalizers/nws/register.go package nws import ( @@ -5,17 +6,11 @@ import ( ) // Register registers NWS normalizers into the provided registry. -// -// This is intentionally empty as a stub. As normalizers are implemented, -// register them here, e.g.: -// -// reg.Register(ObservationNormalizer{}) -// reg.Register(ForecastNormalizer{}) -// reg.Register(AlertsNormalizer{}) func Register(reg *fknormalize.Registry) { if reg == nil { return } - // TODO: register NWS normalizers here. + // Observations + reg.Register(ObservationNormalizer{}) } diff --git a/internal/normalizers/nws/types.go b/internal/normalizers/nws/types.go new file mode 100644 index 0000000..a19f36d --- /dev/null +++ b/internal/normalizers/nws/types.go @@ -0,0 +1,89 @@ +// FILE: ./internal/normalizers/nws/types.go +package nws + +// nwsObservationResponse is a minimal-but-sufficient representation of the NWS +// station observation GeoJSON payload needed for mapping into model.WeatherObservation. +type nwsObservationResponse struct { + ID string `json:"id"` + Properties struct { + StationID string `json:"stationId"` + StationName string `json:"stationName"` + Timestamp string `json:"timestamp"` + TextDescription string `json:"textDescription"` + Icon string `json:"icon"` + + RawMessage string `json:"rawMessage"` + + Elevation struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"elevation"` + + Temperature struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"temperature"` + + Dewpoint struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"dewpoint"` + + WindDirection struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windDirection"` + + WindSpeed struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windSpeed"` + + WindGust struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windGust"` + + BarometricPressure struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"barometricPressure"` + + SeaLevelPressure struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"seaLevelPressure"` + + Visibility struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"visibility"` + + RelativeHumidity struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"relativeHumidity"` + + WindChill struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"windChill"` + + HeatIndex struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"heatIndex"` + + // NWS returns "presentWeather" as decoded METAR phenomena objects. + // We decode these as generic maps, then optionally interpret them in metar.go. + PresentWeather []map[string]any `json:"presentWeather"` + + CloudLayers []struct { + Base struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"base"` + Amount string `json:"amount"` + } `json:"cloudLayers"` + } `json:"properties"` +} diff --git a/internal/normalizers/nws/wmo_map.go b/internal/normalizers/nws/wmo_map.go new file mode 100644 index 0000000..071da18 --- /dev/null +++ b/internal/normalizers/nws/wmo_map.go @@ -0,0 +1,223 @@ +// FILE: ./internal/normalizers/nws/wmo_map.go +package nws + +import ( + "strings" + + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" +) + +// mapNWSToWMO maps NWS signals into a canonical WMO code. +// +// Precedence: +// 1. METAR phenomena (presentWeather) — most reliable for precip/hazards +// 2. textDescription keywords — weaker, but reusable across providers +// 3. cloud layers fallback — only for sky-only conditions +func mapNWSToWMO(providerDesc string, cloudLayers []model.CloudLayer, phenomena []metarPhenomenon) model.WMOCode { + // 1) Prefer METAR phenomena if present. + if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown { + return code + } + + // 2) Reusable fallback: infer WMO from human text description. + if code := normcommon.WMOFromTextDescription(providerDesc); code != model.WMOUnknown { + return code + } + + // 3) NWS/METAR-specific sky fallback. + if code := wmoFromCloudLayers(cloudLayers); code != model.WMOUnknown { + return code + } + + return model.WMOUnknown +} + +func wmoFromPhenomena(phenomena []metarPhenomenon) model.WMOCode { + if len(phenomena) == 0 { + return model.WMOUnknown + } + + intensityOf := func(p metarPhenomenon) string { + if p.Intensity == nil { + return "" + } + return strings.ToLower(strings.TrimSpace(*p.Intensity)) + } + modifierOf := func(p metarPhenomenon) string { + if p.Modifier == nil { + return "" + } + return strings.ToLower(strings.TrimSpace(*p.Modifier)) + } + + // Pass 1: thunder + hail overrides everything (hazard). + hasThunder := false + hailIntensity := "" + for _, p := range phenomena { + switch p.Weather { + case "thunderstorms": + hasThunder = true + case "hail": + if hailIntensity == "" { + hailIntensity = intensityOf(p) + } + } + } + if hasThunder { + if hailIntensity != "" || containsWeather(phenomena, "hail") { + if hailIntensity == "heavy" { + return 99 + } + return 96 + } + return 95 + } + + // Pass 2: freezing hazards. + for _, p := range phenomena { + if modifierOf(p) != "freezing" { + continue + } + switch p.Weather { + case "rain": + if intensityOf(p) == "light" { + return 66 + } + return 67 + case "drizzle": + if intensityOf(p) == "light" { + return 56 + } + return 57 + case "fog", "fog_mist": + return 48 + } + } + + // Pass 3: fog / obscuration. + for _, p := range phenomena { + switch p.Weather { + case "fog", "fog_mist": + return 45 + case "haze", "smoke", "dust", "sand", "spray", "volcanic_ash": + return 45 + } + } + + // Pass 4: precip families. + for _, p := range phenomena { + inten := intensityOf(p) + mod := modifierOf(p) + + if mod == "showers" { + switch p.Weather { + case "rain": + if inten == "light" { + return 80 + } + if inten == "heavy" { + return 82 + } + return 81 + case "snow": + if inten == "light" { + return 85 + } + return 86 + } + } + + switch p.Weather { + case "drizzle": + if inten == "heavy" { + return 55 + } + if inten == "light" { + return 51 + } + return 53 + + case "rain": + if inten == "heavy" { + return 65 + } + if inten == "light" { + return 61 + } + return 63 + + case "snow": + if inten == "heavy" { + return 75 + } + if inten == "light" { + return 71 + } + return 73 + + case "snow_grains": + return 77 + + case "ice_pellets", "snow_pellets": + return 73 + } + } + + return model.WMOUnknown +} + +func wmoFromCloudLayers(cloudLayers []model.CloudLayer) model.WMOCode { + // NWS cloud layer amount values commonly include: + // OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky) + // + // Conservative mapping within our current WMO subset: + // - OVC / BKN / VV => Cloudy (3) + // - SCT => Partly Cloudy (2) + // - FEW => Mainly Sunny/Clear (1) + // - CLR / SKC => Sunny/Clear (0) + // + // Multiple layers: bias toward “most cloudy”. + mostCloudy := "" + + for _, cl := range cloudLayers { + a := strings.ToUpper(strings.TrimSpace(cl.Amount)) + if a == "" { + continue + } + + switch a { + case "OVC": + return 3 + case "BKN", "VV": + if mostCloudy != "OVC" { + mostCloudy = a + } + case "SCT": + if mostCloudy == "" { + mostCloudy = "SCT" + } + case "FEW": + if mostCloudy == "" { + mostCloudy = "FEW" + } + case "CLR", "SKC": + if mostCloudy == "" { + mostCloudy = "CLR" + } + } + } + + switch mostCloudy { + case "BKN", "VV": + return 3 + case "SCT": + return 2 + case "FEW": + return 1 + case "CLR": + return 0 + default: + return model.WMOUnknown + } +} diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index db47778..d4b9140 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -1,20 +1,25 @@ +// FILE: ./internal/sources/nws/observation.go package nws import ( "context" "encoding/json" "fmt" + "io" "net/http" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) -// ObservationSource polls an NWS station observation endpoint and emits a single Observation Event. +// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event. +// +// Key refactor: +// - Source responsibility: fetch bytes + emit a valid event envelope. +// - Normalizer responsibility: interpret raw JSON + map to canonical domain model. // // This corresponds to URLs like: // @@ -34,8 +39,6 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { return nil, fmt.Errorf("nws_observation %q: params are required (need params.url and params.user_agent)", cfg.Name) } - // feedkit keeps config domain-agnostic by storing driver-specific settings in Params. - // Use ParamString so we don't have to type-assert cfg.Params["url"] everywhere. url, ok := cfg.ParamString("url", "URL") if !ok { return nil, fmt.Errorf("nws_observation %q: params.url is required", cfg.Name) @@ -46,37 +49,49 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { return nil, fmt.Errorf("nws_observation %q: params.user_agent is required", cfg.Name) } - // A small timeout is good hygiene for daemons: you want polls to fail fast, - // not hang forever and block subsequent ticks. - client := &http.Client{ - Timeout: 10 * time.Second, - } - return &ObservationSource{ name: cfg.Name, url: url, userAgent: ua, - client: client, + client: &http.Client{ + Timeout: 10 * time.Second, + }, }, nil } func (s *ObservationSource) Name() string { return s.name } // Kind is used for routing/policy. +// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } -// Poll fetches "current conditions" and emits exactly one Event (under normal conditions). +// Poll fetches NWS "latest observation" and emits exactly one RAW Event. +// The RAW payload is json.RawMessage and Schema is standards.SchemaRawNWSObservationV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { - obs, eventID, err := s.fetchAndParse(ctx) + raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } - // EffectiveAt is optional. - // For observations, the natural effective time is the observation timestamp. + // Event.ID must be set BEFORE normalization (feedkit requires it). + // Prefer NWS-provided "id" (stable URL). Fallback to a stable-ish computed key. + eventID := strings.TrimSpace(meta.ID) + if eventID == "" { + ts := meta.ParsedTimestamp + if ts.IsZero() { + ts = time.Now().UTC() + } + station := strings.TrimSpace(meta.StationID) + if station == "" { + station = "UNKNOWN" + } + eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.name, station, ts.UTC().Format(time.RFC3339Nano)) + } + + // EffectiveAt is optional; for observations it’s naturally the observation timestamp. var effectiveAt *time.Time - if !obs.Timestamp.IsZero() { - t := obs.Timestamp + if !meta.ParsedTimestamp.IsZero() { + t := meta.ParsedTimestamp.UTC() effectiveAt = &t } @@ -87,11 +102,11 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { EmittedAt: time.Now().UTC(), EffectiveAt: effectiveAt, - // Optional: makes downstream decoding/inspection easier. - Schema: "weather.observation.v1", + // RAW schema (normalizer matches on this). + Schema: standards.SchemaRawNWSObservationV1, - // Payload remains domain-specific for now. - Payload: obs, + // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. + Payload: raw, } if err := e.Validate(); err != nil { @@ -101,609 +116,65 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return []event.Event{e}, nil } -// --- JSON parsing (minimal model of NWS observation payload) --- +// ---- RAW fetch + minimal metadata decode ---- -type nwsObservationResponse struct { - ID string `json:"id"` // a stable unique identifier URL in the payload you pasted +// observationMeta is a *minimal* decode of the NWS payload used only to build +// a stable Event.ID and a useful EffectiveAt for the envelope. +type observationMeta struct { + ID string `json:"id"` Properties struct { - StationID string `json:"stationId"` - StationName string `json:"stationName"` - Timestamp string `json:"timestamp"` - TextDescription string `json:"textDescription"` - Icon string `json:"icon"` - - RawMessage string `json:"rawMessage"` - - Elevation struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"elevation"` - - Temperature struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"temperature"` - - Dewpoint struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"dewpoint"` - - WindDirection struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"windDirection"` - - WindSpeed struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"windSpeed"` - - WindGust struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"windGust"` - - BarometricPressure struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"barometricPressure"` - - SeaLevelPressure struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"seaLevelPressure"` - - Visibility struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"visibility"` - - RelativeHumidity struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"relativeHumidity"` - - WindChill struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"windChill"` - - HeatIndex struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"heatIndex"` - - // NWS returns "presentWeather" as decoded METAR phenomena objects. - // We decode these initially as generic maps so we can: - // 1) preserve the raw objects in model.PresentWeather{Raw: ...} - // 2) also decode them into a typed struct for our WMO mapping logic. - PresentWeather []map[string]any `json:"presentWeather"` - - CloudLayers []struct { - Base struct { - UnitCode string `json:"unitCode"` - Value *float64 `json:"value"` - } `json:"base"` - Amount string `json:"amount"` - } `json:"cloudLayers"` + StationID string `json:"stationId"` + Timestamp string `json:"timestamp"` } `json:"properties"` + + // Convenience fields populated after decode. + ParsedTimestamp time.Time `json:"-"` + StationID string `json:"-"` } -// metarPhenomenon is a typed view of NWS presentWeather objects. -// You provided the schema for these values (intensity/modifier/weather/rawString). -type metarPhenomenon struct { - Intensity *string `json:"intensity"` // "light", "heavy", or null - Modifier *string `json:"modifier"` // "freezing", "showers", etc., or null - Weather string `json:"weather"` // e.g., "rain", "snow", "fog_mist", ... - RawString string `json:"rawString"` - // InVicinity exists in the schema; we ignore it for now because WMO codes - // don't directly represent "in vicinity" semantics. - InVicinity *bool `json:"inVicinity"` -} - -func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, string, error) { - req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil) +func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) if err != nil { - return model.WeatherObservation{}, "", err + return nil, observationMeta{}, err } - // NWS requests: a real User-Agent with contact info is strongly recommended. req.Header.Set("User-Agent", s.userAgent) req.Header.Set("Accept", "application/geo+json, application/json") res, err := s.client.Do(req) if err != nil { - return model.WeatherObservation{}, "", err + return nil, observationMeta{}, err } defer res.Body.Close() if res.StatusCode < 200 || res.StatusCode >= 300 { - return model.WeatherObservation{}, "", fmt.Errorf("nws_observation %q: HTTP %s", s.name, res.Status) + return nil, observationMeta{}, fmt.Errorf("nws_observation %q: HTTP %s", s.name, res.Status) } - var parsed nwsObservationResponse - if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { - return model.WeatherObservation{}, "", err + b, err := io.ReadAll(res.Body) + if err != nil { + return nil, observationMeta{}, err + } + if len(b) == 0 { + return nil, observationMeta{}, fmt.Errorf("nws_observation %q: empty response body", s.name) } - // Parse timestamp (RFC3339) - var ts time.Time - if strings.TrimSpace(parsed.Properties.Timestamp) != "" { - t, err := time.Parse(time.RFC3339, parsed.Properties.Timestamp) - if err != nil { - return model.WeatherObservation{}, "", fmt.Errorf("nws_observation %q: invalid timestamp %q: %w", - s.name, parsed.Properties.Timestamp, err) + raw := json.RawMessage(b) + + var meta observationMeta + if err := json.Unmarshal(b, &meta); err != nil { + // If metadata decode fails, still return raw; envelope will fall back to computed ID. + return raw, observationMeta{}, nil + } + + meta.StationID = strings.TrimSpace(meta.Properties.StationID) + + tsStr := strings.TrimSpace(meta.Properties.Timestamp) + if tsStr != "" { + if t, err := time.Parse(time.RFC3339, tsStr); err == nil { + meta.ParsedTimestamp = t } - ts = t } - cloudLayers := make([]model.CloudLayer, 0, len(parsed.Properties.CloudLayers)) - for _, cl := range parsed.Properties.CloudLayers { - cloudLayers = append(cloudLayers, model.CloudLayer{ - BaseMeters: cl.Base.Value, - Amount: cl.Amount, - }) - } - - // Preserve the raw presentWeather objects (as before) in the domain model. - present := make([]model.PresentWeather, 0, len(parsed.Properties.PresentWeather)) - for _, pw := range parsed.Properties.PresentWeather { - present = append(present, model.PresentWeather{Raw: pw}) - } - - // Decode presentWeather into a typed slice for improved mapping. - phenomena := decodeMetarPhenomena(parsed.Properties.PresentWeather) - - // Provider description (NWS vocabulary). We store this for troubleshooting only. - providerDesc := strings.TrimSpace(parsed.Properties.TextDescription) - - // Map NWS -> canonical WMO code using best-effort heuristics: - // 1) presentWeather (METAR phenomena) if present - // 2) provider textDescription keywords - // 3) cloud layers fallback - wmo := mapNWSToWMO(providerDesc, cloudLayers, phenomena) - - // Canonical text comes from our shared WMO table. - // NWS does not give us an explicit day/night flag here, so we leave it nil. - canonicalText := standards.WMOText(wmo, nil) - - obs := model.WeatherObservation{ - StationID: parsed.Properties.StationID, - StationName: parsed.Properties.StationName, - Timestamp: ts, - - // Canonical conditions - ConditionCode: wmo, - ConditionText: canonicalText, - IsDay: nil, - - // Provider evidence (for troubleshooting mapping) - ProviderRawDescription: providerDesc, - - // Human-facing fields: - // Populate TextDescription with canonical text so downstream output stays consistent. - TextDescription: canonicalText, - IconURL: parsed.Properties.Icon, - - TemperatureC: parsed.Properties.Temperature.Value, - DewpointC: parsed.Properties.Dewpoint.Value, - - WindDirectionDegrees: parsed.Properties.WindDirection.Value, - WindSpeedKmh: parsed.Properties.WindSpeed.Value, - WindGustKmh: parsed.Properties.WindGust.Value, - - BarometricPressurePa: parsed.Properties.BarometricPressure.Value, - SeaLevelPressurePa: parsed.Properties.SeaLevelPressure.Value, - VisibilityMeters: parsed.Properties.Visibility.Value, - - RelativeHumidityPercent: parsed.Properties.RelativeHumidity.Value, - WindChillC: parsed.Properties.WindChill.Value, - HeatIndexC: parsed.Properties.HeatIndex.Value, - - ElevationMeters: parsed.Properties.Elevation.Value, - RawMessage: parsed.Properties.RawMessage, - - PresentWeather: present, - CloudLayers: cloudLayers, - } - - // Event ID: prefer the NWS-provided "id" (stable unique URL), else fall back to computed. - eventID := strings.TrimSpace(parsed.ID) - if eventID == "" { - eventID = fmt.Sprintf("observation:%s:%s:%s", - s.name, - obs.StationID, - obs.Timestamp.UTC().Format(time.RFC3339Nano), - ) - } - - return obs, eventID, nil -} - -func decodeMetarPhenomena(raw []map[string]any) []metarPhenomenon { - if len(raw) == 0 { - return nil - } - - out := make([]metarPhenomenon, 0, len(raw)) - for _, m := range raw { - // Encode/decode is slightly inefficient, but it's simple and very readable. - // presentWeather payloads are small; this is fine for a polling daemon. - b, err := json.Marshal(m) - if err != nil { - continue - } - - var p metarPhenomenon - if err := json.Unmarshal(b, &p); err != nil { - continue - } - - p.Weather = strings.ToLower(strings.TrimSpace(p.Weather)) - p.RawString = strings.TrimSpace(p.RawString) - out = append(out, p) - } - return out -} - -// mapNWSToWMO maps NWS signals into a canonical WMO code. -// -// Precedence: -// 1. METAR phenomena (presentWeather) — most reliable for precip/hazards -// 2. textDescription keywords — weaker, but still useful -// 3. cloud layers fallback — only for sky-only conditions -func mapNWSToWMO(providerDesc string, cloudLayers []model.CloudLayer, phenomena []metarPhenomenon) model.WMOCode { - // 1) Prefer METAR phenomena if present. - if code := wmoFromPhenomena(phenomena); code != model.WMOUnknown { - return code - } - - // 2) Fall back to provider textDescription keywords. - if code := wmoFromTextDescription(providerDesc); code != model.WMOUnknown { - return code - } - - // 3) Fall back to cloud layers. - if code := wmoFromCloudLayers(cloudLayers); code != model.WMOUnknown { - return code - } - - return model.WMOUnknown -} - -func wmoFromPhenomena(phenomena []metarPhenomenon) model.WMOCode { - if len(phenomena) == 0 { - return model.WMOUnknown - } - - // Helper accessors (avoid repeating nil checks everywhere). - intensityOf := func(p metarPhenomenon) string { - if p.Intensity == nil { - return "" - } - return strings.ToLower(strings.TrimSpace(*p.Intensity)) - } - modifierOf := func(p metarPhenomenon) string { - if p.Modifier == nil { - return "" - } - return strings.ToLower(strings.TrimSpace(*p.Modifier)) - } - - // Pass 1: thunder + hail overrides everything (hazard). - // - // WMO provides: - // 95 = thunderstorm - // 96 = light thunderstorms with hail - // 99 = thunderstorms with hail - hasThunder := false - hailIntensity := "" - for _, p := range phenomena { - switch p.Weather { - case "thunderstorms": - hasThunder = true - case "hail": - if hailIntensity == "" { - hailIntensity = intensityOf(p) - } - } - } - if hasThunder { - if hailIntensity != "" || containsWeather(phenomena, "hail") { - if hailIntensity == "heavy" { - return 99 - } - // Default to "light" hail when unknown - return 96 - } - return 95 - } - - // Pass 2: freezing hazards. - // - // Modifier includes "freezing". - for _, p := range phenomena { - if modifierOf(p) != "freezing" { - continue - } - - switch p.Weather { - case "rain": - if intensityOf(p) == "light" { - return 66 - } - // Default to freezing rain when unknown/heavy. - return 67 - - case "drizzle": - if intensityOf(p) == "light" { - return 56 - } - return 57 - - case "fog", "fog_mist": - // "Freezing fog" isn't a perfect match for "Rime Fog", - // but within our current WMO subset, 48 is the closest. - return 48 - } - } - - // Pass 3: fog / obscuration. - for _, p := range phenomena { - switch p.Weather { - case "fog", "fog_mist": - return 45 - case "haze", "smoke", "dust", "sand", "spray", "volcanic_ash": - // Our current WMO table subset doesn't include haze/smoke/dust codes. - // "Foggy" (45) is a reasonable umbrella for "visibility obscured". - return 45 - } - } - - // Pass 4: precip families. - for _, p := range phenomena { - inten := intensityOf(p) - mod := modifierOf(p) - - // Handle "showers" modifier explicitly (rain vs snow showers). - if mod == "showers" { - switch p.Weather { - case "rain": - if inten == "light" { - return 80 - } - if inten == "heavy" { - return 82 - } - return 81 - - case "snow": - if inten == "light" { - return 85 - } - return 86 - } - } - - switch p.Weather { - // Drizzle - case "drizzle": - if inten == "heavy" { - return 55 - } - if inten == "light" { - return 51 - } - return 53 - - // Rain - case "rain": - if inten == "heavy" { - return 65 - } - if inten == "light" { - return 61 - } - return 63 - - // Snow - case "snow": - if inten == "heavy" { - return 75 - } - if inten == "light" { - return 71 - } - return 73 - - // Snow grains - case "snow_grains": - return 77 - - // We don’t currently have sleet/ice pellet codes in our shared WMO subset. - // We make conservative choices within the available codes. - case "ice_pellets", "snow_pellets": - // Closest within our subset is "Snow" (73). If you later expand the WMO table - // to include sleet/ice pellet codes, update this mapping. - return 73 - } - } - - return model.WMOUnknown -} - -func containsWeather(phenomena []metarPhenomenon, weather string) bool { - weather = strings.ToLower(strings.TrimSpace(weather)) - for _, p := range phenomena { - if p.Weather == weather { - return true - } - } - return false -} - -func wmoFromTextDescription(providerDesc string) model.WMOCode { - s := strings.ToLower(strings.TrimSpace(providerDesc)) - if s == "" { - return model.WMOUnknown - } - - // Thunder / hail - if strings.Contains(s, "thunder") { - if strings.Contains(s, "hail") { - return 99 - } - return 95 - } - - // Freezing hazards - if strings.Contains(s, "freezing rain") { - if strings.Contains(s, "light") { - return 66 - } - return 67 - } - if strings.Contains(s, "freezing drizzle") { - if strings.Contains(s, "light") { - return 56 - } - return 57 - } - - // Drizzle - if strings.Contains(s, "drizzle") { - if strings.Contains(s, "heavy") || strings.Contains(s, "dense") { - return 55 - } - if strings.Contains(s, "light") { - return 51 - } - return 53 - } - - // Showers - if strings.Contains(s, "showers") { - if strings.Contains(s, "heavy") { - return 82 - } - if strings.Contains(s, "light") { - return 80 - } - return 81 - } - - // Rain - if strings.Contains(s, "rain") { - if strings.Contains(s, "heavy") { - return 65 - } - if strings.Contains(s, "light") { - return 61 - } - return 63 - } - - // Snow - if strings.Contains(s, "snow showers") { - if strings.Contains(s, "light") { - return 85 - } - return 86 - } - if strings.Contains(s, "snow grains") { - return 77 - } - if strings.Contains(s, "snow") { - if strings.Contains(s, "heavy") { - return 75 - } - if strings.Contains(s, "light") { - return 71 - } - return 73 - } - - // Fog - if strings.Contains(s, "rime fog") { - return 48 - } - if strings.Contains(s, "fog") || strings.Contains(s, "mist") { - return 45 - } - - // Sky-only - if strings.Contains(s, "overcast") { - return 3 - } - if strings.Contains(s, "cloudy") { - return 3 - } - if strings.Contains(s, "partly cloudy") { - return 2 - } - if strings.Contains(s, "mostly sunny") || strings.Contains(s, "mostly clear") || - strings.Contains(s, "mainly sunny") || strings.Contains(s, "mainly clear") { - return 1 - } - if strings.Contains(s, "clear") || strings.Contains(s, "sunny") { - return 0 - } - - return model.WMOUnknown -} - -func wmoFromCloudLayers(cloudLayers []model.CloudLayer) model.WMOCode { - // NWS cloud layer amount values commonly include: - // OVC, BKN, SCT, FEW, SKC, CLR, VV (vertical visibility / obscured sky) - // - // We interpret these conservatively: - // - OVC / BKN / VV => Cloudy (3) - // - SCT => Partly Cloudy (2) - // - FEW => Mainly Sunny/Clear (1) - // - CLR / SKC => Sunny/Clear (0) - // - // If multiple layers exist, we bias toward the "most cloudy" layer. - mostCloudy := "" - - for _, cl := range cloudLayers { - a := strings.ToUpper(strings.TrimSpace(cl.Amount)) - if a == "" { - continue - } - - switch a { - case "OVC": - return 3 - case "BKN", "VV": - if mostCloudy != "OVC" { - mostCloudy = a - } - case "SCT": - if mostCloudy == "" { - mostCloudy = "SCT" - } - case "FEW": - if mostCloudy == "" { - mostCloudy = "FEW" - } - case "CLR", "SKC": - if mostCloudy == "" { - mostCloudy = "CLR" - } - } - } - - switch mostCloudy { - case "BKN", "VV": - return 3 - case "SCT": - return 2 - case "FEW": - return 1 - case "CLR": - return 0 - default: - return model.WMOUnknown - } + return raw, meta, nil }