weatherfeeder: split the former maximumdirect.net/weatherd project in two.

feedkit now contains a reusable core, while weatherfeeder is a concrete implementation that includes weather-specific functions.
This commit is contained in:
2026-01-13 18:14:21 -06:00
parent 1e05b38347
commit aa4774e0dd
21 changed files with 2432 additions and 1 deletions

View File

@@ -0,0 +1,66 @@
---
sources:
- name: NWSObservationKSTL
kind: observation
driver: nws_observation
every: 12m
params:
url: "https://api.weather.gov/stations/KSTL/observations/latest"
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenMeteoObservation
kind: observation
driver: openmeteo_observation
every: 12m
params:
url: "https://api.open-meteo.com/v1/forecast?latitude=38.6239&longitude=-90.3571&current=temperature_2m,relative_humidity_2m,weather_code,wind_speed_10m,wind_direction_10m,precipitation,surface_pressure,rain,showers,snowfall,cloud_cover,apparent_temperature,is_day,wind_gusts_10m,pressure_msl&forecast_days=1"
user_agent: "HomeOps (eric@maximumdirect.net)"
- name: OpenWeatherObservation
kind: observation
driver: openweather_observation
every: 12m
params:
url: "https://api.openweathermap.org/data/2.5/weather?lat=38.6239&lon=-90.3571&appid=c954f2566cb7ccb56b43737b52e88fc6&units=metric"
user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSObservationKSUS
# kind: observation
# driver: nws_observation
# every: 18s
# params:
# url: "https://api.weather.gov/stations/KSUS/observations/latest"
# user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSObservationKCPS
# kind: observation
# driver: nws_observation
# every: 12m
# params:
# url: "https://api.weather.gov/stations/KCPS/observations/latest"
# user_agent: "HomeOps (eric@maximumdirect.net)"
# - name: NWSAlertsSTL
# kind: alert
# driver: nws_alerts
# every: 1m
# params:
# url: "https://api.weather.gov/alerts?point=38.6239,-90.3571&limit=500"
# user_agent: "HomeOps (eric@maximumdirect.net)"
sinks:
- name: stdout
driver: stdout
params: {}
# - name: logfile
# driver: file
# params:
# path: "/Users/eric/weatherd.log"
routes:
- sink: stdout
kinds: ["observation"]
# - sink: logfile
# kinds: ["observation", "alert", "forecast"]

178
cmd/weatherfeeder/main.go Normal file
View File

@@ -0,0 +1,178 @@
package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
cfgPath := "config.yml"
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("config load failed: %v", err)
}
// --- Registries ---
srcReg := fksources.NewRegistry()
wfsources.RegisterBuiltins(srcReg)
// Minimal sink set to compile: stdout only.
sinkReg := fksinks.NewRegistry()
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewStdoutSink(cfg.Name), nil
})
// --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job
for i, sc := range cfg.Sources {
src, err := srcReg.Build(sc)
if err != nil {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
// Optional safety: if config.kind is set, ensure it matches the source.Kind().
if strings.TrimSpace(sc.Kind) != "" {
expectedKind, err := fkevent.ParseKind(sc.Kind)
if err != nil {
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
}
if src.Kind() != expectedKind {
log.Fatalf(
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
)
}
}
jobs = append(jobs, fkscheduler.Job{
Source: src,
Every: sc.Every.Duration,
})
}
// --- Build sinks ---
builtSinks := map[string]fksinks.Sink{}
for i, sk := range cfg.Sinks {
s, err := sinkReg.Build(sk)
if err != nil {
log.Fatalf("build sink failed (sinks[%d] name=%q driver=%q): %v", i, sk.Name, sk.Driver, err)
}
builtSinks[sk.Name] = s
}
// --- Compile routes ---
routes, err := compileRoutes(cfg, builtSinks)
if err != nil {
log.Fatalf("compile routes failed: %v", err)
}
events := make(chan fkevent.Event, 256)
pl := &fkpipeline.Pipeline{
Processors: nil,
}
s := &fkscheduler.Scheduler{
Jobs: jobs,
Out: events,
Logf: log.Printf,
}
d := &fkdispatch.Dispatcher{
In: events,
Pipeline: pl,
Sinks: builtSinks,
Routes: routes,
}
errCh := make(chan error, 2)
go func() { errCh <- s.Run(ctx) }()
go func() { errCh <- d.Run(ctx, log.Printf) }()
for i := 0; i < 2; i++ {
err := <-errCh
if err == nil || isContextShutdown(err) {
continue
}
log.Printf("fatal error: %v", err)
cancel()
}
log.Printf("shutdown complete")
}
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
if len(cfg.Routes) == 0 {
return defaultRoutes(builtSinks), nil
}
var routes []fkdispatch.Route
for i, r := range cfg.Routes {
if strings.TrimSpace(r.Sink) == "" {
return nil, fmt.Errorf("routes[%d].sink is empty", i)
}
if _, ok := builtSinks[r.Sink]; !ok {
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
}
kinds := map[fkevent.Kind]bool{}
for j, k := range r.Kinds {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
}
kinds[kind] = true
}
routes = append(routes, fkdispatch.Route{
SinkName: r.Sink,
Kinds: kinds,
})
}
return routes, nil
}
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
// nil Kinds means "match all kinds" by convention
var allKinds map[fkevent.Kind]bool = nil
routes := make([]fkdispatch.Route, 0, len(builtSinks))
for name := range builtSinks {
routes = append(routes, fkdispatch.Route{
SinkName: name,
Kinds: allKinds,
})
}
return routes
}
func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
// keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second