Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ea113e2dcc | |||
| 38bc162918 | |||
| eae9568afe | |||
| f464592c56 | |||
| 123e8ff763 |
@@ -1,7 +1,8 @@
|
||||
---
|
||||
sources:
|
||||
- name: NWSObservationKSTL
|
||||
kind: observation
|
||||
mode: poll
|
||||
kinds: ["observation"]
|
||||
driver: nws_observation
|
||||
every: 10m
|
||||
params:
|
||||
@@ -9,7 +10,8 @@ sources:
|
||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
# - name: OpenMeteoObservation
|
||||
# kind: observation
|
||||
# mode: poll
|
||||
# kinds: ["observation"]
|
||||
# driver: openmeteo_observation
|
||||
# every: 10m
|
||||
# params:
|
||||
@@ -17,7 +19,8 @@ sources:
|
||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
# - name: OpenWeatherObservation
|
||||
# kind: observation
|
||||
# mode: poll
|
||||
# kinds: ["observation"]
|
||||
# driver: openweather_observation
|
||||
# every: 10m
|
||||
# params:
|
||||
@@ -25,7 +28,8 @@ sources:
|
||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
# - name: NWSObservationKSUS
|
||||
# kind: observation
|
||||
# mode: poll
|
||||
# kinds: ["observation"]
|
||||
# driver: nws_observation
|
||||
# every: 10m
|
||||
# params:
|
||||
@@ -33,7 +37,8 @@ sources:
|
||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
# - name: NWSObservationKCPS
|
||||
# kind: observation
|
||||
# mode: poll
|
||||
# kinds: ["observation"]
|
||||
# driver: nws_observation
|
||||
# every: 10m
|
||||
# params:
|
||||
@@ -41,7 +46,8 @@ sources:
|
||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
- name: NWSHourlyForecastSTL
|
||||
kind: forecast
|
||||
mode: poll
|
||||
kinds: ["forecast"]
|
||||
driver: nws_forecast
|
||||
every: 45m
|
||||
params:
|
||||
@@ -49,7 +55,8 @@ sources:
|
||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
- name: OpenMeteoHourlyForecastSTL
|
||||
kind: forecast
|
||||
mode: poll
|
||||
kinds: ["forecast"]
|
||||
driver: openmeteo_forecast
|
||||
every: 60m
|
||||
params:
|
||||
@@ -57,7 +64,8 @@ sources:
|
||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||
|
||||
- name: NWSAlertsSTL
|
||||
kind: alert
|
||||
mode: poll
|
||||
kinds: ["alert"]
|
||||
driver: nws_alerts
|
||||
every: 1m
|
||||
params:
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
@@ -16,6 +17,7 @@ import (
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
||||
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||
@@ -51,30 +53,30 @@ func main() {
|
||||
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
||||
return fksinks.NewNATSSinkFromConfig(cfg)
|
||||
})
|
||||
|
||||
// --- Build sources into scheduler jobs ---
|
||||
var jobs []fkscheduler.Job
|
||||
for i, sc := range cfg.Sources {
|
||||
src, err := srcReg.Build(sc)
|
||||
in, err := srcReg.BuildInput(sc) // may be polling or streaming
|
||||
if err != nil {
|
||||
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||
}
|
||||
|
||||
// Optional safety: if config.kind is set, ensure it matches the source.Kind().
|
||||
if strings.TrimSpace(sc.Kind) != "" {
|
||||
expectedKind, err := fkevent.ParseKind(sc.Kind)
|
||||
if err != nil {
|
||||
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
|
||||
}
|
||||
if src.Kind() != expectedKind {
|
||||
log.Fatalf(
|
||||
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
|
||||
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
|
||||
)
|
||||
}
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||
}
|
||||
|
||||
// If this is a polling source, every is required.
|
||||
if _, ok := in.(fksources.PollSource); ok && sc.Every.Duration <= 0 {
|
||||
log.Fatalf(
|
||||
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
|
||||
i, sc.Name, sc.Driver,
|
||||
)
|
||||
}
|
||||
|
||||
// For stream sources, Every is ignored; it is fine if omitted/zero.
|
||||
jobs = append(jobs, fkscheduler.Job{
|
||||
Source: src,
|
||||
Source: in,
|
||||
Every: sc.Every.Duration,
|
||||
})
|
||||
}
|
||||
@@ -100,17 +102,24 @@ func main() {
|
||||
// --- Normalization (optional) ---
|
||||
//
|
||||
// We install feedkit's normalize.Processor even before any normalizers exist.
|
||||
// With an empty registry and RequireMatch=false, this is a no-op passthrough.
|
||||
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
||||
// It will begin transforming events as soon as:
|
||||
// 1) sources emit raw schemas (raw.*), and
|
||||
// 2) matching normalizers are registered.
|
||||
normReg := &fknormalize.Registry{}
|
||||
wfnormalizers.RegisterBuiltins(normReg)
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
if err != nil {
|
||||
log.Fatalf("build processor chain failed: %v", err)
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{
|
||||
Processors: []fkpipeline.Processor{
|
||||
fknormalize.Processor{Registry: normReg},
|
||||
},
|
||||
Processors: chain,
|
||||
}
|
||||
|
||||
s := &fkscheduler.Scheduler{
|
||||
@@ -193,5 +202,74 @@ func isContextShutdown(err error) bool {
|
||||
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
|
||||
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(expectedKinds) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
advertisedKinds := advertisedSourceKinds(in)
|
||||
if len(advertisedKinds) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for kind := range expectedKinds {
|
||||
if !advertisedKinds[kind] {
|
||||
return fmt.Errorf(
|
||||
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
|
||||
kind,
|
||||
sortedKinds(expectedKinds),
|
||||
sortedKinds(advertisedKinds),
|
||||
)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
|
||||
kinds := map[fkevent.Kind]bool{}
|
||||
for i, k := range raw {
|
||||
kind, err := fkevent.ParseKind(k)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
|
||||
}
|
||||
kinds[kind] = true
|
||||
}
|
||||
return kinds, nil
|
||||
}
|
||||
|
||||
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
kinds := map[fkevent.Kind]bool{}
|
||||
if ks, ok := in.(fksources.KindsSource); ok {
|
||||
for _, kind := range ks.Kinds() {
|
||||
kinds[kind] = true
|
||||
}
|
||||
return kinds
|
||||
}
|
||||
|
||||
if ks, ok := in.(fksources.KindSource); ok {
|
||||
kinds[ks.Kind()] = true
|
||||
return kinds
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
|
||||
out := make([]string, 0, len(kindSet))
|
||||
for kind := range kindSet {
|
||||
out = append(out, string(kind))
|
||||
}
|
||||
sort.Strings(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// keep time imported (mirrors your previous main.go defensive trick)
|
||||
var _ = time.Second
|
||||
|
||||
159
cmd/weatherfeeder/main_test.go
Normal file
159
cmd/weatherfeeder/main_test.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
|
||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||
)
|
||||
|
||||
type testInput struct {
|
||||
name string
|
||||
}
|
||||
|
||||
func (s testInput) Name() string { return s.name }
|
||||
|
||||
type testKindSource struct {
|
||||
testInput
|
||||
kind fkevent.Kind
|
||||
}
|
||||
|
||||
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
|
||||
|
||||
type testKindsSource struct {
|
||||
testInput
|
||||
kinds []fkevent.Kind
|
||||
}
|
||||
|
||||
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
|
||||
|
||||
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
|
||||
sc := config.SourceConfig{Kind: "observation"}
|
||||
in := testKindSource{
|
||||
testInput: testInput{name: "test"},
|
||||
kind: fkevent.Kind("observation"),
|
||||
}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
|
||||
sc := config.SourceConfig{Kinds: []string{"observation"}}
|
||||
in := testKindsSource{
|
||||
testInput: testInput{name: "test"},
|
||||
kinds: []fkevent.Kind{"observation", "forecast"},
|
||||
}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
|
||||
sc := config.SourceConfig{Kinds: []string{"alert"}}
|
||||
in := testKindsSource{
|
||||
testInput: testInput{name: "test"},
|
||||
kinds: []fkevent.Kind{"observation", "forecast"},
|
||||
}
|
||||
|
||||
err := validateSourceExpectedKinds(sc, in)
|
||||
if err == nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "configured expected kind") {
|
||||
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
|
||||
sc := config.SourceConfig{Kinds: []string{"alert"}}
|
||||
in := testInput{name: "test"}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
|
||||
if _, err := parseExpectedKinds([]string{""}); err == nil {
|
||||
t.Fatalf("parseExpectedKinds() expected error for empty kind")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExampleConfigLoads(t *testing.T) {
|
||||
if _, err := config.Load("config.yml"); err != nil {
|
||||
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessorRegistryBuildsNormalizeChain(t *testing.T) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
if len(normalizers) == 0 {
|
||||
t.Fatalf("RegisterBuiltins() returned no normalizers")
|
||||
}
|
||||
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
if len(chain) != 1 {
|
||||
t.Fatalf("BuildChain() expected 1 processor, got %d", len(chain))
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||
if len(pl.Processors) != 1 {
|
||||
t.Fatalf("pipeline expected 1 processor, got %d", len(pl.Processors))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeNoMatchPassThrough(t *testing.T) {
|
||||
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
||||
procReg := fkprocessors.NewRegistry()
|
||||
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
||||
return fknormalize.NewProcessor(normalizers, false), nil
|
||||
})
|
||||
|
||||
chain, err := procReg.BuildChain([]string{"normalize"})
|
||||
if err != nil {
|
||||
t.Fatalf("BuildChain() unexpected error: %v", err)
|
||||
}
|
||||
|
||||
pl := &fkpipeline.Pipeline{Processors: chain}
|
||||
in := fkevent.Event{
|
||||
ID: "evt-no-match",
|
||||
Kind: fkevent.Kind("observation"),
|
||||
Source: "test",
|
||||
EmittedAt: time.Now().UTC(),
|
||||
Schema: "raw.weatherfeeder.unknown.v1",
|
||||
Payload: map[string]any{
|
||||
"ok": true,
|
||||
},
|
||||
}
|
||||
|
||||
out, err := pl.Process(context.Background(), in)
|
||||
if err != nil {
|
||||
t.Fatalf("Pipeline.Process() unexpected error: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("Pipeline.Process() returned nil output")
|
||||
}
|
||||
if !reflect.DeepEqual(*out, in) {
|
||||
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
|
||||
}
|
||||
}
|
||||
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
||||
|
||||
go 1.25
|
||||
|
||||
require gitea.maximumdirect.net/ejr/feedkit v0.4.1
|
||||
require gitea.maximumdirect.net/ejr/feedkit v0.6.0
|
||||
|
||||
require (
|
||||
github.com/klauspost/compress v1.17.2 // indirect
|
||||
|
||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.4.1 h1:mMFtPCBKp2LXV3euPH21WzjHku/HHx31KQqYW+w1aqU=
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.4.1/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.6.0 h1:GXwyNKvPp1sWN8TS5E5NDGFgimpyHlzerO5E+/qoTXg=
|
||||
gitea.maximumdirect.net/ejr/feedkit v0.6.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
||||
|
||||
@@ -16,22 +16,20 @@ import (
|
||||
// - sources are built by name (cfg.Driver -> factory)
|
||||
// - normalizers are selected by Match() (event.Schema -> first match wins)
|
||||
//
|
||||
// Registration order matters because feedkit normalize.Registry is “first match wins”.
|
||||
// Registration order matters because feedkit normalize.Processor is "first match wins".
|
||||
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
|
||||
// we still keep ordering stable as a best practice.
|
||||
//
|
||||
// If reg is nil, this function is a no-op.
|
||||
func RegisterBuiltins(reg *fknormalize.Registry) {
|
||||
if reg == nil {
|
||||
return
|
||||
}
|
||||
func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||
out := in
|
||||
|
||||
// Keep this intentionally boring: delegate registration to provider subpackages
|
||||
// so main.go stays clean and each provider owns its own mapping logic.
|
||||
//
|
||||
// Order here should be stable across releases to reduce surprises when adding
|
||||
// new normalizers.
|
||||
nws.Register(reg)
|
||||
openmeteo.Register(reg)
|
||||
openweather.Register(reg)
|
||||
out = nws.Register(out)
|
||||
out = openmeteo.Register(out)
|
||||
out = openweather.Register(out)
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
42
internal/normalizers/builtins_test.go
Normal file
42
internal/normalizers/builtins_test.go
Normal file
@@ -0,0 +1,42 @@
|
||||
package normalizers
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
|
||||
)
|
||||
|
||||
func TestRegisterBuiltinsOrder(t *testing.T) {
|
||||
got := RegisterBuiltins(nil)
|
||||
if len(got) == 0 {
|
||||
t.Fatalf("RegisterBuiltins() returned no normalizers")
|
||||
}
|
||||
|
||||
want := []fknormalize.Normalizer{
|
||||
nws.ObservationNormalizer{},
|
||||
nws.ForecastNormalizer{},
|
||||
nws.AlertsNormalizer{},
|
||||
openmeteo.ObservationNormalizer{},
|
||||
openmeteo.ForecastNormalizer{},
|
||||
openweather.ObservationNormalizer{},
|
||||
}
|
||||
|
||||
if len(got) != len(want) {
|
||||
t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got))
|
||||
}
|
||||
|
||||
for i := range want {
|
||||
if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) {
|
||||
t.Fatalf(
|
||||
"RegisterBuiltins() order mismatch at index %d: got %T, want %T",
|
||||
i,
|
||||
got[i],
|
||||
want[i],
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -8,7 +8,7 @@
|
||||
// transforming provider-specific raw payloads into canonical internal models.
|
||||
//
|
||||
// This package is domain code (weatherfeeder). feedkit’s normalize package is
|
||||
// infrastructure (registry + processor).
|
||||
// infrastructure (normalizer contracts + processor).
|
||||
//
|
||||
// Directory layout (required)
|
||||
// ---------------------------
|
||||
@@ -136,21 +136,22 @@
|
||||
//
|
||||
// Registration pattern
|
||||
// --------------------
|
||||
// feedkit normalization uses a match-driven registry (“first match wins”).
|
||||
// feedkit normalization uses an ordered normalizer list ("first match wins").
|
||||
//
|
||||
// Provider subpackages should expose:
|
||||
//
|
||||
// func Register(reg *normalize.Registry)
|
||||
// func Register(in []normalize.Normalizer) []normalize.Normalizer
|
||||
//
|
||||
// And internal/normalizers/builtins.go should provide one entrypoint:
|
||||
//
|
||||
// func RegisterBuiltins(reg *normalize.Registry)
|
||||
// func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer
|
||||
//
|
||||
// which calls each provider’s Register() in a stable order.
|
||||
// which appends each provider's normalizers in a stable order and is then passed
|
||||
// to normalize.NewProcessor(...).
|
||||
//
|
||||
// Registry ordering
|
||||
// Normalizer ordering
|
||||
// -----------------------------
|
||||
// feedkit normalization uses a match-driven registry (“first match wins”).
|
||||
// feedkit normalization is "first match wins" by list order.
|
||||
// Therefore order matters:
|
||||
//
|
||||
// - Register more specific normalizers before more general ones.
|
||||
|
||||
@@ -11,8 +11,8 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// AlertsNormalizer converts:
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ForecastNormalizer converts:
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ObservationNormalizer converts:
|
||||
|
||||
@@ -5,18 +5,18 @@ import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
)
|
||||
|
||||
// Register registers NWS normalizers into the provided registry.
|
||||
func Register(reg *fknormalize.Registry) {
|
||||
if reg == nil {
|
||||
return
|
||||
}
|
||||
// Register appends NWS normalizers in stable order.
|
||||
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||
out := in
|
||||
|
||||
// Observations
|
||||
reg.Register(ObservationNormalizer{})
|
||||
out = append(out, ObservationNormalizer{})
|
||||
|
||||
// Forecasts
|
||||
reg.Register(ForecastNormalizer{})
|
||||
out = append(out, ForecastNormalizer{})
|
||||
|
||||
// Alerts
|
||||
reg.Register(AlertsNormalizer{})
|
||||
out = append(out, AlertsNormalizer{})
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -9,8 +9,8 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ForecastNormalizer converts:
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ObservationNormalizer converts:
|
||||
|
||||
@@ -5,14 +5,14 @@ import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
)
|
||||
|
||||
// Register registers Open-Meteo normalizers into the provided registry.
|
||||
func Register(reg *fknormalize.Registry) {
|
||||
if reg == nil {
|
||||
return
|
||||
}
|
||||
// Register appends Open-Meteo normalizers in stable order.
|
||||
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||
out := in
|
||||
|
||||
// Observations
|
||||
reg.Register(ObservationNormalizer{})
|
||||
out = append(out, ObservationNormalizer{})
|
||||
// Forecasts
|
||||
reg.Register(ForecastNormalizer{})
|
||||
out = append(out, ForecastNormalizer{})
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ObservationNormalizer converts:
|
||||
|
||||
@@ -5,12 +5,12 @@ import (
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
|
||||
)
|
||||
|
||||
// Register registers OpenWeather normalizers into the provided registry.
|
||||
func Register(reg *fknormalize.Registry) {
|
||||
if reg == nil {
|
||||
return
|
||||
}
|
||||
// Register appends OpenWeather normalizers in stable order.
|
||||
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
|
||||
out := in
|
||||
|
||||
// Observations
|
||||
reg.Register(ObservationNormalizer{})
|
||||
out = append(out, ObservationNormalizer{})
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
@@ -13,26 +13,26 @@ import (
|
||||
// Keeping this in one place makes main.go very readable.
|
||||
func RegisterBuiltins(r *fksource.Registry) {
|
||||
// NWS drivers
|
||||
r.Register("nws_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
|
||||
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return nws.NewObservationSource(cfg)
|
||||
})
|
||||
r.Register("nws_alerts", func(cfg config.SourceConfig) (fksource.Source, error) {
|
||||
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return nws.NewAlertsSource(cfg)
|
||||
})
|
||||
r.Register("nws_forecast", func(cfg config.SourceConfig) (fksource.Source, error) {
|
||||
r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return nws.NewForecastSource(cfg)
|
||||
})
|
||||
|
||||
// Open-Meteo drivers
|
||||
r.Register("openmeteo_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
|
||||
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return openmeteo.NewObservationSource(cfg)
|
||||
})
|
||||
r.Register("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.Source, error) {
|
||||
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return openmeteo.NewForecastSource(cfg)
|
||||
})
|
||||
|
||||
// OpenWeatherMap drivers
|
||||
r.Register("openweather_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
|
||||
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||
return openweather.NewObservationSource(cfg)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event.
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event.
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"gitea.maximumdirect.net/ejr/feedkit/event"
|
||||
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
|
||||
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
|
||||
)
|
||||
|
||||
type ObservationSource struct {
|
||||
|
||||
@@ -1,127 +0,0 @@
|
||||
package standards
|
||||
|
||||
import "gitea.maximumdirect.net/ejr/weatherfeeder/model"
|
||||
|
||||
// This file provides small, shared helper functions for reasoning about WMO codes.
|
||||
// These are intentionally "coarse" categories that are useful for business logic,
|
||||
// dashboards, and alerting decisions.
|
||||
//
|
||||
// Example uses:
|
||||
// - jogging suitability: precipitation? thunderstorm? freezing precip?
|
||||
// - quick glance: "is it cloudy?" "is there any precip?"
|
||||
// - downstream normalizers / aggregators
|
||||
|
||||
func IsThunderstorm(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 95, 96, 99:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsHail(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 96, 99:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsFog(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 45, 48:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsPrecipitation returns true if the code represents any precipitation
|
||||
// (drizzle, rain, snow, showers, etc.).
|
||||
func IsPrecipitation(code model.WMOCode) bool {
|
||||
switch code {
|
||||
// Drizzle
|
||||
case 51, 53, 55, 56, 57:
|
||||
return true
|
||||
|
||||
// Rain
|
||||
case 61, 63, 65, 66, 67:
|
||||
return true
|
||||
|
||||
// Snow
|
||||
case 71, 73, 75, 77:
|
||||
return true
|
||||
|
||||
// Showers
|
||||
case 80, 81, 82, 85, 86:
|
||||
return true
|
||||
|
||||
// Thunderstorm (often includes rain/hail)
|
||||
case 95, 96, 99:
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsRainFamily(code model.WMOCode) bool {
|
||||
switch code {
|
||||
// Drizzle + freezing drizzle
|
||||
case 51, 53, 55, 56, 57:
|
||||
return true
|
||||
|
||||
// Rain + freezing rain
|
||||
case 61, 63, 65, 66, 67:
|
||||
return true
|
||||
|
||||
// Rain showers
|
||||
case 80, 81, 82:
|
||||
return true
|
||||
|
||||
// Thunderstorm often implies rain
|
||||
case 95, 96, 99:
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsSnowFamily(code model.WMOCode) bool {
|
||||
switch code {
|
||||
// Snow and related
|
||||
case 71, 73, 75, 77:
|
||||
return true
|
||||
|
||||
// Snow showers
|
||||
case 85, 86:
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsFreezingPrecip returns true if the code represents freezing drizzle/rain.
|
||||
func IsFreezingPrecip(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 56, 57, 66, 67:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsSkyOnly returns true for codes that represent "sky condition only"
|
||||
// (clear/mostly/partly/cloudy) rather than fog/precip/etc.
|
||||
func IsSkyOnly(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 0, 1, 2, 3:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,7 @@
|
||||
// -----------------------
|
||||
// For readability and stability, canonical payloads (weather.* schemas) should not emit
|
||||
// noisy floating-point representations. weatherfeeder enforces this by rounding float
|
||||
// values in canonical payloads to 2 digits after the decimal point at normalization
|
||||
// values in canonical payloads to 4 digits after the decimal point at normalization
|
||||
// finalization time.
|
||||
//
|
||||
// Provider-specific decoding helpers and quirks live in internal/providers/<provider>.
|
||||
@@ -1,5 +1,14 @@
|
||||
package standards
|
||||
|
||||
// This file provides small, shared helper functions for reasoning about WMO codes.
|
||||
// These are intentionally "coarse" categories that are useful for business logic,
|
||||
// dashboards, and alerting decisions.
|
||||
//
|
||||
// Example uses:
|
||||
// - jogging suitability: precipitation? thunderstorm? freezing precip?
|
||||
// - quick glance: "is it cloudy?" "is there any precip?"
|
||||
// - downstream normalizers / aggregators
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
@@ -100,3 +109,118 @@ func IsKnownWMO(code model.WMOCode) bool {
|
||||
_, ok := WMODescriptions[code]
|
||||
return ok
|
||||
}
|
||||
|
||||
func IsThunderstorm(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 95, 96, 99:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsHail(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 96, 99:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsFog(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 45, 48:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsPrecipitation returns true if the code represents any precipitation
|
||||
// (drizzle, rain, snow, showers, etc.).
|
||||
func IsPrecipitation(code model.WMOCode) bool {
|
||||
switch code {
|
||||
// Drizzle
|
||||
case 51, 53, 55, 56, 57:
|
||||
return true
|
||||
|
||||
// Rain
|
||||
case 61, 63, 65, 66, 67:
|
||||
return true
|
||||
|
||||
// Snow
|
||||
case 71, 73, 75, 77:
|
||||
return true
|
||||
|
||||
// Showers
|
||||
case 80, 81, 82, 85, 86:
|
||||
return true
|
||||
|
||||
// Thunderstorm (often includes rain/hail)
|
||||
case 95, 96, 99:
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsRainFamily(code model.WMOCode) bool {
|
||||
switch code {
|
||||
// Drizzle + freezing drizzle
|
||||
case 51, 53, 55, 56, 57:
|
||||
return true
|
||||
|
||||
// Rain + freezing rain
|
||||
case 61, 63, 65, 66, 67:
|
||||
return true
|
||||
|
||||
// Rain showers
|
||||
case 80, 81, 82:
|
||||
return true
|
||||
|
||||
// Thunderstorm often implies rain
|
||||
case 95, 96, 99:
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func IsSnowFamily(code model.WMOCode) bool {
|
||||
switch code {
|
||||
// Snow and related
|
||||
case 71, 73, 75, 77:
|
||||
return true
|
||||
|
||||
// Snow showers
|
||||
case 85, 86:
|
||||
return true
|
||||
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsFreezingPrecip returns true if the code represents freezing drizzle/rain.
|
||||
func IsFreezingPrecip(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 56, 57, 66, 67:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// IsSkyOnly returns true for codes that represent "sky condition only"
|
||||
// (clear/mostly/partly/cloudy) rather than fog/precip/etc.
|
||||
func IsSkyOnly(code model.WMOCode) bool {
|
||||
switch code {
|
||||
case 0, 1, 2, 3:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user