All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
284 lines
7.5 KiB
Go
284 lines
7.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"sort"
|
|
"strings"
|
|
"syscall"
|
|
"time"
|
|
|
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
|
fkevent "gitea.maximumdirect.net/ejr/feedkit/event"
|
|
fkpipeline "gitea.maximumdirect.net/ejr/feedkit/pipeline"
|
|
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
|
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
|
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
|
fkscheduler "gitea.maximumdirect.net/ejr/feedkit/scheduler"
|
|
fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks"
|
|
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
|
|
|
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
|
wfpgsink "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sinks/postgres"
|
|
wfsources "gitea.maximumdirect.net/ejr/weatherfeeder/internal/sources"
|
|
)
|
|
|
|
const dedupeMaxEntries = 2048
|
|
|
|
func main() {
|
|
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
|
|
|
|
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
|
|
defer cancel()
|
|
|
|
cfgPath := "config.yml"
|
|
cfg, err := config.Load(cfgPath)
|
|
if err != nil {
|
|
log.Fatalf("config load failed: %v", err)
|
|
}
|
|
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
|
|
log.Fatalf("postgres schema registration failed: %v", err)
|
|
}
|
|
|
|
// --- Registries ---
|
|
srcReg := fksources.NewRegistry()
|
|
wfsources.RegisterBuiltins(srcReg)
|
|
|
|
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
|
|
sinkReg := fksinks.NewRegistry()
|
|
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
|
return fksinks.NewStdoutSink(cfg.Name), nil
|
|
})
|
|
sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
|
return fksinks.NewPostgresSinkFromConfig(cfg)
|
|
})
|
|
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
|
return fksinks.NewNATSSinkFromConfig(cfg)
|
|
})
|
|
|
|
// --- Build sources into scheduler jobs ---
|
|
var jobs []fkscheduler.Job
|
|
for i, sc := range cfg.Sources {
|
|
in, err := srcReg.BuildInput(sc) // may be polling or streaming
|
|
if err != nil {
|
|
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
|
}
|
|
|
|
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
|
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
|
}
|
|
|
|
// If this is a polling source, every is required.
|
|
if _, ok := in.(fksources.PollSource); ok && sc.Every.Duration <= 0 {
|
|
log.Fatalf(
|
|
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
|
|
i, sc.Name, sc.Driver,
|
|
)
|
|
}
|
|
|
|
// For stream sources, Every is ignored; it is fine if omitted/zero.
|
|
jobs = append(jobs, fkscheduler.Job{
|
|
Source: in,
|
|
Every: sc.Every.Duration,
|
|
})
|
|
}
|
|
|
|
// --- Build sinks ---
|
|
builtSinks := map[string]fksinks.Sink{}
|
|
for i, sk := range cfg.Sinks {
|
|
s, err := sinkReg.Build(sk)
|
|
if err != nil {
|
|
log.Fatalf("build sink failed (sinks[%d] name=%q driver=%q): %v", i, sk.Name, sk.Driver, err)
|
|
}
|
|
builtSinks[sk.Name] = s
|
|
}
|
|
|
|
// --- Compile routes ---
|
|
routes, err := compileRoutes(cfg, builtSinks)
|
|
if err != nil {
|
|
log.Fatalf("compile routes failed: %v", err)
|
|
}
|
|
|
|
events := make(chan fkevent.Event, 256)
|
|
|
|
// --- Processors ---
|
|
//
|
|
// We install feedkit's processors/normalize.Processor even before any normalizers exist.
|
|
// With an empty normalizer list and RequireMatch=false, this is a no-op passthrough.
|
|
// It will begin transforming events as soon as:
|
|
// 1) sources emit raw schemas (raw.*), and
|
|
// 2) matching normalizers are registered.
|
|
normalizers := wfnormalizers.RegisterBuiltins(nil)
|
|
|
|
procReg := fkprocessors.NewRegistry()
|
|
procReg.Register("normalize", func() (fkprocessors.Processor, error) {
|
|
return fknormalize.NewProcessor(normalizers, false), nil
|
|
})
|
|
procReg.Register("dedupe", fkdedupe.Factory(dedupeMaxEntries))
|
|
|
|
chain, err := procReg.BuildChain([]string{"normalize", "dedupe"})
|
|
if err != nil {
|
|
log.Fatalf("build processor chain failed: %v", err)
|
|
}
|
|
|
|
pl := &fkpipeline.Pipeline{
|
|
Processors: chain,
|
|
}
|
|
|
|
s := &fkscheduler.Scheduler{
|
|
Jobs: jobs,
|
|
Out: events,
|
|
Logf: log.Printf,
|
|
}
|
|
|
|
d := &fkdispatch.Dispatcher{
|
|
In: events,
|
|
Pipeline: pl,
|
|
Sinks: builtSinks,
|
|
Routes: routes,
|
|
}
|
|
|
|
errCh := make(chan error, 2)
|
|
|
|
go func() { errCh <- s.Run(ctx) }()
|
|
go func() { errCh <- d.Run(ctx, log.Printf) }()
|
|
|
|
for i := 0; i < 2; i++ {
|
|
err := <-errCh
|
|
if err == nil || isContextShutdown(err) {
|
|
continue
|
|
}
|
|
log.Printf("fatal error: %v", err)
|
|
cancel()
|
|
}
|
|
|
|
log.Printf("shutdown complete")
|
|
}
|
|
|
|
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
|
|
if len(cfg.Routes) == 0 {
|
|
return defaultRoutes(builtSinks), nil
|
|
}
|
|
|
|
var routes []fkdispatch.Route
|
|
for i, r := range cfg.Routes {
|
|
if strings.TrimSpace(r.Sink) == "" {
|
|
return nil, fmt.Errorf("routes[%d].sink is empty", i)
|
|
}
|
|
if _, ok := builtSinks[r.Sink]; !ok {
|
|
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
|
|
}
|
|
|
|
kinds := map[fkevent.Kind]bool{}
|
|
for j, k := range r.Kinds {
|
|
kind, err := fkevent.ParseKind(k)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
|
|
}
|
|
kinds[kind] = true
|
|
}
|
|
|
|
routes = append(routes, fkdispatch.Route{
|
|
SinkName: r.Sink,
|
|
Kinds: kinds,
|
|
})
|
|
}
|
|
|
|
return routes, nil
|
|
}
|
|
|
|
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
|
|
// nil Kinds means "match all kinds" by convention
|
|
var allKinds map[fkevent.Kind]bool = nil
|
|
|
|
routes := make([]fkdispatch.Route, 0, len(builtSinks))
|
|
for name := range builtSinks {
|
|
routes = append(routes, fkdispatch.Route{
|
|
SinkName: name,
|
|
Kinds: allKinds,
|
|
})
|
|
}
|
|
return routes
|
|
}
|
|
|
|
func isContextShutdown(err error) bool {
|
|
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
|
}
|
|
|
|
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
|
|
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(expectedKinds) == 0 {
|
|
return nil
|
|
}
|
|
|
|
advertisedKinds := advertisedSourceKinds(in)
|
|
if len(advertisedKinds) == 0 {
|
|
return nil
|
|
}
|
|
|
|
for kind := range expectedKinds {
|
|
if !advertisedKinds[kind] {
|
|
return fmt.Errorf(
|
|
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
|
|
kind,
|
|
sortedKinds(expectedKinds),
|
|
sortedKinds(advertisedKinds),
|
|
)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
|
|
kinds := map[fkevent.Kind]bool{}
|
|
for i, k := range raw {
|
|
kind, err := fkevent.ParseKind(k)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
|
|
}
|
|
kinds[kind] = true
|
|
}
|
|
return kinds, nil
|
|
}
|
|
|
|
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
|
|
if in == nil {
|
|
return nil
|
|
}
|
|
|
|
kinds := map[fkevent.Kind]bool{}
|
|
if ks, ok := in.(fksources.KindsSource); ok {
|
|
for _, kind := range ks.Kinds() {
|
|
kinds[kind] = true
|
|
}
|
|
return kinds
|
|
}
|
|
|
|
if ks, ok := in.(fksources.KindSource); ok {
|
|
kinds[ks.Kind()] = true
|
|
return kinds
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
|
|
out := make([]string, 0, len(kindSet))
|
|
for kind := range kindSet {
|
|
out = append(out, string(kind))
|
|
}
|
|
sort.Strings(out)
|
|
return out
|
|
}
|
|
|
|
// keep time imported (mirrors your previous main.go defensive trick)
|
|
var _ = time.Second
|