11 Commits

Author SHA1 Message Date
d0b58a4734 Updates to track feedkit v0.7.2 and to add a dedupe processor
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-16 18:35:44 -05:00
6cd823f528 Update go.mod
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-16 15:37:31 -05:00
84da2bb689 Added a postgres sink implementation.
Some checks failed
ci/woodpecker/push/build-image Pipeline failed
2026-03-16 15:32:46 -05:00
859ee9dd5c Updated go.sum
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-16 13:37:49 -05:00
ea113e2dcc Updated processor/normalizer wiring to track Feedkit v0.7.0
Some checks failed
ci/woodpecker/push/build-image Pipeline failed
2026-03-16 13:35:51 -05:00
38bc162918 Updated go.sum
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-03-15 19:26:51 -05:00
eae9568afe Updated source configuration to track Feedkit v0.6.0
Some checks failed
ci/woodpecker/push/build-image Pipeline failed
2026-03-15 19:22:57 -05:00
f464592c56 Updates to accommodate the new upstream version of feedkit (v0.5.0), which now supports both polling sources and streaming sources.
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-02-08 15:05:53 -06:00
123e8ff763 Moved the standards package out of internal/ so it can be imported by downstream consumers.
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-02-08 09:15:07 -06:00
5923592b53 Moved the weatherfeeder model out of internal/ so that downstream consumers can import it directly.
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
2026-02-08 08:56:16 -06:00
c96a6bb78b Remove Go test files from .dockerignore; this will enable the CI system to run tests before building. 2026-02-08 08:55:35 -06:00
42 changed files with 1520 additions and 236 deletions

View File

@@ -1,6 +1,5 @@
.git
.gitignore
**/*_test.go
**/*.md
dist/
tmp/

View File

@@ -1,7 +1,8 @@
---
sources:
- name: NWSObservationKSTL
kind: observation
mode: poll
kinds: ["observation"]
driver: nws_observation
every: 10m
params:
@@ -9,7 +10,8 @@ sources:
user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: OpenMeteoObservation
# kind: observation
# mode: poll
# kinds: ["observation"]
# driver: openmeteo_observation
# every: 10m
# params:
@@ -17,7 +19,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: OpenWeatherObservation
# kind: observation
# mode: poll
# kinds: ["observation"]
# driver: openweather_observation
# every: 10m
# params:
@@ -25,7 +28,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSObservationKSUS
# kind: observation
# mode: poll
# kinds: ["observation"]
# driver: nws_observation
# every: 10m
# params:
@@ -33,7 +37,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSObservationKCPS
# kind: observation
# mode: poll
# kinds: ["observation"]
# driver: nws_observation
# every: 10m
# params:
@@ -41,7 +46,8 @@ sources:
# user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSHourlyForecastSTL
kind: forecast
mode: poll
kinds: ["forecast"]
driver: nws_forecast
every: 45m
params:
@@ -49,7 +55,8 @@ sources:
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenMeteoHourlyForecastSTL
kind: forecast
mode: poll
kinds: ["forecast"]
driver: openmeteo_forecast
every: 60m
params:
@@ -57,7 +64,8 @@ sources:
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: NWSAlertsSTL
kind: alert
mode: poll
kinds: ["alert"]
driver: nws_alerts
every: 1m
params:

View File

@@ -7,6 +7,7 @@ import (
"log"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"
@@ -14,16 +15,21 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
wfpgsink "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sinks/postgres"
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
)
const dedupeMaxEntries = 2048
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
@@ -35,6 +41,9 @@ func main() {
if err != nil {
log.Fatalf("config load failed: %v", err)
}
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
log.Fatalf("postgres schema registration failed: %v", err)
}
// --- Registries ---
srcReg := fksources.NewRegistry()
@@ -51,30 +60,30 @@ func main() {
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewNATSSinkFromConfig(cfg)
})
// --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job
for i, sc := range cfg.Sources {
src, err := srcReg.Build(sc)
in, err := srcReg.BuildInput(sc) // may be polling or streaming
if err != nil {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
// Optional safety: if config.kind is set, ensure it matches the source.Kind().
if strings.TrimSpace(sc.Kind) != "" {
expectedKind, err := fkevent.ParseKind(sc.Kind)
if err != nil {
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
}
if src.Kind() != expectedKind {
log.Fatalf(
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
)
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
// If this is a polling source, every is required.
if _, ok := in.(fksources.PollSource); ok && sc.Every.Duration <= 0 {
log.Fatalf(
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
i, sc.Name, sc.Driver,
)
}
// For stream sources, Every is ignored; it is fine if omitted/zero.
jobs = append(jobs, fkscheduler.Job{
Source: src,
Source: in,
Every: sc.Every.Duration,
})
}
@@ -97,20 +106,28 @@ func main() {
events := make(chan fkevent.Event, 256)
// --- Normalization (optional) ---
// --- Processors ---
//
// We install feedkit's normalize.Processor even before any normalizers exist.
// With an empty registry and RequireMatch=false, this is a no-op passthrough.
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and
// 2) matching normalizers are registered.
normReg := &fknormalize.Registry{}
wfnormalizers.RegisterBuiltins(normReg)
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
if err != nil {
log.Fatalf("build processor chain failed: %v", err)
}
pl := &fkpipeline.Pipeline{
Processors: []fkpipeline.Processor{
fknormalize.Processor{Registry: normReg},
},
Processors: chain,
}
s := &fkscheduler.Scheduler{
@@ -193,5 +210,74 @@ func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
if err != nil {
return err
}
if len(expectedKinds) == 0 {
return nil
}
advertisedKinds := advertisedSourceKinds(in)
if len(advertisedKinds) == 0 {
return nil
}
for kind := range expectedKinds {
if !advertisedKinds[kind] {
return fmt.Errorf(
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
kind,
sortedKinds(expectedKinds),
sortedKinds(advertisedKinds),
)
}
}
return nil
}
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
kinds := map[fkevent.Kind]bool{}
for i, k := range raw {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
}
kinds[kind] = true
}
return kinds, nil
}
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
if in == nil {
return nil
}
kinds := map[fkevent.Kind]bool{}
if ks, ok := in.(fksources.KindsSource); ok {
for _, kind := range ks.Kinds() {
kinds[kind] = true
}
return kinds
}
if ks, ok := in.(fksources.KindSource); ok {
kinds[ks.Kind()] = true
return kinds
}
return nil
}
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
out := make([]string, 0, len(kindSet))
for kind := range kindSet {
out = append(out, string(kind))
}
sort.Strings(out)
return out
}
// keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second

View File

@@ -0,0 +1,191 @@
package main
import (
"context"
"reflect"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
)
type testInput struct {
name string
}
func (s testInput) Name() string { return s.name }
type testKindSource struct {
testInput
kind fkevent.Kind
}
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
type testKindsSource struct {
testInput
kinds []fkevent.Kind
}
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
sc := config.SourceConfig{Kind: "observation"}
in := testKindSource{
testInput: testInput{name: "test"},
kind: fkevent.Kind("observation"),
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"observation"}}
in := testKindsSource{
testInput: testInput{name: "test"},
kinds: []fkevent.Kind{"observation", "forecast"},
}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testKindsSource{
testInput: testInput{name: "test"},
kinds: []fkevent.Kind{"observation", "forecast"},
}
err := validateSourceExpectedKinds(sc, in)
if err == nil {
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil")
}
if !strings.Contains(err.Error(), "configured expected kind") {
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err)
}
}
func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
sc := config.SourceConfig{Kinds: []string{"alert"}}
in := testInput{name: "test"}
if err := validateSourceExpectedKinds(sc, in); err != nil {
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
}
}
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
if _, err := parseExpectedKinds([]string{""}); err == nil {
t.Fatalf("parseExpectedKinds() expected error for empty kind")
}
}
func TestExampleConfigLoads(t *testing.T) {
if _, err := config.Load("config.yml"); err != nil {
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
}
}
func TestProcessorRegistryBuildsNormalizeThenDedupeChain(t *testing.T) {
chain, err := buildProcessorChainForTests()
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
if len(chain) != 2 {
t.Fatalf("BuildChain() expected 2 processors, got %d", len(chain))
}
pl := &fkpipeline.Pipeline{Processors: chain}
if len(pl.Processors) != 2 {
t.Fatalf("pipeline expected 2 processors, got %d", len(pl.Processors))
}
}
func TestNormalizeNoMatchPassThrough(t *testing.T) {
chain, err := buildProcessorChainForTests()
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
pl := &fkpipeline.Pipeline{Processors: chain}
in := fkevent.Event{
ID: "evt-no-match",
Kind: fkevent.Kind("observation"),
Source: "test",
EmittedAt: time.Now().UTC(),
Schema: "raw.weatherfeeder.unknown.v1",
Payload: map[string]any{
"ok": true,
},
}
out, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("Pipeline.Process() unexpected error: %v", err)
}
if out == nil {
t.Fatalf("Pipeline.Process() returned nil output")
}
if !reflect.DeepEqual(*out, in) {
t.Fatalf("Pipeline.Process() expected passthrough output, got %#v", *out)
}
}
func TestDedupeDropsSecondEventWithSameID(t *testing.T) {
chain, err := buildProcessorChainForTests()
if err != nil {
t.Fatalf("BuildChain() unexpected error: %v", err)
}
pl := &fkpipeline.Pipeline{Processors: chain}
in := fkevent.Event{
ID: "evt-dedupe-1",
Kind: fkevent.Kind("observation"),
Source: "test",
EmittedAt: time.Now().UTC(),
Schema: "raw.weatherfeeder.unknown.v1",
Payload: map[string]any{
"ok": true,
},
}
first, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("first Pipeline.Process() unexpected error: %v", err)
}
if first == nil {
t.Fatalf("first Pipeline.Process() unexpectedly dropped event")
}
second, err := pl.Process(context.Background(), in)
if err != nil {
t.Fatalf("second Pipeline.Process() unexpected error: %v", err)
}
if second != nil {
t.Fatalf("second Pipeline.Process() expected dedupe drop, got %#v", *second)
}
}
func buildProcessorChainForTests() ([]fkprocessors.Processor, error) {
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
return procReg.BuildChain([]string{"normalize", "dedupe"})
}

3
go.mod
View File

@@ -2,10 +2,11 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
go 1.25
require gitea.maximumdirect.net/ejr/feedkit v0.4.1
require gitea.maximumdirect.net/ejr/feedkit v0.7.2
require (
github.com/klauspost/compress v1.17.2 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/nats-io/nats.go v1.34.0 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect

6
go.sum
View File

@@ -1,7 +1,9 @@
gitea.maximumdirect.net/ejr/feedkit v0.4.1 h1:mMFtPCBKp2LXV3euPH21WzjHku/HHx31KQqYW+w1aqU=
gitea.maximumdirect.net/ejr/feedkit v0.4.1/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
gitea.maximumdirect.net/ejr/feedkit v0.7.2 h1:hTg302SgSi7tw11lNzuc+3g7MvHT6jQQziuo2NoARt8=
gitea.maximumdirect.net/ejr/feedkit v0.7.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
github.com/nats-io/nats.go v1.34.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=

View File

@@ -2,7 +2,7 @@
package normalizers
import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
@@ -16,22 +16,20 @@ import (
// - sources are built by name (cfg.Driver -> factory)
// - normalizers are selected by Match() (event.Schema -> first match wins)
//
// Registration order matters because feedkit normalize.Registry is first match wins.
// Registration order matters because feedkit normalize.Processor is "first match wins".
// In weatherfeeder we avoid ambiguity by matching strictly on schema constants, but
// we still keep ordering stable as a best practice.
//
// If reg is nil, this function is a no-op.
func RegisterBuiltins(reg *fknormalize.Registry) {
if reg == nil {
return
}
func RegisterBuiltins(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Keep this intentionally boring: delegate registration to provider subpackages
// so main.go stays clean and each provider owns its own mapping logic.
//
// Order here should be stable across releases to reduce surprises when adding
// new normalizers.
nws.Register(reg)
openmeteo.Register(reg)
openweather.Register(reg)
out = nws.Register(out)
out = openmeteo.Register(out)
out = openweather.Register(out)
return out
}

View File

@@ -0,0 +1,42 @@
package normalizers
import (
"reflect"
"testing"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/openweather"
)
func TestRegisterBuiltinsOrder(t *testing.T) {
got := RegisterBuiltins(nil)
if len(got) == 0 {
t.Fatalf("RegisterBuiltins() returned no normalizers")
}
want := []fknormalize.Normalizer{
nws.ObservationNormalizer{},
nws.ForecastNormalizer{},
nws.AlertsNormalizer{},
openmeteo.ObservationNormalizer{},
openmeteo.ForecastNormalizer{},
openweather.ObservationNormalizer{},
}
if len(got) != len(want) {
t.Fatalf("RegisterBuiltins() expected %d normalizers, got %d", len(want), len(got))
}
for i := range want {
if reflect.TypeOf(got[i]) != reflect.TypeOf(want[i]) {
t.Fatalf(
"RegisterBuiltins() order mismatch at index %d: got %T, want %T",
i,
got[i],
want[i],
)
}
}
}

View File

@@ -4,7 +4,7 @@ package common
import (
"strings"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
)
// WMOFromTextDescription is a cross-provider fallback that tries to infer a WMO code

View File

@@ -8,7 +8,7 @@
// transforming provider-specific raw payloads into canonical internal models.
//
// This package is domain code (weatherfeeder). feedkits normalize package is
// infrastructure (registry + processor).
// infrastructure (normalizer contracts + processor).
//
// Directory layout (required)
// ---------------------------
@@ -29,7 +29,7 @@
//
// 1. One normalizer per file.
// Each file contains exactly one Normalizer implementation (one type that
// satisfies feedkit/normalize.Normalizer).
// satisfies feedkit/processors/normalize.Normalizer).
// Helper files are encouraged (types.go, common.go, mapping.go, etc.) as long
// as they do not define additional Normalizer types.
//
@@ -136,21 +136,22 @@
//
// Registration pattern
// --------------------
// feedkit normalization uses a match-driven registry (first match wins).
// feedkit normalization uses an ordered normalizer list ("first match wins").
//
// Provider subpackages should expose:
//
// func Register(reg *normalize.Registry)
// func Register(in []normalize.Normalizer) []normalize.Normalizer
//
// And internal/normalizers/builtins.go should provide one entrypoint:
//
// func RegisterBuiltins(reg *normalize.Registry)
// func RegisterBuiltins(in []normalize.Normalizer) []normalize.Normalizer
//
// which calls each providers Register() in a stable order.
// which appends each provider's normalizers in a stable order and is then passed
// to normalize.NewProcessor(...).
//
// Registry ordering
// Normalizer ordering
// -----------------------------
// feedkit normalization uses a match-driven registry (“first match wins”).
// feedkit normalization is "first match wins" by list order.
// Therefore order matters:
//
// - Register more specific normalizers before more general ones.

View File

@@ -9,10 +9,10 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// AlertsNormalizer converts:

View File

@@ -8,10 +8,10 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastNormalizer converts:

View File

@@ -6,8 +6,8 @@ import (
"strings"
"unicode"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
)
// centroidLatLon returns a best-effort centroid (lat, lon) from a GeoJSON polygon.

View File

@@ -8,10 +8,10 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationNormalizer converts:

View File

@@ -2,21 +2,21 @@
package nws
import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
)
// Register registers NWS normalizers into the provided registry.
func Register(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Register appends NWS normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Observations
reg.Register(ObservationNormalizer{})
out = append(out, ObservationNormalizer{})
// Forecasts
reg.Register(ForecastNormalizer{})
out = append(out, ForecastNormalizer{})
// Alerts
reg.Register(AlertsNormalizer{})
out = append(out, AlertsNormalizer{})
return out
}

View File

@@ -4,8 +4,8 @@ package nws
import (
"strings"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
)
// mapNWSToWMO maps NWS signals into a canonical WMO code.

View File

@@ -7,10 +7,10 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastNormalizer converts:

View File

@@ -8,10 +8,10 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
omcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationNormalizer converts:

View File

@@ -2,17 +2,17 @@
package openmeteo
import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
)
// Register registers Open-Meteo normalizers into the provided registry.
func Register(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Register appends Open-Meteo normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Observations
reg.Register(ObservationNormalizer{})
out = append(out, ObservationNormalizer{})
// Forecasts
reg.Register(ForecastNormalizer{})
out = append(out, ForecastNormalizer{})
return out
}

View File

@@ -7,9 +7,9 @@ import (
"time"
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
normcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationNormalizer converts:
@@ -121,7 +121,7 @@ func buildObservation(parsed owmResponse) (model.WeatherObservation, time.Time,
TextDescription: canonicalText,
IconURL: iconURL,
TemperatureC: &tempC,
TemperatureC: &tempC,
ApparentTemperatureC: apparentC,
WindDirectionDegrees: parsed.Wind.Deg,

View File

@@ -2,15 +2,15 @@
package openweather
import (
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
)
// Register registers OpenWeather normalizers into the provided registry.
func Register(reg *fknormalize.Registry) {
if reg == nil {
return
}
// Register appends OpenWeather normalizers in stable order.
func Register(in []fknormalize.Normalizer) []fknormalize.Normalizer {
out := in
// Observations
reg.Register(ObservationNormalizer{})
out = append(out, ObservationNormalizer{})
return out
}

View File

@@ -1,7 +1,7 @@
// FILE: ./internal/normalizers/openweather/wmo_map.go
package openweather
import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
import "gitea.maximumdirect.net/ejr/weatherfeeder/model"
// mapOpenWeatherToWMO maps OpenWeather weather condition IDs into weatherfeeder's
// canonical WMO code vocabulary.

View File

@@ -0,0 +1,343 @@
package postgres
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func mapPostgresEvent(_ context.Context, e fkevent.Event) ([]fksinks.PostgresWrite, error) {
schema := strings.TrimSpace(e.Schema)
switch schema {
case standards.SchemaWeatherObservationV1:
return mapObservationEvent(e)
case standards.SchemaWeatherForecastV1:
return mapForecastEvent(e)
case standards.SchemaWeatherAlertV1:
return mapAlertEvent(e)
default:
return nil, nil
}
}
func mapObservationEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
obs, err := decodePayload[model.WeatherObservation](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode observation payload: %w", err)
}
if obs.Timestamp.IsZero() {
return nil, fmt.Errorf("decode observation payload: timestamp is required")
}
observedAt := obs.Timestamp.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(obs.CloudLayers)+len(obs.PresentWeather))
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservations,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"station_id": nullableString(obs.StationID),
"station_name": nullableString(obs.StationName),
"observed_at": observedAt,
"condition_code": int(obs.ConditionCode),
"condition_text": nullableString(obs.ConditionText),
"is_day": nullableBool(obs.IsDay),
"provider_raw_description": nullableString(obs.ProviderRawDescription),
"text_description": nullableString(obs.TextDescription),
"icon_url": nullableString(obs.IconURL),
"temperature_c": nullableFloat64(obs.TemperatureC),
"dewpoint_c": nullableFloat64(obs.DewpointC),
"wind_direction_degrees": nullableFloat64(obs.WindDirectionDegrees),
"wind_speed_kmh": nullableFloat64(obs.WindSpeedKmh),
"wind_gust_kmh": nullableFloat64(obs.WindGustKmh),
"barometric_pressure_pa": nullableFloat64(obs.BarometricPressurePa),
"sea_level_pressure_pa": nullableFloat64(obs.SeaLevelPressurePa),
"visibility_meters": nullableFloat64(obs.VisibilityMeters),
"relative_humidity_percent": nullableFloat64(obs.RelativeHumidityPercent),
"apparent_temperature_c": nullableFloat64(obs.ApparentTemperatureC),
"elevation_meters": nullableFloat64(obs.ElevationMeters),
"raw_message": nullableString(obs.RawMessage),
},
})
for i, cl := range obs.CloudLayers {
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservationCloudLayers,
Values: map[string]any{
"event_id": e.ID,
"layer_index": i,
"observed_at": observedAt,
"base_meters": nullableFloat64(cl.BaseMeters),
"amount": nullableString(cl.Amount),
},
})
}
for i, pw := range obs.PresentWeather {
rawText, err := compactJSONText(pw.Raw)
if err != nil {
return nil, fmt.Errorf("observation presentWeather[%d].raw: %w", i, err)
}
writes = append(writes, fksinks.PostgresWrite{
Table: tableObservationPresentWeather,
Values: map[string]any{
"event_id": e.ID,
"weather_index": i,
"observed_at": observedAt,
"raw_text": rawText,
},
})
}
return writes, nil
}
func mapForecastEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherForecastRun](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode forecast payload: %w", err)
}
if run.IssuedAt.IsZero() {
return nil, fmt.Errorf("decode forecast payload: issuedAt is required")
}
if strings.TrimSpace(string(run.Product)) == "" {
return nil, fmt.Errorf("decode forecast payload: product is required")
}
issuedAt := run.IssuedAt.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Periods))
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecasts,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"location_id": nullableString(run.LocationID),
"location_name": nullableString(run.LocationName),
"issued_at": issuedAt,
"updated_at": nullableTime(run.UpdatedAt),
"product": string(run.Product),
"latitude": nullableFloat64(run.Latitude),
"longitude": nullableFloat64(run.Longitude),
"elevation_meters": nullableFloat64(run.ElevationMeters),
"period_count": len(run.Periods),
},
})
for i, p := range run.Periods {
if p.StartTime.IsZero() || p.EndTime.IsZero() {
return nil, fmt.Errorf("decode forecast payload: periods[%d] startTime/endTime are required", i)
}
writes = append(writes, fksinks.PostgresWrite{
Table: tableForecastPeriods,
Values: map[string]any{
"run_event_id": e.ID,
"period_index": i,
"issued_at": issuedAt,
"start_time": p.StartTime.UTC(),
"end_time": p.EndTime.UTC(),
"name": nullableString(p.Name),
"is_day": nullableBool(p.IsDay),
"condition_code": int(p.ConditionCode),
"condition_text": nullableString(p.ConditionText),
"provider_raw_description": nullableString(p.ProviderRawDescription),
"text_description": nullableString(p.TextDescription),
"detailed_text": nullableString(p.DetailedText),
"icon_url": nullableString(p.IconURL),
"temperature_c": nullableFloat64(p.TemperatureC),
"temperature_c_min": nullableFloat64(p.TemperatureCMin),
"temperature_c_max": nullableFloat64(p.TemperatureCMax),
"dewpoint_c": nullableFloat64(p.DewpointC),
"relative_humidity_percent": nullableFloat64(p.RelativeHumidityPercent),
"wind_direction_degrees": nullableFloat64(p.WindDirectionDegrees),
"wind_speed_kmh": nullableFloat64(p.WindSpeedKmh),
"wind_gust_kmh": nullableFloat64(p.WindGustKmh),
"barometric_pressure_pa": nullableFloat64(p.BarometricPressurePa),
"visibility_meters": nullableFloat64(p.VisibilityMeters),
"apparent_temperature_c": nullableFloat64(p.ApparentTemperatureC),
"cloud_cover_percent": nullableFloat64(p.CloudCoverPercent),
"probability_of_precipitation_percent": nullableFloat64(p.ProbabilityOfPrecipitationPercent),
"precipitation_amount_mm": nullableFloat64(p.PrecipitationAmountMm),
"snowfall_depth_mm": nullableFloat64(p.SnowfallDepthMM),
"uv_index": nullableFloat64(p.UVIndex),
},
})
}
return writes, nil
}
func mapAlertEvent(e fkevent.Event) ([]fksinks.PostgresWrite, error) {
run, err := decodePayload[model.WeatherAlertRun](e.Payload)
if err != nil {
return nil, fmt.Errorf("decode alert payload: %w", err)
}
if run.AsOf.IsZero() {
return nil, fmt.Errorf("decode alert payload: asOf is required")
}
asOf := run.AsOf.UTC()
writes := make([]fksinks.PostgresWrite, 0, 1+len(run.Alerts)+countAlertReferences(run.Alerts))
writes = append(writes, fksinks.PostgresWrite{
Table: tableAlertRuns,
Values: map[string]any{
"event_id": e.ID,
"event_kind": string(e.Kind),
"event_source": e.Source,
"event_schema": e.Schema,
"event_emitted_at": e.EmittedAt.UTC(),
"event_effective_at": nullableTime(e.EffectiveAt),
"location_id": nullableString(run.LocationID),
"location_name": nullableString(run.LocationName),
"as_of": asOf,
"latitude": nullableFloat64(run.Latitude),
"longitude": nullableFloat64(run.Longitude),
"alert_count": len(run.Alerts),
},
})
for i, a := range run.Alerts {
if strings.TrimSpace(a.ID) == "" {
return nil, fmt.Errorf("decode alert payload: alerts[%d].id is required", i)
}
writes = append(writes, fksinks.PostgresWrite{
Table: tableAlerts,
Values: map[string]any{
"run_event_id": e.ID,
"alert_index": i,
"as_of": asOf,
"alert_id": a.ID,
"event": nullableString(a.Event),
"headline": nullableString(a.Headline),
"severity": nullableString(a.Severity),
"urgency": nullableString(a.Urgency),
"certainty": nullableString(a.Certainty),
"status": nullableString(a.Status),
"message_type": nullableString(a.MessageType),
"category": nullableString(a.Category),
"response": nullableString(a.Response),
"description": nullableString(a.Description),
"instruction": nullableString(a.Instruction),
"sent": nullableTime(a.Sent),
"effective": nullableTime(a.Effective),
"onset": nullableTime(a.Onset),
"expires": nullableTime(a.Expires),
"area_description": nullableString(a.AreaDescription),
"sender_name": nullableString(a.SenderName),
"reference_count": len(a.References),
},
})
for j, ref := range a.References {
writes = append(writes, fksinks.PostgresWrite{
Table: tableAlertReferences,
Values: map[string]any{
"run_event_id": e.ID,
"alert_index": i,
"reference_index": j,
"as_of": asOf,
"id": nullableString(ref.ID),
"identifier": nullableString(ref.Identifier),
"sender": nullableString(ref.Sender),
"sent": nullableTime(ref.Sent),
},
})
}
}
return writes, nil
}
func decodePayload[T any](payload any) (T, error) {
var out T
if payload == nil {
return out, fmt.Errorf("payload is nil")
}
if typed, ok := payload.(T); ok {
return typed, nil
}
if ptr, ok := payload.(*T); ok {
if ptr == nil {
return out, fmt.Errorf("payload pointer is nil")
}
return *ptr, nil
}
b, err := json.Marshal(payload)
if err != nil {
return out, fmt.Errorf("marshal payload: %w", err)
}
if err := json.Unmarshal(b, &out); err != nil {
return out, fmt.Errorf("unmarshal payload: %w", err)
}
return out, nil
}
func countAlertReferences(alerts []model.WeatherAlert) int {
total := 0
for _, a := range alerts {
total += len(a.References)
}
return total
}
func nullableString(s string) any {
if strings.TrimSpace(s) == "" {
return nil
}
return s
}
func nullableFloat64(v *float64) any {
if v == nil {
return nil
}
return *v
}
func nullableBool(v *bool) any {
if v == nil {
return nil
}
return *v
}
func nullableTime(v *time.Time) any {
if v == nil || v.IsZero() {
return nil
}
return v.UTC()
}
func compactJSONText(v any) (any, error) {
if v == nil {
return nil, nil
}
b, err := json.Marshal(v)
if err != nil {
return nil, err
}
if string(b) == "null" {
return nil, nil
}
return string(b), nil
}

View File

@@ -0,0 +1,256 @@
package postgres
import (
"context"
"encoding/json"
"strings"
"testing"
"time"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
func TestMapPostgresEventObservationStructPayload(t *testing.T) {
isDay := true
temp := 21.5
base := 1200.0
obs := model.WeatherObservation{
StationID: "KSTL",
StationName: "St. Louis",
Timestamp: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
ConditionCode: model.WMOCode(1),
ConditionText: "Mainly Sunny",
IsDay: &isDay,
ProviderRawDescription: "few clouds",
TextDescription: "Mainly Sunny",
IconURL: "https://example/icon.png",
TemperatureC: &temp,
CloudLayers: []model.CloudLayer{{BaseMeters: &base, Amount: "FEW"}},
PresentWeather: []model.PresentWeather{{Raw: map[string]any{"a": 1, "b": "x"}}},
}
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherObservationV1, "observation", obs))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 3 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
}
if writes[0].Table != tableObservations {
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableObservations)
}
if got := writes[0].Values["station_id"]; got != "KSTL" {
t.Fatalf("observations station_id = %#v, want KSTL", got)
}
if writes[1].Table != tableObservationCloudLayers {
t.Fatalf("writes[1].Table = %q, want %q", writes[1].Table, tableObservationCloudLayers)
}
if writes[2].Table != tableObservationPresentWeather {
t.Fatalf("writes[2].Table = %q, want %q", writes[2].Table, tableObservationPresentWeather)
}
if got := writes[2].Values["raw_text"]; got != `{"a":1,"b":"x"}` {
t.Fatalf("present_weather raw_text = %#v, want compact JSON", got)
}
assertAllWritesIncludeAllColumns(t, writes)
}
func TestMapPostgresEventForecastStructPayload(t *testing.T) {
isDay := true
temp := 10.5
run := model.WeatherForecastRun{
LocationID: "LOC-1",
LocationName: "St. Louis",
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
Product: model.ForecastProductHourly,
Periods: []model.WeatherForecastPeriod{
{
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
IsDay: &isDay,
ConditionCode: model.WMOCode(2),
ConditionText: "Partly Cloudy",
TemperatureC: &temp,
},
{
StartTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
EndTime: time.Date(2026, 3, 16, 21, 0, 0, 0, time.UTC),
ConditionCode: model.WMOCode(3),
},
},
}
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", run))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 3 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 3", len(writes))
}
if writes[0].Table != tableForecasts {
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
}
if got := writes[0].Values["period_count"]; got != 2 {
t.Fatalf("forecasts period_count = %#v, want 2", got)
}
if writes[1].Table != tableForecastPeriods || writes[2].Table != tableForecastPeriods {
t.Fatalf("forecast period writes not in expected order")
}
if got := writes[1].Values["period_index"]; got != 0 {
t.Fatalf("first period index = %#v, want 0", got)
}
assertAllWritesIncludeAllColumns(t, writes)
}
func TestMapPostgresEventAlertStructPayload(t *testing.T) {
sent := time.Date(2026, 3, 16, 17, 0, 0, 0, time.UTC)
run := model.WeatherAlertRun{
AsOf: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
Alerts: []model.WeatherAlert{
{
ID: "urn:alert:1",
Headline: "Winter Weather Advisory",
Severity: "Moderate",
References: []model.AlertReference{
{ID: "urn:ref:1", Sent: &sent},
{Identifier: "ref-two"},
},
},
{
ID: "urn:alert:2",
Headline: "Second alert",
},
},
}
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherAlertV1, "alert", run))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 5 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 5", len(writes))
}
counts := map[string]int{}
for _, w := range writes {
counts[w.Table]++
}
if counts[tableAlertRuns] != 1 || counts[tableAlerts] != 2 || counts[tableAlertReferences] != 2 {
t.Fatalf("unexpected table write counts: %#v", counts)
}
firstAlert, ok := firstWriteForTable(writes, tableAlerts)
if !ok {
t.Fatalf("missing alerts write")
}
if got := firstAlert.Values["reference_count"]; got != 2 {
t.Fatalf("alerts reference_count = %#v, want 2", got)
}
assertAllWritesIncludeAllColumns(t, writes)
}
func TestMapPostgresEventMapPayload(t *testing.T) {
run := model.WeatherForecastRun{
IssuedAt: time.Date(2026, 3, 16, 18, 0, 0, 0, time.UTC),
Product: model.ForecastProductHourly,
Periods: []model.WeatherForecastPeriod{
{
StartTime: time.Date(2026, 3, 16, 19, 0, 0, 0, time.UTC),
EndTime: time.Date(2026, 3, 16, 20, 0, 0, 0, time.UTC),
ConditionCode: model.WMOCode(2),
},
},
}
b, err := json.Marshal(run)
if err != nil {
t.Fatalf("json.Marshal() error = %v", err)
}
var payload map[string]any
if err := json.Unmarshal(b, &payload); err != nil {
t.Fatalf("json.Unmarshal() error = %v", err)
}
writes, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", payload))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 2 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 2", len(writes))
}
if writes[0].Table != tableForecasts {
t.Fatalf("writes[0].Table = %q, want %q", writes[0].Table, tableForecasts)
}
assertAllWritesIncludeAllColumns(t, writes)
}
func TestMapPostgresEventUnknownSchemaNoOp(t *testing.T) {
writes, err := mapPostgresEvent(context.Background(), testEvent("weather.unknown.v1", "observation", map[string]any{"x": 1}))
if err != nil {
t.Fatalf("mapPostgresEvent() error = %v", err)
}
if len(writes) != 0 {
t.Fatalf("mapPostgresEvent() writes len = %d, want 0", len(writes))
}
}
func TestMapPostgresEventMalformedPayload(t *testing.T) {
_, err := mapPostgresEvent(context.Background(), testEvent(standards.SchemaWeatherForecastV1, "forecast", "bad"))
if err == nil {
t.Fatalf("mapPostgresEvent() expected error for malformed payload")
}
if !strings.Contains(err.Error(), "decode forecast payload") {
t.Fatalf("error = %q, want decode forecast payload context", err)
}
}
func testEvent(schema string, kind fkevent.Kind, payload any) fkevent.Event {
effectiveAt := time.Date(2026, 3, 16, 18, 30, 0, 0, time.UTC)
return fkevent.Event{
ID: "evt-1",
Kind: kind,
Source: "test-source",
Schema: schema,
EmittedAt: time.Date(2026, 3, 16, 18, 31, 0, 0, time.UTC),
EffectiveAt: &effectiveAt,
Payload: payload,
}
}
func firstWriteForTable(writes []fksinks.PostgresWrite, table string) (fksinks.PostgresWrite, bool) {
for _, w := range writes {
if w.Table == table {
return w, true
}
}
return fksinks.PostgresWrite{}, false
}
func assertAllWritesIncludeAllColumns(t *testing.T, writes []fksinks.PostgresWrite) {
t.Helper()
colCounts := tableColumnCounts()
for i, w := range writes {
expectedCount, ok := colCounts[w.Table]
if !ok {
t.Fatalf("writes[%d] references unknown table %q", i, w.Table)
}
if len(w.Values) != expectedCount {
t.Fatalf("writes[%d] table=%q has %d values, want %d", i, w.Table, len(w.Values), expectedCount)
}
}
}
func tableColumnCounts() map[string]int {
s := weatherPostgresSchema()
m := make(map[string]int, len(s.Tables))
for _, tbl := range s.Tables {
m[tbl.Name] = len(tbl.Columns)
}
return m
}

View File

@@ -0,0 +1,264 @@
package postgres
import (
"fmt"
"strings"
"gitea.maximumdirect.net/ejr/feedkit/config"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
)
const (
tableObservations = "observations"
tableObservationCloudLayers = "observation_cloud_layers"
tableObservationPresentWeather = "observation_present_weather"
tableForecasts = "forecasts"
tableForecastPeriods = "forecast_periods"
tableAlertRuns = "alert_runs"
tableAlerts = "alerts"
tableAlertReferences = "alert_references"
)
// RegisterPostgresSchemas registers weatherfeeder's Postgres schema for each
// configured sink using driver=postgres.
func RegisterPostgresSchemas(cfg *config.Config) error {
if cfg == nil {
return fmt.Errorf("register postgres schemas: config is nil")
}
schema := weatherPostgresSchema()
for i, sk := range cfg.Sinks {
if !isPostgresDriver(sk.Driver) {
continue
}
if err := fksinks.RegisterPostgresSchema(sk.Name, schema); err != nil {
return fmt.Errorf("register postgres schema for sinks[%d] name=%q: %w", i, sk.Name, err)
}
}
return nil
}
func isPostgresDriver(driver string) bool {
return strings.EqualFold(strings.TrimSpace(driver), "postgres")
}
func weatherPostgresSchema() fksinks.PostgresSchema {
return fksinks.PostgresSchema{
Tables: []fksinks.PostgresTable{
{
Name: tableObservations,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT", Nullable: false},
{Name: "event_kind", Type: "TEXT", Nullable: false},
{Name: "event_source", Type: "TEXT", Nullable: false},
{Name: "event_schema", Type: "TEXT", Nullable: false},
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "station_id", Type: "TEXT", Nullable: true},
{Name: "station_name", Type: "TEXT", Nullable: true},
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "condition_code", Type: "INTEGER", Nullable: false},
{Name: "condition_text", Type: "TEXT", Nullable: true},
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
{Name: "text_description", Type: "TEXT", Nullable: true},
{Name: "icon_url", Type: "TEXT", Nullable: true},
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "sea_level_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "raw_message", Type: "TEXT", Nullable: true},
},
PrimaryKey: []string{"event_id"},
PruneColumn: "observed_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_obs_station_observed_at", Columns: []string{"station_id", "observed_at"}},
{Name: "idx_wf_obs_observed_at", Columns: []string{"observed_at"}},
{Name: "idx_wf_obs_condition_code", Columns: []string{"condition_code"}},
},
},
{
Name: tableObservationCloudLayers,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "layer_index", Type: "INTEGER", Nullable: false},
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "base_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "amount", Type: "TEXT", Nullable: true},
},
PrimaryKey: []string{"event_id", "layer_index"},
PruneColumn: "observed_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_obs_cloud_observed_at", Columns: []string{"observed_at"}},
},
},
{
Name: tableObservationPresentWeather,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT REFERENCES observations(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "weather_index", Type: "INTEGER", Nullable: false},
{Name: "observed_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "raw_text", Type: "TEXT", Nullable: true},
},
PrimaryKey: []string{"event_id", "weather_index"},
PruneColumn: "observed_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_obs_present_observed_at", Columns: []string{"observed_at"}},
},
},
{
Name: tableForecasts,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT", Nullable: false},
{Name: "event_kind", Type: "TEXT", Nullable: false},
{Name: "event_source", Type: "TEXT", Nullable: false},
{Name: "event_schema", Type: "TEXT", Nullable: false},
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "location_id", Type: "TEXT", Nullable: true},
{Name: "location_name", Type: "TEXT", Nullable: true},
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "updated_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "product", Type: "TEXT", Nullable: false},
{Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "elevation_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "period_count", Type: "INTEGER", Nullable: false},
},
PrimaryKey: []string{"event_id"},
PruneColumn: "issued_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_fc_location_product_issued_at", Columns: []string{"location_id", "product", "issued_at"}},
{Name: "idx_wf_fc_issued_at", Columns: []string{"issued_at"}},
{Name: "idx_wf_fc_product_issued_at", Columns: []string{"product", "issued_at"}},
},
},
{
Name: tableForecastPeriods,
Columns: []fksinks.PostgresColumn{
{Name: "run_event_id", Type: "TEXT REFERENCES forecasts(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "period_index", Type: "INTEGER", Nullable: false},
{Name: "issued_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "start_time", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "end_time", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "name", Type: "TEXT", Nullable: true},
{Name: "is_day", Type: "BOOLEAN", Nullable: true},
{Name: "condition_code", Type: "INTEGER", Nullable: false},
{Name: "condition_text", Type: "TEXT", Nullable: true},
{Name: "provider_raw_description", Type: "TEXT", Nullable: true},
{Name: "text_description", Type: "TEXT", Nullable: true},
{Name: "detailed_text", Type: "TEXT", Nullable: true},
{Name: "icon_url", Type: "TEXT", Nullable: true},
{Name: "temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "temperature_c_min", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "temperature_c_max", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "dewpoint_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "relative_humidity_percent", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_direction_degrees", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_speed_kmh", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "wind_gust_kmh", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "barometric_pressure_pa", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "visibility_meters", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "apparent_temperature_c", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "cloud_cover_percent", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "probability_of_precipitation_percent", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "precipitation_amount_mm", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "snowfall_depth_mm", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "uv_index", Type: "DOUBLE PRECISION", Nullable: true},
},
PrimaryKey: []string{"run_event_id", "period_index"},
PruneColumn: "issued_at",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_fc_period_start_time", Columns: []string{"start_time"}},
{Name: "idx_wf_fc_period_end_time", Columns: []string{"end_time"}},
{Name: "idx_wf_fc_period_run_start", Columns: []string{"run_event_id", "start_time"}},
},
},
{
Name: tableAlertRuns,
Columns: []fksinks.PostgresColumn{
{Name: "event_id", Type: "TEXT", Nullable: false},
{Name: "event_kind", Type: "TEXT", Nullable: false},
{Name: "event_source", Type: "TEXT", Nullable: false},
{Name: "event_schema", Type: "TEXT", Nullable: false},
{Name: "event_emitted_at", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "event_effective_at", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "location_id", Type: "TEXT", Nullable: true},
{Name: "location_name", Type: "TEXT", Nullable: true},
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "latitude", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "longitude", Type: "DOUBLE PRECISION", Nullable: true},
{Name: "alert_count", Type: "INTEGER", Nullable: false},
},
PrimaryKey: []string{"event_id"},
PruneColumn: "as_of",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_alert_run_location_as_of", Columns: []string{"location_id", "as_of"}},
{Name: "idx_wf_alert_run_as_of", Columns: []string{"as_of"}},
},
},
{
Name: tableAlerts,
Columns: []fksinks.PostgresColumn{
{Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "alert_index", Type: "INTEGER", Nullable: false},
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "alert_id", Type: "TEXT", Nullable: false},
{Name: "event", Type: "TEXT", Nullable: true},
{Name: "headline", Type: "TEXT", Nullable: true},
{Name: "severity", Type: "TEXT", Nullable: true},
{Name: "urgency", Type: "TEXT", Nullable: true},
{Name: "certainty", Type: "TEXT", Nullable: true},
{Name: "status", Type: "TEXT", Nullable: true},
{Name: "message_type", Type: "TEXT", Nullable: true},
{Name: "category", Type: "TEXT", Nullable: true},
{Name: "response", Type: "TEXT", Nullable: true},
{Name: "description", Type: "TEXT", Nullable: true},
{Name: "instruction", Type: "TEXT", Nullable: true},
{Name: "sent", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "effective", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "onset", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "expires", Type: "TIMESTAMPTZ", Nullable: true},
{Name: "area_description", Type: "TEXT", Nullable: true},
{Name: "sender_name", Type: "TEXT", Nullable: true},
{Name: "reference_count", Type: "INTEGER", Nullable: false},
},
PrimaryKey: []string{"run_event_id", "alert_index"},
PruneColumn: "as_of",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_alerts_alert_id", Columns: []string{"alert_id"}},
{Name: "idx_wf_alerts_severity_expires", Columns: []string{"severity", "expires"}},
{Name: "idx_wf_alerts_as_of", Columns: []string{"as_of"}},
},
},
{
Name: tableAlertReferences,
Columns: []fksinks.PostgresColumn{
{Name: "run_event_id", Type: "TEXT REFERENCES alert_runs(event_id) ON DELETE CASCADE", Nullable: false},
{Name: "alert_index", Type: "INTEGER", Nullable: false},
{Name: "reference_index", Type: "INTEGER", Nullable: false},
{Name: "as_of", Type: "TIMESTAMPTZ", Nullable: false},
{Name: "id", Type: "TEXT", Nullable: true},
{Name: "identifier", Type: "TEXT", Nullable: true},
{Name: "sender", Type: "TEXT", Nullable: true},
{Name: "sent", Type: "TIMESTAMPTZ", Nullable: true},
},
PrimaryKey: []string{"run_event_id", "alert_index", "reference_index"},
PruneColumn: "as_of",
Indexes: []fksinks.PostgresIndex{
{Name: "idx_wf_alert_refs_as_of", Columns: []string{"as_of"}},
{Name: "idx_wf_alert_refs_sent", Columns: []string{"sent"}},
},
},
},
MapEvent: mapPostgresEvent,
}
}

View File

@@ -0,0 +1,96 @@
package postgres
import (
"fmt"
"strings"
"testing"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
)
func TestRegisterPostgresSchemasNilConfig(t *testing.T) {
err := RegisterPostgresSchemas(nil)
if err == nil {
t.Fatalf("RegisterPostgresSchemas(nil) expected error")
}
if !strings.Contains(err.Error(), "config is nil") {
t.Fatalf("error = %q, want config is nil", err)
}
}
func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) {
cfg := &config.Config{
Sinks: []config.SinkConfig{
{Name: "stdout_only", Driver: "stdout"},
{Name: "nats_only", Driver: "nats"},
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err)
}
}
func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) {
sinkName := uniqueSinkName("pg_test")
cfg := &config.Config{
Sinks: []config.SinkConfig{
{Name: sinkName, Driver: "postgres"},
},
}
if err := RegisterPostgresSchemas(cfg); err != nil {
t.Fatalf("first RegisterPostgresSchemas() error = %v", err)
}
err := RegisterPostgresSchemas(cfg)
if err == nil {
t.Fatalf("second RegisterPostgresSchemas() expected duplicate error")
}
if !strings.Contains(err.Error(), "already registered") {
t.Fatalf("error = %q, want already registered", err)
}
}
func TestWeatherPostgresSchemaShape(t *testing.T) {
s := weatherPostgresSchema()
if s.MapEvent == nil {
t.Fatalf("weatherPostgresSchema().MapEvent is nil")
}
wantTables := map[string]bool{
tableObservations: true,
tableObservationCloudLayers: true,
tableObservationPresentWeather: true,
tableForecasts: true,
tableForecastPeriods: true,
tableAlertRuns: true,
tableAlerts: true,
tableAlertReferences: true,
}
if len(s.Tables) != len(wantTables) {
t.Fatalf("weatherPostgresSchema().Tables len = %d, want %d", len(s.Tables), len(wantTables))
}
seenIndexes := map[string]bool{}
for _, tbl := range s.Tables {
if !wantTables[tbl.Name] {
t.Fatalf("unexpected table %q in schema", tbl.Name)
}
if tbl.PruneColumn == "" {
t.Fatalf("table %q missing prune column", tbl.Name)
}
for _, idx := range tbl.Indexes {
if seenIndexes[idx.Name] {
t.Fatalf("duplicate index name %q", idx.Name)
}
seenIndexes[idx.Name] = true
}
}
}
func uniqueSinkName(prefix string) string {
return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano())
}

View File

@@ -13,26 +13,26 @@ import (
// Keeping this in one place makes main.go very readable.
func RegisterBuiltins(r *fksource.Registry) {
// NWS drivers
r.Register("nws_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewObservationSource(cfg)
})
r.Register("nws_alerts", func(cfg config.SourceConfig) (fksource.Source, error) {
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewAlertsSource(cfg)
})
r.Register("nws_forecast", func(cfg config.SourceConfig) (fksource.Source, error) {
r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return nws.NewForecastSource(cfg)
})
// Open-Meteo drivers
r.Register("openmeteo_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewObservationSource(cfg)
})
r.Register("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.Source, error) {
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openmeteo.NewForecastSource(cfg)
})
// OpenWeatherMap drivers
r.Register("openweather_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
return openweather.NewObservationSource(cfg)
})
}

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// AlertsSource polls an NWS alerts endpoint and emits a RAW alerts Event.

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastSource polls an NWS forecast endpoint (narrative or hourly) and emits a RAW forecast Event.

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event"
nwscommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/nws"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationSource polls an NWS station observation endpoint and emits a RAW observation Event.

View File

@@ -10,7 +10,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ForecastSource polls an Open-Meteo hourly forecast endpoint and emits one RAW Forecast Event.

View File

@@ -10,7 +10,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openmeteo"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
// ObservationSource polls an Open-Meteo endpoint and emits one RAW Observation Event.

View File

@@ -11,7 +11,7 @@ import (
"gitea.maximumdirect.net/ejr/feedkit/event"
owcommon "gitea.maximumdirect.net/ejr/weatherfeeder/internal/providers/openweather"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources/common"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/standards"
"gitea.maximumdirect.net/ejr/weatherfeeder/standards"
)
type ObservationSource struct {

View File

@@ -1,127 +0,0 @@
package standards
import "gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
// This file provides small, shared helper functions for reasoning about WMO codes.
// These are intentionally "coarse" categories that are useful for business logic,
// dashboards, and alerting decisions.
//
// Example uses:
// - jogging suitability: precipitation? thunderstorm? freezing precip?
// - quick glance: "is it cloudy?" "is there any precip?"
// - downstream normalizers / aggregators
func IsThunderstorm(code model.WMOCode) bool {
switch code {
case 95, 96, 99:
return true
default:
return false
}
}
func IsHail(code model.WMOCode) bool {
switch code {
case 96, 99:
return true
default:
return false
}
}
func IsFog(code model.WMOCode) bool {
switch code {
case 45, 48:
return true
default:
return false
}
}
// IsPrecipitation returns true if the code represents any precipitation
// (drizzle, rain, snow, showers, etc.).
func IsPrecipitation(code model.WMOCode) bool {
switch code {
// Drizzle
case 51, 53, 55, 56, 57:
return true
// Rain
case 61, 63, 65, 66, 67:
return true
// Snow
case 71, 73, 75, 77:
return true
// Showers
case 80, 81, 82, 85, 86:
return true
// Thunderstorm (often includes rain/hail)
case 95, 96, 99:
return true
default:
return false
}
}
func IsRainFamily(code model.WMOCode) bool {
switch code {
// Drizzle + freezing drizzle
case 51, 53, 55, 56, 57:
return true
// Rain + freezing rain
case 61, 63, 65, 66, 67:
return true
// Rain showers
case 80, 81, 82:
return true
// Thunderstorm often implies rain
case 95, 96, 99:
return true
default:
return false
}
}
func IsSnowFamily(code model.WMOCode) bool {
switch code {
// Snow and related
case 71, 73, 75, 77:
return true
// Snow showers
case 85, 86:
return true
default:
return false
}
}
// IsFreezingPrecip returns true if the code represents freezing drizzle/rain.
func IsFreezingPrecip(code model.WMOCode) bool {
switch code {
case 56, 57, 66, 67:
return true
default:
return false
}
}
// IsSkyOnly returns true for codes that represent "sky condition only"
// (clear/mostly/partly/cloudy) rather than fog/precip/etc.
func IsSkyOnly(code model.WMOCode) bool {
switch code {
case 0, 1, 2, 3:
return true
default:
return false
}
}

View File

@@ -15,7 +15,7 @@
// -----------------------
// For readability and stability, canonical payloads (weather.* schemas) should not emit
// noisy floating-point representations. weatherfeeder enforces this by rounding float
// values in canonical payloads to 2 digits after the decimal point at normalization
// values in canonical payloads to 4 digits after the decimal point at normalization
// finalization time.
//
// Provider-specific decoding helpers and quirks live in internal/providers/<provider>.

View File

@@ -1,9 +1,18 @@
package standards
// This file provides small, shared helper functions for reasoning about WMO codes.
// These are intentionally "coarse" categories that are useful for business logic,
// dashboards, and alerting decisions.
//
// Example uses:
// - jogging suitability: precipitation? thunderstorm? freezing precip?
// - quick glance: "is it cloudy?" "is there any precip?"
// - downstream normalizers / aggregators
import (
"fmt"
"gitea.maximumdirect.net/ejr/weatherfeeder/internal/model"
"gitea.maximumdirect.net/ejr/weatherfeeder/model"
)
type WMODescription struct {
@@ -100,3 +109,118 @@ func IsKnownWMO(code model.WMOCode) bool {
_, ok := WMODescriptions[code]
return ok
}
func IsThunderstorm(code model.WMOCode) bool {
switch code {
case 95, 96, 99:
return true
default:
return false
}
}
func IsHail(code model.WMOCode) bool {
switch code {
case 96, 99:
return true
default:
return false
}
}
func IsFog(code model.WMOCode) bool {
switch code {
case 45, 48:
return true
default:
return false
}
}
// IsPrecipitation returns true if the code represents any precipitation
// (drizzle, rain, snow, showers, etc.).
func IsPrecipitation(code model.WMOCode) bool {
switch code {
// Drizzle
case 51, 53, 55, 56, 57:
return true
// Rain
case 61, 63, 65, 66, 67:
return true
// Snow
case 71, 73, 75, 77:
return true
// Showers
case 80, 81, 82, 85, 86:
return true
// Thunderstorm (often includes rain/hail)
case 95, 96, 99:
return true
default:
return false
}
}
func IsRainFamily(code model.WMOCode) bool {
switch code {
// Drizzle + freezing drizzle
case 51, 53, 55, 56, 57:
return true
// Rain + freezing rain
case 61, 63, 65, 66, 67:
return true
// Rain showers
case 80, 81, 82:
return true
// Thunderstorm often implies rain
case 95, 96, 99:
return true
default:
return false
}
}
func IsSnowFamily(code model.WMOCode) bool {
switch code {
// Snow and related
case 71, 73, 75, 77:
return true
// Snow showers
case 85, 86:
return true
default:
return false
}
}
// IsFreezingPrecip returns true if the code represents freezing drizzle/rain.
func IsFreezingPrecip(code model.WMOCode) bool {
switch code {
case 56, 57, 66, 67:
return true
default:
return false
}
}
// IsSkyOnly returns true for codes that represent "sky condition only"
// (clear/mostly/partly/cloudy) rather than fog/precip/etc.
func IsSkyOnly(code model.WMOCode) bool {
switch code {
case 0, 1, 2, 3:
return true
default:
return false
}
}