- Adopt an opinionated Event.ID policy across sources: - use upstream-provided ID when available - otherwise derive a stable ID from Source:EffectiveAt (RFC3339Nano, UTC) - fall back to Source:EmittedAt when EffectiveAt is unavailable - Add common/id helper to centralize ID selection logic and keep sources consistent - Simplify common event construction by collapsing SingleRawEventAt/SingleRawEvent into a single explicit SingleRawEvent helper (emittedAt passed in) - Update NWS/Open-Meteo/OpenWeather observation sources to: - compute EffectiveAt first - generate IDs via the shared helper - build envelopes via the unified SingleRawEvent helper - Improve determinism and dedupe-friendliness without changing schemas or payloads
40 lines
1.0 KiB
Go
40 lines
1.0 KiB
Go
// FILE: ./internal/sources/common/id.go
|
|
package common
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// ChooseEventID applies weatherfeeder's opinionated Event.ID policy:
|
|
//
|
|
// - If upstream provides an ID, use it (trimmed).
|
|
// - Otherwise, ID is "<Source>:<EffectiveAt>" when available.
|
|
// - If EffectiveAt is unavailable, fall back to "<Source>:<EmittedAt>".
|
|
//
|
|
// Timestamps are encoded as RFC3339Nano in UTC.
|
|
func ChooseEventID(upstreamID, sourceName string, effectiveAt *time.Time, emittedAt time.Time) string {
|
|
if id := strings.TrimSpace(upstreamID); id != "" {
|
|
return id
|
|
}
|
|
|
|
src := strings.TrimSpace(sourceName)
|
|
if src == "" {
|
|
src = "UNKNOWN_SOURCE"
|
|
}
|
|
|
|
// Prefer EffectiveAt for dedupe friendliness.
|
|
if effectiveAt != nil && !effectiveAt.IsZero() {
|
|
return fmt.Sprintf("%s:%s", src, effectiveAt.UTC().Format(time.RFC3339Nano))
|
|
}
|
|
|
|
// Fall back to EmittedAt (still stable within a poll invocation).
|
|
t := emittedAt.UTC()
|
|
if t.IsZero() {
|
|
t = time.Now().UTC()
|
|
}
|
|
|
|
return fmt.Sprintf("%s:%s", src, t.Format(time.RFC3339Nano))
|
|
}
|