From 2eb2d4b90f491a67a5aae658d1848dbbe7c50975 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Fri, 16 Jan 2026 10:28:32 -0600 Subject: [PATCH] feat(nws, normalizers): add NWS hourly forecast normalization and enforce canonical float rounding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Implement full NWS hourly forecast normalizer (raw.nws.hourly.forecast.v1 → weather.forecast.v1) - Add GeoJSON forecast types and helpers for NWS gridpoint hourly payloads - Normalize temperatures, winds, humidity, PoP, and infer WMO condition codes from forecast text/icons - Treat forecast IssuedAt as EffectiveAt for stable, dedupe-friendly event IDs - Introduce project-wide float rounding at normalization finalization - Round all float values in canonical payloads to 2 decimal places - Apply consistently across pointers, slices, maps, and nested structs - Preserve opaque structs (e.g., time.Time) unchanged - Add SchemaRawNWSHourlyForecastV1 and align schema matching/comments - Clean up NWS helper organization and comments - Update documentation to reflect numeric wire-format and normalization policies This establishes a complete, deterministic hourly forecast pipeline for NWS and improves JSON output stability across all canonical weather schemas. --- cmd/weatherfeeder/config.yml | 58 +++--- internal/normalizers/common/doc.go | 40 +--- internal/normalizers/common/finalize.go | 5 +- internal/normalizers/common/round.go | 215 ++++++++++++++++++++++ internal/normalizers/nws/forecast.go | 159 ++++++++++++++++ internal/normalizers/nws/helpers.go | 235 ++++++++++++++++++++++++ internal/normalizers/nws/register.go | 3 + internal/normalizers/nws/types.go | 63 +++++++ internal/sources/nws/forecast.go | 132 ++++++++++--- internal/standards/doc.go | 8 + internal/standards/schema.go | 5 + 11 files changed, 831 insertions(+), 92 deletions(-) create mode 100644 internal/normalizers/common/round.go create mode 100644 internal/normalizers/nws/forecast.go create mode 100644 internal/normalizers/nws/helpers.go diff --git a/cmd/weatherfeeder/config.yml b/cmd/weatherfeeder/config.yml index c73b5d1..2c862dd 100644 --- a/cmd/weatherfeeder/config.yml +++ b/cmd/weatherfeeder/config.yml @@ -1,28 +1,28 @@ --- sources: - - name: NWSObservationKSTL - kind: observation - driver: nws_observation - every: 12m - params: - url: "https://api.weather.gov/stations/KSTL/observations/latest" - user_agent: "HomeOps (eric@maximumdirect.net)" +# - name: NWSObservationKSTL +# kind: observation +# driver: nws_observation +# every: 12m +# params: +# url: "https://api.weather.gov/stations/KSTL/observations/latest" +# user_agent: "HomeOps (eric@maximumdirect.net)" - - name: OpenMeteoObservation - kind: observation - driver: openmeteo_observation - every: 12m - params: - url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1" - user_agent: "HomeOps (eric@maximumdirect.net)" +# - name: OpenMeteoObservation +# kind: observation +# driver: openmeteo_observation +# every: 12m +# params: +# url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571¤t=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1" +# user_agent: "HomeOps (eric@maximumdirect.net)" - - name: OpenWeatherObservation - kind: observation - driver: openweather_observation - every: 12m - params: - url: "https://api.openweathermap.org/data/2.5/weather?lat=38.6239&lon=-90.3571&appid=c954f2566cb7ccb56b43737b52e88fc6&units=metric" - user_agent: "HomeOps (eric@maximumdirect.net)" +# - name: OpenWeatherObservation +# kind: observation +# driver: openweather_observation +# every: 12m +# params: +# url: "https://api.openweathermap.org/data/2.5/weather?lat=38.6239&lon=-90.3571&appid=c954f2566cb7ccb56b43737b52e88fc6&units=metric" +# user_agent: "HomeOps (eric@maximumdirect.net)" # - name: NWSObservationKSUS # kind: observation @@ -48,13 +48,13 @@ sources: # url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast" # user_agent: "HomeOps (eric@maximumdirect.net)" -# - name: NWSHourlyForecastSTL -# kind: forecast -# driver: nws_forecast -# every: 1m -# params: -# url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly" -# user_agent: "HomeOps (eric@maximumdirect.net)" + - name: NWSHourlyForecastSTL + kind: forecast + driver: nws_forecast + every: 1m + params: + url: "https://api.weather.gov/gridpoints/LSX/90,74/forecast/hourly" + user_agent: "HomeOps (eric@maximumdirect.net)" # - name: NWSAlertsSTL # kind: alert @@ -76,7 +76,7 @@ sinks: routes: - sink: stdout - kinds: ["observation"] + kinds: ["observation", "forecast", "alert"] # - sink: logfile # kinds: ["observation", "alert", "forecast"] diff --git a/internal/normalizers/common/doc.go b/internal/normalizers/common/doc.go index a9552a3..0373059 100644 --- a/internal/normalizers/common/doc.go +++ b/internal/normalizers/common/doc.go @@ -11,39 +11,9 @@ // - minimal abstractions (prefer straightforward functions) // - easy to unit test // -// What belongs here -// ----------------- -// Put code in internal/normalizers/common when it is: -// -// - potentially reusable by more than one provider -// - provider-agnostic (no NWS/OpenWeather/Open-Meteo specific assumptions) -// - stable, small, and readable -// -// Typical examples: -// - unit conversion helpers (°F <-> °C, m/s <-> km/h, hPa <-> Pa, etc.) -// - json.RawMessage payload extraction helpers (with good error messages) -// - shared parsing helpers (timestamps, simple numeric coercions) -// - generic fallbacks (e.g., mapping a human text description into a coarse canonical code), -// so long as the logic truly applies across providers -// -// What does NOT belong here -// ------------------------- -// Do NOT put the following in this package: -// -// - Normalizer implementations (types that satisfy feedkit/normalize.Normalizer) -// - provider-specific JSON structs or mapping logic (put those under -// internal/normalizers//) -// - network or filesystem I/O (sources fetch; normalizers transform) -// - code that depends on event.Source naming, config fields, or driver-specific params -// -// Style and API guidelines -// ------------------------ -// - Prefer small, single-purpose functions. -// - Keep function names explicit (avoid clever generic “DoThing” helpers). -// - Return typed errors with context (include schema/field names where helpful). -// - Keep dependencies minimal: standard library + weatherfeeder packages only. -// - Add unit tests for any non-trivial logic (especially parsing and fallbacks). -// -// Keeping this clean matters: common is shared by all providers, so complexity here -// multiplies across the project. +// Numeric wire policy +// ------------------- +// Canonical payloads are intended for sinks/serialization. To keep output stable and readable, +// weatherfeeder rounds floating-point values in canonical payloads to a small, fixed precision +// at finalization time (see round.go). package common diff --git a/internal/normalizers/common/finalize.go b/internal/normalizers/common/finalize.go index 0ada934..368460e 100644 --- a/internal/normalizers/common/finalize.go +++ b/internal/normalizers/common/finalize.go @@ -14,10 +14,13 @@ import ( // - ID/Kind/Source/EmittedAt are preserved by copying the input event. // - EffectiveAt is only overwritten when effectiveAt is non-zero. // If effectiveAt is zero, any existing in.EffectiveAt is preserved. +// - Payload floats are rounded to a stable wire-friendly precision (see round.go). func Finalize(in event.Event, outSchema string, outPayload any, effectiveAt time.Time) (*event.Event, error) { out := in out.Schema = outSchema - out.Payload = outPayload + + // Enforce stable numeric presentation for sinks: round floats in the canonical payload. + out.Payload = RoundFloats(outPayload, DefaultFloatPrecision) if !effectiveAt.IsZero() { t := effectiveAt.UTC() diff --git a/internal/normalizers/common/round.go b/internal/normalizers/common/round.go new file mode 100644 index 0000000..c83a940 --- /dev/null +++ b/internal/normalizers/common/round.go @@ -0,0 +1,215 @@ +// FILE: ./internal/normalizers/common/round.go +package common + +import ( + "math" + "reflect" +) + +// DefaultFloatPrecision is the project-wide wire-format policy for floating-point +// values in canonical payloads (weather.* schemas). +// +// Note: encoding/json will not necessarily print trailing zeros (e.g. 1.50 -> 1.5), +// but values will be *rounded* to this number of digits after the decimal point. +const DefaultFloatPrecision = 2 + +// RoundFloats returns a copy of v with all float32/float64 values (including pointers, +// slices, arrays, maps, and nested exported-struct fields) rounded to `decimals` digits +// after the decimal point. +// +// This is a best-effort helper meant for presentation stability. If reflection hits an +// unsupported/opaque type (e.g. structs with unexported fields like time.Time), that +// subtree is left unchanged. +func RoundFloats(v any, decimals int) any { + if v == nil || decimals < 0 { + return v + } + + defer func() { + // Never let presentation formatting crash the pipeline. + _ = recover() + }() + + rv := reflect.ValueOf(v) + out := roundValue(rv, decimals) + if !out.IsValid() { + return v + } + return out.Interface() +} + +func roundValue(v reflect.Value, decimals int) reflect.Value { + if !v.IsValid() { + return v + } + + // Unwrap interfaces. + if v.Kind() == reflect.Interface { + if v.IsNil() { + return v + } + elem := roundValue(v.Elem(), decimals) + + // Re-wrap in the same interface type. + out := reflect.New(v.Type()).Elem() + if elem.IsValid() && elem.Type().AssignableTo(v.Type()) { + out.Set(elem) + return out + } + if elem.IsValid() && elem.Type().AssignableTo(v.Type()) { + out.Set(elem) + return out + } + if elem.IsValid() && elem.Type().ConvertibleTo(v.Type()) { + out.Set(elem.Convert(v.Type())) + return out + } + // If we can't sensibly re-wrap, just keep the original. + return v + } + + // Copy pointers (and round their targets). + if v.Kind() == reflect.Pointer { + if v.IsNil() { + return v + } + + // If the pointed-to type is an opaque struct (e.g. time.Time), keep as-is. + if v.Elem().Kind() == reflect.Struct && isOpaqueStruct(v.Elem().Type()) { + return v + } + + elem := roundValue(v.Elem(), decimals) + p := reflect.New(v.Type().Elem()) + if elem.IsValid() && elem.Type().AssignableTo(v.Type().Elem()) { + p.Elem().Set(elem) + } else if elem.IsValid() && elem.Type().ConvertibleTo(v.Type().Elem()) { + p.Elem().Set(elem.Convert(v.Type().Elem())) + } else { + p.Elem().Set(v.Elem()) + } + return p + } + + switch v.Kind() { + case reflect.Float32, reflect.Float64: + f := v.Convert(reflect.TypeOf(float64(0))).Float() + r := roundFloat64(f, decimals) + return reflect.ValueOf(r).Convert(v.Type()) + + case reflect.Struct: + // Avoid reconstructing opaque structs (time.Time has unexported fields). + if isOpaqueStruct(v.Type()) { + return v + } + + out := reflect.New(v.Type()).Elem() + out.Set(v) // start from a copy, then replace rounded fields + + t := v.Type() + for i := 0; i < v.NumField(); i++ { + sf := t.Field(i) + + // Only exported fields are safely settable across packages. + if sf.PkgPath != "" { + continue + } + + fv := v.Field(i) + rf := roundValue(fv, decimals) + + of := out.Field(i) + if !of.CanSet() { + continue + } + + if rf.IsValid() && rf.Type().AssignableTo(of.Type()) { + of.Set(rf) + } else if rf.IsValid() && rf.Type().ConvertibleTo(of.Type()) { + of.Set(rf.Convert(of.Type())) + } + } + return out + + case reflect.Slice: + if v.IsNil() { + return v + } + out := reflect.MakeSlice(v.Type(), v.Len(), v.Len()) + for i := 0; i < v.Len(); i++ { + ev := v.Index(i) + re := roundValue(ev, decimals) + if re.IsValid() && re.Type().AssignableTo(out.Index(i).Type()) { + out.Index(i).Set(re) + } else if re.IsValid() && re.Type().ConvertibleTo(out.Index(i).Type()) { + out.Index(i).Set(re.Convert(out.Index(i).Type())) + } else { + out.Index(i).Set(ev) + } + } + return out + + case reflect.Array: + out := reflect.New(v.Type()).Elem() + out.Set(v) + for i := 0; i < v.Len(); i++ { + ev := v.Index(i) + re := roundValue(ev, decimals) + if re.IsValid() && re.Type().AssignableTo(out.Index(i).Type()) { + out.Index(i).Set(re) + } else if re.IsValid() && re.Type().ConvertibleTo(out.Index(i).Type()) { + out.Index(i).Set(re.Convert(out.Index(i).Type())) + } else { + out.Index(i).Set(ev) + } + } + return out + + case reflect.Map: + if v.IsNil() { + return v + } + out := reflect.MakeMapWithSize(v.Type(), v.Len()) + iter := v.MapRange() + for iter.Next() { + k := iter.Key() + mv := iter.Value() + rv := roundValue(mv, decimals) + + if rv.IsValid() && rv.Type().AssignableTo(v.Type().Elem()) { + out.SetMapIndex(k, rv) + } else if rv.IsValid() && rv.Type().ConvertibleTo(v.Type().Elem()) { + out.SetMapIndex(k, rv.Convert(v.Type().Elem())) + } else { + out.SetMapIndex(k, mv) + } + } + return out + + default: + // ints, strings, bools, time.Time (handled as opaque), etc. + return v + } +} + +func roundFloat64(f float64, decimals int) float64 { + if decimals <= 0 { + return math.Round(f) + } + pow := math.Pow10(decimals) + return math.Round(f*pow) / pow +} + +// isOpaqueStruct returns true for structs that are unsafe/unhelpful to reconstruct via reflection. +// Any struct containing unexported fields (e.g. time.Time) is treated as opaque. +func isOpaqueStruct(t reflect.Type) bool { + if t.Kind() != reflect.Struct { + return false + } + for i := 0; i < t.NumField(); i++ { + if t.Field(i).PkgPath != "" { + return true + } + } + return false +} diff --git a/internal/normalizers/nws/forecast.go b/internal/normalizers/nws/forecast.go new file mode 100644 index 0000000..4011b8f --- /dev/null +++ b/internal/normalizers/nws/forecast.go @@ -0,0 +1,159 @@ +// FILE: internal/normalizers/nws/forecast.go +package nws + +import ( + "context" + "fmt" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ForecastNormalizer converts: +// +// standards.SchemaRawNWSHourlyForecastV1 -> standards.SchemaWeatherForecastV1 +// +// It interprets NWS GeoJSON gridpoint *hourly* forecast responses and maps them into +// the canonical model.WeatherForecastRun representation. +// +// Caveats / policy: +// 1. NWS forecast periods do not include METAR presentWeather phenomena, so ConditionCode +// is inferred from period.shortForecast (with a conservative icon-based fallback). +// 2. Temperature is converted to °C when NWS supplies °F. +// 3. WindSpeed is parsed from strings like "9 mph" / "10 to 15 mph" and converted to km/h. +type ForecastNormalizer struct{} + +func (ForecastNormalizer) Match(e event.Event) bool { + s := strings.TrimSpace(e.Schema) + return s == standards.SchemaRawNWSHourlyForecastV1 +} + +func (ForecastNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps + + return normcommon.NormalizeJSON( + in, + "nws hourly forecast", + standards.SchemaWeatherForecastV1, + buildForecast, + ) +} + +// buildForecast contains the domain mapping logic (provider -> canonical model). +func buildForecast(parsed nwsForecastResponse) (model.WeatherForecastRun, time.Time, error) { + // IssuedAt is required by the canonical model. + issuedStr := strings.TrimSpace(parsed.Properties.GeneratedAt) + if issuedStr == "" { + return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("missing properties.generatedAt") + } + issuedAt, err := nwscommon.ParseTime(issuedStr) + if err != nil { + return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("invalid properties.generatedAt %q: %w", issuedStr, err) + } + issuedAt = issuedAt.UTC() + + // UpdatedAt is optional. + var updatedAt *time.Time + if s := strings.TrimSpace(parsed.Properties.UpdateTime); s != "" { + if t, err := nwscommon.ParseTime(s); err == nil { + tt := t.UTC() + updatedAt = &tt + } + } + + // Best-effort location centroid from the GeoJSON polygon (optional). + lat, lon := centroidLatLon(parsed.Geometry.Coordinates) + + // Schema is explicitly hourly, so product is not a heuristic. + run := model.WeatherForecastRun{ + LocationID: "", + LocationName: "", + + IssuedAt: issuedAt, + UpdatedAt: updatedAt, + Product: model.ForecastProductHourly, + + Latitude: lat, + Longitude: lon, + ElevationMeters: parsed.Properties.Elevation.Value, + + Periods: nil, + } + + periods := make([]model.WeatherForecastPeriod, 0, len(parsed.Properties.Periods)) + for i, p := range parsed.Properties.Periods { + startStr := strings.TrimSpace(p.StartTime) + endStr := strings.TrimSpace(p.EndTime) + + if startStr == "" || endStr == "" { + return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d]: missing startTime/endTime", i) + } + + start, err := nwscommon.ParseTime(startStr) + if err != nil { + return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].startTime invalid %q: %w", i, startStr, err) + } + end, err := nwscommon.ParseTime(endStr) + if err != nil { + return model.WeatherForecastRun{}, time.Time{}, fmt.Errorf("periods[%d].endTime invalid %q: %w", i, endStr, err) + } + start = start.UTC() + end = end.UTC() + + // NWS hourly supplies isDaytime; make it a pointer to match the canonical model. + var isDay *bool + if p.IsDaytime != nil { + b := *p.IsDaytime + isDay = &b + } + + tempC := tempCFromNWS(p.Temperature, p.TemperatureUnit) + + // Infer WMO from shortForecast (and fall back to icon token). + providerDesc := strings.TrimSpace(p.ShortForecast) + wmo := wmoFromNWSForecast(providerDesc, p.Icon, tempC) + + canonicalText := standards.WMOText(wmo, isDay) + + period := model.WeatherForecastPeriod{ + StartTime: start, + EndTime: end, + + Name: strings.TrimSpace(p.Name), + IsDay: isDay, + + ConditionCode: wmo, + ConditionText: canonicalText, + + ProviderRawDescription: providerDesc, + + // For forecasts, keep provider text as the human-facing description. + TextDescription: strings.TrimSpace(p.ShortForecast), + DetailedText: strings.TrimSpace(p.DetailedForecast), + + IconURL: strings.TrimSpace(p.Icon), + + TemperatureC: tempC, + + DewpointC: p.Dewpoint.Value, + RelativeHumidityPercent: p.RelativeHumidity.Value, + + WindDirectionDegrees: parseNWSWindDirectionDegrees(p.WindDirection), + WindSpeedKmh: parseNWSWindSpeedKmh(p.WindSpeed), + + ProbabilityOfPrecipitationPercent: p.ProbabilityOfPrecipitation.Value, + } + + periods = append(periods, period) + } + + run.Periods = periods + + // EffectiveAt policy for forecasts: treat IssuedAt as the effective time (dedupe-friendly). + return run, issuedAt, nil +} diff --git a/internal/normalizers/nws/helpers.go b/internal/normalizers/nws/helpers.go new file mode 100644 index 0000000..c6bb968 --- /dev/null +++ b/internal/normalizers/nws/helpers.go @@ -0,0 +1,235 @@ +// FILE: internal/normalizers/nws/helpers.go +package nws + +import ( + "strconv" + "strings" + "unicode" + + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" +) + +// centroidLatLon returns a best-effort centroid (lat, lon) from a GeoJSON polygon. +// If geometry is missing or malformed, returns (nil, nil). +func centroidLatLon(coords [][][]float64) (lat *float64, lon *float64) { + if len(coords) == 0 || len(coords[0]) == 0 { + return nil, nil + } + + var sumLon, sumLat float64 + var n float64 + + for _, pt := range coords[0] { + if len(pt) < 2 { + continue + } + sumLon += pt[0] + sumLat += pt[1] + n++ + } + + if n == 0 { + return nil, nil + } + + avgLon := sumLon / n + avgLat := sumLat / n + + return &avgLat, &avgLon +} + +func tempCFromNWS(v *float64, unit string) *float64 { + if v == nil { + return nil + } + + u := strings.ToUpper(strings.TrimSpace(unit)) + switch u { + case "F": + c := normcommon.TempCFromF(*v) + return &c + case "C": + c := *v + return &c + default: + // Unknown unit; be conservative. + return nil + } +} + +// wmoFromNWSForecast infers a canonical WMO code for a forecast period. +// +// Strategy: +// 1. Try to infer from shortForecast using the cross-provider fallback. +// 2. Special-case mixed rain+snow using temperature when available (since our WMO table +// does not include a “mixed precip” code). +// 3. Fall back to an icon token (e.g., "rain", "snow", "ovc", "bkn", "sct", ...). +func wmoFromNWSForecast(shortForecast, iconURL string, tempC *float64) model.WMOCode { + sf := strings.TrimSpace(shortForecast) + s := strings.ToLower(sf) + + // Mixed precip heuristic: choose rain vs snow based on temperature. + if strings.Contains(s, "rain") && strings.Contains(s, "snow") { + if tempC != nil && *tempC <= 0.0 { + return 73 // Snow + } + return 63 // Rain + } + + if code := normcommon.WMOFromTextDescription(sf); code != model.WMOUnknown { + return code + } + + // Icon fallback: token is usually the last path segment (before any comma/query). + if token := nwsIconToken(iconURL); token != "" { + // Try the general text fallback first (works for "rain", "snow", etc.). + if code := normcommon.WMOFromTextDescription(token); code != model.WMOUnknown { + return code + } + + // Sky-condition icon tokens are common; map conservatively. + switch token { + case "ovc", "bkn", "cloudy", "ovcast": + return 3 + case "sct", "bkn-sct": + return 2 + case "few": + return 1 + case "skc", "clr", "clear": + return 0 + } + } + + return model.WMOUnknown +} + +func nwsIconToken(iconURL string) string { + u := strings.TrimSpace(iconURL) + if u == "" { + return "" + } + + // Drop query string. + base := strings.SplitN(u, "?", 2)[0] + + // Take last path segment. + parts := strings.Split(base, "/") + if len(parts) == 0 { + return "" + } + last := parts[len(parts)-1] + if last == "" && len(parts) > 1 { + last = parts[len(parts)-2] + } + + // Some icons look like "rain,30" or "snow,20". + last = strings.SplitN(last, ",", 2)[0] + last = strings.ToLower(strings.TrimSpace(last)) + + return last +} + +// parseNWSWindSpeedKmh parses NWS wind speed strings like: +// - "9 mph" +// - "10 to 15 mph" +// +// and converts to km/h. +// +// Policy: if a range is present, we use the midpoint (best effort). +func parseNWSWindSpeedKmh(s string) *float64 { + raw := strings.ToLower(strings.TrimSpace(s)) + if raw == "" { + return nil + } + + nums := extractFloats(raw) + if len(nums) == 0 { + return nil + } + + val := nums[0] + if len(nums) >= 2 && (strings.Contains(raw, " to ") || strings.Contains(raw, "-")) { + val = (nums[0] + nums[1]) / 2.0 + } + + switch { + case strings.Contains(raw, "mph"): + k := normcommon.SpeedKmhFromMph(val) + return &k + + case strings.Contains(raw, "km/h") || strings.Contains(raw, "kph"): + k := val + return &k + + case strings.Contains(raw, "kt") || strings.Contains(raw, "kts") || strings.Contains(raw, "knot"): + // 1 knot = 1.852 km/h + k := val * 1.852 + return &k + + default: + // Unknown unit; be conservative. + return nil + } +} + +// parseNWSWindDirectionDegrees maps compass directions to degrees. +// Returns nil if direction is empty/unknown. +func parseNWSWindDirectionDegrees(dir string) *float64 { + d := strings.ToUpper(strings.TrimSpace(dir)) + if d == "" { + return nil + } + + // 16-wind compass. + m := map[string]float64{ + "N": 0, + "NNE": 22.5, + "NE": 45, + "ENE": 67.5, + "E": 90, + "ESE": 112.5, + "SE": 135, + "SSE": 157.5, + "S": 180, + "SSW": 202.5, + "SW": 225, + "WSW": 247.5, + "W": 270, + "WNW": 292.5, + "NW": 315, + "NNW": 337.5, + } + + if deg, ok := m[d]; ok { + return ° + } + return nil +} + +func extractFloats(s string) []float64 { + var out []float64 + var buf strings.Builder + + flush := func() { + if buf.Len() == 0 { + return + } + v, err := strconv.ParseFloat(buf.String(), 64) + if err == nil { + out = append(out, v) + } + buf.Reset() + } + + for _, r := range s { + if unicode.IsDigit(r) || r == '.' { + buf.WriteRune(r) + continue + } + flush() + } + flush() + + return out +} diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go index 01c635f..f2a4f52 100644 --- a/internal/normalizers/nws/register.go +++ b/internal/normalizers/nws/register.go @@ -13,4 +13,7 @@ func Register(reg *fknormalize.Registry) { // Observations reg.Register(ObservationNormalizer{}) + + // Forecasts + reg.Register(ForecastNormalizer{}) } diff --git a/internal/normalizers/nws/types.go b/internal/normalizers/nws/types.go index a19f36d..83e2359 100644 --- a/internal/normalizers/nws/types.go +++ b/internal/normalizers/nws/types.go @@ -87,3 +87,66 @@ type nwsObservationResponse struct { } `json:"cloudLayers"` } `json:"properties"` } + +// nwsForecastResponse is a minimal-but-sufficient representation of the NWS +// gridpoint forecast GeoJSON payload needed for mapping into model.WeatherForecastRun. +// +// This is currently designed to support the hourly forecast endpoint; revisions may be needed +// to accommodate other forecast endpoints in the future. +type nwsForecastResponse struct { + Geometry struct { + Type string `json:"type"` + Coordinates [][][]float64 `json:"coordinates"` // GeoJSON polygon: [ring][point][lon,lat] + } `json:"geometry"` + + Properties struct { + Units string `json:"units"` // "us" or "si" (often "us" for hourly) + ForecastGenerator string `json:"forecastGenerator"` // e.g. "HourlyForecastGenerator" + + GeneratedAt string `json:"generatedAt"` // RFC3339-ish + UpdateTime string `json:"updateTime"` // RFC3339-ish + ValidTimes string `json:"validTimes"` + + Elevation struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"elevation"` + + Periods []nwsForecastPeriod `json:"periods"` + } `json:"properties"` +} + +type nwsForecastPeriod struct { + Number int `json:"number"` + Name string `json:"name"` + StartTime string `json:"startTime"` + EndTime string `json:"endTime"` + + IsDaytime *bool `json:"isDaytime"` + + Temperature *float64 `json:"temperature"` + TemperatureUnit string `json:"temperatureUnit"` // "F" or "C" + TemperatureTrend any `json:"temperatureTrend"` + + ProbabilityOfPrecipitation struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"probabilityOfPrecipitation"` + + Dewpoint struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"dewpoint"` + + RelativeHumidity struct { + UnitCode string `json:"unitCode"` + Value *float64 `json:"value"` + } `json:"relativeHumidity"` + + WindSpeed string `json:"windSpeed"` // e.g. "9 mph", "10 to 15 mph" + WindDirection string `json:"windDirection"` // e.g. "W", "NW" + + Icon string `json:"icon"` + ShortForecast string `json:"shortForecast"` + DetailedForecast string `json:"detailedForecast"` +} diff --git a/internal/sources/nws/forecast.go b/internal/sources/nws/forecast.go index d183067..0572416 100644 --- a/internal/sources/nws/forecast.go +++ b/internal/sources/nws/forecast.go @@ -1,51 +1,129 @@ +// FILE: internal/sources/nws/forecast.go package nws import ( "context" - "fmt" + "encoding/json" "strings" + "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" + nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) +// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event. +// +// It intentionally emits the *entire* upstream payload as json.RawMessage and only decodes +// minimal metadata for Event.EffectiveAt and Event.ID. +// +// Output schema (current implementation): +// - standards.SchemaRawNWSHourlyForecastV1 type ForecastSource struct { - name string - url string - userAgent string + http *common.HTTPSource } func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { - if strings.TrimSpace(cfg.Name) == "" { - return nil, fmt.Errorf("nws_forecast: name is required") - } - if cfg.Params == nil { - return nil, fmt.Errorf("nws_forecast %q: params are required (need params.url and params.user_agent)", cfg.Name) + const driver = "nws_forecast" + + // NWS forecast endpoints are GeoJSON (and sometimes also advertise json-ld/json). + hs, err := common.NewHTTPSource(driver, cfg, "application/geo+json, application/json") + if err != nil { + return nil, err } - url, ok := cfg.ParamString("url", "URL") - if !ok { - return nil, fmt.Errorf("nws_forecast %q: params.url is required", cfg.Name) - } - - ua, ok := cfg.ParamString("user_agent", "userAgent") - if !ok { - return nil, fmt.Errorf("nws_forecast %q: params.user_agent is required", cfg.Name) - } - - return &ForecastSource{ - name: cfg.Name, - url: url, - userAgent: ua, - }, nil + return &ForecastSource{http: hs}, nil } -func (s *ForecastSource) Name() string { return s.name } +func (s *ForecastSource) Name() string { return s.http.Name } // Kind is used for routing/policy. func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { - _ = ctx - return nil, fmt.Errorf("nws.ForecastSource.Poll: TODO implement (url=%s)", s.url) + raw, meta, err := s.fetchRaw(ctx) + if err != nil { + return nil, err + } + + // EffectiveAt is optional; for forecasts it’s most naturally the run “issued” time. + // NWS gridpoint forecasts expose generatedAt (preferred) and updateTime/updated. + var effectiveAt *time.Time + switch { + case !meta.ParsedGeneratedAt.IsZero(): + t := meta.ParsedGeneratedAt.UTC() + effectiveAt = &t + case !meta.ParsedUpdateTime.IsZero(): + t := meta.ParsedUpdateTime.UTC() + effectiveAt = &t + } + + emittedAt := time.Now().UTC() + + // NWS gridpoint forecast GeoJSON commonly has a stable "id" equal to the endpoint URL. + // That is *not* unique per issued run, so we intentionally do not use it for Event.ID. + // Instead we rely on Source:EffectiveAt (or Source:EmittedAt fallback). + eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + + return common.SingleRawEvent( + s.Kind(), + s.http.Name, + standards.SchemaRawNWSHourlyForecastV1, + eventID, + emittedAt, + effectiveAt, + raw, + ) +} + +// ---- RAW fetch + minimal metadata decode ---- + +type forecastMeta struct { + // Present for GeoJSON Feature responses, but often stable (endpoint URL). + ID string `json:"id"` + + Properties struct { + GeneratedAt string `json:"generatedAt"` // preferred “issued/run generated” time + UpdateTime string `json:"updateTime"` // last update time of underlying data + Updated string `json:"updated"` // deprecated alias for updateTime + } `json:"properties"` + + ParsedGeneratedAt time.Time `json:"-"` + ParsedUpdateTime time.Time `json:"-"` +} + +func (s *ForecastSource) fetchRaw(ctx context.Context) (json.RawMessage, forecastMeta, error) { + raw, err := s.http.FetchJSON(ctx) + if err != nil { + return nil, forecastMeta{}, err + } + + var meta forecastMeta + if err := json.Unmarshal(raw, &meta); err != nil { + // If metadata decode fails, still return raw; Poll will fall back to Source:EmittedAt. + return raw, forecastMeta{}, nil + } + + // generatedAt (preferred) + genStr := strings.TrimSpace(meta.Properties.GeneratedAt) + if genStr != "" { + if t, err := nwscommon.ParseTime(genStr); err == nil { + meta.ParsedGeneratedAt = t.UTC() + } + } + + // updateTime, with fallback to deprecated "updated" + updStr := strings.TrimSpace(meta.Properties.UpdateTime) + if updStr == "" { + updStr = strings.TrimSpace(meta.Properties.Updated) + } + if updStr != "" { + if t, err := nwscommon.ParseTime(updStr); err == nil { + meta.ParsedUpdateTime = t.UTC() + } + } + + return raw, meta, nil } diff --git a/internal/standards/doc.go b/internal/standards/doc.go index 1764688..54729f3 100644 --- a/internal/standards/doc.go +++ b/internal/standards/doc.go @@ -5,11 +5,19 @@ // - Schema identifiers and versioning conventions (see schema.go). // - Canonical interpretations / cross-provider mappings that are not specific to a // single upstream API (e.g., shared code tables, text heuristics, unit policy). +// - Wire-format conventions for canonical payloads. // // Standards are used by both sources and normalizers. Keep this package free of // provider-specific logic and free of dependencies on internal/sources/* or // internal/normalizers/* to avoid import cycles. // +// Wire-format conventions +// ----------------------- +// For readability and stability, canonical payloads (weather.* schemas) should not emit +// noisy floating-point representations. weatherfeeder enforces this by rounding float +// values in canonical payloads to 2 digits after the decimal point at normalization +// finalization time. +// // Provider-specific decoding helpers and quirks live in internal/providers/. // Normalizer implementations and canonical mapping logic live in internal/normalizers/. package standards diff --git a/internal/standards/schema.go b/internal/standards/schema.go index fe86c38..32fed1d 100644 --- a/internal/standards/schema.go +++ b/internal/standards/schema.go @@ -15,6 +15,11 @@ const ( SchemaRawOpenMeteoCurrentV1 = "raw.openmeteo.current.v1" SchemaRawOpenWeatherCurrentV1 = "raw.openweather.current.v1" + SchemaRawNWSHourlyForecastV1 = "raw.nws.hourly.forecast.v1" + SchemaRawOpenMeteoHourlyForecastV1 = "raw.openmeteo.hourly.forecast.v1" + SchemaRawOpenWeatherHourlyForecastV1 = "raw.openweather.hourly.forecast.v1" + // Canonical domain schemas (emitted after normalization). SchemaWeatherObservationV1 = "weather.observation.v1" + SchemaWeatherForecastV1 = "weather.forecast.v1" )