Files
feedkit/sources/postgres.go

118 lines
3.3 KiB
Go

package sources
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"gitea.maximumdirect.net/ejr/feedkit/config"
pgconn "gitea.maximumdirect.net/ejr/feedkit/internal/postgres"
)
const defaultPostgresQueryTimeout = 30 * time.Second
type postgresQueryDB interface {
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
}
var openPostgresQueryDB = func(ctx context.Context, cfg pgconn.ConnConfig) (postgresQueryDB, error) {
return pgconn.Open(ctx, cfg)
}
// PostgresQuerySource is a reusable helper for polling Postgres-backed sources.
//
// It centralizes generic source config parsing and query execution. Concrete
// daemon sources remain responsible for SQL semantics, row scanning, cursoring,
// and event construction.
type PostgresQuerySource struct {
Driver string
Name string
SQL string
QueryTimeout time.Duration
db postgresQueryDB
}
// NewPostgresQuerySource builds a generic Postgres polling helper from
// SourceConfig.
//
// Required params:
// - params.uri
// - params.username
// - params.password
// - params.query
//
// Optional params:
// - params.query_timeout (default 30s)
func NewPostgresQuerySource(driver string, cfg config.SourceConfig) (*PostgresQuerySource, error) {
name := strings.TrimSpace(cfg.Name)
if name == "" {
return nil, fmt.Errorf("%s: name is required", driver)
}
if cfg.Params == nil {
return nil, fmt.Errorf("%s %q: params are required (need params.uri, params.username, params.password, and params.query)", driver, cfg.Name)
}
uri, ok := cfg.ParamString("uri")
if !ok {
return nil, fmt.Errorf("%s %q: params.uri is required", driver, cfg.Name)
}
username, ok := cfg.ParamString("username")
if !ok {
return nil, fmt.Errorf("%s %q: params.username is required", driver, cfg.Name)
}
password, ok := cfg.ParamString("password")
if !ok {
return nil, fmt.Errorf("%s %q: params.password is required", driver, cfg.Name)
}
query, ok := cfg.ParamString("query")
if !ok {
return nil, fmt.Errorf("%s %q: params.query is required", driver, cfg.Name)
}
queryTimeout := defaultPostgresQueryTimeout
if _, exists := cfg.Params["query_timeout"]; exists {
var ok bool
queryTimeout, ok = cfg.ParamDuration("query_timeout")
if !ok || queryTimeout <= 0 {
return nil, fmt.Errorf("source %q: params.query_timeout must be a positive duration", cfg.Name)
}
}
db, err := openPostgresQueryDB(context.Background(), pgconn.ConnConfig{
URI: uri,
Username: username,
Password: password,
})
if err != nil {
return nil, fmt.Errorf("%s %q: open db: %w", driver, cfg.Name, err)
}
return &PostgresQuerySource{
Driver: driver,
Name: name,
SQL: query,
QueryTimeout: queryTimeout,
db: db,
}, nil
}
func (s *PostgresQuerySource) Query(ctx context.Context, args ...any) (*sql.Rows, error) {
queryCtx := ctx
if s.QueryTimeout > 0 {
if deadline, ok := ctx.Deadline(); !ok || time.Until(deadline) > s.QueryTimeout {
// We intentionally do not cancel this derived context here because the
// returned rows may still be reading from the database.
queryCtx, _ = context.WithTimeout(ctx, s.QueryTimeout)
}
}
rows, err := s.db.QueryContext(queryCtx, s.SQL, args...)
if err != nil {
return nil, fmt.Errorf("%s %q: query: %w", s.Driver, s.Name, err)
}
return rows, nil
}