From efc44e8c6a826f053d83b163785155fd95da047d Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Wed, 14 Jan 2026 10:35:16 -0600 Subject: [PATCH] normalizers: added a structure for normalizers; refactoring sources -> sources+normalizers is still todo. --- cmd/weatherfeeder/main.go | 16 ++- internal/normalizers/builtins.go | 37 +++++ internal/normalizers/doc.go | 141 +++++++++++++++++++ internal/normalizers/nws/register.go | 21 +++ internal/normalizers/openmeteo/register.go | 19 +++ internal/normalizers/openweather/register.go | 19 +++ internal/standards/schema.go | 20 +++ 7 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 internal/normalizers/builtins.go create mode 100644 internal/normalizers/doc.go create mode 100644 internal/normalizers/nws/register.go create mode 100644 internal/normalizers/openmeteo/register.go create mode 100644 internal/normalizers/openweather/register.go create mode 100644 internal/standards/schema.go diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go index 74044a7..0e39fac 100644 --- a/cmd/weatherfeeder/main.go +++ b/cmd/weatherfeeder/main.go @@ -14,11 +14,13 @@ import ( "gitea.maximumdirect.net/ejr/feedkit/config" fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch" fkevent "gitea.maximumdirect.net/ejr/feedkit/event" + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksources "gitea.maximumdirect.net/ejr/feedkit/sources" + wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers" wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources" ) @@ -90,8 +92,20 @@ func main() { events := make(chan fkevent.Event, 256) + // --- Normalization (optional) --- + // + // We install feedkit's normalize.Processor even before any normalizers exist. + // With an empty registry and RequireMatch=false, this is a no-op passthrough. + // It will begin transforming events as soon as: + // 1) sources emit raw schemas (raw.*), and + // 2) matching normalizers are registered. + normReg := &fknormalize.Registry{} + wfnormalizers.RegisterBuiltins(normReg) + pl := &fkpipeline.Pipeline{ - Processors: nil, + Processors: []fkpipeline.Processor{ + fknormalize.Processor{Registry: normReg}, + }, } s := &fkscheduler.Scheduler{ diff --git a/internal/normalizers/builtins.go b/internal/normalizers/builtins.go new file mode 100644 index 0000000..05079c9 --- /dev/null +++ b/internal/normalizers/builtins.go @@ -0,0 +1,37 @@ +// FILE: ./internal/normalizers/builtins.go +package normalizers + +import ( + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" + + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo" + "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather" +) + +// RegisterBuiltins registers all normalizers shipped with this binary. +// +// This mirrors internal/sources.RegisterBuiltins, but note the selection model: +// +// - sources are built by name (cfg.Driver -> factory) +// - normalizers are selected by Match() (event.Schema -> first match wins) +// +// Registration order matters because feedkit normalize.Registry is “first match wins”. +// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but +// we still keep ordering stable as a best practice. +// +// If reg is nil, this function is a no-op. +func RegisterBuiltins(reg *fknormalize.Registry) { + if reg == nil { + return + } + + // Keep this intentionally boring: delegate registration to provider subpackages + // so main.go stays clean and each provider owns its own mapping logic. + // + // Order here should be stable across releases to reduce surprises when adding + // new normalizers. + nws.Register(reg) + openmeteo.Register(reg) + openweather.Register(reg) +} diff --git a/internal/normalizers/doc.go b/internal/normalizers/doc.go new file mode 100644 index 0000000..0f92748 --- /dev/null +++ b/internal/normalizers/doc.go @@ -0,0 +1,141 @@ +// Package normalizers defines weatherfeeder’s **prescriptive** conventions for +// writing feedkit normalizers and provides the recommended project layout. +// +// Summary +// ------- +// weatherfeeder ingests multiple upstream providers whose payloads differ. +// Sources should focus on polling/fetching. Normalizers should focus on +// transforming provider-specific raw payloads into canonical internal models. +// +// This package is domain code (weatherfeeder). feedkit’s normalize package is +// infrastructure (registry + processor). +// +// Directory layout (required) +// --------------------------- +// Normalizers are organized by provider: +// +// internal/normalizers// +// +// Example: +// +// internal/normalizers/nws/observation.go +// internal/normalizers/nws/common.go +// internal/normalizers/openweather/observation.go +// internal/normalizers/openmeteo/observation.go +// internal/normalizers/common/units.go +// +// Rules: +// +// 1. One normalizer per file. +// Each file contains exactly one Normalizer implementation (one type). +// +// 2. Provider-level shared helpers live in: +// internal/normalizers//common.go +// +// 3. Cross-provider helpers live in: +// internal/normalizers/common/ +// +// 4. Matching is standardized on Event.Schema. +// (Do not match on event.Source or event.Kind in weatherfeeder normalizers.) +// +// Schema conventions (required) +// ----------------------------- +// Sources emit RAW events with provider-specific schemas. +// Normalizers convert RAW -> CANONICAL schemas. +// +// Raw schemas: +// +// raw...vN +// +// Canonical schemas: +// +// weather..vN +// +// weatherfeeder centralizes schema strings in internal/standards/schemas.go. +// Always use those constants (do not inline schema strings). +// +// Example mappings: +// +// standards.SchemaRawOpenWeatherCurrentV1 -> standards.SchemaWeatherObservationV1 +// standards.SchemaRawOpenMeteoCurrentV1 -> standards.SchemaWeatherObservationV1 +// standards.SchemaRawNWSObservationV1 -> standards.SchemaWeatherObservationV1 +// +// Normalizer structure (required template) +// ---------------------------------------- +// Each normalizer file must follow this structure (with helpful comments): +// +// type OpenWeatherObservationNormalizer struct{} +// +// func (OpenWeatherObservationNormalizer) Match(e event.Event) bool { ... } +// +// func (OpenWeatherObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) { +// // 1) Decode raw payload (recommended: json.RawMessage) +// // 2) Parse into provider structs +// // 3) Map provider -> canonical internal/model types +// // 4) Build output event (copy input, modify intentionally) +// // 5) Set EffectiveAt if applicable +// // 6) Validate out.Validate() +// } +// +// Required doc comment content +// ---------------------------- +// Every normalizer type must have a doc comment that states: +// +// - what it converts (e.g., “OpenWeather current -> WeatherObservation”) +// - which raw schema it matches (constant name + value) +// - which canonical schema it produces (constant name + value) +// - any special caveats (units, day/night inference, missing fields, etc.) +// +// Event field handling (strong defaults) +// -------------------------------------- +// Normalizers should treat the incoming event envelope as stable identity and +// should only change fields intentionally. +// +// Default behavior: +// +// - Keep: ID, Kind, Source, EmittedAt +// - Set: Schema to the canonical schema +// - Set: Payload to the canonical payload (internal/model/*) +// - Optional: EffectiveAt (often derived from observation timestamp) +// - Avoid changing Kind unless you have a clear “raw kind vs canonical kind” design. +// +// Always validate the output event: +// +// if err := out.Validate(); err != nil { ... } +// +// Payload representation for RAW events +// ------------------------------------- +// weatherfeeder recommends RAW payloads be stored as json.RawMessage for JSON APIs. +// This keeps sources small and defers schema-specific decoding to normalizers. +// +// If a source already decodes into typed provider structs, it can still emit the +// raw event; it should simply re-marshal to json.RawMessage (or better: decode +// once in the normalizer instead to keep “fetch” separate from “transform”). +// +// Registration pattern +// -------------------- +// feedkit normalization uses a match-driven registry (“first match wins”). +// +// Provider subpackages should expose: +// +// func Register(reg *normalize.Registry) +// +// And internal/normalizers/builtins.go should provide one entrypoint: +// +// func RegisterBuiltins(reg *normalize.Registry) +// +// which calls each provider’s Register() in a stable order. +// +// Testing guidance (recommended) +// ------------------------------ +// Add a unit test per normalizer: +// +// internal/normalizers/openweather/observation_test.go +// +// Tests should: +// +// - build a RAW event with Schema=standards.SchemaRaw... and Payload=json.RawMessage +// - run the normalizer +// - assert canonical Schema + key payload fields + EffectiveAt +// - assert out.Validate() passes +package normalizers diff --git a/internal/normalizers/nws/register.go b/internal/normalizers/nws/register.go new file mode 100644 index 0000000..64c31fa --- /dev/null +++ b/internal/normalizers/nws/register.go @@ -0,0 +1,21 @@ +package nws + +import ( + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" +) + +// Register registers NWS normalizers into the provided registry. +// +// This is intentionally empty as a stub. As normalizers are implemented, +// register them here, e.g.: +// +// reg.Register(ObservationNormalizer{}) +// reg.Register(ForecastNormalizer{}) +// reg.Register(AlertsNormalizer{}) +func Register(reg *fknormalize.Registry) { + if reg == nil { + return + } + + // TODO: register NWS normalizers here. +} diff --git a/internal/normalizers/openmeteo/register.go b/internal/normalizers/openmeteo/register.go new file mode 100644 index 0000000..4401ae2 --- /dev/null +++ b/internal/normalizers/openmeteo/register.go @@ -0,0 +1,19 @@ +package openmeteo + +import ( + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" +) + +// Register registers Open-Meteo normalizers into the provided registry. +// +// This is intentionally empty as a stub. As normalizers are implemented, +// register them here, e.g.: +// +// reg.Register(ObservationNormalizer{}) +func Register(reg *fknormalize.Registry) { + if reg == nil { + return + } + + // TODO: register Open-Meteo normalizers here. +} diff --git a/internal/normalizers/openweather/register.go b/internal/normalizers/openweather/register.go new file mode 100644 index 0000000..346cf3b --- /dev/null +++ b/internal/normalizers/openweather/register.go @@ -0,0 +1,19 @@ +package openweather + +import ( + fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" +) + +// Register registers OpenWeather normalizers into the provided registry. +// +// This is intentionally empty as a stub. As normalizers are implemented, +// register them here, e.g.: +// +// reg.Register(ObservationNormalizer{}) +func Register(reg *fknormalize.Registry) { + if reg == nil { + return + } + + // TODO: register OpenWeather normalizers here. +} diff --git a/internal/standards/schema.go b/internal/standards/schema.go new file mode 100644 index 0000000..fe86c38 --- /dev/null +++ b/internal/standards/schema.go @@ -0,0 +1,20 @@ +package standards + +// Schema strings used by weatherfeeder. +// +// We standardize on schema matching for normalizers (rather than matching on +// source names or kinds) because schema strings are explicit, versionable, and +// independent of user configuration. +// +// Conventions: +// - Raw upstream payloads: "raw...vN" +// - Canonical domain events: "weather..vN" +const ( + // Raw upstream schemas (emitted by sources). + SchemaRawNWSObservationV1 = "raw.nws.observation.v1" + SchemaRawOpenMeteoCurrentV1 = "raw.openmeteo.current.v1" + SchemaRawOpenWeatherCurrentV1 = "raw.openweather.current.v1" + + // Canonical domain schemas (emitted after normalization). + SchemaWeatherObservationV1 = "weather.observation.v1" +)