7 Commits

Author SHA1 Message Date
ea113e2dcc Updated processor/normalizer wiring to track Feedkit v0.7.0
Some checks failed
ci/woodpecker/push/build-image Pipeline failed
2026-03-16 13:35:51 -05:00
38bc162918 Updated go.sum
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-15 19:26:51 -05:00
eae9568afe Updated source configuration to track Feedkit v0.6.0
Some checks failed
ci/woodpecker/push/build-image Pipeline failed
2026-03-15 19:22:57 -05:00
f464592c56 Updates to accommodate the new upstream version of feedkit (v0.5.0), which now supports both polling sources and streaming sources.
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-02-08 15:05:53 -06:00
123e8ff763 Moved the standards package out of internal/ so it can be imported by downstream consumers.
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-02-08 09:15:07 -06:00
5923592b53 Moved the weatherfeeder model out of internal/ so that downstream consumers can import it directly.
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-02-08 08:56:16 -06:00
c96a6bb78b Remove Go test files from .dockerignore; this will enable the CI system to run tests before building. 2026-02-08 08:55:35 -06:00
38 changed files with 510 additions and 228 deletions

View File

@@ -1,6 +1,5 @@
.git .git
.gitignore .gitignore
**/*_test.go
**/*.md **/*.md
dist/ dist/
tmp/ tmp/

View File

@@ -1,7 +1,8 @@
--- ---
sources: sources:
- name: NWSObservationKSTL - name: NWSObservationKSTL
kind: observation mode: poll
kinds: ["observation"]
driver: nws_observation driver: nws_observation
every: 10m every: 10m
params: params:
@@ -9,7 +10,8 @@ sources:
user_agent: "HomeOps (eric@maximumdirect.net)" user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: OpenMeteoObservation # - name: OpenMeteoObservation
# kind: observation # mode: poll
# kinds: ["observation"]
# driver: openmeteo_observation # driver: openmeteo_observation
# every: 10m # every: 10m
# params: # params:
@@ -17,7 +19,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)" # user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: OpenWeatherObservation # - name: OpenWeatherObservation
# kind: observation # mode: poll
# kinds: ["observation"]
# driver: openweather_observation # driver: openweather_observation
# every: 10m # every: 10m
# params: # params:
@@ -25,7 +28,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)" # user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSObservationKSUS # - name: NWSObservationKSUS
# kind: observation # mode: poll
# kinds: ["observation"]
# driver: nws_observation # driver: nws_observation
# every: 10m # every: 10m
# params: # params:
@@ -33,7 +37,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)" # user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSObservationKCPS # - name: NWSObservationKCPS
# kind: observation # mode: poll
# kinds: ["observation"]
# driver: nws_observation # driver: nws_observation
# every: 10m # every: 10m
# params: # params:
@@ -41,7 +46,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)" # user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSHourlyForecastSTL - name: NWSHourlyForecastSTL
kind: forecast mode: poll
kinds: ["forecast"]
driver: nws_forecast driver: nws_forecast
every: 45m every: 45m
params: params:
@@ -49,7 +55,8 @@ sources:
user_agent: "HomeOps (eric@maximumdirect.net)" user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenMeteoHourlyForecastSTL - name: OpenMeteoHourlyForecastSTL
kind: forecast mode: poll
kinds: ["forecast"]
driver: openmeteo_forecast driver: openmeteo_forecast
every: 60m every: 60m
params: params:
@@ -57,7 +64,8 @@ sources:
user_agent: "HomeOps (eric@maximumdirect.net)" user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSAlertsSTL - name: NWSAlertsSTL
kind: alert mode: poll
kinds: ["alert"]
driver: nws_alerts driver: nws_alerts
every: 1m every: 1m
params: params:

View File

@@ -7,6 +7,7 @@ import (
"log" "log"
"os" "os"
"os/signal" "os/signal"
"sort"
"strings" "strings"
"syscall" "syscall"
"time" "time"
@@ -16,6 +17,7 @@ import (
fkevent "gitea.maximumdirect.net/ejr/feedkit/event" fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline" fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler" fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources" fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
@@ -51,30 +53,30 @@ func main() {
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) { sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewNATSSinkFromConfig(cfg) return fksinks.NewNATSSinkFromConfig(cfg)
}) })
// --- Build sources into scheduler jobs --- // --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job var jobs []fkscheduler.Job
for i, sc := range cfg.Sources { for i, sc := range cfg.Sources {
src, err := srcReg.Build(sc) in, err := srcReg.BuildInput(sc) // may be polling or streaming
if err != nil { if err != nil {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err) log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
} }
// Optional safety: if config.kind is set, ensure it matches the source.Kind(). if err := validateSourceExpectedKinds(sc, in); err != nil {
if strings.TrimSpace(sc.Kind) != "" { log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
expectedKind, err := fkevent.ParseKind(sc.Kind)
if err != nil {
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
}
if src.Kind() != expectedKind {
log.Fatalf(
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
)
}
} }
// If this is a polling source, every is required.
if _, ok := in.(fksources.PollSource); ok && sc.Every.Duration <= 0 {
log.Fatalf(
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
i, sc.Name, sc.Driver,
)
}
// For stream sources, Every is ignored; it is fine if omitted/zero.
jobs = append(jobs, fkscheduler.Job{ jobs = append(jobs, fkscheduler.Job{
Source: src, Source: in,
Every: sc.Every.Duration, Every: sc.Every.Duration,
}) })
} }
@@ -100,17 +102,24 @@ func main() {
// --- Normalization (optional) --- // --- Normalization (optional) ---
// //
// We install feedkit's normalize.Processor even before any normalizers exist. // We install feedkit's normalize.Processor even before any normalizers exist.
// With an empty registry and RequireMatch=false, this is a no-op passthrough. // With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as: // It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and // 1) sources emit raw schemas (raw.*), and
// 2) matching normalizers are registered. // 2) matching normalizers are registered.
normReg := &fknormalize.Registry{} normalizers := wfnormalizers.RegisterBuiltins(nil)
wfnormalizers.RegisterBuiltins(normReg)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
log.Fatalf("build processor chain failed: %v", err)
}
pl := &fkpipeline.Pipeline{ pl := &fkpipeline.Pipeline{
Processors: []fkpipeline.Processor{ Processors: chain,
fknormalize.Processor{Registry: normReg},
},
} }
s := &fkscheduler.Scheduler{ s := &fkscheduler.Scheduler{
@@ -193,5 +202,74 @@ func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
} }
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
if err != nil {
return err
}
if len(expectedKinds) == 0 {
return nil
}
advertisedKinds := advertisedSourceKinds(in)
if len(advertisedKinds) == 0 {
return nil
}
for kind := range expectedKinds {
if !advertisedKinds[kind] {
return fmt.Errorf(
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
kind,
sortedKinds(expectedKinds),
sortedKinds(advertisedKinds),
)
}
}
return nil
}
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
kinds := map[fkevent.Kind]bool{}
for i, k := range raw {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
}
kinds[kind] = true
}
return kinds, nil
}
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
if in == nil {
return nil
}
kinds := map[fkevent.Kind]bool{}
if ks, ok := in.(fksources.KindsSource); ok {
for _, kind := range ks.Kinds() {
kinds[kind] = true
}
return kinds
}
if ks, ok := in.(fksources.KindSource); ok {
kinds[ks.Kind()] = true
return kinds
}
return nil
}
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
out := make([]string, 0, len(kindSet))
for kind := range kindSet {
out = append(out, string(kind))
}
sort.Strings(out)
return out
}
// keep time imported (mirrors your previous main.go defensive trick) // keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second var _ = time.Second

View File

@@ -0,0 +1,159 @@
package main
import (
"context"
"reflect"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
)
type testInput struct {
name string
}
func (s testInput) Name() string { return s.name }
type testKindSource struct {
testInput
kind fkevent.Kind
}
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
type testKindsSource struct {
testInput
kinds []fkevent.Kind
}
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
sc := config.SourceConfig{Kind: "observation"}
in := testKindSource{
testInput: testInput{name: "test"},
kind: fkevent.Kind("observation"),
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"observation"}}
in := testKindsSource{
testInput: testInput{name: "test"},
kinds: []fkevent.Kind{"observation", "forecast"},
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testKindsSource{
testInput: testInput{name: "test"},
kinds: []fkevent.Kind{"observation", "forecast"},
}
err := validateSourceExpectedKinds(sc, in)
if err == nil {
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil")
}
if !strings.Contains(err.Error(), "configured expected kind") {
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err)
}
}
func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testInput{name: "test"}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
if _, err := parseExpectedKinds([]string{""}); err == nil {
t.Fatalf("parseExpectedKinds() expected error for empty kind")
}
}
func TestExampleConfigLoads(t *testing.T) {
if _, err := config.Load("config.yml"); err != nil {
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
}
}
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
if len(normalizers) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
if len(chain) != 1 {
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
}
pl := &fkpipeline.Pipeline{Processors: chain}
if len(pl.Processors) != 1 {
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
}
}
func TestNormalizeNoMatchPassThrough(t *testing.T) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
chain, err := procReg.BuildChain([]string{"normalize"})
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
pl := &fkpipeline.Pipeline{Processors: chain}
in := fkevent.Event{
ID: "evt-no-match",
Kind: fkevent.Kind("observation"),
Source: "test",
EmittedAt: time.Now().UTC(),
Schema: "raw.weatherfeeder.unknown.v1",
Payload: map[string]any{
"ok": true,
},
}
out, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("Pipeline.Process() unexpected error: %v", err)
}
if out == nil {
t.Fatalf("Pipeline.Process() returned nil output")
}
if !reflect.DeepEqual(*out, in) {
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
}
}

2
go.mod
View File

@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25 go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.4.1 require gitea.maximumdirect.net/ejr/feedkit v0.6.0
require ( require (
github.com/klauspost/compress v1.17.2 // indirect github.com/klauspost/compress v1.17.2 // indirect

4
go.sum
View File

@@ -1,5 +1,5 @@
gitea.maximumdirect.net/ejr/feedkit v0.4.1 h1:mMFtPCBKp2LXV3euPH21WzjHku/HHx31KQqYW+w1aqU= gitea.maximumdirect.net/ejr/feedkit v0.6.0 h1:GXwyNKvPp1sWN8TS5E5NDGFgimpyHlzerO5E+/qoTXg=
gitea.maximumdirect.net/ejr/feedkit v0.4.1/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls= gitea.maximumdirect.net/ejr/feedkit v0.6.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk= github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=

View File

@@ -16,22 +16,20 @@ import (
// - sources are built by name (cfg.Driver -> factory) // - sources are built by name (cfg.Driver -> factory)
// - normalizers are selected by Match() (event.Schema -> first match wins) // - normalizers are selected by Match() (event.Schema -> first match wins)
// //
// Registration order matters because feedkit normalize.Registry is first match wins. // Registration order matters because feedkit normalize.Processor is "first match wins".
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but // In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
// we still keep ordering stable as a best practice. // we still keep ordering stable as a best practice.
// func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
// If reg is nil, this function is a no-op. out := in
func RegisterBuiltins(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Keep this intentionally boring: delegate registration to provider subpackages // Keep this intentionally boring: delegate registration to provider subpackages
// so main.go stays clean and each provider owns its own mapping logic. // so main.go stays clean and each provider owns its own mapping logic.
// //
// Order here should be stable across releases to reduce surprises when adding // Order here should be stable across releases to reduce surprises when adding
// new normalizers. // new normalizers.
nws.Register(reg) out = nws.Register(out)
openmeteo.Register(reg) out = openmeteo.Register(out)
openweather.Register(reg) out = openweather.Register(out)
return out
} }

View File

@@ -0,0 +1,42 @@
package normalizers
import (
"reflect"
"testing"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
)
func TestRegisterBuiltinsOrder(t *testing.T) {
got := RegisterBuiltins(nil)
if len(got) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
want := []fknormalize.Normalizer{
nws.ObservationNormalizer{},
nws.ForecastNormalizer{},
nws.AlertsNormalizer{},
openmeteo.ObservationNormalizer{},
openmeteo.ForecastNormalizer{},
openweather.ObservationNormalizer{},
}
if len(got) != len(want) {
t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got))
}
for i := range want {
if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) {
t.Fatalf(
"RegisterBuiltins() order mismatch at index %d: got %T, want %T",
i,
got[i],
want[i],
)
}
}
}

View File

@@ -4,7 +4,7 @@ package common
import ( import (
"strings" "strings"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
) )
// WMOFromTextDescription is a cross-provider fallback that tries to infer a WMO code // WMOFromTextDescription is a cross-provider fallback that tries to infer a WMO code

View File

@@ -8,7 +8,7 @@
// transforming provider-specific raw payloads into canonical internal models. // transforming provider-specific raw payloads into canonical internal models.
// //
// This package is domain code (weatherfeeder). feedkits normalize package is // This package is domain code (weatherfeeder). feedkits normalize package is
// infrastructure (registry + processor). // infrastructure (normalizer contracts + processor).
// //
// Directory layout (required) // Directory layout (required)
// --------------------------- // ---------------------------
@@ -136,21 +136,22 @@
// //
// Registration pattern // Registration pattern
// -------------------- // --------------------
// feedkit normalization uses a match-driven registry (first match wins). // feedkit normalization uses an ordered normalizer list ("first match wins").
// //
// Provider subpackages should expose: // Provider subpackages should expose:
// //
// func Register(reg *normalize.Registry) // func Register(in []normalize.Normalizer) []normalize.Normalizer
// //
// And internal/normalizers/builtins.go should provide one entrypoint: // And internal/normalizers/builtins.go should provide one entrypoint:
// //
// func RegisterBuiltins(reg *normalize.Registry) // func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer
// //
// which calls each providers Register() in a stable order. // which appends each provider's normalizers in a stable order and is then passed
// to normalize.NewProcessor(...).
// //
// Registry ordering // Normalizer ordering
// ----------------------------- // -----------------------------
// feedkit normalization uses a match-driven registry (“first match wins”). // feedkit normalization is "first match wins" by list order.
// Therefore order matters: // Therefore order matters:
// //
// - Register more specific normalizers before more general ones. // - Register more specific normalizers before more general ones.

View File

@@ -9,10 +9,10 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// AlertsNormalizer converts: // AlertsNormalizer converts:

View File

@@ -8,10 +8,10 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ForecastNormalizer converts: // ForecastNormalizer converts:

View File

@@ -6,8 +6,8 @@ import (
"strings" "strings"
"unicode" "unicode"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
) )
// centroidLatLon returns a best-effort centroid (lat, lon) from a GeoJSON polygon. // centroidLatLon returns a best-effort centroid (lat, lon) from a GeoJSON polygon.

View File

@@ -8,10 +8,10 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ObservationNormalizer converts: // ObservationNormalizer converts:

View File

@@ -5,18 +5,18 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
) )
// Register registers NWS normalizers into the provided registry. // Register appends NWS normalizers in stable order.
func Register(reg *fknormalize.Registry) { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
if reg == nil { out := in
return
}
// Observations // Observations
reg.Register(ObservationNormalizer{}) out = append(out, ObservationNormalizer{})
// Forecasts // Forecasts
reg.Register(ForecastNormalizer{}) out = append(out, ForecastNormalizer{})
// Alerts // Alerts
reg.Register(AlertsNormalizer{}) out = append(out, AlertsNormalizer{})
return out
} }

View File

@@ -4,8 +4,8 @@ package nws
import ( import (
"strings" "strings"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
) )
// mapNWSToWMO maps NWS signals into a canonical WMO code. // mapNWSToWMO maps NWS signals into a canonical WMO code.

View File

@@ -7,10 +7,10 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ForecastNormalizer converts: // ForecastNormalizer converts:

View File

@@ -8,10 +8,10 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ObservationNormalizer converts: // ObservationNormalizer converts:

View File

@@ -5,14 +5,14 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
) )
// Register registers Open-Meteo normalizers into the provided registry. // Register appends Open-Meteo normalizers in stable order.
func Register(reg *fknormalize.Registry) { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
if reg == nil { out := in
return
}
// Observations // Observations
reg.Register(ObservationNormalizer{}) out = append(out, ObservationNormalizer{})
// Forecasts // Forecasts
reg.Register(ForecastNormalizer{}) out = append(out, ForecastNormalizer{})
return out
} }

View File

@@ -7,9 +7,9 @@ import (
"time" "time"
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common" normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ObservationNormalizer converts: // ObservationNormalizer converts:
@@ -121,7 +121,7 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
TextDescription: canonicalText, TextDescription: canonicalText,
IconURL: iconURL, IconURL: iconURL,
TemperatureC: &tempC, TemperatureC: &tempC,
ApparentTemperatureC: apparentC, ApparentTemperatureC: apparentC,
WindDirectionDegrees: parsed.Wind.Deg, WindDirectionDegrees: parsed.Wind.Deg,

View File

@@ -5,12 +5,12 @@ import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize" fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
) )
// Register registers OpenWeather normalizers into the provided registry. // Register appends OpenWeather normalizers in stable order.
func Register(reg *fknormalize.Registry) { func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
if reg == nil { out := in
return
}
// Observations // Observations
reg.Register(ObservationNormalizer{}) out = append(out, ObservationNormalizer{})
return out
} }

View File

@@ -1,7 +1,7 @@
// FILE: ./internal/normalizers/openweather/wmo_map.go // FILE: ./internal/normalizers/openweather/wmo_map.go
package openweather package openweather
import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" import "gitea.maximumdirect.net/ejr/weatherfeeder/model"
// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into weatherfeeder's // mapOpenWeatherToWMO maps OpenWeather weather condition IDs into weatherfeeder's
// canonical WMO code vocabulary. // canonical WMO code vocabulary.

View File

@@ -13,26 +13,26 @@ import (
// Keeping this in one place makes main.go very readable. // Keeping this in one place makes main.go very readable.
func RegisterBuiltins(r *fksource.Registry) { func RegisterBuiltins(r *fksource.Registry) {
// NWS drivers // NWS drivers
r.Register("nws_observation", func(cfg config.SourceConfig) (fksource.Source, error) { r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewObservationSource(cfg) return nws.NewObservationSource(cfg)
}) })
r.Register("nws_alerts", func(cfg config.SourceConfig) (fksource.Source, error) { r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewAlertsSource(cfg) return nws.NewAlertsSource(cfg)
}) })
r.Register("nws_forecast", func(cfg config.SourceConfig) (fksource.Source, error) { r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewForecastSource(cfg) return nws.NewForecastSource(cfg)
}) })
// Open-Meteo drivers // Open-Meteo drivers
r.Register("openmeteo_observation", func(cfg config.SourceConfig) (fksource.Source, error) { r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewObservationSource(cfg) return openmeteo.NewObservationSource(cfg)
}) })
r.Register("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.Source, error) { r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewForecastSource(cfg) return openmeteo.NewForecastSource(cfg)
}) })
// OpenWeatherMap drivers // OpenWeatherMap drivers
r.Register("openweather_observation", func(cfg config.SourceConfig) (fksource.Source, error) { r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg) return openweather.NewObservationSource(cfg)
}) })
} }

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event. // AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event.

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event. // ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event.

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws" nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event. // ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.

View File

@@ -10,7 +10,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event. // ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.

View File

@@ -10,7 +10,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event. // ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event" "gitea.maximumdirect.net/ejr/feedkit/event"
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather" owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common" "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards" "gitea.maximumdirect.net/ejr/weatherfeeder/standards"
) )
type ObservationSource struct { type ObservationSource struct {

View File

@@ -1,127 +0,0 @@
package standards
import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
// This file provides small, shared helper functions for reasoning about WMO codes.
// These are intentionally "coarse" categories that are useful for business logic,
// dashboards, and alerting decisions.
//
// Example uses:
// - jogging suitability: precipitation? thunderstorm? freezing precip?
// - quick glance: "is it cloudy?" "is there any precip?"
// - downstream normalizers / aggregators
func IsThunderstorm(code model.WMOCode) bool {
switch code {
case 95, 96, 99:
return true
default:
return false
}
}
func IsHail(code model.WMOCode) bool {
switch code {
case 96, 99:
return true
default:
return false
}
}
func IsFog(code model.WMOCode) bool {
switch code {
case 45, 48:
return true
default:
return false
}
}
// IsPrecipitation returns true if the code represents any precipitation
// (drizzle, rain, snow, showers, etc.).
func IsPrecipitation(code model.WMOCode) bool {
switch code {
// Drizzle
case 51, 53, 55, 56, 57:
return true
// Rain
case 61, 63, 65, 66, 67:
return true
// Snow
case 71, 73, 75, 77:
return true
// Showers
case 80, 81, 82, 85, 86:
return true
// Thunderstorm (often includes rain/hail)
case 95, 96, 99:
return true
default:
return false
}
}
func IsRainFamily(code model.WMOCode) bool {
switch code {
// Drizzle + freezing drizzle
case 51, 53, 55, 56, 57:
return true
// Rain + freezing rain
case 61, 63, 65, 66, 67:
return true
// Rain showers
case 80, 81, 82:
return true
// Thunderstorm often implies rain
case 95, 96, 99:
return true
default:
return false
}
}
func IsSnowFamily(code model.WMOCode) bool {
switch code {
// Snow and related
case 71, 73, 75, 77:
return true
// Snow showers
case 85, 86:
return true
default:
return false
}
}
// IsFreezingPrecip returns true if the code represents freezing drizzle/rain.
func IsFreezingPrecip(code model.WMOCode) bool {
switch code {
case 56, 57, 66, 67:
return true
default:
return false
}
}
// IsSkyOnly returns true for codes that represent "sky condition only"
// (clear/mostly/partly/cloudy) rather than fog/precip/etc.
func IsSkyOnly(code model.WMOCode) bool {
switch code {
case 0, 1, 2, 3:
return true
default:
return false
}
}

View File

@@ -15,7 +15,7 @@
// ----------------------- // -----------------------
// For readability and stability, canonical payloads (weather.* schemas) should not emit // For readability and stability, canonical payloads (weather.* schemas) should not emit
// noisy floating-point representations. weatherfeeder enforces this by rounding float // noisy floating-point representations. weatherfeeder enforces this by rounding float
// values in canonical payloads to 2 digits after the decimal point at normalization // values in canonical payloads to 4 digits after the decimal point at normalization
// finalization time. // finalization time.
// //
// Provider-specific decoding helpers and quirks live in internal/providers/<provider>. // Provider-specific decoding helpers and quirks live in internal/providers/<provider>.

View File

@@ -1,9 +1,18 @@
package standards package standards
// This file provides small, shared helper functions for reasoning about WMO codes.
// These are intentionally "coarse" categories that are useful for business logic,
// dashboards, and alerting decisions.
//
// Example uses:
// - jogging suitability: precipitation? thunderstorm? freezing precip?
// - quick glance: "is it cloudy?" "is there any precip?"
// - downstream normalizers / aggregators
import ( import (
"fmt" "fmt"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model" "gitea.maximumdirect.net/ejr/weatherfeeder/model"
) )
type WMODescription struct { type WMODescription struct {
@@ -100,3 +109,118 @@ func IsKnownWMO(code model.WMOCode) bool {
_, ok := WMODescriptions[code] _, ok := WMODescriptions[code]
return ok return ok
} }
func IsThunderstorm(code model.WMOCode) bool {
switch code {
case 95, 96, 99:
return true
default:
return false
}
}
func IsHail(code model.WMOCode) bool {
switch code {
case 96, 99:
return true
default:
return false
}
}
func IsFog(code model.WMOCode) bool {
switch code {
case 45, 48:
return true
default:
return false
}
}
// IsPrecipitation returns true if the code represents any precipitation
// (drizzle, rain, snow, showers, etc.).
func IsPrecipitation(code model.WMOCode) bool {
switch code {
// Drizzle
case 51, 53, 55, 56, 57:
return true
// Rain
case 61, 63, 65, 66, 67:
return true
// Snow
case 71, 73, 75, 77:
return true
// Showers
case 80, 81, 82, 85, 86:
return true
// Thunderstorm (often includes rain/hail)
case 95, 96, 99:
return true
default:
return false
}
}
func IsRainFamily(code model.WMOCode) bool {
switch code {
// Drizzle + freezing drizzle
case 51, 53, 55, 56, 57:
return true
// Rain + freezing rain
case 61, 63, 65, 66, 67:
return true
// Rain showers
case 80, 81, 82:
return true
// Thunderstorm often implies rain
case 95, 96, 99:
return true
default:
return false
}
}
func IsSnowFamily(code model.WMOCode) bool {
switch code {
// Snow and related
case 71, 73, 75, 77:
return true
// Snow showers
case 85, 86:
return true
default:
return false
}
}
// IsFreezingPrecip returns true if the code represents freezing drizzle/rain.
func IsFreezingPrecip(code model.WMOCode) bool {
switch code {
case 56, 57, 66, 67:
return true
default:
return false
}
}
// IsSkyOnly returns true for codes that represent "sky condition only"
// (clear/mostly/partly/cloudy) rather than fog/precip/etc.
func IsSkyOnly(code model.WMOCode) bool {
switch code {
case 0, 1, 2, 3:
return true
default:
return false
}
}