118 lines
3.3 KiB
Go
118 lines
3.3 KiB
Go
package sources
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"gitea.maximumdirect.net/ejr/feedkit/config"
|
|
pgconn "gitea.maximumdirect.net/ejr/feedkit/internal/postgres"
|
|
)
|
|
|
|
const defaultPostgresQueryTimeout = 30 * time.Second
|
|
|
|
type postgresQueryDB interface {
|
|
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
|
|
}
|
|
|
|
var openPostgresQueryDB = func(ctx context.Context, cfg pgconn.ConnConfig) (postgresQueryDB, error) {
|
|
return pgconn.Open(ctx, cfg)
|
|
}
|
|
|
|
// PostgresQuerySource is a reusable helper for polling Postgres-backed sources.
|
|
//
|
|
// It centralizes generic source config parsing and query execution. Concrete
|
|
// daemon sources remain responsible for SQL semantics, row scanning, cursoring,
|
|
// and event construction.
|
|
type PostgresQuerySource struct {
|
|
Driver string
|
|
Name string
|
|
SQL string
|
|
QueryTimeout time.Duration
|
|
|
|
db postgresQueryDB
|
|
}
|
|
|
|
// NewPostgresQuerySource builds a generic Postgres polling helper from
|
|
// SourceConfig.
|
|
//
|
|
// Required params:
|
|
// - params.uri
|
|
// - params.username
|
|
// - params.password
|
|
// - params.query
|
|
//
|
|
// Optional params:
|
|
// - params.query_timeout (default 30s)
|
|
func NewPostgresQuerySource(driver string, cfg config.SourceConfig) (*PostgresQuerySource, error) {
|
|
name := strings.TrimSpace(cfg.Name)
|
|
if name == "" {
|
|
return nil, fmt.Errorf("%s: name is required", driver)
|
|
}
|
|
if cfg.Params == nil {
|
|
return nil, fmt.Errorf("%s %q: params are required (need params.uri, params.username, params.password, and params.query)", driver, cfg.Name)
|
|
}
|
|
|
|
uri, ok := cfg.ParamString("uri")
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s %q: params.uri is required", driver, cfg.Name)
|
|
}
|
|
username, ok := cfg.ParamString("username")
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s %q: params.username is required", driver, cfg.Name)
|
|
}
|
|
password, ok := cfg.ParamString("password")
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s %q: params.password is required", driver, cfg.Name)
|
|
}
|
|
query, ok := cfg.ParamString("query")
|
|
if !ok {
|
|
return nil, fmt.Errorf("%s %q: params.query is required", driver, cfg.Name)
|
|
}
|
|
|
|
queryTimeout := defaultPostgresQueryTimeout
|
|
if _, exists := cfg.Params["query_timeout"]; exists {
|
|
var ok bool
|
|
queryTimeout, ok = cfg.ParamDuration("query_timeout")
|
|
if !ok || queryTimeout <= 0 {
|
|
return nil, fmt.Errorf("source %q: params.query_timeout must be a positive duration", cfg.Name)
|
|
}
|
|
}
|
|
|
|
db, err := openPostgresQueryDB(context.Background(), pgconn.ConnConfig{
|
|
URI: uri,
|
|
Username: username,
|
|
Password: password,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s %q: open db: %w", driver, cfg.Name, err)
|
|
}
|
|
|
|
return &PostgresQuerySource{
|
|
Driver: driver,
|
|
Name: name,
|
|
SQL: query,
|
|
QueryTimeout: queryTimeout,
|
|
db: db,
|
|
}, nil
|
|
}
|
|
|
|
func (s *PostgresQuerySource) Query(ctx context.Context, args ...any) (*sql.Rows, error) {
|
|
queryCtx := ctx
|
|
if s.QueryTimeout > 0 {
|
|
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > s.QueryTimeout {
|
|
// We intentionally do not cancel this derived context here because the
|
|
// returned rows may still be reading from the database.
|
|
queryCtx, _ = context.WithTimeout(ctx, s.QueryTimeout)
|
|
}
|
|
}
|
|
|
|
rows, err := s.db.QueryContext(queryCtx, s.SQL, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("%s %q: query: %w", s.Driver, s.Name, err)
|
|
}
|
|
return rows, nil
|
|
}
|