// FILE: ./internal/sources/openweather/observation.go package openweather import ( "context" "encoding/json" "fmt" "net/http" "net/url" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" "gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) // ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits a RAW observation Event. // // Refactor (mirrors NWS): // - Source responsibility: fetch bytes + emit a valid event envelope. // - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation. // // Typical URL shape (provided via config): // // https://api.openweathermap.org/data/2.5/weather?lat=...&lon=...&appid=...&units=metric // // IMPORTANT UNIT POLICY (weatherfeeder convention): // OpenWeather changes units based on the `units` query parameter but does NOT include the unit // system in the response body. To keep normalization deterministic, this driver *requires* // `units=metric`. If absent (or non-metric), the driver returns an error. type ObservationSource struct { name string url string userAgent string client *http.Client } func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { if strings.TrimSpace(cfg.Name) == "" { return nil, fmt.Errorf("openweather_observation: name is required") } if cfg.Params == nil { return nil, fmt.Errorf("openweather_observation %q: params are required (need params.url)", cfg.Name) } rawURL, ok := cfg.ParamString("url", "URL") if !ok { return nil, fmt.Errorf("openweather_observation %q: params.url is required", cfg.Name) } // Fail fast: enforce deterministic unit system. if err := requireMetricUnits(rawURL); err != nil { return nil, fmt.Errorf("openweather_observation %q: %w", cfg.Name, err) } ua := cfg.ParamStringDefault("weatherfeeder (openweather client)", "user_agent", "userAgent") return &ObservationSource{ name: cfg.Name, url: rawURL, userAgent: ua, client: &http.Client{ Timeout: 10 * time.Second, }, }, nil } func (s *ObservationSource) Name() string { return s.name } // Kind is used for routing/policy. // We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } // Poll fetches OpenWeather "current weather" and emits exactly one RAW Event. // The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenWeatherCurrentV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { // Re-check policy defensively (in case the URL is mutated after construction). if err := requireMetricUnits(s.url); err != nil { return nil, fmt.Errorf("openweather_observation %q: %w", s.name, err) } raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } eventID := buildEventID(s.name, meta) if strings.TrimSpace(eventID) == "" { // Extremely defensive fallback: should never happen, but keep the envelope valid. eventID = fmt.Sprintf("openweather:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano)) } var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { t := meta.ParsedTimestamp.UTC() effectiveAt = &t } e := event.Event{ ID: eventID, Kind: s.Kind(), Source: s.name, EmittedAt: time.Now().UTC(), EffectiveAt: effectiveAt, // RAW schema (normalizer matches on this). Schema: standards.SchemaRawOpenWeatherCurrentV1, // Raw JSON; normalizer will decode and map to canonical model.WeatherObservation. Payload: raw, } if err := e.Validate(); err != nil { return nil, err } return []event.Event{e}, nil } // ---- RAW fetch + minimal metadata decode ---- // openWeatherMeta is a *minimal* decode of the OpenWeather payload used only to build // a stable Event.ID and a useful EffectiveAt for the envelope. type openWeatherMeta struct { Dt int64 `json:"dt"` // unix seconds, UTC ID int64 `json:"id"` // city id (if present) Name string `json:"name"` // city name (optional) Coord struct { Lon float64 `json:"lon"` Lat float64 `json:"lat"` } `json:"coord"` // Convenience fields populated after decode. ParsedTimestamp time.Time `json:"-"` } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openWeatherMeta, error) { b, err := common.FetchBody(ctx, s.client, s.url, s.userAgent, "application/json") if err != nil { return nil, openWeatherMeta{}, fmt.Errorf("openweather_observation %q: %w", s.name, err) } raw := json.RawMessage(b) var meta openWeatherMeta if err := json.Unmarshal(b, &meta); err != nil { // If metadata decode fails, still return raw; envelope will fall back to computed ID. return raw, openWeatherMeta{}, nil } if meta.Dt > 0 { meta.ParsedTimestamp = time.Unix(meta.Dt, 0).UTC() } return raw, meta, nil } func buildEventID(sourceName string, meta openWeatherMeta) string { // Prefer provider city ID if present; otherwise fall back to lat/lon. locKey := "" if meta.ID != 0 { locKey = fmt.Sprintf("city:%d", meta.ID) } else if meta.Coord.Lat != 0 || meta.Coord.Lon != 0 { locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Coord.Lat, meta.Coord.Lon) } else { locKey = "loc:unknown" } ts := meta.ParsedTimestamp if ts.IsZero() { // We prefer stable IDs, but if the payload didn't decode, use "now" so we still emit. ts = time.Now().UTC() } // Example: // openweather:current::city:12345:2026-01-14T17:00:00.123Z return fmt.Sprintf("openweather:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) } // requireMetricUnits enforces weatherfeeder's OpenWeather unit policy. // // OpenWeather does not tell us the unit system in the response body. We therefore enforce that // the request URL explicitly contains units=metric; otherwise normalization would be ambiguous. func requireMetricUnits(rawURL string) error { u, err := url.Parse(strings.TrimSpace(rawURL)) if err != nil { return fmt.Errorf("invalid url %q: %w", rawURL, err) } units := strings.ToLower(strings.TrimSpace(u.Query().Get("units"))) if units != "metric" { // Treat missing units ("" -> standard) as non-compliant too. if units == "" { units = "(missing; defaults to standard)" } return fmt.Errorf("url must include units=metric (got units=%s)", units) } return nil }