openweather: refactored the OpenWeather source files to relocate normalization logic to internal/normalizers.
This commit is contained in:
206
internal/normalizers/openweather/observation.go
Normal file
206
internal/normalizers/openweather/observation.go
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
// FILE: ./internal/normalizers/openweather/observation.go
|
||||||
|
package openweather
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
||||||
|
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ObservationNormalizer converts:
|
||||||
|
//
|
||||||
|
// standards.SchemaRawOpenWeatherCurrentV1 -> standards.SchemaWeatherObservationV1
|
||||||
|
//
|
||||||
|
// It interprets OpenWeatherMap "current weather" JSON and maps it into the canonical
|
||||||
|
// model.WeatherObservation representation.
|
||||||
|
//
|
||||||
|
// Caveats / assumptions:
|
||||||
|
// - Unit system: this normalizer assumes the upstream request used `units=metric`.
|
||||||
|
// The OpenWeather source enforces this invariant (fails fast otherwise).
|
||||||
|
// That means:
|
||||||
|
// - main.temp is °C
|
||||||
|
// - wind.speed and wind.gust are m/s (we convert to km/h)
|
||||||
|
// - pressure fields are hPa (we convert to Pa)
|
||||||
|
//
|
||||||
|
// Day/night handling:
|
||||||
|
// - Prefer the OpenWeather icon suffix ("d" / "n") when available.
|
||||||
|
// - Otherwise fall back to sunrise/sunset bounds (unix seconds).
|
||||||
|
type ObservationNormalizer struct{}
|
||||||
|
|
||||||
|
func (ObservationNormalizer) Match(e event.Event) bool {
|
||||||
|
return strings.TrimSpace(e.Schema) == standards.SchemaRawOpenWeatherCurrentV1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||||
|
_ = ctx // normalization is pure/CPU; keep ctx for future expensive steps
|
||||||
|
|
||||||
|
rawBytes, err := normcommon.PayloadBytes(in)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("openweather observation normalize: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var parsed owmResponse
|
||||||
|
if err := json.Unmarshal(rawBytes, &parsed); err != nil {
|
||||||
|
return nil, fmt.Errorf("openweather observation normalize: decode raw payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obs, effectiveAt, err := buildObservation(parsed)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := in
|
||||||
|
out.Schema = standards.SchemaWeatherObservationV1
|
||||||
|
out.Payload = obs
|
||||||
|
|
||||||
|
// EffectiveAt is optional; for observations it is naturally the observation timestamp.
|
||||||
|
if !effectiveAt.IsZero() {
|
||||||
|
t := effectiveAt.UTC()
|
||||||
|
out.EffectiveAt = &t
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := out.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildObservation contains the domain mapping logic (provider -> canonical model).
|
||||||
|
func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time, error) {
|
||||||
|
// Timestamp: dt is unix seconds, UTC (OpenWeather contract).
|
||||||
|
var ts time.Time
|
||||||
|
if parsed.Dt > 0 {
|
||||||
|
ts = time.Unix(parsed.Dt, 0).UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Primary weather condition: OpenWeather returns an array; treat [0] as primary.
|
||||||
|
owmID, rawDesc, icon := primaryCondition(parsed.Weather)
|
||||||
|
|
||||||
|
// Day/night inference:
|
||||||
|
// 1) icon suffix "d" or "n"
|
||||||
|
// 2) sunrise/sunset bounds
|
||||||
|
isDay := inferIsDay(icon, parsed.Dt, parsed.Sys.Sunrise, parsed.Sys.Sunset)
|
||||||
|
|
||||||
|
// Unit policy: metric is enforced by the source, so:
|
||||||
|
// - temp is already °C
|
||||||
|
// - wind speed is m/s -> km/h conversion
|
||||||
|
tempC := parsed.Main.Temp
|
||||||
|
rh := parsed.Main.Humidity
|
||||||
|
|
||||||
|
surfacePa := normcommon.PressurePaFromHPa(parsed.Main.Pressure)
|
||||||
|
var seaLevelPa *float64
|
||||||
|
if parsed.Main.SeaLevel != nil {
|
||||||
|
v := normcommon.PressurePaFromHPa(*parsed.Main.SeaLevel)
|
||||||
|
seaLevelPa = &v
|
||||||
|
}
|
||||||
|
|
||||||
|
wsKmh := normcommon.SpeedKmhFromMps(parsed.Wind.Speed)
|
||||||
|
var wgKmh *float64
|
||||||
|
if parsed.Wind.Gust != nil {
|
||||||
|
v := normcommon.SpeedKmhFromMps(*parsed.Wind.Gust)
|
||||||
|
wgKmh = &v
|
||||||
|
}
|
||||||
|
|
||||||
|
var visM *float64
|
||||||
|
if parsed.Visibility != nil {
|
||||||
|
v := *parsed.Visibility
|
||||||
|
visM = &v
|
||||||
|
}
|
||||||
|
|
||||||
|
// Condition mapping: OpenWeather condition IDs -> canonical WMO code vocabulary.
|
||||||
|
wmo := mapOpenWeatherToWMO(owmID)
|
||||||
|
canonicalText := standards.WMOText(wmo, isDay)
|
||||||
|
|
||||||
|
iconURL := openWeatherIconURL(icon)
|
||||||
|
|
||||||
|
stationID := openWeatherStationID(parsed)
|
||||||
|
stationName := strings.TrimSpace(parsed.Name)
|
||||||
|
if stationName == "" {
|
||||||
|
stationName = "OpenWeatherMap"
|
||||||
|
}
|
||||||
|
|
||||||
|
obs := model.WeatherObservation{
|
||||||
|
StationID: stationID,
|
||||||
|
StationName: stationName,
|
||||||
|
Timestamp: ts,
|
||||||
|
|
||||||
|
ConditionCode: wmo,
|
||||||
|
ConditionText: canonicalText,
|
||||||
|
IsDay: isDay,
|
||||||
|
|
||||||
|
ProviderRawDescription: rawDesc,
|
||||||
|
|
||||||
|
// Human-facing legacy fields: populate with canonical text for consistency.
|
||||||
|
TextDescription: canonicalText,
|
||||||
|
IconURL: iconURL,
|
||||||
|
|
||||||
|
TemperatureC: &tempC,
|
||||||
|
|
||||||
|
WindDirectionDegrees: parsed.Wind.Deg,
|
||||||
|
WindSpeedKmh: &wsKmh,
|
||||||
|
WindGustKmh: wgKmh,
|
||||||
|
|
||||||
|
BarometricPressurePa: &surfacePa,
|
||||||
|
SeaLevelPressurePa: seaLevelPa,
|
||||||
|
VisibilityMeters: visM,
|
||||||
|
|
||||||
|
RelativeHumidityPercent: &rh,
|
||||||
|
}
|
||||||
|
|
||||||
|
return obs, ts, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func primaryCondition(list []owmWeather) (id int, desc string, icon string) {
|
||||||
|
if len(list) == 0 {
|
||||||
|
return 0, "", ""
|
||||||
|
}
|
||||||
|
w := list[0]
|
||||||
|
return w.ID, strings.TrimSpace(w.Description), strings.TrimSpace(w.Icon)
|
||||||
|
}
|
||||||
|
|
||||||
|
func inferIsDay(icon string, dt, sunrise, sunset int64) *bool {
|
||||||
|
// Prefer icon suffix.
|
||||||
|
icon = strings.TrimSpace(icon)
|
||||||
|
if icon != "" {
|
||||||
|
last := icon[len(icon)-1]
|
||||||
|
switch last {
|
||||||
|
case 'd':
|
||||||
|
v := true
|
||||||
|
return &v
|
||||||
|
case 'n':
|
||||||
|
v := false
|
||||||
|
return &v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to sunrise/sunset bounds if provided.
|
||||||
|
if dt > 0 && sunrise > 0 && sunset > 0 {
|
||||||
|
v := dt >= sunrise && dt < sunset
|
||||||
|
return &v
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func openWeatherIconURL(icon string) string {
|
||||||
|
icon = strings.TrimSpace(icon)
|
||||||
|
if icon == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon)
|
||||||
|
}
|
||||||
|
|
||||||
|
func openWeatherStationID(parsed owmResponse) string {
|
||||||
|
if parsed.ID != 0 {
|
||||||
|
return fmt.Sprintf("OPENWEATHER(%d)", parsed.ID)
|
||||||
|
}
|
||||||
|
// Fallback: synthesize from coordinates.
|
||||||
|
return fmt.Sprintf("OPENWEATHER(%.5f,%.5f)", parsed.Coord.Lat, parsed.Coord.Lon)
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
// FILE: ./internal/normalizers/openweather/register.go
|
||||||
package openweather
|
package openweather
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -5,15 +6,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Register registers OpenWeather normalizers into the provided registry.
|
// Register registers OpenWeather normalizers into the provided registry.
|
||||||
//
|
|
||||||
// This is intentionally empty as a stub. As normalizers are implemented,
|
|
||||||
// register them here, e.g.:
|
|
||||||
//
|
|
||||||
// reg.Register(ObservationNormalizer{})
|
|
||||||
func Register(reg *fknormalize.Registry) {
|
func Register(reg *fknormalize.Registry) {
|
||||||
if reg == nil {
|
if reg == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: register OpenWeather normalizers here.
|
// Observations
|
||||||
|
reg.Register(ObservationNormalizer{})
|
||||||
}
|
}
|
||||||
|
|||||||
51
internal/normalizers/openweather/types.go
Normal file
51
internal/normalizers/openweather/types.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
// FILE: ./internal/normalizers/openweather/types.go
|
||||||
|
package openweather
|
||||||
|
|
||||||
|
// owmResponse is a minimal-but-sufficient representation of the OpenWeatherMap
|
||||||
|
// "Current weather" payload needed for mapping into model.WeatherObservation.
|
||||||
|
//
|
||||||
|
// NOTE: OpenWeather field presence can vary by location/product tier.
|
||||||
|
// We keep some fields optional (pointers) where the API commonly omits them.
|
||||||
|
type owmResponse struct {
|
||||||
|
Coord struct {
|
||||||
|
Lon float64 `json:"lon"`
|
||||||
|
Lat float64 `json:"lat"`
|
||||||
|
} `json:"coord"`
|
||||||
|
|
||||||
|
Weather []owmWeather `json:"weather"`
|
||||||
|
|
||||||
|
Main struct {
|
||||||
|
Temp float64 `json:"temp"` // °C when units=metric (enforced by source)
|
||||||
|
Pressure float64 `json:"pressure"` // hPa
|
||||||
|
Humidity float64 `json:"humidity"` // %
|
||||||
|
SeaLevel *float64 `json:"sea_level"`
|
||||||
|
} `json:"main"`
|
||||||
|
|
||||||
|
Visibility *float64 `json:"visibility"` // meters (optional)
|
||||||
|
|
||||||
|
Wind struct {
|
||||||
|
Speed float64 `json:"speed"` // m/s when units=metric (enforced by source)
|
||||||
|
Deg *float64 `json:"deg"`
|
||||||
|
Gust *float64 `json:"gust"` // m/s when units=metric (enforced by source)
|
||||||
|
} `json:"wind"`
|
||||||
|
|
||||||
|
Dt int64 `json:"dt"` // unix seconds, UTC
|
||||||
|
|
||||||
|
Sys struct {
|
||||||
|
Country string `json:"country"`
|
||||||
|
Sunrise int64 `json:"sunrise"` // unix, UTC
|
||||||
|
Sunset int64 `json:"sunset"` // unix, UTC
|
||||||
|
} `json:"sys"`
|
||||||
|
|
||||||
|
Timezone int `json:"timezone"` // seconds offset from UTC
|
||||||
|
ID int64 `json:"id"` // city id
|
||||||
|
Name string `json:"name"` // city name
|
||||||
|
Cod int `json:"cod"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type owmWeather struct {
|
||||||
|
ID int `json:"id"`
|
||||||
|
Main string `json:"main"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
Icon string `json:"icon"` // e.g. "04d" or "01n"
|
||||||
|
}
|
||||||
98
internal/normalizers/openweather/wmo_map.go
Normal file
98
internal/normalizers/openweather/wmo_map.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
// FILE: ./internal/normalizers/openweather/wmo_map.go
|
||||||
|
package openweather
|
||||||
|
|
||||||
|
import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
||||||
|
|
||||||
|
// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into weatherfeeder's
|
||||||
|
// canonical WMO code vocabulary.
|
||||||
|
//
|
||||||
|
// This is an approximate semantic mapping between two different code systems.
|
||||||
|
// We map conservatively into the subset currently represented in standards.WMODescriptions.
|
||||||
|
func mapOpenWeatherToWMO(owmID int) model.WMOCode {
|
||||||
|
switch {
|
||||||
|
// 2xx Thunderstorm
|
||||||
|
case owmID >= 200 && owmID <= 232:
|
||||||
|
return 95
|
||||||
|
|
||||||
|
// 3xx Drizzle
|
||||||
|
case owmID >= 300 && owmID <= 321:
|
||||||
|
if owmID == 300 {
|
||||||
|
return 51
|
||||||
|
}
|
||||||
|
if owmID == 302 {
|
||||||
|
return 55
|
||||||
|
}
|
||||||
|
return 53
|
||||||
|
|
||||||
|
// 5xx Rain
|
||||||
|
case owmID >= 500 && owmID <= 531:
|
||||||
|
// 511 is "freezing rain"
|
||||||
|
if owmID == 511 {
|
||||||
|
return 67
|
||||||
|
}
|
||||||
|
|
||||||
|
// showers bucket (520-531)
|
||||||
|
if owmID >= 520 && owmID <= 531 {
|
||||||
|
if owmID == 520 {
|
||||||
|
return 80
|
||||||
|
}
|
||||||
|
if owmID == 522 {
|
||||||
|
return 82
|
||||||
|
}
|
||||||
|
return 81
|
||||||
|
}
|
||||||
|
|
||||||
|
// normal rain intensity
|
||||||
|
if owmID == 500 {
|
||||||
|
return 61
|
||||||
|
}
|
||||||
|
if owmID == 501 {
|
||||||
|
return 63
|
||||||
|
}
|
||||||
|
if owmID >= 502 && owmID <= 504 {
|
||||||
|
return 65
|
||||||
|
}
|
||||||
|
return 63
|
||||||
|
|
||||||
|
// 6xx Snow
|
||||||
|
case owmID >= 600 && owmID <= 622:
|
||||||
|
if owmID == 600 {
|
||||||
|
return 71
|
||||||
|
}
|
||||||
|
if owmID == 601 {
|
||||||
|
return 73
|
||||||
|
}
|
||||||
|
if owmID == 602 {
|
||||||
|
return 75
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snow showers bucket (620-622)
|
||||||
|
if owmID == 620 {
|
||||||
|
return 85
|
||||||
|
}
|
||||||
|
if owmID == 621 || owmID == 622 {
|
||||||
|
return 86
|
||||||
|
}
|
||||||
|
|
||||||
|
return 73
|
||||||
|
|
||||||
|
// 7xx Atmosphere (mist/smoke/haze/dust/fog/etc.)
|
||||||
|
case owmID >= 701 && owmID <= 781:
|
||||||
|
return 45
|
||||||
|
|
||||||
|
// 800 Clear
|
||||||
|
case owmID == 800:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
// 80x Clouds
|
||||||
|
case owmID == 801:
|
||||||
|
return 1
|
||||||
|
case owmID == 802:
|
||||||
|
return 2
|
||||||
|
case owmID == 803 || owmID == 804:
|
||||||
|
return 3
|
||||||
|
|
||||||
|
default:
|
||||||
|
return model.WMOUnknown
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,9 +1,11 @@
|
|||||||
|
// FILE: ./internal/sources/openweather/observation.go
|
||||||
package openweather
|
package openweather
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -11,25 +13,23 @@ import (
|
|||||||
|
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
|
|
||||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits one Observation event.
|
// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits a RAW observation Event.
|
||||||
//
|
//
|
||||||
// Typical URL shape (you provide this via config):
|
// Refactor (mirrors NWS):
|
||||||
|
// - Source responsibility: fetch bytes + emit a valid event envelope.
|
||||||
|
// - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation.
|
||||||
|
//
|
||||||
|
// Typical URL shape (provided via config):
|
||||||
//
|
//
|
||||||
// https://api.openweathermap.org/data/2.5/weather?lat=...&lon=...&appid=...&units=metric
|
// https://api.openweathermap.org/data/2.5/weather?lat=...&lon=...&appid=...&units=metric
|
||||||
//
|
//
|
||||||
// Unit notes:
|
// IMPORTANT UNIT POLICY (weatherfeeder convention):
|
||||||
// - If `units` is omitted, OpenWeather uses "standard" units (temp Kelvin, wind m/s).
|
// OpenWeather changes units based on the `units` query parameter but does NOT include the unit
|
||||||
// - `units=metric` => temp Celsius, wind m/s.
|
// system in the response body. To keep normalization deterministic, this driver *requires*
|
||||||
// - `units=imperial` => temp Fahrenheit, wind mph.
|
// `units=metric`. If absent (or non-metric), the driver returns an error.
|
||||||
//
|
|
||||||
// weatherd normalizes to:
|
|
||||||
// - TemperatureC in °C
|
|
||||||
// - WindSpeedKmh in km/h
|
|
||||||
// - Pressure in Pa (OpenWeather provides hPa)
|
|
||||||
type ObservationSource struct {
|
type ObservationSource struct {
|
||||||
name string
|
name string
|
||||||
url string
|
url string
|
||||||
@@ -45,18 +45,21 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
return nil, fmt.Errorf("openweather_observation %q: params are required (need params.url)", cfg.Name)
|
return nil, fmt.Errorf("openweather_observation %q: params are required (need params.url)", cfg.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Driver-specific settings live under cfg.Params to keep feedkit domain-agnostic.
|
rawURL, ok := cfg.ParamString("url", "URL")
|
||||||
url, ok := cfg.ParamString("url", "URL")
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("openweather_observation %q: params.url is required", cfg.Name)
|
return nil, fmt.Errorf("openweather_observation %q: params.url is required", cfg.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optional User-Agent.
|
// Fail fast: enforce deterministic unit system.
|
||||||
|
if err := requireMetricUnits(rawURL); err != nil {
|
||||||
|
return nil, fmt.Errorf("openweather_observation %q: %w", cfg.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
ua := cfg.ParamStringDefault("weatherfeeder (openweather client)", "user_agent", "userAgent")
|
ua := cfg.ParamStringDefault("weatherfeeder (openweather client)", "user_agent", "userAgent")
|
||||||
|
|
||||||
return &ObservationSource{
|
return &ObservationSource{
|
||||||
name: cfg.Name,
|
name: cfg.Name,
|
||||||
url: url,
|
url: rawURL,
|
||||||
userAgent: ua,
|
userAgent: ua,
|
||||||
client: &http.Client{
|
client: &http.Client{
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
@@ -67,19 +70,31 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
|
|||||||
func (s *ObservationSource) Name() string { return s.name }
|
func (s *ObservationSource) Name() string { return s.name }
|
||||||
|
|
||||||
// Kind is used for routing/policy.
|
// Kind is used for routing/policy.
|
||||||
|
// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical.
|
||||||
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
|
||||||
|
|
||||||
|
// Poll fetches OpenWeather "current weather" and emits exactly one RAW Event.
|
||||||
|
// The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenWeatherCurrentV1.
|
||||||
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
||||||
obs, eventID, err := s.fetchAndParse(ctx)
|
// Re-check policy defensively (in case the URL is mutated after construction).
|
||||||
|
if err := requireMetricUnits(s.url); err != nil {
|
||||||
|
return nil, fmt.Errorf("openweather_observation %q: %w", s.name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
raw, meta, err := s.fetchRaw(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// EffectiveAt is optional. If we have a real observation timestamp, use it.
|
eventID := buildEventID(s.name, meta)
|
||||||
// We intentionally take a copy so the pointer is stable and not tied to a struct field.
|
if strings.TrimSpace(eventID) == "" {
|
||||||
|
// Extremely defensive fallback: should never happen, but keep the envelope valid.
|
||||||
|
eventID = fmt.Sprintf("openweather:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano))
|
||||||
|
}
|
||||||
|
|
||||||
var effectiveAt *time.Time
|
var effectiveAt *time.Time
|
||||||
if !obs.Timestamp.IsZero() {
|
if !meta.ParsedTimestamp.IsZero() {
|
||||||
t := obs.Timestamp
|
t := meta.ParsedTimestamp.UTC()
|
||||||
effectiveAt = &t
|
effectiveAt = &t
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -90,8 +105,11 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
EmittedAt: time.Now().UTC(),
|
EmittedAt: time.Now().UTC(),
|
||||||
EffectiveAt: effectiveAt,
|
EffectiveAt: effectiveAt,
|
||||||
|
|
||||||
Schema: "weather.observation.v1",
|
// RAW schema (normalizer matches on this).
|
||||||
Payload: obs,
|
Schema: standards.SchemaRawOpenWeatherCurrentV1,
|
||||||
|
|
||||||
|
// Raw JSON; normalizer will decode and map to canonical model.WeatherObservation.
|
||||||
|
Payload: raw,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := e.Validate(); err != nil {
|
if err := e.Validate(); err != nil {
|
||||||
@@ -101,58 +119,29 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
|
|||||||
return []event.Event{e}, nil
|
return []event.Event{e}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- OpenWeather JSON parsing (minimal subset) ---
|
// ---- RAW fetch + minimal metadata decode ----
|
||||||
|
|
||||||
|
// openWeatherMeta is a *minimal* decode of the OpenWeather payload used only to build
|
||||||
|
// a stable Event.ID and a useful EffectiveAt for the envelope.
|
||||||
|
type openWeatherMeta struct {
|
||||||
|
Dt int64 `json:"dt"` // unix seconds, UTC
|
||||||
|
|
||||||
|
ID int64 `json:"id"` // city id (if present)
|
||||||
|
Name string `json:"name"` // city name (optional)
|
||||||
|
|
||||||
type owmResponse struct {
|
|
||||||
Coord struct {
|
Coord struct {
|
||||||
Lon float64 `json:"lon"`
|
Lon float64 `json:"lon"`
|
||||||
Lat float64 `json:"lat"`
|
Lat float64 `json:"lat"`
|
||||||
} `json:"coord"`
|
} `json:"coord"`
|
||||||
|
|
||||||
Weather []struct {
|
// Convenience fields populated after decode.
|
||||||
ID int `json:"id"`
|
ParsedTimestamp time.Time `json:"-"`
|
||||||
Main string `json:"main"`
|
|
||||||
Description string `json:"description"`
|
|
||||||
Icon string `json:"icon"` // e.g. "04d" or "01n"
|
|
||||||
} `json:"weather"`
|
|
||||||
|
|
||||||
Main struct {
|
|
||||||
Temp float64 `json:"temp"`
|
|
||||||
Pressure float64 `json:"pressure"` // hPa
|
|
||||||
Humidity float64 `json:"humidity"` // %
|
|
||||||
SeaLevel *float64 `json:"sea_level"` // hPa (optional)
|
|
||||||
} `json:"main"`
|
|
||||||
|
|
||||||
Visibility *float64 `json:"visibility"` // meters (optional)
|
|
||||||
|
|
||||||
Wind struct {
|
|
||||||
Speed float64 `json:"speed"` // units depend on `units=...`
|
|
||||||
Deg *float64 `json:"deg"`
|
|
||||||
Gust *float64 `json:"gust"` // units depend on `units=...`
|
|
||||||
} `json:"wind"`
|
|
||||||
|
|
||||||
Clouds struct {
|
|
||||||
All *float64 `json:"all"` // cloudiness %
|
|
||||||
} `json:"clouds"`
|
|
||||||
|
|
||||||
Dt int64 `json:"dt"` // unix seconds, UTC
|
|
||||||
|
|
||||||
Sys struct {
|
|
||||||
Country string `json:"country"`
|
|
||||||
Sunrise int64 `json:"sunrise"` // unix, UTC
|
|
||||||
Sunset int64 `json:"sunset"` // unix, UTC
|
|
||||||
} `json:"sys"`
|
|
||||||
|
|
||||||
Timezone int `json:"timezone"` // seconds offset from UTC
|
|
||||||
ID int64 `json:"id"` // city id
|
|
||||||
Name string `json:"name"` // city name
|
|
||||||
Cod int `json:"cod"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObservation, string, error) {
|
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) {
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", s.url, nil)
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherObservation{}, "", err
|
return nil, openWeatherMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
req.Header.Set("User-Agent", s.userAgent)
|
req.Header.Set("User-Agent", s.userAgent)
|
||||||
@@ -160,279 +149,77 @@ func (s *ObservationSource) fetchAndParse(ctx context.Context) (model.WeatherObs
|
|||||||
|
|
||||||
res, err := s.client.Do(req)
|
res, err := s.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return model.WeatherObservation{}, "", err
|
return nil, openWeatherMeta{}, err
|
||||||
}
|
}
|
||||||
defer res.Body.Close()
|
defer res.Body.Close()
|
||||||
|
|
||||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||||
return model.WeatherObservation{}, "", fmt.Errorf("openweather_observation %q: HTTP %s", s.name, res.Status)
|
return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: HTTP %s", s.name, res.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
var parsed owmResponse
|
b, err := io.ReadAll(res.Body)
|
||||||
if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil {
|
|
||||||
return model.WeatherObservation{}, "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Timestamp: dt is unix seconds, UTC.
|
|
||||||
ts := time.Unix(parsed.Dt, 0).UTC()
|
|
||||||
|
|
||||||
// Primary weather condition: OpenWeather returns a list; we treat [0] as primary.
|
|
||||||
// If missing, we degrade gracefully.
|
|
||||||
owmID := 0
|
|
||||||
rawDesc := ""
|
|
||||||
icon := ""
|
|
||||||
if len(parsed.Weather) > 0 {
|
|
||||||
owmID = parsed.Weather[0].ID
|
|
||||||
rawDesc = strings.TrimSpace(parsed.Weather[0].Description)
|
|
||||||
icon = strings.TrimSpace(parsed.Weather[0].Icon)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Day/night inference:
|
|
||||||
// - Prefer icon suffix if present ("d" or "n")
|
|
||||||
// - Else fall back to sunrise/sunset bounds
|
|
||||||
var isDay *bool
|
|
||||||
if icon != "" {
|
|
||||||
last := icon[len(icon)-1]
|
|
||||||
switch last {
|
|
||||||
case 'd':
|
|
||||||
v := true
|
|
||||||
isDay = &v
|
|
||||||
case 'n':
|
|
||||||
v := false
|
|
||||||
isDay = &v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if isDay == nil && parsed.Sys.Sunrise > 0 && parsed.Sys.Sunset > 0 {
|
|
||||||
v := parsed.Dt >= parsed.Sys.Sunrise && parsed.Dt < parsed.Sys.Sunset
|
|
||||||
isDay = &v
|
|
||||||
}
|
|
||||||
|
|
||||||
// Units handling based on the request URL.
|
|
||||||
unitSystem := getUnitsFromURL(s.url)
|
|
||||||
|
|
||||||
// Temperature normalization to Celsius.
|
|
||||||
tempC := normalizeTempToC(parsed.Main.Temp, unitSystem)
|
|
||||||
|
|
||||||
// Humidity is already percent.
|
|
||||||
rh := parsed.Main.Humidity
|
|
||||||
|
|
||||||
// Pressure hPa -> Pa
|
|
||||||
surfacePa := parsed.Main.Pressure * 100.0
|
|
||||||
var seaLevelPa *float64
|
|
||||||
if parsed.Main.SeaLevel != nil {
|
|
||||||
v := (*parsed.Main.SeaLevel) * 100.0
|
|
||||||
seaLevelPa = &v
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wind speed normalization to km/h
|
|
||||||
wsKmh := normalizeSpeedToKmh(parsed.Wind.Speed, unitSystem)
|
|
||||||
|
|
||||||
var wgKmh *float64
|
|
||||||
if parsed.Wind.Gust != nil {
|
|
||||||
v := normalizeSpeedToKmh(*parsed.Wind.Gust, unitSystem)
|
|
||||||
wgKmh = &v
|
|
||||||
}
|
|
||||||
|
|
||||||
// Visibility in meters (already matches our model)
|
|
||||||
var visM *float64
|
|
||||||
if parsed.Visibility != nil {
|
|
||||||
v := *parsed.Visibility
|
|
||||||
visM = &v
|
|
||||||
}
|
|
||||||
|
|
||||||
// Map OpenWeather condition IDs -> canonical WMO code (our internal vocabulary).
|
|
||||||
wmo := mapOpenWeatherToWMO(owmID)
|
|
||||||
|
|
||||||
// Canonical text from our shared table.
|
|
||||||
canonicalText := standards.WMOText(wmo, isDay)
|
|
||||||
|
|
||||||
// Icon URL (optional).
|
|
||||||
iconURL := ""
|
|
||||||
if icon != "" {
|
|
||||||
iconURL = fmt.Sprintf("https://openweathermap.org/img/wn/%s@2x.png", icon)
|
|
||||||
}
|
|
||||||
|
|
||||||
stationID := ""
|
|
||||||
if parsed.ID != 0 {
|
|
||||||
stationID = fmt.Sprintf("OPENWEATHER(%d)", parsed.ID)
|
|
||||||
} else {
|
|
||||||
stationID = fmt.Sprintf("OPENWEATHER(%.5f,%.5f)", parsed.Coord.Lat, parsed.Coord.Lon)
|
|
||||||
}
|
|
||||||
|
|
||||||
stationName := strings.TrimSpace(parsed.Name)
|
|
||||||
if stationName == "" {
|
|
||||||
stationName = "OpenWeatherMap"
|
|
||||||
}
|
|
||||||
|
|
||||||
obs := model.WeatherObservation{
|
|
||||||
StationID: stationID,
|
|
||||||
StationName: stationName,
|
|
||||||
Timestamp: ts,
|
|
||||||
|
|
||||||
// Canonical internal representation
|
|
||||||
ConditionCode: wmo,
|
|
||||||
ConditionText: canonicalText,
|
|
||||||
IsDay: isDay,
|
|
||||||
|
|
||||||
// Provider evidence for troubleshooting mappings
|
|
||||||
ProviderRawDescription: rawDesc,
|
|
||||||
|
|
||||||
// Human-facing legacy fields: we populate with canonical text for consistency
|
|
||||||
TextDescription: canonicalText,
|
|
||||||
IconURL: iconURL,
|
|
||||||
|
|
||||||
TemperatureC: &tempC,
|
|
||||||
|
|
||||||
WindDirectionDegrees: parsed.Wind.Deg,
|
|
||||||
WindSpeedKmh: &wsKmh,
|
|
||||||
WindGustKmh: wgKmh,
|
|
||||||
|
|
||||||
BarometricPressurePa: &surfacePa,
|
|
||||||
SeaLevelPressurePa: seaLevelPa,
|
|
||||||
VisibilityMeters: visM,
|
|
||||||
|
|
||||||
RelativeHumidityPercent: &rh,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stable event ID: key by source + timestamp.
|
|
||||||
eventID := fmt.Sprintf("openweather:%s:%s", s.name, obs.Timestamp.UTC().Format(time.RFC3339Nano))
|
|
||||||
|
|
||||||
return obs, eventID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getUnitsFromURL(raw string) string {
|
|
||||||
u, err := url.Parse(raw)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "standard"
|
return nil, openWeatherMeta{}, err
|
||||||
}
|
}
|
||||||
q := u.Query()
|
if len(b) == 0 {
|
||||||
units := strings.TrimSpace(strings.ToLower(q.Get("units")))
|
return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: empty response body", s.name)
|
||||||
if units == "" {
|
|
||||||
return "standard"
|
|
||||||
}
|
}
|
||||||
switch units {
|
|
||||||
case "standard", "metric", "imperial":
|
raw := json.RawMessage(b)
|
||||||
return units
|
|
||||||
default:
|
var meta openWeatherMeta
|
||||||
return "standard"
|
if err := json.Unmarshal(b, &meta); err != nil {
|
||||||
|
// If metadata decode fails, still return raw; envelope will fall back to computed ID.
|
||||||
|
return raw, openWeatherMeta{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if meta.Dt > 0 {
|
||||||
|
meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
return raw, meta, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func normalizeTempToC(v float64, unitSystem string) float64 {
|
func buildEventID(sourceName string, meta openWeatherMeta) string {
|
||||||
switch unitSystem {
|
// Prefer provider city ID if present; otherwise fall back to lat/lon.
|
||||||
case "metric":
|
locKey := ""
|
||||||
// Already °C
|
if meta.ID != 0 {
|
||||||
return v
|
locKey = fmt.Sprintf("city:%d", meta.ID)
|
||||||
case "imperial":
|
} else if meta.Coord.Lat != 0 || meta.Coord.Lon != 0 {
|
||||||
// °F -> °C
|
locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Coord.Lat, meta.Coord.Lon)
|
||||||
return (v - 32.0) * 5.0 / 9.0
|
} else {
|
||||||
default:
|
locKey = "loc:unknown"
|
||||||
// "standard" => Kelvin -> °C
|
|
||||||
return v - 273.15
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ts := meta.ParsedTimestamp
|
||||||
|
if ts.IsZero() {
|
||||||
|
// We prefer stable IDs, but if the payload didn't decode, use "now" so we still emit.
|
||||||
|
ts = time.Now().UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Example:
|
||||||
|
// openweather:current:<configured-source-name>:city:12345:2026-01-14T17:00:00.123Z
|
||||||
|
return fmt.Sprintf("openweather:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano))
|
||||||
}
|
}
|
||||||
|
|
||||||
func normalizeSpeedToKmh(v float64, unitSystem string) float64 {
|
// requireMetricUnits enforces weatherfeeder's OpenWeather unit policy.
|
||||||
switch unitSystem {
|
|
||||||
case "imperial":
|
|
||||||
// mph -> km/h
|
|
||||||
return v * 1.609344
|
|
||||||
default:
|
|
||||||
// m/s -> km/h
|
|
||||||
return v * 3.6
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into your internal WMO code vocabulary.
|
|
||||||
//
|
//
|
||||||
// This is an approximate semantic mapping between two different code systems.
|
// OpenWeather does not tell us the unit system in the response body. We therefore enforce that
|
||||||
// Your current canonical WMO table is intentionally small and text-focused,
|
// the request URL explicitly contains units=metric; otherwise normalization would be ambiguous.
|
||||||
// so we map into that set (0/1/2/3/45/48/51/.../99) conservatively.
|
func requireMetricUnits(rawURL string) error {
|
||||||
func mapOpenWeatherToWMO(owmID int) model.WMOCode {
|
u, err := url.Parse(strings.TrimSpace(rawURL))
|
||||||
switch {
|
if err != nil {
|
||||||
// 2xx Thunderstorm
|
return fmt.Errorf("invalid url %q: %w", rawURL, err)
|
||||||
case owmID >= 200 && owmID <= 232:
|
|
||||||
return model.WMOCode(95)
|
|
||||||
|
|
||||||
// 3xx Drizzle
|
|
||||||
case owmID >= 300 && owmID <= 321:
|
|
||||||
if owmID == 300 {
|
|
||||||
return model.WMOCode(51)
|
|
||||||
}
|
|
||||||
if owmID == 302 {
|
|
||||||
return model.WMOCode(55)
|
|
||||||
}
|
|
||||||
return model.WMOCode(53)
|
|
||||||
|
|
||||||
// 5xx Rain
|
|
||||||
case owmID >= 500 && owmID <= 531:
|
|
||||||
// 511 is "freezing rain"
|
|
||||||
if owmID == 511 {
|
|
||||||
return model.WMOCode(67)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// showers bucket (520-531)
|
units := strings.ToLower(strings.TrimSpace(u.Query().Get("units")))
|
||||||
if owmID >= 520 && owmID <= 531 {
|
if units != "metric" {
|
||||||
if owmID == 520 {
|
// Treat missing units ("" -> standard) as non-compliant too.
|
||||||
return model.WMOCode(80)
|
if units == "" {
|
||||||
|
units = "(missing; defaults to standard)"
|
||||||
}
|
}
|
||||||
if owmID == 522 {
|
return fmt.Errorf("url must include units=metric (got units=%s)", units)
|
||||||
return model.WMOCode(82)
|
|
||||||
}
|
|
||||||
return model.WMOCode(81)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// normal rain intensity
|
return nil
|
||||||
if owmID == 500 {
|
|
||||||
return model.WMOCode(61)
|
|
||||||
}
|
|
||||||
if owmID == 501 {
|
|
||||||
return model.WMOCode(63)
|
|
||||||
}
|
|
||||||
if owmID >= 502 && owmID <= 504 {
|
|
||||||
return model.WMOCode(65)
|
|
||||||
}
|
|
||||||
return model.WMOCode(63)
|
|
||||||
|
|
||||||
// 6xx Snow
|
|
||||||
case owmID >= 600 && owmID <= 622:
|
|
||||||
if owmID == 600 {
|
|
||||||
return model.WMOCode(71)
|
|
||||||
}
|
|
||||||
if owmID == 601 {
|
|
||||||
return model.WMOCode(73)
|
|
||||||
}
|
|
||||||
if owmID == 602 {
|
|
||||||
return model.WMOCode(75)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snow showers bucket (620-622)
|
|
||||||
if owmID == 620 {
|
|
||||||
return model.WMOCode(85)
|
|
||||||
}
|
|
||||||
if owmID == 621 || owmID == 622 {
|
|
||||||
return model.WMOCode(86)
|
|
||||||
}
|
|
||||||
|
|
||||||
return model.WMOCode(73)
|
|
||||||
|
|
||||||
// 7xx Atmosphere (mist/smoke/haze/dust/fog/etc.)
|
|
||||||
case owmID >= 701 && owmID <= 781:
|
|
||||||
return model.WMOCode(45)
|
|
||||||
|
|
||||||
// 800 Clear
|
|
||||||
case owmID == 800:
|
|
||||||
return model.WMOCode(0)
|
|
||||||
|
|
||||||
// 80x Clouds
|
|
||||||
case owmID == 801:
|
|
||||||
return model.WMOCode(1)
|
|
||||||
case owmID == 802:
|
|
||||||
return model.WMOCode(2)
|
|
||||||
case owmID == 803 || owmID == 804:
|
|
||||||
return model.WMOCode(3)
|
|
||||||
|
|
||||||
default:
|
|
||||||
return model.WMOUnknown
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user