Files
weatherfeeder/cmd/weatherfeeder/main.go
Eric Rakestraw 9663fdfc43
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
Updated main.go to register the postgres and NATS sinks in addition to the stdout sink.
2026-02-07 11:57:54 -06:00

198 lines
5.3 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/normalize"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
)
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
cfgPath := "config.yml"
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("config load failed: %v", err)
}
// --- Registries ---
srcReg := fksources.NewRegistry()
wfsources.RegisterBuiltins(srcReg)
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
sinkReg := fksinks.NewRegistry()
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewStdoutSink(cfg.Name), nil
})
sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewPostgresSinkFromConfig(cfg)
})
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
return fksinks.NewNATSSinkFromConfig(cfg)
})
// --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job
for i, sc := range cfg.Sources {
src, err := srcReg.Build(sc)
if err != nil {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
// Optional safety: if config.kind is set, ensure it matches the source.Kind().
if strings.TrimSpace(sc.Kind) != "" {
expectedKind, err := fkevent.ParseKind(sc.Kind)
if err != nil {
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
}
if src.Kind() != expectedKind {
log.Fatalf(
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
)
}
}
jobs = append(jobs, fkscheduler.Job{
Source: src,
Every: sc.Every.Duration,
})
}
// --- Build sinks ---
builtSinks := map[string]fksinks.Sink{}
for i, sk := range cfg.Sinks {
s, err := sinkReg.Build(sk)
if err != nil {
log.Fatalf("build sink failed (sinks[%d] name=%q driver=%q): %v", i, sk.Name, sk.Driver, err)
}
builtSinks[sk.Name] = s
}
// --- Compile routes ---
routes, err := compileRoutes(cfg, builtSinks)
if err != nil {
log.Fatalf("compile routes failed: %v", err)
}
events := make(chan fkevent.Event, 256)
// --- Normalization (optional) ---
//
// We install feedkit's normalize.Processor even before any normalizers exist.
// With an empty registry and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and
// 2) matching normalizers are registered.
normReg := &fknormalize.Registry{}
wfnormalizers.RegisterBuiltins(normReg)
pl := &fkpipeline.Pipeline{
Processors: []fkpipeline.Processor{
fknormalize.Processor{Registry: normReg},
},
}
s := &fkscheduler.Scheduler{
Jobs: jobs,
Out: events,
Logf: log.Printf,
}
d := &fkdispatch.Dispatcher{
In: events,
Pipeline: pl,
Sinks: builtSinks,
Routes: routes,
}
errCh := make(chan error, 2)
go func() { errCh <- s.Run(ctx) }()
go func() { errCh <- d.Run(ctx, log.Printf) }()
for i := 0; i < 2; i++ {
err := <-errCh
if err == nil || isContextShutdown(err) {
continue
}
log.Printf("fatal error: %v", err)
cancel()
}
log.Printf("shutdown complete")
}
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
if len(cfg.Routes) == 0 {
return defaultRoutes(builtSinks), nil
}
var routes []fkdispatch.Route
for i, r := range cfg.Routes {
if strings.TrimSpace(r.Sink) == "" {
return nil, fmt.Errorf("routes[%d].sink is empty", i)
}
if _, ok := builtSinks[r.Sink]; !ok {
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
}
kinds := map[fkevent.Kind]bool{}
for j, k := range r.Kinds {
kind, err := fkevent.ParseKind(k)
if err != nil {
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
}
kinds[kind] = true
}
routes = append(routes, fkdispatch.Route{
SinkName: r.Sink,
Kinds: kinds,
})
}
return routes, nil
}
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
// nil Kinds means "match all kinds" by convention
var allKinds map[fkevent.Kind]bool = nil
routes := make([]fkdispatch.Route, 0, len(builtSinks))
for name := range builtSinks {
routes = append(routes, fkdispatch.Route{
SinkName: name,
Kinds: allKinds,
})
}
return routes
}
func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}
// keep time imported (mirrors your previous main.go defensive trick)
var _ = time.Second