diff --git a/internal/sources/common/event.go b/internal/sources/common/event.go index 584487d..16ae1bd 100644 --- a/internal/sources/common/event.go +++ b/internal/sources/common/event.go @@ -9,17 +9,34 @@ import ( // SingleRawEvent constructs, validates, and returns a slice containing exactly one event. // -// This removes the repetitive "event envelope ceremony" from individual sources. +// This removes repetitive "event envelope ceremony" from individual sources. // Sources remain responsible for: // - fetching bytes (raw payload) // - choosing Schema (raw schema identifier) -// - computing a stable Event.ID and (optional) EffectiveAt -func SingleRawEvent(kind event.Kind, sourceName string, schema string, id string, effectiveAt *time.Time, payload any) ([]event.Event, error) { +// - computing Event.ID and (optional) EffectiveAt +// +// emittedAt is explicit so callers can compute IDs using the same timestamp (or +// so tests can provide a stable value). +func SingleRawEvent( + kind event.Kind, + sourceName string, + schema string, + id string, + emittedAt time.Time, + effectiveAt *time.Time, + payload any, +) ([]event.Event, error) { + if emittedAt.IsZero() { + emittedAt = time.Now().UTC() + } else { + emittedAt = emittedAt.UTC() + } + e := event.Event{ ID: id, Kind: kind, Source: sourceName, - EmittedAt: time.Now().UTC(), + EmittedAt: emittedAt, EffectiveAt: effectiveAt, // RAW schema (normalizer matches on this). diff --git a/internal/sources/common/id.go b/internal/sources/common/id.go new file mode 100644 index 0000000..2bfd5ac --- /dev/null +++ b/internal/sources/common/id.go @@ -0,0 +1,39 @@ +// FILE: ./internal/sources/common/id.go +package common + +import ( + "fmt" + "strings" + "time" +) + +// ChooseEventID applies weatherfeeder's opinionated Event.ID policy: +// +// - If upstream provides an ID, use it (trimmed). +// - Otherwise, ID is ":" when available. +// - If EffectiveAt is unavailable, fall back to ":". +// +// Timestamps are encoded as RFC3339Nano in UTC. +func ChooseEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string { + if id := strings.TrimSpace(upstreamID); id != "" { + return id + } + + src := strings.TrimSpace(sourceName) + if src == "" { + src = "UNKNOWN_SOURCE" + } + + // Prefer EffectiveAt for dedupe friendliness. + if effectiveAt != nil && !effectiveAt.IsZero() { + return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano)) + } + + // Fall back to EmittedAt (still stable within a poll invocation). + t := emittedAt.UTC() + if t.IsZero() { + t = time.Now().UTC() + } + + return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano)) +} diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index ce20c9c..d9625ce 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -4,7 +4,6 @@ package nws import ( "context" "encoding/json" - "fmt" "strings" "time" @@ -15,10 +14,6 @@ import ( ) // ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event. -// -// This corresponds to URLs like: -// -// https://api.weather.gov/stations/KSTL/observations/latest type ObservationSource struct { http *common.HTTPSource } @@ -36,35 +31,14 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { func (s *ObservationSource) Name() string { return s.http.Name } -// Kind is used for routing/policy. -// We keep Kind canonical (observation) even for raw events; Schema differentiates raw vs canonical. func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } -// Poll fetches NWS "latest observation" and emits exactly one RAW Event. -// The RAW payload is json.RawMessage and Schema is standards.SchemaRawNWSObservationV1. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } - // Event.ID must be set BEFORE normalization (feedkit requires it). - // Prefer NWS-provided "id" (stable URL). Fallback to a stable computed key. - eventID := strings.TrimSpace(meta.ID) - if eventID == "" { - ts := meta.ParsedTimestamp - if ts.IsZero() { - ts = time.Now().UTC() - } - - station := strings.TrimSpace(meta.StationID) - if station == "" { - station = "UNKNOWN" - } - - eventID = fmt.Sprintf("nws:observation:%s:%s:%s", s.http.Name, station, ts.UTC().Format(time.RFC3339Nano)) - } - // EffectiveAt is optional; for observations it’s naturally the observation timestamp. var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { @@ -72,11 +46,15 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { effectiveAt = &t } + emittedAt := time.Now().UTC() + eventID := common.ChooseEventID(meta.ID, s.http.Name, effectiveAt, emittedAt) + return common.SingleRawEvent( s.Kind(), s.http.Name, standards.SchemaRawNWSObservationV1, eventID, + emittedAt, effectiveAt, raw, ) @@ -84,18 +62,13 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { // ---- RAW fetch + minimal metadata decode ---- -// observationMeta is a *minimal* decode of the NWS payload used only to build -// a stable Event.ID and a useful EffectiveAt for the envelope. type observationMeta struct { ID string `json:"id"` Properties struct { - StationID string `json:"stationId"` Timestamp string `json:"timestamp"` } `json:"properties"` - // Convenience fields populated after decode. ParsedTimestamp time.Time `json:"-"` - StationID string `json:"-"` } func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, observationMeta, error) { @@ -106,12 +79,10 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, obse var meta observationMeta if err := json.Unmarshal(raw, &meta); err != nil { - // If metadata decode fails, still return raw; envelope will fall back to computed ID. + // If metadata decode fails, still return raw; envelope will fall back to Source:EffectiveAt. return raw, observationMeta{}, nil } - meta.StationID = strings.TrimSpace(meta.Properties.StationID) - tsStr := strings.TrimSpace(meta.Properties.Timestamp) if tsStr != "" { if t, err := time.Parse(time.RFC3339, tsStr); err == nil { diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index 2318d1b..268e7de 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -4,8 +4,6 @@ package openmeteo import ( "context" "encoding/json" - "fmt" - "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/config" @@ -23,8 +21,6 @@ type ObservationSource struct { func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { const driver = "openmeteo_observation" - // We require params.user_agent for uniformity across sources (even though Open-Meteo - // itself does not strictly require a special User-Agent). hs, err := common.NewHTTPSource(driver, cfg, "application/json") if err != nil { return nil, err @@ -37,30 +33,27 @@ func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } -// Poll fetches Open-Meteo "current" and emits exactly one RAW Event. func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, err := s.fetchRaw(ctx) if err != nil { return nil, err } - eventID := buildEventID(s.http.Name, meta) - if strings.TrimSpace(eventID) == "" { - // Extremely defensive fallback: keep the envelope valid no matter what. - eventID = fmt.Sprintf("openmeteo:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano)) - } - var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { t := meta.ParsedTimestamp.UTC() effectiveAt = &t } + emittedAt := time.Now().UTC() + eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + return common.SingleRawEvent( s.Kind(), s.http.Name, standards.SchemaRawOpenMeteoCurrentV1, eventID, + emittedAt, effectiveAt, raw, ) @@ -69,10 +62,8 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { // ---- RAW fetch + minimal metadata decode ---- type openMeteoMeta struct { - Latitude float64 `json:"latitude"` - Longitude float64 `json:"longitude"` - Timezone string `json:"timezone"` - UTCOffsetSeconds int `json:"utc_offset_seconds"` + Timezone string `json:"timezone"` + UTCOffsetSeconds int `json:"utc_offset_seconds"` Current struct { Time string `json:"time"` @@ -89,31 +80,13 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open var meta openMeteoMeta if err := json.Unmarshal(raw, &meta); err != nil { - // If metadata decode fails, still return raw; envelope will fall back to computed ID without EffectiveAt. + // If metadata decode fails, still return raw; envelope will omit EffectiveAt. return raw, openMeteoMeta{}, nil } - // Best effort: compute a stable EffectiveAt + event ID component. - // If parsing fails, we simply omit EffectiveAt and fall back to time.Now() in buildEventID. if t, err := openmeteo.ParseTime(meta.Current.Time, meta.Timezone, meta.UTCOffsetSeconds); err == nil { meta.ParsedTimestamp = t.UTC() } return raw, meta, nil } - -func buildEventID(sourceName string, meta openMeteoMeta) string { - locKey := "" - if meta.Latitude != 0 || meta.Longitude != 0 { - locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Latitude, meta.Longitude) - } else { - locKey = "loc:unknown" - } - - ts := meta.ParsedTimestamp - if ts.IsZero() { - ts = time.Now().UTC() - } - - return fmt.Sprintf("openmeteo:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) -} diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 998eca4..6ec3b96 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -15,12 +15,6 @@ import ( "gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" ) -// ObservationSource polls the OpenWeatherMap "Current weather" endpoint and emits a RAW observation Event. -// -// IMPORTANT UNIT POLICY (weatherfeeder convention): -// OpenWeather changes units based on the `units` query parameter but does NOT include the unit -// system in the response body. To keep normalization deterministic, this driver *requires* -// `units=metric`. If absent (or non-metric), the driver returns an error. type ObservationSource struct { http *common.HTTPSource } @@ -45,7 +39,6 @@ func (s *ObservationSource) Name() string { return s.http.Name } func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { - // Re-check policy defensively (in case the URL is mutated after construction). if err := requireMetricUnits(s.http.URL); err != nil { return nil, fmt.Errorf("%s %q: %w", s.http.Driver, s.http.Name, err) } @@ -55,22 +48,21 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { return nil, err } - eventID := buildEventID(s.http.Name, meta) - if strings.TrimSpace(eventID) == "" { - eventID = fmt.Sprintf("openweather:current:%s:%s", s.http.Name, time.Now().UTC().Format(time.RFC3339Nano)) - } - var effectiveAt *time.Time if !meta.ParsedTimestamp.IsZero() { t := meta.ParsedTimestamp.UTC() effectiveAt = &t } + emittedAt := time.Now().UTC() + eventID := common.ChooseEventID("", s.http.Name, effectiveAt, emittedAt) + return common.SingleRawEvent( s.Kind(), s.http.Name, standards.SchemaRawOpenWeatherCurrentV1, eventID, + emittedAt, effectiveAt, raw, ) @@ -81,14 +73,6 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { type openWeatherMeta struct { Dt int64 `json:"dt"` // unix seconds, UTC - ID int64 `json:"id"` - Name string `json:"name"` - - Coord struct { - Lon float64 `json:"lon"` - Lat float64 `json:"lat"` - } `json:"coord"` - ParsedTimestamp time.Time `json:"-"` } @@ -110,24 +94,6 @@ func (s *ObservationSource) fetchRaw(ctx context.Context) (json.RawMessage, open return raw, meta, nil } -func buildEventID(sourceName string, meta openWeatherMeta) string { - locKey := "" - if meta.ID != 0 { - locKey = fmt.Sprintf("city:%d", meta.ID) - } else if meta.Coord.Lat != 0 || meta.Coord.Lon != 0 { - locKey = fmt.Sprintf("coord:%.5f,%.5f", meta.Coord.Lat, meta.Coord.Lon) - } else { - locKey = "loc:unknown" - } - - ts := meta.ParsedTimestamp - if ts.IsZero() { - ts = time.Now().UTC() - } - - return fmt.Sprintf("openweather:current:%s:%s:%s", sourceName, locKey, ts.Format(time.RFC3339Nano)) -} - func requireMetricUnits(rawURL string) error { u, err := url.Parse(strings.TrimSpace(rawURL)) if err != nil {