From f43babdfd2867269d69d1234a11e29f7ef2deed6 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Wed, 14 Jan 2026 12:10:32 -0600 Subject: [PATCH] openmeteo: refactored the OpenMeteo source files to relocate normalization logic to internal/normalizers. --- internal/normalizers/openmeteo/common.go | 40 ++++ internal/normalizers/openmeteo/observation.go | 180 ++++++++++++++++ internal/normalizers/openmeteo/register.go | 9 +- internal/normalizers/openmeteo/types.go | 33 +++ internal/sources/openmeteo/observation.go | 201 ++++++++---------- 5 files changed, 348 insertions(+), 115 deletions(-) create mode 100644 internal/normalizers/openmeteo/common.go create mode 100644 internal/normalizers/openmeteo/observation.go create mode 100644 internal/normalizers/openmeteo/types.go diff --git a/internal/normalizers/openmeteo/common.go b/internal/normalizers/openmeteo/common.go new file mode 100644 index 0000000..e948485 --- /dev/null +++ b/internal/normalizers/openmeteo/common.go @@ -0,0 +1,40 @@ +// FILE: ./internal/normalizers/openmeteo/common.go +package openmeteo + +import ( + "fmt" + "strings" + "time" +) + +// parseOpenMeteoTime parses Open-Meteo timestamps. +// +// Open-Meteo commonly returns "YYYY-MM-DDTHH:MM" (no timezone suffix) when timezone +// is provided separately. When a timezone suffix is present (RFC3339), we accept it too. +// +// This is provider-specific because it relies on Open-Meteo's timezone and offset fields. +func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) { + s = strings.TrimSpace(s) + if s == "" { + return time.Time{}, fmt.Errorf("empty time") + } + + // If the server returned an RFC3339 timestamp with timezone, treat it as authoritative. + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t, nil + } + + // Typical Open-Meteo format: "2006-01-02T15:04" + const layout = "2006-01-02T15:04" + + // Best effort: try to load the timezone as an IANA name. + if tz != "" { + if loc, err := time.LoadLocation(tz); err == nil { + return time.ParseInLocation(layout, s, loc) + } + } + + // Fallback: use a fixed zone from the offset seconds. + loc := time.FixedZone("open-meteo", utcOffsetSeconds) + return time.ParseInLocation(layout, s, loc) +} diff --git a/internal/normalizers/openmeteo/observation.go b/internal/normalizers/openmeteo/observation.go new file mode 100644 index 0000000..12b2c3a --- /dev/null +++ b/internal/normalizers/openmeteo/observation.go @@ -0,0 +1,180 @@ +// FILE: ./internal/normalizers/openmeteo/observation.go +package openmeteo + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "gitea.maximumdirect.net/ejr/feedkit/event" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" + normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" +) + +// ObservationNormalizer converts: +// +// standards.SchemaRawOpenMeteoCurrentV1 -> standards.SchemaWeatherObservationV1 +// +// It interprets Open-Meteo "current weather" JSON and maps it into the canonical +// model.WeatherObservation representation. +// +// Caveats / assumptions: +// +// - Open-Meteo is not a station feed; StationID is synthesized from lat/lon. +// - Open-Meteo provides WMO weather_code directly; we treat it as authoritative. +// - Day/night handling uses Open-Meteo's is_day flag when present. +// - Pressure fields are typically hPa; we convert to Pa to match our model. +// +// Timestamp handling: +// +// - Open-Meteo "current.time" is often "YYYY-MM-DDTHH:MM" with timezone specified +// separately; we interpret it using the returned timezone / utc_offset_seconds. +// - If parsing fails, Timestamp may be zero and EffectiveAt will be omitted. +type ObservationNormalizer struct{} + +func (ObservationNormalizer) Match(e event.Event) bool { + return strings.TrimSpace(e.Schema) == standards.SchemaRawOpenMeteoCurrentV1 +} + +func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { + _ = ctx // normalization is pure/CPU; keep ctx for future expensive steps + + rawBytes, err := normcommon.PayloadBytes(in) + if err != nil { + return nil, fmt.Errorf("openmeteo observation normalize: %w", err) + } + + var parsed omResponse + if err := json.Unmarshal(rawBytes, &parsed); err != nil { + return nil, fmt.Errorf("openmeteo observation normalize: decode raw payload: %w", err) + } + + obs, effectiveAt, err := buildObservation(parsed) + if err != nil { + return nil, err + } + + out := in + out.Schema = standards.SchemaWeatherObservationV1 + out.Payload = obs + + // EffectiveAt is optional; for observations it is naturally the observation timestamp. + if !effectiveAt.IsZero() { + t := effectiveAt.UTC() + out.EffectiveAt = &t + } + + if err := out.Validate(); err != nil { + return nil, err + } + return &out, nil +} + +// buildObservation contains the domain mapping logic (provider -> canonical model). +func buildObservation(parsed omResponse) (model.WeatherObservation, time.Time, error) { + // Parse current.time. + var ts time.Time + if s := strings.TrimSpace(parsed.Current.Time); s != "" { + t, err := parseOpenMeteoTime(s, parsed.Timezone, parsed.UTCOffsetSeconds) + if err != nil { + return model.WeatherObservation{}, time.Time{}, fmt.Errorf("openmeteo observation normalize: parse time %q: %w", s, err) + } + ts = t.UTC() + } + + // Day/night: optional. + var isDay *bool + if parsed.Current.IsDay != nil { + v := *parsed.Current.IsDay == 1 + isDay = &v + } + + // WMO weather code: optional. + wmo := model.WMOUnknown + if parsed.Current.WeatherCode != nil { + wmo = model.WMOCode(*parsed.Current.WeatherCode) + } + + canonicalText := standards.WMOText(wmo, isDay) + + // Station identity: Open-Meteo is not a station feed; synthesize from coordinates. + stationID := "" + if parsed.Latitude != nil || parsed.Longitude != nil { + lat := 0.0 + lon := 0.0 + if parsed.Latitude != nil { + lat = *parsed.Latitude + } + if parsed.Longitude != nil { + lon = *parsed.Longitude + } + stationID = fmt.Sprintf("OPENMETEO(%.5f,%.5f)", lat, lon) + } + + obs := model.WeatherObservation{ + StationID: stationID, + StationName: "Open-Meteo", + Timestamp: ts, + + ConditionCode: wmo, + ConditionText: canonicalText, + IsDay: isDay, + + // Open-Meteo does not provide a separate human text description for "current" + // when using weather_code; we leave provider evidence empty. + ProviderRawDescription: "", + + // Transitional / human-facing: + // keep output consistent by populating TextDescription from canonical text. + TextDescription: canonicalText, + + // IconURL: Open-Meteo does not provide an icon URL in this endpoint. + IconURL: "", + } + + // Measurements (all optional; only set when present). + if parsed.Current.Temperature2m != nil { + v := *parsed.Current.Temperature2m + obs.TemperatureC = &v + } + + if parsed.Current.RelativeHumidity2m != nil { + v := *parsed.Current.RelativeHumidity2m + obs.RelativeHumidityPercent = &v + } + + if parsed.Current.WindDirection10m != nil { + v := *parsed.Current.WindDirection10m + obs.WindDirectionDegrees = &v + } + + if parsed.Current.WindSpeed10m != nil { + v := *parsed.Current.WindSpeed10m // Open-Meteo returns km/h for wind_speed_10m + obs.WindSpeedKmh = &v + } + + if parsed.Current.WindGusts10m != nil { + v := *parsed.Current.WindGusts10m // Open-Meteo returns km/h for wind_gusts_10m + obs.WindGustKmh = &v + } + + if parsed.Current.SurfacePressure != nil { + v := normcommon.PressurePaFromHPa(*parsed.Current.SurfacePressure) + obs.BarometricPressurePa = &v + } + + if parsed.Current.PressureMSL != nil { + v := normcommon.PressurePaFromHPa(*parsed.Current.PressureMSL) + obs.SeaLevelPressurePa = &v + } + + if parsed.Elevation != nil { + v := *parsed.Elevation + obs.ElevationMeters = &v + } + + return obs, ts, nil +} diff --git a/internal/normalizers/openmeteo/register.go b/internal/normalizers/openmeteo/register.go index 4401ae2..4616c1b 100644 --- a/internal/normalizers/openmeteo/register.go +++ b/internal/normalizers/openmeteo/register.go @@ -1,3 +1,4 @@ +// FILE: ./internal/normalizers/openmeteo/register.go package openmeteo import ( @@ -5,15 +6,11 @@ import ( ) // Register registers Open-Meteo normalizers into the provided registry. -// -// This is intentionally empty as a stub. As normalizers are implemented, -// register them here, e.g.: -// -// reg.Register(ObservationNormalizer{}) func Register(reg *fknormalize.Registry) { if reg == nil { return } - // TODO: register Open-Meteo normalizers here. + // Observations + reg.Register(ObservationNormalizer{}) } diff --git a/internal/normalizers/openmeteo/types.go b/internal/normalizers/openmeteo/types.go new file mode 100644 index 0000000..c5a21fc --- /dev/null +++ b/internal/normalizers/openmeteo/types.go @@ -0,0 +1,33 @@ +// FILE: ./internal/normalizers/openmeteo/types.go +package openmeteo + +// omResponse is a minimal-but-sufficient representation of the Open-Meteo "current" +// payload needed for mapping into model.WeatherObservation. +// +// We use pointers for many fields so "missing" is distinguishable from "zero". +type omResponse struct { + Latitude *float64 `json:"latitude"` + Longitude *float64 `json:"longitude"` + Timezone string `json:"timezone"` + UTCOffsetSeconds int `json:"utc_offset_seconds"` + Elevation *float64 `json:"elevation"` + + Current omCurrent `json:"current"` +} + +type omCurrent struct { + Time string `json:"time"` // e.g. "2026-01-10T12:30" (often no timezone suffix) + + Temperature2m *float64 `json:"temperature_2m"` + RelativeHumidity2m *float64 `json:"relative_humidity_2m"` + WeatherCode *int `json:"weather_code"` + + WindSpeed10m *float64 `json:"wind_speed_10m"` // km/h (per Open-Meteo docs for these fields) + WindDirection10m *float64 `json:"wind_direction_10m"` // degrees + WindGusts10m *float64 `json:"wind_gusts_10m"` // km/h + + SurfacePressure *float64 `json:"surface_pressure"` // hPa + PressureMSL *float64 `json:"pressure_msl"` // hPa + + IsDay *int `json:"is_day"` // 0/1 +} diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index 443e3a2..08206f7 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -1,20 +1,25 @@ +// FILE: ./internal/sources/openmeteo/observation.go package openmeteo import ( "context" "encoding/json" "fmt" + "io" "net/http" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" - "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) -// ObservationSource polls an Open-Meteo endpoint and emits one Observation event. +// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event. +// +// Refactor (mirrors NWS/OpenWeather): +// - Source responsibility: fetch bytes + emit a valid event envelope +// - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation // // Typical URL shape (you provide this via config): // @@ -34,14 +39,12 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { return nil, fmt.Errorf("openmeteo_observation %q: params are required (need params.url)", cfg.Name) } - // Open-Meteo needs only a URL; everything else is optional. url, ok := cfg.ParamString("url", "URL") if !ok { return nil, fmt.Errorf("openmeteo_observation %q: params.url is required", cfg.Name) } // Open-Meteo doesn't require a special User-Agent, but including one is polite. - // If the caller doesn't provide one, we supply a reasonable default. ua := cfg.ParamStringDefault("weatherfeeder (open-meteo client)", "user_agent", "userAgent") return &ObservationSource{ @@ -56,31 +59,43 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { func (s *ObservationSource) Name() string { return s.name } -// Kind is used for routing/policy. Note that the TYPE is domain-agnostic (event.Kind). +// Kind is used for routing/policy. +// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } +// Poll fetches Open-Meteo "current" and emits exactly one RAW Event. +// The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenMeteoCurrentV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { - obs, effectiveAt, eventID, err := s.fetchAndParse(ctx) + raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } - // Make EffectiveAt a stable pointer. - effectiveAtCopy := effectiveAt + eventID := buildEventID(s.name, meta) + if strings.TrimSpace(eventID) == "" { + // Extremely defensive fallback: keep the envelope valid no matter what. + eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) + } + + // EffectiveAt is optional; for observations it’s naturally the observation timestamp. + var effectiveAt *time.Time + if !meta.ParsedTimestamp.IsZero() { + t := meta.ParsedTimestamp.UTC() + effectiveAt = &t + } e := event.Event{ ID: eventID, Kind: s.Kind(), Source: s.name, EmittedAt: time.Now().UTC(), - EffectiveAt: &effectiveAtCopy, + EffectiveAt: effectiveAt, - // Optional but useful for downstream consumers once multiple event types exist. - Schema: "weather.observation.v1", + // RAW schema (normalizer matches on this). + Schema: standards.SchemaRawOpenMeteoCurrentV1, - // The payload domain-specific (model.WeatherObservation). - // feedkit treats this as opaque. - Payload: obs, + // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. + Payload: raw, } if err := e.Validate(); err != nil { @@ -90,43 +105,30 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return []event.Event{e}, nil } -// ---- Open-Meteo JSON parsing ---- +// ---- RAW fetch + minimal metadata decode ---- -type omResponse struct { +// openMeteoMeta is a *minimal* decode of the Open-Meteo payload used only to build +// a stable Event.ID and a useful EffectiveAt for the envelope. +// +// Important: this is NOT where we map into internal/model. That belongs in the normalizer. +type openMeteoMeta struct { Latitude float64 `json:"latitude"` Longitude float64 `json:"longitude"` Timezone string `json:"timezone"` UTCOffsetSeconds int `json:"utc_offset_seconds"` - Elevation float64 `json:"elevation"` - Current omCurrent `json:"current"` + Current struct { + Time string `json:"time"` // e.g. "2026-01-10T12:30" (often no timezone suffix) + } `json:"current"` + + // Convenience fields populated after decode. + ParsedTimestamp time.Time `json:"-"` } -type omCurrent struct { - Time string `json:"time"` // e.g. "2026-01-10T12:30" - Interval int `json:"interval"` - Temperature2m float64 `json:"temperature_2m"` - RelativeHumidity2m float64 `json:"relative_humidity_2m"` - WeatherCode int `json:"weather_code"` - - WindSpeed10m float64 `json:"wind_speed_10m"` // km/h - WindDirection10m float64 `json:"wind_direction_10m"` // degrees - WindGusts10m float64 `json:"wind_gusts_10m"` // km/h - - Precipitation float64 `json:"precipitation"` - - SurfacePressure float64 `json:"surface_pressure"` // hPa - PressureMSL float64 `json:"pressure_msl"` // hPa - - CloudCover float64 `json:"cloud_cover"` - ApparentTemperature float64 `json:"apparent_temperature"` - IsDay int `json:"is_day"` -} - -func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, time.Time, string, error) { - req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil) +func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) if err != nil { - return model.WeatherObservation{}, time.Time{}, "", err + return nil, openMeteoMeta{}, err } req.Header.Set("User-Agent", s.userAgent) @@ -134,92 +136,74 @@ func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObs res, err := s.client.Do(req) if err != nil { - return model.WeatherObservation{}, time.Time{}, "", err + return nil, openMeteoMeta{}, err } defer res.Body.Close() if res.StatusCode < 200 || res.StatusCode >= 300 { - return model.WeatherObservation{}, time.Time{}, "", fmt.Errorf("openmeteo_observation %q: HTTP %s", s.name, res.Status) + return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: HTTP %s", s.name, res.Status) } - var parsed omResponse - if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { - return model.WeatherObservation{}, time.Time{}, "", err - } - - // Parse current.time. - // Open-Meteo "time" commonly looks like "YYYY-MM-DDTHH:MM" (no timezone suffix). - // We'll interpret it in the timezone returned by the API (best-effort). - t, err := parseOpenMeteoTime(parsed.Current.Time, parsed.Timezone, parsed.UTCOffsetSeconds) + b, err := io.ReadAll(res.Body) if err != nil { - return model.WeatherObservation{}, time.Time{}, "", fmt.Errorf("openmeteo_observation %q: parse time %q: %w", s.name, parsed.Current.Time, err) + return nil, openMeteoMeta{}, err + } + if len(b) == 0 { + return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: empty response body", s.name) } - // Normalize to UTC inside the domain model; presentation can localize later. - effectiveAt := t.UTC() + raw := json.RawMessage(b) - // Measurements - tempC := parsed.Current.Temperature2m - rh := parsed.Current.RelativeHumidity2m - wdir := parsed.Current.WindDirection10m - wsKmh := parsed.Current.WindSpeed10m - wgKmh := parsed.Current.WindGusts10m - - surfacePa := parsed.Current.SurfacePressure * 100.0 - mslPa := parsed.Current.PressureMSL * 100.0 - - elevM := parsed.Elevation - - // Canonical condition (WMO) - isDay := parsed.Current.IsDay == 1 - wmo := model.WMOCode(parsed.Current.WeatherCode) - canonicalText := standards.WMOText(wmo, &isDay) - - obs := model.WeatherObservation{ - // Open-Meteo isn't a station feed; we’ll label this with a synthetic identifier. - StationID: fmt.Sprintf("OPENMETEO(%.5f,%.5f)", parsed.Latitude, parsed.Longitude), - StationName: "Open-Meteo", - - Timestamp: effectiveAt, - - // Canonical conditions - ConditionCode: wmo, - ConditionText: canonicalText, - IsDay: &isDay, - - // Provider evidence (Open-Meteo does not provide a separate raw description here) - ProviderRawDescription: "", - - // Human-facing fields: - // Populate TextDescription with canonical text so downstream output remains consistent. - TextDescription: canonicalText, - - TemperatureC: &tempC, - RelativeHumidityPercent: &rh, - - WindDirectionDegrees: &wdir, - WindSpeedKmh: &wsKmh, - WindGustKmh: &wgKmh, - - BarometricPressurePa: &surfacePa, - SeaLevelPressurePa: &mslPa, - - ElevationMeters: &elevM, + var meta openMeteoMeta + if err := json.Unmarshal(b, &meta); err != nil { + // If metadata decode fails, still return raw; envelope will fall back to computed ID without EffectiveAt. + return raw, openMeteoMeta{}, nil } - // Build a stable event ID. - // Open-Meteo doesn't supply a unique ID, so we key by source + effective time. - eventID := fmt.Sprintf("openmeteo:%s:%s", s.name, effectiveAt.Format(time.RFC3339Nano)) + // Best-effort parse of current.time so the envelope carries a meaningful EffectiveAt. + // This duplicates the parsing logic in the normalizer, but ONLY for envelope metadata. + if t, err := parseOpenMeteoTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil { + meta.ParsedTimestamp = t.UTC() + } - return obs, effectiveAt, eventID, nil + return raw, meta, nil } +func buildEventID(sourceName string, meta openMeteoMeta) string { + // Prefer stable location key from lat/lon if present. + locKey := "" + if meta.Latitude != 0 || meta.Longitude != 0 { + locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Latitude, meta.Longitude) + } else { + locKey = "loc:unknown" + } + + ts := meta.ParsedTimestamp + if ts.IsZero() { + // If we couldn't parse current.time, use "now" so we still emit. + ts = time.Now().UTC() + } + + // Example: + // openmeteo:current::coord:38.62390,-90.35710:2026-01-14T18:00:00.000Z + return fmt.Sprintf("openmeteo:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) +} + +// parseOpenMeteoTime parses Open-Meteo "current.time" using timezone/offset hints. +// +// Open-Meteo commonly returns "YYYY-MM-DDTHH:MM" (no timezone suffix) when timezone +// is provided separately. When a timezone suffix is present (RFC3339), we accept it too. func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) { s = strings.TrimSpace(s) if s == "" { return time.Time{}, fmt.Errorf("empty time") } + // If the server returned an RFC3339 timestamp with timezone, take it as authoritative. + if t, err := time.Parse(time.RFC3339, s); err == nil { + return t, nil + } + // Typical Open-Meteo format: "2006-01-02T15:04" const layout = "2006-01-02T15:04" @@ -231,8 +215,7 @@ func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, e } } - // Fallback: use the offset seconds to create a fixed zone. - // (If offset is 0, this is UTC.) + // Fallback: use the offset seconds to create a fixed zone. (If offset is 0, this is UTC.) loc := time.FixedZone("open-meteo", utcOffsetSeconds) return time.ParseInLocation(layout, s, loc) }