From 759fa317621287a1e121ecea1eb441017a604855 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Wed, 14 Jan 2026 11:59:17 -0600 Subject: [PATCH] openweather: refactored the OpenWeather source files to relocate normalization logic to internal/normalizers. --- .../normalizers/openweather/observation.go | 206 +++++++++ internal/normalizers/openweather/register.go | 9 +- internal/normalizers/openweather/types.go | 51 ++ internal/normalizers/openweather/wmo_map.go | 98 ++++ internal/sources/openweather/observation.go | 435 +++++------------- 5 files changed, 469 insertions(+), 330 deletions(-) create mode 100644 internal/normalizers/openweather/observation.go create mode 100644 internal/normalizers/openweather/types.go create mode 100644 internal/normalizers/openweather/wmo_map.go diff --git a/internal/normalizers/openweather/observation.go b/internal/normalizers/openweather/observation.go new file mode 100644 index 0000000..b21190e --- /dev/null +++ b/internal/normalizers/openweather/observation.go @@ -0,0 +1,206 @@ +// FILE: ./internal/normalizers/openweather/observation.go +package openweather + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ObservationNormalizer converts: +// +// standards.SchemaRawOpenWeatherCurrentV1 -> standards.SchemaWeatherObservationV1 +// +// It interprets OpenWeatherMap "current weather" JSON and maps it into the canonical +// model.WeatherObservation representation. +// +// Caveats / assumptions: +// - Unit system: this normalizer assumes the upstream request used `units=metric`. +// The OpenWeather source enforces this invariant (fails fast otherwise). +// That means: +// - main.temp is °C +// - wind.speed and wind.gust are m/s (we convert to km/h) +// - pressure fields are hPa (we convert to Pa) +// +// Day/night handling: +// - Prefer the OpenWeather icon suffix ("d" / "n") when available. +// - Otherwise fall back to sunrise/sunset bounds (unix seconds). +type ObservationNormalizer struct{} + +func (ObservationNormalizer) Match(e event.Event) bool { + return strings.TrimSpace(e.Schema) == standards.SchemaRawOpenWeatherCurrentV1 +} + +func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps + + rawBytes, err := normcommon.PayloadBytes(in) + if err != nil { + return nil, fmt.Errorf("openweather observation normalize: %w", err) + } + + var parsed owmResponse + if err := json.Unmarshal(rawBytes, &parsed); err != nil { + return nil, fmt.Errorf("openweather observation normalize: decode raw payload: %w", err) + } + + obs, effectiveAt, err := buildObservation(parsed) + if err != nil { + return nil, err + } + + out := in + out.Schema = standards.SchemaWeatherObservationV1 + out.Payload = obs + + // EffectiveAt is optional; for observations it is naturally the observation timestamp. + if !effectiveAt.IsZero() { + t := effectiveAt.UTC() + out.EffectiveAt = &t + } + + if err := out.Validate(); err != nil { + return nil, err + } + return &out, nil +} + +// buildObservation contains the domain mapping logic (provider -> canonical model). +func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time, error) { + // Timestamp: dt is unix seconds, UTC (OpenWeather contract). + var ts time.Time + if parsed.Dt > 0 { + ts = time.Unix(parsed.Dt, 0).UTC() + } + + // Primary weather condition: OpenWeather returns an array; treat [0] as primary. + owmID, rawDesc, icon := primaryCondition(parsed.Weather) + + // Day/night inference: + // 1) icon suffix "d" or "n" + // 2) sunrise/sunset bounds + isDay := inferIsDay(icon, parsed.Dt, parsed.Sys.Sunrise, parsed.Sys.Sunset) + + // Unit policy: metric is enforced by the source, so: + // - temp is already °C + // - wind speed is m/s -> km/h conversion + tempC := parsed.Main.Temp + rh := parsed.Main.Humidity + + surfacePa := normcommon.PressurePaFromHPa(parsed.Main.Pressure) + var seaLevelPa *float64 + if parsed.Main.SeaLevel != nil { + v := normcommon.PressurePaFromHPa(*parsed.Main.SeaLevel) + seaLevelPa = &v + } + + wsKmh := normcommon.SpeedKmhFromMps(parsed.Wind.Speed) + var wgKmh *float64 + if parsed.Wind.Gust != nil { + v := normcommon.SpeedKmhFromMps(*parsed.Wind.Gust) + wgKmh = &v + } + + var visM *float64 + if parsed.Visibility != nil { + v := *parsed.Visibility + visM = &v + } + + // Condition mapping: OpenWeather condition IDs -> canonical WMO code vocabulary. + wmo := mapOpenWeatherToWMO(owmID) + canonicalText := standards.WMOText(wmo, isDay) + + iconURL := openWeatherIconURL(icon) + + stationID := openWeatherStationID(parsed) + stationName := strings.TrimSpace(parsed.Name) + if stationName == "" { + stationName = "OpenWeatherMap" + } + + obs := model.WeatherObservation{ + StationID: stationID, + StationName: stationName, + Timestamp: ts, + + ConditionCode: wmo, + ConditionText: canonicalText, + IsDay: isDay, + + ProviderRawDescription: rawDesc, + + // Human-facing legacy fields: populate with canonical text for consistency. + TextDescription: canonicalText, + IconURL: iconURL, + + TemperatureC: &tempC, + + WindDirectionDegrees: parsed.Wind.Deg, + WindSpeedKmh: &wsKmh, + WindGustKmh: wgKmh, + + BarometricPressurePa: &surfacePa, + SeaLevelPressurePa: seaLevelPa, + VisibilityMeters: visM, + + RelativeHumidityPercent: &rh, + } + + return obs, ts, nil +} + +func primaryCondition(list []owmWeather) (id int, desc string, icon string) { + if len(list) == 0 { + return 0, "", "" + } + w := list[0] + return w.ID, strings.TrimSpace(w.Description), strings.TrimSpace(w.Icon) +} + +func inferIsDay(icon string, dt, sunrise, sunset int64) *bool { + // Prefer icon suffix. + icon = strings.TrimSpace(icon) + if icon != "" { + last := icon[len(icon)-1] + switch last { + case 'd': + v := true + return &v + case 'n': + v := false + return &v + } + } + + // Fall back to sunrise/sunset bounds if provided. + if dt > 0 && sunrise > 0 && sunset > 0 { + v := dt >= sunrise && dt < sunset + return &v + } + + return nil +} + +func openWeatherIconURL(icon string) string { + icon = strings.TrimSpace(icon) + if icon == "" { + return "" + } + return fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon) +} + +func openWeatherStationID(parsed owmResponse) string { + if parsed.ID != 0 { + return fmt.Sprintf("OPENWEATHER(%d)", parsed.ID) + } + // Fallback: synthesize from coordinates. + return fmt.Sprintf("OPENWEATHER(%.5f,%.5f)", parsed.Coord.Lat, parsed.Coord.Lon) +} diff --git a/internal/normalizers/openweather/register.go b/internal/normalizers/openweather/register.go index 346cf3b..0e2f16c 100644 --- a/internal/normalizers/openweather/register.go +++ b/internal/normalizers/openweather/register.go @@ -1,3 +1,4 @@ +// FILE: ./internal/normalizers/openweather/register.go package openweather import ( @@ -5,15 +6,11 @@ import ( ) // Register registers OpenWeather normalizers into the provided registry. -// -// This is intentionally empty as a stub. As normalizers are implemented, -// register them here, e.g.: -// -// reg.Register(ObservationNormalizer{}) func Register(reg *fknormalize.Registry) { if reg == nil { return } - // TODO: register OpenWeather normalizers here. + // Observations + reg.Register(ObservationNormalizer{}) } diff --git a/internal/normalizers/openweather/types.go b/internal/normalizers/openweather/types.go new file mode 100644 index 0000000..ce76a5f --- /dev/null +++ b/internal/normalizers/openweather/types.go @@ -0,0 +1,51 @@ +// FILE: ./internal/normalizers/openweather/types.go +package openweather + +// owmResponse is a minimal-but-sufficient representation of the OpenWeatherMap +// "Current weather" payload needed for mapping into model.WeatherObservation. +// +// NOTE: OpenWeather field presence can vary by location/product tier. +// We keep some fields optional (pointers) where the API commonly omits them. +type owmResponse struct { + Coord struct { + Lon float64 `json:"lon"` + Lat float64 `json:"lat"` + } `json:"coord"` + + Weather []owmWeather `json:"weather"` + + Main struct { + Temp float64 `json:"temp"` // °C when units=metric (enforced by source) + Pressure float64 `json:"pressure"` // hPa + Humidity float64 `json:"humidity"` // % + SeaLevel *float64 `json:"sea_level"` + } `json:"main"` + + Visibility *float64 `json:"visibility"` // meters (optional) + + Wind struct { + Speed float64 `json:"speed"` // m/s when units=metric (enforced by source) + Deg *float64 `json:"deg"` + Gust *float64 `json:"gust"` // m/s when units=metric (enforced by source) + } `json:"wind"` + + Dt int64 `json:"dt"` // unix seconds, UTC + + Sys struct { + Country string `json:"country"` + Sunrise int64 `json:"sunrise"` // unix, UTC + Sunset int64 `json:"sunset"` // unix, UTC + } `json:"sys"` + + Timezone int `json:"timezone"` // seconds offset from UTC + ID int64 `json:"id"` // city id + Name string `json:"name"` // city name + Cod int `json:"cod"` +} + +type owmWeather struct { + ID int `json:"id"` + Main string `json:"main"` + Description string `json:"description"` + Icon string `json:"icon"` // e.g. "04d" or "01n" +} diff --git a/internal/normalizers/openweather/wmo_map.go b/internal/normalizers/openweather/wmo_map.go new file mode 100644 index 0000000..cc9c4ad --- /dev/null +++ b/internal/normalizers/openweather/wmo_map.go @@ -0,0 +1,98 @@ +// FILE: ./internal/normalizers/openweather/wmo_map.go +package openweather + +import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + +// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into weatherfeeder's +// canonical WMO code vocabulary. +// +// This is an approximate semantic mapping between two different code systems. +// We map conservatively into the subset currently represented in standards.WMODescriptions. +func mapOpenWeatherToWMO(owmID int) model.WMOCode { + switch { + // 2xx Thunderstorm + case owmID >= 200 && owmID <= 232: + return 95 + + // 3xx Drizzle + case owmID >= 300 && owmID <= 321: + if owmID == 300 { + return 51 + } + if owmID == 302 { + return 55 + } + return 53 + + // 5xx Rain + case owmID >= 500 && owmID <= 531: + // 511 is "freezing rain" + if owmID == 511 { + return 67 + } + + // showers bucket (520-531) + if owmID >= 520 && owmID <= 531 { + if owmID == 520 { + return 80 + } + if owmID == 522 { + return 82 + } + return 81 + } + + // normal rain intensity + if owmID == 500 { + return 61 + } + if owmID == 501 { + return 63 + } + if owmID >= 502 && owmID <= 504 { + return 65 + } + return 63 + + // 6xx Snow + case owmID >= 600 && owmID <= 622: + if owmID == 600 { + return 71 + } + if owmID == 601 { + return 73 + } + if owmID == 602 { + return 75 + } + + // Snow showers bucket (620-622) + if owmID == 620 { + return 85 + } + if owmID == 621 || owmID == 622 { + return 86 + } + + return 73 + + // 7xx Atmosphere (mist/smoke/haze/dust/fog/etc.) + case owmID >= 701 && owmID <= 781: + return 45 + + // 800 Clear + case owmID == 800: + return 0 + + // 80x Clouds + case owmID == 801: + return 1 + case owmID == 802: + return 2 + case owmID == 803 || owmID == 804: + return 3 + + default: + return model.WMOUnknown + } +} diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 18992e2..231858a 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -1,9 +1,11 @@ +// FILE: ./internal/sources/openweather/observation.go package openweather import ( "context" "encoding/json" "fmt" + "io" "net/http" "net/url" "strings" @@ -11,25 +13,23 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) -// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits one Observation event. +// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits a RAW observation Event. // -// Typical URL shape (you provide this via config): +// Refactor (mirrors NWS): +// - Source responsibility: fetch bytes + emit a valid event envelope. +// - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation. +// +// Typical URL shape (provided via config): // // https://api.openweathermap.org/data/2.5/weather?lat=...&lon=...&appid=...&units=metric // -// Unit notes: -// - If `units` is omitted, OpenWeather uses "standard" units (temp Kelvin, wind m/s). -// - `units=metric` => temp Celsius, wind m/s. -// - `units=imperial` => temp Fahrenheit, wind mph. -// -// weatherd normalizes to: -// - TemperatureC in °C -// - WindSpeedKmh in km/h -// - Pressure in Pa (OpenWeather provides hPa) +// IMPORTANT UNIT POLICY (weatherfeeder convention): +// OpenWeather changes units based on the `units` query parameter but does NOT include the unit +// system in the response body. To keep normalization deterministic, this driver *requires* +// `units=metric`. If absent (or non-metric), the driver returns an error. type ObservationSource struct { name string url string @@ -45,18 +45,21 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { return nil, fmt.Errorf("openweather_observation %q: params are required (need params.url)", cfg.Name) } - // Driver-specific settings live under cfg.Params to keep feedkit domain-agnostic. - url, ok := cfg.ParamString("url", "URL") + rawURL, ok := cfg.ParamString("url", "URL") if !ok { return nil, fmt.Errorf("openweather_observation %q: params.url is required", cfg.Name) } - // Optional User-Agent. + // Fail fast: enforce deterministic unit system. + if err := requireMetricUnits(rawURL); err != nil { + return nil, fmt.Errorf("openweather_observation %q: %w", cfg.Name, err) + } + ua := cfg.ParamStringDefault("weatherfeeder (openweather client)", "user_agent", "userAgent") return &ObservationSource{ name: cfg.Name, - url: url, + url: rawURL, userAgent: ua, client: &http.Client{ Timeout: 10 * time.Second, @@ -67,19 +70,31 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { func (s *ObservationSource) Name() string { return s.name } // Kind is used for routing/policy. +// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } +// Poll fetches OpenWeather "current weather" and emits exactly one RAW Event. +// The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenWeatherCurrentV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { - obs, eventID, err := s.fetchAndParse(ctx) + // Re-check policy defensively (in case the URL is mutated after construction). + if err := requireMetricUnits(s.url); err != nil { + return nil, fmt.Errorf("openweather_observation %q: %w", s.name, err) + } + + raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } - // EffectiveAt is optional. If we have a real observation timestamp, use it. - // We intentionally take a copy so the pointer is stable and not tied to a struct field. + eventID := buildEventID(s.name, meta) + if strings.TrimSpace(eventID) == "" { + // Extremely defensive fallback: should never happen, but keep the envelope valid. + eventID = fmt.Sprintf("openweather:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) + } + var effectiveAt *time.Time - if !obs.Timestamp.IsZero() { - t := obs.Timestamp + if !meta.ParsedTimestamp.IsZero() { + t := meta.ParsedTimestamp.UTC() effectiveAt = &t } @@ -90,8 +105,11 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { EmittedAt: time.Now().UTC(), EffectiveAt: effectiveAt, - Schema: "weather.observation.v1", - Payload: obs, + // RAW schema (normalizer matches on this). + Schema: standards.SchemaRawOpenWeatherCurrentV1, + + // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. + Payload: raw, } if err := e.Validate(); err != nil { @@ -101,58 +119,29 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return []event.Event{e}, nil } -// --- OpenWeather JSON parsing (minimal subset) --- +// ---- RAW fetch + minimal metadata decode ---- + +// openWeatherMeta is a *minimal* decode of the OpenWeather payload used only to build +// a stable Event.ID and a useful EffectiveAt for the envelope. +type openWeatherMeta struct { + Dt int64 `json:"dt"` // unix seconds, UTC + + ID int64 `json:"id"` // city id (if present) + Name string `json:"name"` // city name (optional) -type owmResponse struct { Coord struct { Lon float64 `json:"lon"` Lat float64 `json:"lat"` } `json:"coord"` - Weather []struct { - ID int `json:"id"` - Main string `json:"main"` - Description string `json:"description"` - Icon string `json:"icon"` // e.g. "04d" or "01n" - } `json:"weather"` - - Main struct { - Temp float64 `json:"temp"` - Pressure float64 `json:"pressure"` // hPa - Humidity float64 `json:"humidity"` // % - SeaLevel *float64 `json:"sea_level"` // hPa (optional) - } `json:"main"` - - Visibility *float64 `json:"visibility"` // meters (optional) - - Wind struct { - Speed float64 `json:"speed"` // units depend on `units=...` - Deg *float64 `json:"deg"` - Gust *float64 `json:"gust"` // units depend on `units=...` - } `json:"wind"` - - Clouds struct { - All *float64 `json:"all"` // cloudiness % - } `json:"clouds"` - - Dt int64 `json:"dt"` // unix seconds, UTC - - Sys struct { - Country string `json:"country"` - Sunrise int64 `json:"sunrise"` // unix, UTC - Sunset int64 `json:"sunset"` // unix, UTC - } `json:"sys"` - - Timezone int `json:"timezone"` // seconds offset from UTC - ID int64 `json:"id"` // city id - Name string `json:"name"` // city name - Cod int `json:"cod"` + // Convenience fields populated after decode. + ParsedTimestamp time.Time `json:"-"` } -func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, string, error) { - req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil) +func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) if err != nil { - return model.WeatherObservation{}, "", err + return nil, openWeatherMeta{}, err } req.Header.Set("User-Agent", s.userAgent) @@ -160,279 +149,77 @@ func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObs res, err := s.client.Do(req) if err != nil { - return model.WeatherObservation{}, "", err + return nil, openWeatherMeta{}, err } defer res.Body.Close() if res.StatusCode < 200 || res.StatusCode >= 300 { - return model.WeatherObservation{}, "", fmt.Errorf("openweather_observation %q: HTTP %s", s.name, res.Status) + return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: HTTP %s", s.name, res.Status) } - var parsed owmResponse - if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { - return model.WeatherObservation{}, "", err - } - - // Timestamp: dt is unix seconds, UTC. - ts := time.Unix(parsed.Dt, 0).UTC() - - // Primary weather condition: OpenWeather returns a list; we treat [0] as primary. - // If missing, we degrade gracefully. - owmID := 0 - rawDesc := "" - icon := "" - if len(parsed.Weather) > 0 { - owmID = parsed.Weather[0].ID - rawDesc = strings.TrimSpace(parsed.Weather[0].Description) - icon = strings.TrimSpace(parsed.Weather[0].Icon) - } - - // Day/night inference: - // - Prefer icon suffix if present ("d" or "n") - // - Else fall back to sunrise/sunset bounds - var isDay *bool - if icon != "" { - last := icon[len(icon)-1] - switch last { - case 'd': - v := true - isDay = &v - case 'n': - v := false - isDay = &v - } - } - if isDay == nil && parsed.Sys.Sunrise > 0 && parsed.Sys.Sunset > 0 { - v := parsed.Dt >= parsed.Sys.Sunrise && parsed.Dt < parsed.Sys.Sunset - isDay = &v - } - - // Units handling based on the request URL. - unitSystem := getUnitsFromURL(s.url) - - // Temperature normalization to Celsius. - tempC := normalizeTempToC(parsed.Main.Temp, unitSystem) - - // Humidity is already percent. - rh := parsed.Main.Humidity - - // Pressure hPa -> Pa - surfacePa := parsed.Main.Pressure * 100.0 - var seaLevelPa *float64 - if parsed.Main.SeaLevel != nil { - v := (*parsed.Main.SeaLevel) * 100.0 - seaLevelPa = &v - } - - // Wind speed normalization to km/h - wsKmh := normalizeSpeedToKmh(parsed.Wind.Speed, unitSystem) - - var wgKmh *float64 - if parsed.Wind.Gust != nil { - v := normalizeSpeedToKmh(*parsed.Wind.Gust, unitSystem) - wgKmh = &v - } - - // Visibility in meters (already matches our model) - var visM *float64 - if parsed.Visibility != nil { - v := *parsed.Visibility - visM = &v - } - - // Map OpenWeather condition IDs -> canonical WMO code (our internal vocabulary). - wmo := mapOpenWeatherToWMO(owmID) - - // Canonical text from our shared table. - canonicalText := standards.WMOText(wmo, isDay) - - // Icon URL (optional). - iconURL := "" - if icon != "" { - iconURL = fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon) - } - - stationID := "" - if parsed.ID != 0 { - stationID = fmt.Sprintf("OPENWEATHER(%d)", parsed.ID) - } else { - stationID = fmt.Sprintf("OPENWEATHER(%.5f,%.5f)", parsed.Coord.Lat, parsed.Coord.Lon) - } - - stationName := strings.TrimSpace(parsed.Name) - if stationName == "" { - stationName = "OpenWeatherMap" - } - - obs := model.WeatherObservation{ - StationID: stationID, - StationName: stationName, - Timestamp: ts, - - // Canonical internal representation - ConditionCode: wmo, - ConditionText: canonicalText, - IsDay: isDay, - - // Provider evidence for troubleshooting mappings - ProviderRawDescription: rawDesc, - - // Human-facing legacy fields: we populate with canonical text for consistency - TextDescription: canonicalText, - IconURL: iconURL, - - TemperatureC: &tempC, - - WindDirectionDegrees: parsed.Wind.Deg, - WindSpeedKmh: &wsKmh, - WindGustKmh: wgKmh, - - BarometricPressurePa: &surfacePa, - SeaLevelPressurePa: seaLevelPa, - VisibilityMeters: visM, - - RelativeHumidityPercent: &rh, - } - - // Stable event ID: key by source + timestamp. - eventID := fmt.Sprintf("openweather:%s:%s", s.name, obs.Timestamp.UTC().Format(time.RFC3339Nano)) - - return obs, eventID, nil -} - -func getUnitsFromURL(raw string) string { - u, err := url.Parse(raw) + b, err := io.ReadAll(res.Body) if err != nil { - return "standard" + return nil, openWeatherMeta{}, err } - q := u.Query() - units := strings.TrimSpace(strings.ToLower(q.Get("units"))) - if units == "" { - return "standard" + if len(b) == 0 { + return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: empty response body", s.name) } - switch units { - case "standard", "metric", "imperial": - return units - default: - return "standard" + + raw := json.RawMessage(b) + + var meta openWeatherMeta + if err := json.Unmarshal(b, &meta); err != nil { + // If metadata decode fails, still return raw; envelope will fall back to computed ID. + return raw, openWeatherMeta{}, nil } + + if meta.Dt > 0 { + meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC() + } + + return raw, meta, nil } -func normalizeTempToC(v float64, unitSystem string) float64 { - switch unitSystem { - case "metric": - // Already °C - return v - case "imperial": - // °F -> °C - return (v - 32.0) * 5.0 / 9.0 - default: - // "standard" => Kelvin -> °C - return v - 273.15 +func buildEventID(sourceName string, meta openWeatherMeta) string { + // Prefer provider city ID if present; otherwise fall back to lat/lon. + locKey := "" + if meta.ID != 0 { + locKey = fmt.Sprintf("city:%d", meta.ID) + } else if meta.Coord.Lat != 0 || meta.Coord.Lon != 0 { + locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Coord.Lat, meta.Coord.Lon) + } else { + locKey = "loc:unknown" } + + ts := meta.ParsedTimestamp + if ts.IsZero() { + // We prefer stable IDs, but if the payload didn't decode, use "now" so we still emit. + ts = time.Now().UTC() + } + + // Example: + // openweather:current::city:12345:2026-01-14T17:00:00.123Z + return fmt.Sprintf("openweather:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) } -func normalizeSpeedToKmh(v float64, unitSystem string) float64 { - switch unitSystem { - case "imperial": - // mph -> km/h - return v * 1.609344 - default: - // m/s -> km/h - return v * 3.6 - } -} - -// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into your internal WMO code vocabulary. +// requireMetricUnits enforces weatherfeeder's OpenWeather unit policy. // -// This is an approximate semantic mapping between two different code systems. -// Your current canonical WMO table is intentionally small and text-focused, -// so we map into that set (0/1/2/3/45/48/51/.../99) conservatively. -func mapOpenWeatherToWMO(owmID int) model.WMOCode { - switch { - // 2xx Thunderstorm - case owmID >= 200 && owmID <= 232: - return model.WMOCode(95) - - // 3xx Drizzle - case owmID >= 300 && owmID <= 321: - if owmID == 300 { - return model.WMOCode(51) - } - if owmID == 302 { - return model.WMOCode(55) - } - return model.WMOCode(53) - - // 5xx Rain - case owmID >= 500 && owmID <= 531: - // 511 is "freezing rain" - if owmID == 511 { - return model.WMOCode(67) - } - - // showers bucket (520-531) - if owmID >= 520 && owmID <= 531 { - if owmID == 520 { - return model.WMOCode(80) - } - if owmID == 522 { - return model.WMOCode(82) - } - return model.WMOCode(81) - } - - // normal rain intensity - if owmID == 500 { - return model.WMOCode(61) - } - if owmID == 501 { - return model.WMOCode(63) - } - if owmID >= 502 && owmID <= 504 { - return model.WMOCode(65) - } - return model.WMOCode(63) - - // 6xx Snow - case owmID >= 600 && owmID <= 622: - if owmID == 600 { - return model.WMOCode(71) - } - if owmID == 601 { - return model.WMOCode(73) - } - if owmID == 602 { - return model.WMOCode(75) - } - - // Snow showers bucket (620-622) - if owmID == 620 { - return model.WMOCode(85) - } - if owmID == 621 || owmID == 622 { - return model.WMOCode(86) - } - - return model.WMOCode(73) - - // 7xx Atmosphere (mist/smoke/haze/dust/fog/etc.) - case owmID >= 701 && owmID <= 781: - return model.WMOCode(45) - - // 800 Clear - case owmID == 800: - return model.WMOCode(0) - - // 80x Clouds - case owmID == 801: - return model.WMOCode(1) - case owmID == 802: - return model.WMOCode(2) - case owmID == 803 || owmID == 804: - return model.WMOCode(3) - - default: - return model.WMOUnknown +// OpenWeather does not tell us the unit system in the response body. We therefore enforce that +// the request URL explicitly contains units=metric; otherwise normalization would be ambiguous. +func requireMetricUnits(rawURL string) error { + u, err := url.Parse(strings.TrimSpace(rawURL)) + if err != nil { + return fmt.Errorf("invalid url %q: %w", rawURL, err) } + + units := strings.ToLower(strings.TrimSpace(u.Query().Get("units"))) + if units != "metric" { + // Treat missing units ("" -> standard) as non-compliant too. + if units == "" { + units = "(missing; defaults to standard)" + } + return fmt.Errorf("url must include units=metric (got units=%s)", units) + } + + return nil }