Moved generic and broadly useful helper functions upstream into feedkit
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
All checks were successful
ci/woodpecker/push/build-image Pipeline was successful
This commit is contained in:
@@ -3,14 +3,10 @@ package main
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sort"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"gitea.maximumdirect.net/ejr/feedkit/config"
|
||||
fkdispatch "gitea.maximumdirect.net/ejr/feedkit/dispatch"
|
||||
@@ -41,7 +37,7 @@ func main() {
|
||||
if err != nil {
|
||||
log.Fatalf("config load failed: %v", err)
|
||||
}
|
||||
if err := wfpgsink.RegisterPostgresSchemas(cfg); err != nil {
|
||||
if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, wfpgsink.PostgresSchema()); err != nil {
|
||||
log.Fatalf("postgres schema registration failed: %v", err)
|
||||
}
|
||||
|
||||
@@ -51,15 +47,7 @@ func main() {
|
||||
|
||||
// Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs.
|
||||
sinkReg := fksinks.NewRegistry()
|
||||
sinkReg.Register("stdout", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
||||
return fksinks.NewStdoutSink(cfg.Name), nil
|
||||
})
|
||||
sinkReg.Register("postgres", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
||||
return fksinks.NewPostgresSinkFromConfig(cfg)
|
||||
})
|
||||
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
||||
return fksinks.NewNATSSinkFromConfig(cfg)
|
||||
})
|
||||
fksinks.RegisterBuiltins(sinkReg)
|
||||
|
||||
// --- Build sources into scheduler jobs ---
|
||||
var jobs []fkscheduler.Job
|
||||
@@ -69,7 +57,7 @@ func main() {
|
||||
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||
}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
|
||||
log.Fatalf("source expected kinds validation failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||
}
|
||||
|
||||
@@ -99,7 +87,7 @@ func main() {
|
||||
}
|
||||
|
||||
// --- Compile routes ---
|
||||
routes, err := compileRoutes(cfg, builtSinks)
|
||||
routes, err := fkdispatch.CompileRoutes(cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("compile routes failed: %v", err)
|
||||
}
|
||||
@@ -160,124 +148,6 @@ func main() {
|
||||
log.Printf("shutdown complete")
|
||||
}
|
||||
|
||||
func compileRoutes(cfg *config.Config, builtSinks map[string]fksinks.Sink) ([]fkdispatch.Route, error) {
|
||||
if len(cfg.Routes) == 0 {
|
||||
return defaultRoutes(builtSinks), nil
|
||||
}
|
||||
|
||||
var routes []fkdispatch.Route
|
||||
for i, r := range cfg.Routes {
|
||||
if strings.TrimSpace(r.Sink) == "" {
|
||||
return nil, fmt.Errorf("routes[%d].sink is empty", i)
|
||||
}
|
||||
if _, ok := builtSinks[r.Sink]; !ok {
|
||||
return nil, fmt.Errorf("routes[%d].sink references unknown sink %q", i, r.Sink)
|
||||
}
|
||||
|
||||
kinds := map[fkevent.Kind]bool{}
|
||||
for j, k := range r.Kinds {
|
||||
kind, err := fkevent.ParseKind(k)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("routes[%d].kinds[%d]: %w", i, j, err)
|
||||
}
|
||||
kinds[kind] = true
|
||||
}
|
||||
|
||||
routes = append(routes, fkdispatch.Route{
|
||||
SinkName: r.Sink,
|
||||
Kinds: kinds,
|
||||
})
|
||||
}
|
||||
|
||||
return routes, nil
|
||||
}
|
||||
|
||||
func defaultRoutes(builtSinks map[string]fksinks.Sink) []fkdispatch.Route {
|
||||
// nil Kinds means "match all kinds" by convention
|
||||
var allKinds map[fkevent.Kind]bool = nil
|
||||
|
||||
routes := make([]fkdispatch.Route, 0, len(builtSinks))
|
||||
for name := range builtSinks {
|
||||
routes = append(routes, fkdispatch.Route{
|
||||
SinkName: name,
|
||||
Kinds: allKinds,
|
||||
})
|
||||
}
|
||||
return routes
|
||||
}
|
||||
|
||||
func isContextShutdown(err error) bool {
|
||||
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
|
||||
}
|
||||
|
||||
func validateSourceExpectedKinds(sc config.SourceConfig, in fksources.Input) error {
|
||||
expectedKinds, err := parseExpectedKinds(sc.ExpectedKinds())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(expectedKinds) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
advertisedKinds := advertisedSourceKinds(in)
|
||||
if len(advertisedKinds) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for kind := range expectedKinds {
|
||||
if !advertisedKinds[kind] {
|
||||
return fmt.Errorf(
|
||||
"configured expected kind %q not advertised by source (configured=%v advertised=%v)",
|
||||
kind,
|
||||
sortedKinds(expectedKinds),
|
||||
sortedKinds(advertisedKinds),
|
||||
)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseExpectedKinds(raw []string) (map[fkevent.Kind]bool, error) {
|
||||
kinds := map[fkevent.Kind]bool{}
|
||||
for i, k := range raw {
|
||||
kind, err := fkevent.ParseKind(k)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid expected kind at index %d (%q): %w", i, k, err)
|
||||
}
|
||||
kinds[kind] = true
|
||||
}
|
||||
return kinds, nil
|
||||
}
|
||||
|
||||
func advertisedSourceKinds(in fksources.Input) map[fkevent.Kind]bool {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
kinds := map[fkevent.Kind]bool{}
|
||||
if ks, ok := in.(fksources.KindsSource); ok {
|
||||
for _, kind := range ks.Kinds() {
|
||||
kinds[kind] = true
|
||||
}
|
||||
return kinds
|
||||
}
|
||||
|
||||
if ks, ok := in.(fksources.KindSource); ok {
|
||||
kinds[ks.Kind()] = true
|
||||
return kinds
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func sortedKinds(kindSet map[fkevent.Kind]bool) []string {
|
||||
out := make([]string, 0, len(kindSet))
|
||||
for kind := range kindSet {
|
||||
out = append(out, string(kind))
|
||||
}
|
||||
sort.Strings(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// keep time imported (mirrors your previous main.go defensive trick)
|
||||
var _ = time.Second
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
fkprocessors "gitea.maximumdirect.net/ejr/feedkit/processors"
|
||||
fkdedupe "gitea.maximumdirect.net/ejr/feedkit/processors/dedupe"
|
||||
fknormalize "gitea.maximumdirect.net/ejr/feedkit/processors/normalize"
|
||||
fksources "gitea.maximumdirect.net/ejr/feedkit/sources"
|
||||
|
||||
wfnormalizers "gitea.maximumdirect.net/ejr/weatherfeeder/internal/normalizers"
|
||||
)
|
||||
@@ -44,8 +45,8 @@ func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) {
|
||||
kind: fkevent.Kind("observation"),
|
||||
}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,8 +57,8 @@ func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) {
|
||||
kinds: []fkevent.Kind{"observation", "forecast"},
|
||||
}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,12 +69,12 @@ func TestValidateSourceExpectedKindsMismatchFails(t *testing.T) {
|
||||
kinds: []fkevent.Kind{"observation", "forecast"},
|
||||
}
|
||||
|
||||
err := validateSourceExpectedKinds(sc, in)
|
||||
err := fksources.ValidateExpectedKinds(sc, in)
|
||||
if err == nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() expected mismatch error, got nil")
|
||||
t.Fatalf("ValidateExpectedKinds() expected mismatch error, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "configured expected kind") {
|
||||
t.Fatalf("validateSourceExpectedKinds() error %q does not include expected message", err)
|
||||
t.Fatalf("ValidateExpectedKinds() error %q does not include expected message", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,14 +82,8 @@ func TestValidateSourceExpectedKindsNoMetadataSkipsCheck(t *testing.T) {
|
||||
sc := config.SourceConfig{Kinds: []string{"alert"}}
|
||||
in := testInput{name: "test"}
|
||||
|
||||
if err := validateSourceExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("validateSourceExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseExpectedKindsRejectsEmptyValues(t *testing.T) {
|
||||
if _, err := parseExpectedKinds([]string{""}); err == nil {
|
||||
t.Fatalf("parseExpectedKinds() expected error for empty kind")
|
||||
if err := fksources.ValidateExpectedKinds(sc, in); err != nil {
|
||||
t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user