Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f464592c56 |
@@ -51,10 +51,11 @@ func main() {
|
|||||||
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
sinkReg.Register("nats", func(cfg config.SinkConfig) (fksinks.Sink, error) {
|
||||||
return fksinks.NewNATSSinkFromConfig(cfg)
|
return fksinks.NewNATSSinkFromConfig(cfg)
|
||||||
})
|
})
|
||||||
|
|
||||||
// --- Build sources into scheduler jobs ---
|
// --- Build sources into scheduler jobs ---
|
||||||
var jobs []fkscheduler.Job
|
var jobs []fkscheduler.Job
|
||||||
for i, sc := range cfg.Sources {
|
for i, sc := range cfg.Sources {
|
||||||
src, err := srcReg.Build(sc)
|
in, err := srcReg.BuildInput(sc) // may be polling or streaming
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
log.Fatalf("build source failed (sources[%d] name=%q driver=%q): %v", i, sc.Name, sc.Driver, err)
|
||||||
}
|
}
|
||||||
@@ -65,16 +66,25 @@ func main() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
|
log.Fatalf("invalid kind in config (sources[%d] name=%q kind=%q): %v", i, sc.Name, sc.Kind, err)
|
||||||
}
|
}
|
||||||
if src.Kind() != expectedKind {
|
if in.Kind() != expectedKind {
|
||||||
log.Fatalf(
|
log.Fatalf(
|
||||||
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
|
"source kind mismatch (sources[%d] name=%q driver=%q): config kind=%q but driver emits kind=%q",
|
||||||
i, sc.Name, sc.Driver, expectedKind, src.Kind(),
|
i, sc.Name, sc.Driver, expectedKind, in.Kind(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If this is a polling source, every is required.
|
||||||
|
if _, ok := in.(fksources.Source); ok && sc.Every.Duration <= 0 {
|
||||||
|
log.Fatalf(
|
||||||
|
"polling source missing/invalid interval (sources[%d] name=%q driver=%q): sources[].every must be > 0",
|
||||||
|
i, sc.Name, sc.Driver,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// For stream sources, Every is ignored; it is fine if omitted/zero.
|
||||||
jobs = append(jobs, fkscheduler.Job{
|
jobs = append(jobs, fkscheduler.Job{
|
||||||
Source: src,
|
Source: in,
|
||||||
Every: sc.Every.Duration,
|
Every: sc.Every.Duration,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -2,7 +2,7 @@ module gitea.maximumdirect.net/ejr/weatherfeeder
|
|||||||
|
|
||||||
go 1.25
|
go 1.25
|
||||||
|
|
||||||
require gitea.maximumdirect.net/ejr/feedkit v0.4.1
|
require gitea.maximumdirect.net/ejr/feedkit v0.5.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/klauspost/compress v1.17.2 // indirect
|
github.com/klauspost/compress v1.17.2 // indirect
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
|||||||
gitea.maximumdirect.net/ejr/feedkit v0.4.1 h1:mMFtPCBKp2LXV3euPH21WzjHku/HHx31KQqYW+w1aqU=
|
gitea.maximumdirect.net/ejr/feedkit v0.5.0 h1:T4pRTo9Tj/o7TbZYUbp8UE7cQVLmIucUrYmD6G8E8ZQ=
|
||||||
gitea.maximumdirect.net/ejr/feedkit v0.4.1/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
gitea.maximumdirect.net/ejr/feedkit v0.5.0/go.mod h1:wYtA10GouvSe7L/8e1UEC+tqcp32HJofExIo1k+Wjls=
|
||||||
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
|
||||||
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||||
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
github.com/nats-io/nats.go v1.34.0 h1:fnxnPCNiwIG5w08rlMcEKTUw4AV/nKyGCOJE8TdhSPk=
|
||||||
|
|||||||
Reference in New Issue
Block a user