Files
weatherfeeder/cmd/weatherfeeder/main.go
Eric Rakestraw 6712c16167
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
Updated to feedkit v0.9.0
2026-03-29 08:35:56 -05:00

144 lines
4.2 KiB
Go

package main
import (
"context"
"errors"
"log"
"os"
"os/signal"
"syscall"
"gitea.maximumdirect.net/ejr/feedkit/config"
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
wfpgsink "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sinks/postgres"
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
)
const dedupeMaxEntries = 2048
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
cfgPath := "config.yml"
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("config load failed: %v", err)
}
// --- Registries ---
srcReg := fksources.NewRegistry()
wfsources.RegisterBuiltins(srcReg)
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
sinkReg := fksinks.NewRegistry()
fksinks.RegisterBuiltins(sinkReg)
sinkReg.Register("postgres", fksinks.PostgresFactory(wfpgsink.PostgresSchema()))
// --- Build sources into scheduler jobs ---
var jobs []fkscheduler.Job
for i, sc := range cfg.Sources {
in, err := srcReg.BuildInput(sc) // may be polling or streaming
if err != nil {
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
job, err := fkscheduler.JobFromSourceConfig(in, sc)
if err != nil {
log.Fatalf("build scheduler job failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
}
jobs = append(jobs, job)
}
// --- Build sinks ---
builtSinks := map[string]fksinks.Sink{}
for i, sk := range cfg.Sinks {
s, err := sinkReg.Build(sk)
if err != nil {
log.Fatalf("build sink failed (sinks[%d] name=%q driver=%q): %v", i, sk.Name, sk.Driver, err)
}
builtSinks[sk.Name] = s
}
// --- Compile routes ---
routes, err := fkdispatch.CompileRoutes(cfg)
if err != nil {
log.Fatalf("compile routes failed: %v", err)
}
events := make(chan fkevent.Event, 256)
// --- Processors ---
//
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
// It will begin transforming events as soon as:
// 1) sources emit raw schemas (raw.*), and
// 2) matching normalizers are registered.
normalizers := wfnormalizers.RegisterBuiltins(nil)
procReg := fkprocessors.NewRegistry()
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
return fknormalize.NewProcessor(normalizers, false), nil
})
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
if err != nil {
log.Fatalf("build processor chain failed: %v", err)
}
pl := &fkpipeline.Pipeline{
Processors: chain,
}
s := &fkscheduler.Scheduler{
Jobs: jobs,
Out: events,
Logf: log.Printf,
}
d := &fkdispatch.Dispatcher{
In: events,
Pipeline: pl,
Sinks: builtSinks,
Routes: routes,
}
errCh := make(chan error, 2)
go func() { errCh <- s.Run(ctx) }()
go func() { errCh <- d.Run(ctx, log.Printf) }()
for i := 0; i < 2; i++ {
err := <-errCh
if err == nil || isContextShutdown(err) {
continue
}
log.Printf("fatal error: %v", err)
cancel()
}
log.Printf("shutdown complete")
}
func isContextShutdown(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}