Remove global Postgres schema registration in favor of explicit schema-aware sink factory wiring, and update weatherfeeder to register the Postgres sink explicitly. Add optional per-source HTTP timeout and response body limit overrides while keeping feedkit defaults. Remove remaining legacy source/config compatibility surfaces, including singular kind support and old source registry/type aliases, and migrate weatherfeeder sources to plural `Kinds()` metadata. Clean up related docs, tests, and sample config to match the new Postgres, HTTP, and NATS configuration model.
127 lines
3.4 KiB
Go
127 lines
3.4 KiB
Go
package sinks
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"testing"
|
|
|
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
"gitea.maximumdirect.net/ejr/feedkit/event"
|
|
)
|
|
|
|
type testSink struct{ name string }
|
|
|
|
func (s testSink) Name() string { return s.name }
|
|
|
|
func (s testSink) Consume(context.Context, event.Event) error { return nil }
|
|
|
|
func TestRegistryRegisterPanicsOnNilRegistry(t *testing.T) {
|
|
var r *Registry
|
|
defer func() {
|
|
if recover() == nil {
|
|
t.Fatalf("Register() expected panic on nil registry")
|
|
}
|
|
}()
|
|
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "stdout"}, nil })
|
|
}
|
|
|
|
func TestRegistryRegisterPanicsOnEmptyDriver(t *testing.T) {
|
|
r := NewRegistry()
|
|
defer func() {
|
|
if recover() == nil {
|
|
t.Fatalf("Register() expected panic on empty driver")
|
|
}
|
|
}()
|
|
r.Register(" ", func(config.SinkConfig) (Sink, error) { return testSink{name: "x"}, nil })
|
|
}
|
|
|
|
func TestRegistryRegisterPanicsOnNilFactory(t *testing.T) {
|
|
r := NewRegistry()
|
|
defer func() {
|
|
if recover() == nil {
|
|
t.Fatalf("Register() expected panic on nil factory")
|
|
}
|
|
}()
|
|
r.Register("stdout", nil)
|
|
}
|
|
|
|
func TestRegistryRegisterPanicsOnDuplicateDriver(t *testing.T) {
|
|
r := NewRegistry()
|
|
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "a"}, nil })
|
|
|
|
defer func() {
|
|
if recover() == nil {
|
|
t.Fatalf("Register() expected panic on duplicate driver")
|
|
}
|
|
}()
|
|
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "b"}, nil })
|
|
}
|
|
|
|
func TestRegistryBuildNilRegistryFails(t *testing.T) {
|
|
var r *Registry
|
|
_, err := r.Build(config.SinkConfig{Driver: "stdout"})
|
|
if err == nil {
|
|
t.Fatalf("Build() expected error for nil registry")
|
|
}
|
|
if !strings.Contains(err.Error(), "registry is nil") {
|
|
t.Fatalf("Build() error = %q, want registry is nil", err)
|
|
}
|
|
}
|
|
|
|
func TestRegistryBuildTrimsDriver(t *testing.T) {
|
|
r := NewRegistry()
|
|
r.Register("stdout", func(config.SinkConfig) (Sink, error) { return testSink{name: "stdout"}, nil })
|
|
|
|
sink, err := r.Build(config.SinkConfig{Name: "sink1", Driver: " stdout "})
|
|
if err != nil {
|
|
t.Fatalf("Build() error = %v", err)
|
|
}
|
|
if sink.Name() != "stdout" {
|
|
t.Fatalf("Build() sink name = %q, want stdout", sink.Name())
|
|
}
|
|
}
|
|
|
|
func TestRegistryBuildWrapsFactoryError(t *testing.T) {
|
|
r := NewRegistry()
|
|
r.Register("broken", func(config.SinkConfig) (Sink, error) { return nil, errors.New("boom") })
|
|
|
|
_, err := r.Build(config.SinkConfig{Driver: "broken"})
|
|
if err == nil {
|
|
t.Fatalf("Build() expected error")
|
|
}
|
|
if !strings.Contains(err.Error(), `build sink "broken": boom`) {
|
|
t.Fatalf("Build() error = %q", err)
|
|
}
|
|
}
|
|
|
|
func TestRegistryBuildRejectsNilSink(t *testing.T) {
|
|
r := NewRegistry()
|
|
r.Register("nil_sink", func(config.SinkConfig) (Sink, error) { return nil, nil })
|
|
|
|
_, err := r.Build(config.SinkConfig{Driver: "nil_sink"})
|
|
if err == nil {
|
|
t.Fatalf("Build() expected nil sink error")
|
|
}
|
|
if !strings.Contains(err.Error(), "factory returned nil sink") {
|
|
t.Fatalf("Build() error = %q", err)
|
|
}
|
|
}
|
|
|
|
func TestRegisterBuiltinsExposesExpectedDrivers(t *testing.T) {
|
|
r := NewRegistry()
|
|
RegisterBuiltins(r)
|
|
|
|
if len(r.byDriver) != 2 {
|
|
t.Fatalf("len(byDriver) = %d, want 2", len(r.byDriver))
|
|
}
|
|
for _, driver := range []string{"stdout", "nats"} {
|
|
if _, ok := r.byDriver[driver]; !ok {
|
|
t.Fatalf("builtins missing driver %q", driver)
|
|
}
|
|
}
|
|
if _, ok := r.byDriver["postgres"]; ok {
|
|
t.Fatalf("builtins unexpectedly registered postgres driver")
|
|
}
|
|
}
|