Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| eae9568afe | |||
| f464592c56 |
@@ -1,7 +1,8 @@
|
|||||||
---
|
---
|
||||||
sources:
|
sources:
|
||||||
- name: NWSObservationKSTL
|
- name: NWSObservationKSTL
|
||||||
kind: observation
|
mode: poll
|
||||||
|
kinds: ["observation"]
|
||||||
driver: nws_observation
|
driver: nws_observation
|
||||||
every: 10m
|
every: 10m
|
||||||
params:
|
params:
|
||||||
@@ -9,7 +10,8 @@ sources:
|
|||||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
# - name: OpenMeteoObservation
|
# - name: OpenMeteoObservation
|
||||||
# kind: observation
|
# mode: poll
|
||||||
|
# kinds: ["observation"]
|
||||||
# driver: openmeteo_observation
|
# driver: openmeteo_observation
|
||||||
# every: 10m
|
# every: 10m
|
||||||
# params:
|
# params:
|
||||||
@@ -17,7 +19,8 @@ sources:
|
|||||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
# - name: OpenWeatherObservation
|
# - name: OpenWeatherObservation
|
||||||
# kind: observation
|
# mode: poll
|
||||||
|
# kinds: ["observation"]
|
||||||
# driver: openweather_observation
|
# driver: openweather_observation
|
||||||
# every: 10m
|
# every: 10m
|
||||||
# params:
|
# params:
|
||||||
@@ -25,7 +28,8 @@ sources:
|
|||||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
# - name: NWSObservationKSUS
|
# - name: NWSObservationKSUS
|
||||||
# kind: observation
|
# mode: poll
|
||||||
|
# kinds: ["observation"]
|
||||||
# driver: nws_observation
|
# driver: nws_observation
|
||||||
# every: 10m
|
# every: 10m
|
||||||
# params:
|
# params:
|
||||||
@@ -33,7 +37,8 @@ sources:
|
|||||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
# - name: NWSObservationKCPS
|
# - name: NWSObservationKCPS
|
||||||
# kind: observation
|
# mode: poll
|
||||||
|
# kinds: ["observation"]
|
||||||
# driver: nws_observation
|
# driver: nws_observation
|
||||||
# every: 10m
|
# every: 10m
|
||||||
# params:
|
# params:
|
||||||
@@ -41,7 +46,8 @@ sources:
|
|||||||
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
# user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
- name: NWSHourlyForecastSTL
|
- name: NWSHourlyForecastSTL
|
||||||
kind: forecast
|
mode: poll
|
||||||
|
kinds: ["forecast"]
|
||||||
driver: nws_forecast
|
driver: nws_forecast
|
||||||
every: 45m
|
every: 45m
|
||||||
params:
|
params:
|
||||||
@@ -49,7 +55,8 @@ sources:
|
|||||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
- name: OpenMeteoHourlyForecastSTL
|
- name: OpenMeteoHourlyForecastSTL
|
||||||
kind: forecast
|
mode: poll
|
||||||
|
kinds: ["forecast"]
|
||||||
driver: openmeteo_forecast
|
driver: openmeteo_forecast
|
||||||
every: 60m
|
every: 60m
|
||||||
params:
|
params:
|
||||||
@@ -57,7 +64,8 @@ sources:
|
|||||||
user_agent: "HomeOps (eric@maximumdirect.net)"
|
user_agent: "HomeOps (eric@maximumdirect.net)"
|
||||||
|
|
||||||
- name: NWSAlertsSTL
|
- name: NWSAlertsSTL
|
||||||
kind: alert
|
mode: poll
|
||||||
|
kinds: ["alert"]
|
||||||
driver: nws_alerts
|
driver: nws_alerts
|
||||||
every: 1m
|
every: 1m
|
||||||
params:
|
params:
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
@@ -51,30 +52,30 @@ func main() {
|
|||||||
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
||||||
return fksinks.NewNATSSinkFromConfig(cfg)
|
return fksinks.NewNATSSinkFromConfig(cfg)
|
||||||
})
|
})
|
||||||
|
|
||||||
// --- Build sources into scheduler jobs ---
|
// --- Build sources into scheduler jobs ---
|
||||||
var jobs []fkscheduler.Job
|
var jobs []fkscheduler.Job
|
||||||
for i, sc := range cfg.Sources {
|
for i, sc := range cfg.Sources {
|
||||||
src, err := srcReg.Build(sc)
|
in, err := srcReg.BuildInput(sc) // may be polling or streaming
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optional safety: if config.kind is set, ensure it matches the source.Kind().
|
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||||
if strings.TrimSpace(sc.Kind) != "" {
|
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||||
expectedKind, err := fkevent.ParseKind(sc.Kind)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
|
|
||||||
}
|
|
||||||
if src.Kind() != expectedKind {
|
|
||||||
log.Fatalf(
|
|
||||||
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
|
|
||||||
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this is a polling source, every is required.
|
||||||
|
if _, ok := in.(fksources.PollSource); ok && sc.Every.Duration <= 0 {
|
||||||
|
log.Fatalf(
|
||||||
|
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
|
||||||
|
i, sc.Name, sc.Driver,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For stream sources, Every is ignored; it is fine if omitted/zero.
|
||||||
jobs = append(jobs, fkscheduler.Job{
|
jobs = append(jobs, fkscheduler.Job{
|
||||||
Source: src,
|
Source: in,
|
||||||
Every: sc.Every.Duration,
|
Every: sc.Every.Duration,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -193,5 +194,74 @@ func isContextShutdown(err error) bool {
|
|||||||
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
|
||||||
|
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if len(expectedKinds) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
advertisedKinds := advertisedSourceKinds(in)
|
||||||
|
if len(advertisedKinds) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for kind := range expectedKinds {
|
||||||
|
if !advertisedKinds[kind] {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
|
||||||
|
kind,
|
||||||
|
sortedKinds(expectedKinds),
|
||||||
|
sortedKinds(advertisedKinds),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
|
||||||
|
kinds := map[fkevent.Kind]bool{}
|
||||||
|
for i, k := range raw {
|
||||||
|
kind, err := fkevent.ParseKind(k)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
|
||||||
|
}
|
||||||
|
kinds[kind] = true
|
||||||
|
}
|
||||||
|
return kinds, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
|
||||||
|
if in == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
kinds := map[fkevent.Kind]bool{}
|
||||||
|
if ks, ok := in.(fksources.KindsSource); ok {
|
||||||
|
for _, kind := range ks.Kinds() {
|
||||||
|
kinds[kind] = true
|
||||||
|
}
|
||||||
|
return kinds
|
||||||
|
}
|
||||||
|
|
||||||
|
if ks, ok := in.(fksources.KindSource); ok {
|
||||||
|
kinds[ks.Kind()] = true
|
||||||
|
return kinds
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
|
||||||
|
out := make([]string, 0, len(kindSet))
|
||||||
|
for kind := range kindSet {
|
||||||
|
out = append(out, string(kind))
|
||||||
|
}
|
||||||
|
sort.Strings(out)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
// keep time imported (mirrors your previous main.go defensive trick)
|
// keep time imported (mirrors your previous main.go defensive trick)
|
||||||
var _ = time.Second
|
var _ = time.Second
|
||||||
|
|||||||
90
cmd/weatherfeeder/main_test.go
Normal file
90
cmd/weatherfeeder/main_test.go
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||||
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testInput struct {
|
||||||
|
name string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s testInput) Name() string { return s.name }
|
||||||
|
|
||||||
|
type testKindSource struct {
|
||||||
|
testInput
|
||||||
|
kind fkevent.Kind
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s testKindSource) Kind() fkevent.Kind { return s.kind }
|
||||||
|
|
||||||
|
type testKindsSource struct {
|
||||||
|
testInput
|
||||||
|
kinds []fkevent.Kind
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds }
|
||||||
|
|
||||||
|
func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
|
||||||
|
sc := config.SourceConfig{Kind: "observation"}
|
||||||
|
in := testKindSource{
|
||||||
|
testInput: testInput{name: "test"},
|
||||||
|
kind: fkevent.Kind("observation"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||||
|
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
|
||||||
|
sc := config.SourceConfig{Kinds: []string{"observation"}}
|
||||||
|
in := testKindsSource{
|
||||||
|
testInput: testInput{name: "test"},
|
||||||
|
kinds: []fkevent.Kind{"observation", "forecast"},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||||
|
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
|
||||||
|
sc := config.SourceConfig{Kinds: []string{"alert"}}
|
||||||
|
in := testKindsSource{
|
||||||
|
testInput: testInput{name: "test"},
|
||||||
|
kinds: []fkevent.Kind{"observation", "forecast"},
|
||||||
|
}
|
||||||
|
|
||||||
|
err := validateSourceExpectedKinds(sc, in)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), "configured expected kind") {
|
||||||
|
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
|
||||||
|
sc := config.SourceConfig{Kinds: []string{"alert"}}
|
||||||
|
in := testInput{name: "test"}
|
||||||
|
|
||||||
|
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||||
|
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
|
||||||
|
if _, err := parseExpectedKinds([]string{""}); err == nil {
|
||||||
|
t.Fatalf("parseExpectedKinds() expected error for empty kind")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExampleConfigLoads(t *testing.T) {
|
||||||
|
if _, err := config.Load("config.yml"); err != nil {
|
||||||
|
t.Fatalf("config.Load(config.yml) unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.4.1
|
require gitea.maximumdirect.net/ejr/feedkit v0.6.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.4.1 h1:mMFtPCBKp2LXV3euPH21WzjHku/HHx31KQqYW+w1aqU=
|
gitea.maximumdirect.net/ejr/feedkit v0.5.0 h1:T4pRTo9Tj/o7TbZYUbp8UE7cQVLmIucUrYmD6G8E8ZQ=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.4.1/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
gitea.maximumdirect.net/ejr/feedkit v0.5.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
||||||
|
|||||||
@@ -13,26 +13,26 @@ import (
|
|||||||
// Keeping this in one place makes main.go very readable.
|
// Keeping this in one place makes main.go very readable.
|
||||||
func RegisterBuiltins(r *fksource.Registry) {
|
func RegisterBuiltins(r *fksource.Registry) {
|
||||||
// NWS drivers
|
// NWS drivers
|
||||||
r.Register("nws_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
|
r.RegisterPoll("nws_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return nws.NewObservationSource(cfg)
|
return nws.NewObservationSource(cfg)
|
||||||
})
|
})
|
||||||
r.Register("nws_alerts", func(cfg config.SourceConfig) (fksource.Source, error) {
|
r.RegisterPoll("nws_alerts", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return nws.NewAlertsSource(cfg)
|
return nws.NewAlertsSource(cfg)
|
||||||
})
|
})
|
||||||
r.Register("nws_forecast", func(cfg config.SourceConfig) (fksource.Source, error) {
|
r.RegisterPoll("nws_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return nws.NewForecastSource(cfg)
|
return nws.NewForecastSource(cfg)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Open-Meteo drivers
|
// Open-Meteo drivers
|
||||||
r.Register("openmeteo_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
|
r.RegisterPoll("openmeteo_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return openmeteo.NewObservationSource(cfg)
|
return openmeteo.NewObservationSource(cfg)
|
||||||
})
|
})
|
||||||
r.Register("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.Source, error) {
|
r.RegisterPoll("openmeteo_forecast", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return openmeteo.NewForecastSource(cfg)
|
return openmeteo.NewForecastSource(cfg)
|
||||||
})
|
})
|
||||||
|
|
||||||
// OpenWeatherMap drivers
|
// OpenWeatherMap drivers
|
||||||
r.Register("openweather_observation", func(cfg config.SourceConfig) (fksource.Source, error) {
|
r.RegisterPoll("openweather_observation", func(cfg config.SourceConfig) (fksource.PollSource, error) {
|
||||||
return openweather.NewObservationSource(cfg)
|
return openweather.NewObservationSource(cfg)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user