Files
weatherfeeder/internal/sources/openmeteo/observation.go

222 lines
7.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// FILE: ./internal/sources/openmeteo/observation.go
package openmeteo
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
)
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
//
// Refactor (mirrors NWS/OpenWeather):
// - Source responsibility: fetch bytes + emit a valid event envelope
// - Normalizer responsibility: decode JSON + map to canonical model.WeatherObservation
//
// Typical URL shape (you provide this via config):
//
// https://api.open-meteo.com/v1/forecast?latitude=...&longitude=...&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,wind_gusts_10m,surface_pressure,pressure_msl&timezone=GMT
type ObservationSource struct {
name string
url string
userAgent string
client *http.Client
}
func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) {
if strings.TrimSpace(cfg.Name) == "" {
return nil, fmt.Errorf("openmeteo_observation: name is required")
}
if cfg.Params == nil {
return nil, fmt.Errorf("openmeteo_observation %q: params are required (need params.url)", cfg.Name)
}
url, ok := cfg.ParamString("url", "URL")
if !ok {
return nil, fmt.Errorf("openmeteo_observation %q: params.url is required", cfg.Name)
}
// Open-Meteo doesn't require a special User-Agent, but including one is polite.
ua := cfg.ParamStringDefault("weatherfeeder (open-meteo client)", "user_agent", "userAgent")
return &ObservationSource{
name: cfg.Name,
url: url,
userAgent: ua,
client: &http.Client{
Timeout: 10 * time.Second,
},
}, nil
}
func (s *ObservationSource) Name() string { return s.name }
// Kind is used for routing/policy.
// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical.
func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") }
// Poll fetches Open-Meteo "current" and emits exactly one RAW Event.
// The RAW payload is json.RawMessage and Schema is standards.SchemaRawOpenMeteoCurrentV1.
func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) {
raw, meta, err := s.fetchRaw(ctx)
if err != nil {
return nil, err
}
eventID := buildEventID(s.name, meta)
if strings.TrimSpace(eventID) == "" {
// Extremely defensive fallback: keep the envelope valid no matter what.
eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.name, time.Now().UTC().Format(time.RFC3339Nano))
}
// EffectiveAt is optional; for observations its naturally the observation timestamp.
var effectiveAt *time.Time
if !meta.ParsedTimestamp.IsZero() {
t := meta.ParsedTimestamp.UTC()
effectiveAt = &t
}
e := event.Event{
ID: eventID,
Kind: s.Kind(),
Source: s.name,
EmittedAt: time.Now().UTC(),
EffectiveAt: effectiveAt,
// RAW schema (normalizer matches on this).
Schema: standards.SchemaRawOpenMeteoCurrentV1,
// Raw JSON; normalizer will decode and map to canonical model.WeatherObservation.
Payload: raw,
}
if err := e.Validate(); err != nil {
return nil, err
}
return []event.Event{e}, nil
}
// ---- RAW fetch + minimal metadata decode ----
// openMeteoMeta is a *minimal* decode of the Open-Meteo payload used only to build
// a stable Event.ID and a useful EffectiveAt for the envelope.
//
// Important: this is NOT where we map into internal/model. That belongs in the normalizer.
type openMeteoMeta struct {
Latitude float64 `json:"latitude"`
Longitude float64 `json:"longitude"`
Timezone string `json:"timezone"`
UTCOffsetSeconds int `json:"utc_offset_seconds"`
Current struct {
Time string `json:"time"` // e.g. "2026-01-10T12:30" (often no timezone suffix)
} `json:"current"`
// Convenience fields populated after decode.
ParsedTimestamp time.Time `json:"-"`
}
func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, openMeteoMeta, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
if err != nil {
return nil, openMeteoMeta{}, err
}
req.Header.Set("User-Agent", s.userAgent)
req.Header.Set("Accept", "application/json")
res, err := s.client.Do(req)
if err != nil {
return nil, openMeteoMeta{}, err
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode >= 300 {
return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: HTTP %s", s.name, res.Status)
}
b, err := io.ReadAll(res.Body)
if err != nil {
return nil, openMeteoMeta{}, err
}
if len(b) == 0 {
return nil, openMeteoMeta{}, fmt.Errorf("openmeteo_observation %q: empty response body", s.name)
}
raw := json.RawMessage(b)
var meta openMeteoMeta
if err := json.Unmarshal(b, &meta); err != nil {
// If metadata decode fails, still return raw; envelope will fall back to computed ID without EffectiveAt.
return raw, openMeteoMeta{}, nil
}
// Best-effort parse of current.time so the envelope carries a meaningful EffectiveAt.
// This duplicates the parsing logic in the normalizer, but ONLY for envelope metadata.
if t, err := parseOpenMeteoTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil {
meta.ParsedTimestamp = t.UTC()
}
return raw, meta, nil
}
func buildEventID(sourceName string, meta openMeteoMeta) string {
// Prefer stable location key from lat/lon if present.
locKey := ""
if meta.Latitude != 0 || meta.Longitude != 0 {
locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Latitude, meta.Longitude)
} else {
locKey = "loc:unknown"
}
ts := meta.ParsedTimestamp
if ts.IsZero() {
// If we couldn't parse current.time, use "now" so we still emit.
ts = time.Now().UTC()
}
// Example:
// openmeteo:current:<configured-source-name>:coord:38.62390,-90.35710:2026-01-14T18:00:00.000Z
return fmt.Sprintf("openmeteo:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano))
}
// parseOpenMeteoTime parses Open-Meteo "current.time" using timezone/offset hints.
//
// Open-Meteo commonly returns "YYYY-MM-DDTHH:MM" (no timezone suffix) when timezone
// is provided separately. When a timezone suffix is present (RFC3339), we accept it too.
func parseOpenMeteoTime(s string, tz string, utcOffsetSeconds int) (time.Time, error) {
s = strings.TrimSpace(s)
if s == "" {
return time.Time{}, fmt.Errorf("empty time")
}
// If the server returned an RFC3339 timestamp with timezone, take it as authoritative.
if t, err := time.Parse(time.RFC3339, s); err == nil {
return t, nil
}
// Typical Open-Meteo format: "2006-01-02T15:04"
const layout = "2006-01-02T15:04"
// Best effort: try to load the timezone as an IANA name.
// Examples Open-Meteo might return: "GMT", "America/Chicago".
if tz != "" {
if loc, err := time.LoadLocation(tz); err == nil {
return time.ParseInLocation(layout, s, loc)
}
}
// Fallback: use the offset seconds to create a fixed zone. (If offset is 0, this is UTC.)
loc := time.FixedZone("open-meteo", utcOffsetSeconds)
return time.ParseInLocation(layout, s, loc)
}