normalizers: added a structure for normalizers; refactoring sources -> sources+normalizers is still todo.
This commit is contained in:
@@ -14,11 +14,13 @@ import (
|
|||||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
||||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||||
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
||||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||||
|
|
||||||
|
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||||
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -90,8 +92,20 @@ func main() {
|
|||||||
|
|
||||||
events := make(chan fkevent.Event, 256)
|
events := make(chan fkevent.Event, 256)
|
||||||
|
|
||||||
|
// --- Normalization (optional) ---
|
||||||
|
//
|
||||||
|
// We install feedkit's normalize.Processor even before any normalizers exist.
|
||||||
|
// With an empty registry and RequireMatch=false, this is a no-op passthrough.
|
||||||
|
// It will begin transforming events as soon as:
|
||||||
|
// 1) sources emit raw schemas (raw.*), and
|
||||||
|
// 2) matching normalizers are registered.
|
||||||
|
normReg := &fknormalize.Registry{}
|
||||||
|
wfnormalizers.RegisterBuiltins(normReg)
|
||||||
|
|
||||||
pl := &fkpipeline.Pipeline{
|
pl := &fkpipeline.Pipeline{
|
||||||
Processors: nil,
|
Processors: []fkpipeline.Processor{
|
||||||
|
fknormalize.Processor{Registry: normReg},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
s := &fkscheduler.Scheduler{
|
s := &fkscheduler.Scheduler{
|
||||||
|
|||||||
37
internal/normalizers/builtins.go
Normal file
37
internal/normalizers/builtins.go
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
// FILE: ./internal/normalizers/builtins.go
|
||||||
|
package normalizers
|
||||||
|
|
||||||
|
import (
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||||
|
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RegisterBuiltins registers all normalizers shipped with this binary.
|
||||||
|
//
|
||||||
|
// This mirrors internal/sources.RegisterBuiltins, but note the selection model:
|
||||||
|
//
|
||||||
|
// - sources are built by name (cfg.Driver -> factory)
|
||||||
|
// - normalizers are selected by Match() (event.Schema -> first match wins)
|
||||||
|
//
|
||||||
|
// Registration order matters because feedkit normalize.Registry is “first match wins”.
|
||||||
|
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
|
||||||
|
// we still keep ordering stable as a best practice.
|
||||||
|
//
|
||||||
|
// If reg is nil, this function is a no-op.
|
||||||
|
func RegisterBuiltins(reg *fknormalize.Registry) {
|
||||||
|
if reg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Keep this intentionally boring: delegate registration to provider subpackages
|
||||||
|
// so main.go stays clean and each provider owns its own mapping logic.
|
||||||
|
//
|
||||||
|
// Order here should be stable across releases to reduce surprises when adding
|
||||||
|
// new normalizers.
|
||||||
|
nws.Register(reg)
|
||||||
|
openmeteo.Register(reg)
|
||||||
|
openweather.Register(reg)
|
||||||
|
}
|
||||||
141
internal/normalizers/doc.go
Normal file
141
internal/normalizers/doc.go
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
// Package normalizers defines weatherfeeder’s **prescriptive** conventions for
|
||||||
|
// writing feedkit normalizers and provides the recommended project layout.
|
||||||
|
//
|
||||||
|
// Summary
|
||||||
|
// -------
|
||||||
|
// weatherfeeder ingests multiple upstream providers whose payloads differ.
|
||||||
|
// Sources should focus on polling/fetching. Normalizers should focus on
|
||||||
|
// transforming provider-specific raw payloads into canonical internal models.
|
||||||
|
//
|
||||||
|
// This package is domain code (weatherfeeder). feedkit’s normalize package is
|
||||||
|
// infrastructure (registry + processor).
|
||||||
|
//
|
||||||
|
// Directory layout (required)
|
||||||
|
// ---------------------------
|
||||||
|
// Normalizers are organized by provider:
|
||||||
|
//
|
||||||
|
// internal/normalizers/<provider>/
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// internal/normalizers/nws/observation.go
|
||||||
|
// internal/normalizers/nws/common.go
|
||||||
|
// internal/normalizers/openweather/observation.go
|
||||||
|
// internal/normalizers/openmeteo/observation.go
|
||||||
|
// internal/normalizers/common/units.go
|
||||||
|
//
|
||||||
|
// Rules:
|
||||||
|
//
|
||||||
|
// 1. One normalizer per file.
|
||||||
|
// Each file contains exactly one Normalizer implementation (one type).
|
||||||
|
//
|
||||||
|
// 2. Provider-level shared helpers live in:
|
||||||
|
// internal/normalizers/<provider>/common.go
|
||||||
|
//
|
||||||
|
// 3. Cross-provider helpers live in:
|
||||||
|
// internal/normalizers/common/
|
||||||
|
//
|
||||||
|
// 4. Matching is standardized on Event.Schema.
|
||||||
|
// (Do not match on event.Source or event.Kind in weatherfeeder normalizers.)
|
||||||
|
//
|
||||||
|
// Schema conventions (required)
|
||||||
|
// -----------------------------
|
||||||
|
// Sources emit RAW events with provider-specific schemas.
|
||||||
|
// Normalizers convert RAW -> CANONICAL schemas.
|
||||||
|
//
|
||||||
|
// Raw schemas:
|
||||||
|
//
|
||||||
|
// raw.<provider>.<thing>.vN
|
||||||
|
//
|
||||||
|
// Canonical schemas:
|
||||||
|
//
|
||||||
|
// weather.<kind>.vN
|
||||||
|
//
|
||||||
|
// weatherfeeder centralizes schema strings in internal/standards/schemas.go.
|
||||||
|
// Always use those constants (do not inline schema strings).
|
||||||
|
//
|
||||||
|
// Example mappings:
|
||||||
|
//
|
||||||
|
// standards.SchemaRawOpenWeatherCurrentV1 -> standards.SchemaWeatherObservationV1
|
||||||
|
// standards.SchemaRawOpenMeteoCurrentV1 -> standards.SchemaWeatherObservationV1
|
||||||
|
// standards.SchemaRawNWSObservationV1 -> standards.SchemaWeatherObservationV1
|
||||||
|
//
|
||||||
|
// Normalizer structure (required template)
|
||||||
|
// ----------------------------------------
|
||||||
|
// Each normalizer file must follow this structure (with helpful comments):
|
||||||
|
//
|
||||||
|
// type OpenWeatherObservationNormalizer struct{}
|
||||||
|
//
|
||||||
|
// func (OpenWeatherObservationNormalizer) Match(e event.Event) bool { ... }
|
||||||
|
//
|
||||||
|
// func (OpenWeatherObservationNormalizer) Normalize(ctx context.Context, in event.Event) (*event.Event, error) {
|
||||||
|
// // 1) Decode raw payload (recommended: json.RawMessage)
|
||||||
|
// // 2) Parse into provider structs
|
||||||
|
// // 3) Map provider -> canonical internal/model types
|
||||||
|
// // 4) Build output event (copy input, modify intentionally)
|
||||||
|
// // 5) Set EffectiveAt if applicable
|
||||||
|
// // 6) Validate out.Validate()
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// Required doc comment content
|
||||||
|
// ----------------------------
|
||||||
|
// Every normalizer type must have a doc comment that states:
|
||||||
|
//
|
||||||
|
// - what it converts (e.g., “OpenWeather current -> WeatherObservation”)
|
||||||
|
// - which raw schema it matches (constant name + value)
|
||||||
|
// - which canonical schema it produces (constant name + value)
|
||||||
|
// - any special caveats (units, day/night inference, missing fields, etc.)
|
||||||
|
//
|
||||||
|
// Event field handling (strong defaults)
|
||||||
|
// --------------------------------------
|
||||||
|
// Normalizers should treat the incoming event envelope as stable identity and
|
||||||
|
// should only change fields intentionally.
|
||||||
|
//
|
||||||
|
// Default behavior:
|
||||||
|
//
|
||||||
|
// - Keep: ID, Kind, Source, EmittedAt
|
||||||
|
// - Set: Schema to the canonical schema
|
||||||
|
// - Set: Payload to the canonical payload (internal/model/*)
|
||||||
|
// - Optional: EffectiveAt (often derived from observation timestamp)
|
||||||
|
// - Avoid changing Kind unless you have a clear “raw kind vs canonical kind” design.
|
||||||
|
//
|
||||||
|
// Always validate the output event:
|
||||||
|
//
|
||||||
|
// if err := out.Validate(); err != nil { ... }
|
||||||
|
//
|
||||||
|
// Payload representation for RAW events
|
||||||
|
// -------------------------------------
|
||||||
|
// weatherfeeder recommends RAW payloads be stored as json.RawMessage for JSON APIs.
|
||||||
|
// This keeps sources small and defers schema-specific decoding to normalizers.
|
||||||
|
//
|
||||||
|
// If a source already decodes into typed provider structs, it can still emit the
|
||||||
|
// raw event; it should simply re-marshal to json.RawMessage (or better: decode
|
||||||
|
// once in the normalizer instead to keep “fetch” separate from “transform”).
|
||||||
|
//
|
||||||
|
// Registration pattern
|
||||||
|
// --------------------
|
||||||
|
// feedkit normalization uses a match-driven registry (“first match wins”).
|
||||||
|
//
|
||||||
|
// Provider subpackages should expose:
|
||||||
|
//
|
||||||
|
// func Register(reg *normalize.Registry)
|
||||||
|
//
|
||||||
|
// And internal/normalizers/builtins.go should provide one entrypoint:
|
||||||
|
//
|
||||||
|
// func RegisterBuiltins(reg *normalize.Registry)
|
||||||
|
//
|
||||||
|
// which calls each provider’s Register() in a stable order.
|
||||||
|
//
|
||||||
|
// Testing guidance (recommended)
|
||||||
|
// ------------------------------
|
||||||
|
// Add a unit test per normalizer:
|
||||||
|
//
|
||||||
|
// internal/normalizers/openweather/observation_test.go
|
||||||
|
//
|
||||||
|
// Tests should:
|
||||||
|
//
|
||||||
|
// - build a RAW event with Schema=standards.SchemaRaw... and Payload=json.RawMessage
|
||||||
|
// - run the normalizer
|
||||||
|
// - assert canonical Schema + key payload fields + EffectiveAt
|
||||||
|
// - assert out.Validate() passes
|
||||||
|
package normalizers
|
||||||
21
internal/normalizers/nws/register.go
Normal file
21
internal/normalizers/nws/register.go
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
package nws
|
||||||
|
|
||||||
|
import (
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register registers NWS normalizers into the provided registry.
|
||||||
|
//
|
||||||
|
// This is intentionally empty as a stub. As normalizers are implemented,
|
||||||
|
// register them here, e.g.:
|
||||||
|
//
|
||||||
|
// reg.Register(ObservationNormalizer{})
|
||||||
|
// reg.Register(ForecastNormalizer{})
|
||||||
|
// reg.Register(AlertsNormalizer{})
|
||||||
|
func Register(reg *fknormalize.Registry) {
|
||||||
|
if reg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: register NWS normalizers here.
|
||||||
|
}
|
||||||
19
internal/normalizers/openmeteo/register.go
Normal file
19
internal/normalizers/openmeteo/register.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package openmeteo
|
||||||
|
|
||||||
|
import (
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register registers Open-Meteo normalizers into the provided registry.
|
||||||
|
//
|
||||||
|
// This is intentionally empty as a stub. As normalizers are implemented,
|
||||||
|
// register them here, e.g.:
|
||||||
|
//
|
||||||
|
// reg.Register(ObservationNormalizer{})
|
||||||
|
func Register(reg *fknormalize.Registry) {
|
||||||
|
if reg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: register Open-Meteo normalizers here.
|
||||||
|
}
|
||||||
19
internal/normalizers/openweather/register.go
Normal file
19
internal/normalizers/openweather/register.go
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
package openweather
|
||||||
|
|
||||||
|
import (
|
||||||
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Register registers OpenWeather normalizers into the provided registry.
|
||||||
|
//
|
||||||
|
// This is intentionally empty as a stub. As normalizers are implemented,
|
||||||
|
// register them here, e.g.:
|
||||||
|
//
|
||||||
|
// reg.Register(ObservationNormalizer{})
|
||||||
|
func Register(reg *fknormalize.Registry) {
|
||||||
|
if reg == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: register OpenWeather normalizers here.
|
||||||
|
}
|
||||||
20
internal/standards/schema.go
Normal file
20
internal/standards/schema.go
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
package standards
|
||||||
|
|
||||||
|
// Schema strings used by weatherfeeder.
|
||||||
|
//
|
||||||
|
// We standardize on schema matching for normalizers (rather than matching on
|
||||||
|
// source names or kinds) because schema strings are explicit, versionable, and
|
||||||
|
// independent of user configuration.
|
||||||
|
//
|
||||||
|
// Conventions:
|
||||||
|
// - Raw upstream payloads: "raw.<provider>.<thing>.vN"
|
||||||
|
// - Canonical domain events: "weather.<kind>.vN"
|
||||||
|
const (
|
||||||
|
// Raw upstream schemas (emitted by sources).
|
||||||
|
SchemaRawNWSObservationV1 = "raw.nws.observation.v1"
|
||||||
|
SchemaRawOpenMeteoCurrentV1 = "raw.openmeteo.current.v1"
|
||||||
|
SchemaRawOpenWeatherCurrentV1 = "raw.openweather.current.v1"
|
||||||
|
|
||||||
|
// Canonical domain schemas (emitted after normalization).
|
||||||
|
SchemaWeatherObservationV1 = "weather.observation.v1"
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user