diff --git a/cmd/weatherfeeder/config.yml b/cmd/weatherfeeder/config.yml index 6b307e7..2bc1e7f 100644 --- a/cmd/weatherfeeder/config.yml +++ b/cmd/weatherfeeder/config.yml @@ -90,7 +90,7 @@ sinks: driver: nats params: url: nats://nats:4222 - exchange: weatherfeeder + subject: weatherfeeder # - name: pg_weatherfeeder # driver: postgres diff --git a/cmd/weatherfeeder/main.go b/cmd/weatherfeeder/main.go index f8f77c5..9405691 100644 --- a/cmd/weatherfeeder/main.go +++ b/cmd/weatherfeeder/main.go @@ -37,9 +37,6 @@ func main() { if err != nil { log.Fatalf("config load failed: %v", err) } - if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, wfpgsink.PostgresSchema()); err != nil { - log.Fatalf("postgres schema registration failed: %v", err) - } // --- Registries --- srcReg := fksources.NewRegistry() @@ -48,6 +45,7 @@ func main() { // Compile stdout, Postgres, and NATS sinks for weatherfeeder. The former is useful for debugging and the latter are the main intended outputs. sinkReg := fksinks.NewRegistry() fksinks.RegisterBuiltins(sinkReg) + sinkReg.Register("postgres", fksinks.PostgresFactory(wfpgsink.PostgresSchema())) // --- Build sources into scheduler jobs --- var jobs []fkscheduler.Job diff --git a/cmd/weatherfeeder/main_test.go b/cmd/weatherfeeder/main_test.go index 2cfbef9..5aef5cb 100644 --- a/cmd/weatherfeeder/main_test.go +++ b/cmd/weatherfeeder/main_test.go @@ -24,13 +24,6 @@ type testInput struct { func (s testInput) Name() string { return s.name } -type testKindSource struct { - testInput - kind fkevent.Kind -} - -func (s testKindSource) Kind() fkevent.Kind { return s.kind } - type testKindsSource struct { testInput kinds []fkevent.Kind @@ -38,18 +31,6 @@ type testKindsSource struct { func (s testKindsSource) Kinds() []fkevent.Kind { return s.kinds } -func TestValidateSourceExpectedKindsLegacyKindFallback(t *testing.T) { - sc := config.SourceConfig{Kind: "observation"} - in := testKindSource{ - testInput: testInput{name: "test"}, - kind: fkevent.Kind("observation"), - } - - if err := fksources.ValidateExpectedKinds(sc, in); err != nil { - t.Fatalf("ValidateExpectedKinds() unexpected error: %v", err) - } -} - func TestValidateSourceExpectedKindsSubsetAllowed(t *testing.T) { sc := config.SourceConfig{Kinds: []string{"observation"}} in := testKindsSource{ diff --git a/go.mod b/go.mod index f5bb809..4ef4814 100644 --- a/go.mod +++ b/go.mod @@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder go 1.25 -require gitea.maximumdirect.net/ejr/feedkit v0.8.1 +require gitea.maximumdirect.net/ejr/feedkit v0.8.2 require ( github.com/klauspost/compress v1.17.2 // indirect diff --git a/go.sum b/go.sum index 6a836dc..7d05c8c 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -gitea.maximumdirect.net/ejr/feedkit v0.8.1 h1:gSZ5J/mYHfNZ6dp27TS0V9RQ0JuP5ZAHXhSeCJaBu60= -gitea.maximumdirect.net/ejr/feedkit v0.8.1/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= +gitea.maximumdirect.net/ejr/feedkit v0.8.2 h1:6AMxNacfqJ8SXQhFAUMW3LgiVxixs50tf+S8Q9Ivm+Y= +gitea.maximumdirect.net/ejr/feedkit v0.8.2/go.mod h1:U6xC9xZLN3cL4yi7YBVyzGoHYRLJXusFCAKlj2kdYYQ= github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= diff --git a/internal/sinks/postgres/schema_test.go b/internal/sinks/postgres/schema_test.go index 70763b0..65779e4 100644 --- a/internal/sinks/postgres/schema_test.go +++ b/internal/sinks/postgres/schema_test.go @@ -1,58 +1,6 @@ package postgres -import ( - "fmt" - "strings" - "testing" - "time" - - "gitea.maximumdirect.net/ejr/feedkit/config" - fksinks "gitea.maximumdirect.net/ejr/feedkit/sinks" -) - -func TestRegisterPostgresSchemasNilConfig(t *testing.T) { - err := fksinks.RegisterPostgresSchemaForConfiguredSinks(nil, PostgresSchema()) - if err == nil { - t.Fatalf("RegisterPostgresSchemas(nil) expected error") - } - if !strings.Contains(err.Error(), "config is nil") { - t.Fatalf("error = %q, want config is nil", err) - } -} - -func TestRegisterPostgresSchemasNonPostgresNoOp(t *testing.T) { - cfg := &config.Config{ - Sinks: []config.SinkConfig{ - {Name: "stdout_only", Driver: "stdout"}, - {Name: "nats_only", Driver: "nats"}, - }, - } - - if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil { - t.Fatalf("RegisterPostgresSchemas(non-postgres) error = %v", err) - } -} - -func TestRegisterPostgresSchemasDuplicateRegistrationFails(t *testing.T) { - sinkName := uniqueSinkName("pg_test") - cfg := &config.Config{ - Sinks: []config.SinkConfig{ - {Name: sinkName, Driver: "postgres"}, - }, - } - - if err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()); err != nil { - t.Fatalf("first RegisterPostgresSchemas() error = %v", err) - } - - err := fksinks.RegisterPostgresSchemaForConfiguredSinks(cfg, PostgresSchema()) - if err == nil { - t.Fatalf("second RegisterPostgresSchemas() expected duplicate error") - } - if !strings.Contains(err.Error(), "already registered") { - t.Fatalf("error = %q, want already registered", err) - } -} +import "testing" func TestWeatherPostgresSchemaShape(t *testing.T) { s := PostgresSchema() @@ -90,7 +38,3 @@ func TestWeatherPostgresSchemaShape(t *testing.T) { } } } - -func uniqueSinkName(prefix string) string { - return fmt.Sprintf("%s_%d", prefix, time.Now().UnixNano()) -} diff --git a/internal/sources/nws/alerts.go b/internal/sources/nws/alerts.go index 10804cf..f244c46 100644 --- a/internal/sources/nws/alerts.go +++ b/internal/sources/nws/alerts.go @@ -39,8 +39,8 @@ func NewAlertsSource(cfg config.SourceConfig) (*AlertsSource, error) { func (s *AlertsSource) Name() string { return s.http.Name } -// Kind is used for routing/policy. -func (s *AlertsSource) Kind() event.Kind { return event.Kind("alert") } +// Kinds is used for routing/policy. +func (s *AlertsSource) Kinds() []event.Kind { return []event.Kind{event.Kind("alert")} } func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, changed, err := s.fetchRaw(ctx) @@ -71,7 +71,7 @@ func (s *AlertsSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) return fksources.SingleEvent( - s.Kind(), + event.Kind("alert"), s.http.Name, standards.SchemaRawNWSAlertsV1, eventID, diff --git a/internal/sources/nws/forecast_common.go b/internal/sources/nws/forecast_common.go index c17372a..3d67b03 100644 --- a/internal/sources/nws/forecast_common.go +++ b/internal/sources/nws/forecast_common.go @@ -44,7 +44,7 @@ func newForecastSource(cfg config.SourceConfig, driver, rawSchema string) (*fore func (s *forecastSource) Name() string { return s.http.Name } -func (s *forecastSource) Kind() event.Kind { return event.Kind("forecast") } +func (s *forecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} } func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, changed, err := s.fetchRaw(ctx) @@ -69,7 +69,7 @@ func (s *forecastSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) return fksources.SingleEvent( - s.Kind(), + event.Kind("forecast"), s.http.Name, s.rawSchema, eventID, diff --git a/internal/sources/nws/forecast_test.go b/internal/sources/nws/forecast_test.go index 0dfc8bc..89ca44f 100644 --- a/internal/sources/nws/forecast_test.go +++ b/internal/sources/nws/forecast_test.go @@ -53,6 +53,11 @@ func TestForecastSourcesEmitExpectedSchemaAndPreferGeneratedAt(t *testing.T) { if err != nil { t.Fatalf("newSource() error = %v", err) } + if ks, ok := src.(interface{ Kinds() []event.Kind }); !ok { + t.Fatalf("source does not implement Kinds()") + } else if gotKinds := ks.Kinds(); len(gotKinds) != 1 || gotKinds[0] != event.Kind("forecast") { + t.Fatalf("Kinds() = %#v, want [forecast]", gotKinds) + } got, err := src.Poll(context.Background()) if err != nil { diff --git a/internal/sources/nws/observation.go b/internal/sources/nws/observation.go index 4cd0ce8..e139102 100644 --- a/internal/sources/nws/observation.go +++ b/internal/sources/nws/observation.go @@ -32,7 +32,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { func (s *ObservationSource) Name() string { return s.http.Name } -func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } +func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, changed, err := s.fetchRaw(ctx) @@ -54,7 +54,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := fksources.DefaultEventID(meta.ID, s.http.Name, effectiveAt, emittedAt) return fksources.SingleEvent( - s.Kind(), + event.Kind("observation"), s.http.Name, standards.SchemaRawNWSObservationV1, eventID, diff --git a/internal/sources/nws/observation_test.go b/internal/sources/nws/observation_test.go index 8104fa1..a6e20d7 100644 --- a/internal/sources/nws/observation_test.go +++ b/internal/sources/nws/observation_test.go @@ -41,6 +41,9 @@ func TestObservationSourcePollReturnsNoEventsOn304(t *testing.T) { if err != nil { t.Fatalf("NewObservationSource() error = %v", err) } + if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") { + t.Fatalf("Kinds() = %#v, want [observation]", got) + } first, err := src.Poll(context.Background()) if err != nil { diff --git a/internal/sources/openmeteo/forecast.go b/internal/sources/openmeteo/forecast.go index d01980a..7749099 100644 --- a/internal/sources/openmeteo/forecast.go +++ b/internal/sources/openmeteo/forecast.go @@ -31,7 +31,7 @@ func NewForecastSource(cfg config.SourceConfig) (*ForecastSource, error) { func (s *ForecastSource) Name() string { return s.http.Name } -func (s *ForecastSource) Kind() event.Kind { return event.Kind("forecast") } +func (s *ForecastSource) Kinds() []event.Kind { return []event.Kind{event.Kind("forecast")} } func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, changed, err := s.fetchRaw(ctx) @@ -55,7 +55,7 @@ func (s *ForecastSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) return fksources.SingleEvent( - s.Kind(), + event.Kind("forecast"), s.http.Name, standards.SchemaRawOpenMeteoHourlyForecastV1, eventID, diff --git a/internal/sources/openmeteo/observation.go b/internal/sources/openmeteo/observation.go index 15639b8..6eeb276 100644 --- a/internal/sources/openmeteo/observation.go +++ b/internal/sources/openmeteo/observation.go @@ -31,7 +31,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { func (s *ObservationSource) Name() string { return s.http.Name } -func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } +func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { raw, meta, changed, err := s.fetchRaw(ctx) @@ -52,7 +52,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) return fksources.SingleEvent( - s.Kind(), + event.Kind("observation"), s.http.Name, standards.SchemaRawOpenMeteoCurrentV1, eventID, diff --git a/internal/sources/openmeteo/source_test.go b/internal/sources/openmeteo/source_test.go new file mode 100644 index 0000000..b010678 --- /dev/null +++ b/internal/sources/openmeteo/source_test.go @@ -0,0 +1,44 @@ +package openmeteo + +import ( + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestObservationSourceAdvertisesKinds(t *testing.T) { + src, err := NewObservationSource(config.SourceConfig{ + Name: "openmeteo-observation-test", + Driver: "openmeteo_observation", + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": "https://example.invalid", + "user_agent": "test-agent", + }, + }) + if err != nil { + t.Fatalf("NewObservationSource() error = %v", err) + } + if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") { + t.Fatalf("Kinds() = %#v, want [observation]", got) + } +} + +func TestForecastSourceAdvertisesKinds(t *testing.T) { + src, err := NewForecastSource(config.SourceConfig{ + Name: "openmeteo-forecast-test", + Driver: "openmeteo_forecast", + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": "https://example.invalid", + "user_agent": "test-agent", + }, + }) + if err != nil { + t.Fatalf("NewForecastSource() error = %v", err) + } + if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("forecast") { + t.Fatalf("Kinds() = %#v, want [forecast]", got) + } +} diff --git a/internal/sources/openweather/observation.go b/internal/sources/openweather/observation.go index 25b6264..cb99c42 100644 --- a/internal/sources/openweather/observation.go +++ b/internal/sources/openweather/observation.go @@ -35,7 +35,7 @@ func NewObservationSource(cfg config.SourceConfig) (*ObservationSource, error) { func (s *ObservationSource) Name() string { return s.http.Name } -func (s *ObservationSource) Kind() event.Kind { return event.Kind("observation") } +func (s *ObservationSource) Kinds() []event.Kind { return []event.Kind{event.Kind("observation")} } func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { if err := owcommon.RequireMetricUnits(s.http.URL); err != nil { @@ -60,7 +60,7 @@ func (s *ObservationSource) Poll(ctx context.Context) ([]event.Event, error) { eventID := fksources.DefaultEventID("", s.http.Name, effectiveAt, emittedAt) return fksources.SingleEvent( - s.Kind(), + event.Kind("observation"), s.http.Name, standards.SchemaRawOpenWeatherCurrentV1, eventID, diff --git a/internal/sources/openweather/source_test.go b/internal/sources/openweather/source_test.go new file mode 100644 index 0000000..170c6e6 --- /dev/null +++ b/internal/sources/openweather/source_test.go @@ -0,0 +1,26 @@ +package openweather + +import ( + "testing" + + "gitea.maximumdirect.net/ejr/feedkit/config" + "gitea.maximumdirect.net/ejr/feedkit/event" +) + +func TestObservationSourceAdvertisesKinds(t *testing.T) { + src, err := NewObservationSource(config.SourceConfig{ + Name: "openweather-observation-test", + Driver: "openweather_observation", + Mode: config.SourceModePoll, + Params: map[string]any{ + "url": "https://example.invalid?units=metric", + "user_agent": "test-agent", + }, + }) + if err != nil { + t.Fatalf("NewObservationSource() error = %v", err) + } + if got := src.Kinds(); len(got) != 1 || got[0] != event.Kind("observation") { + t.Fatalf("Kinds() = %#v, want [observation]", got) + } +}