Files
feedkit/sources/http.go
Eric Rakestraw eb9a7cb349 Refactor feedkit boundaries ahead of v1
Remove global Postgres schema registration in favor of explicit schema-aware sink factory wiring, and update weatherfeeder to register the Postgres sink explicitly. Add optional per-source HTTP timeout and response body limit overrides while keeping feedkit defaults. Remove remaining legacy source/config compatibility surfaces, including singular kind support and old source registry/type aliases, and migrate weatherfeeder sources to plural `Kinds()` metadata. Clean up related docs, tests, and sample config to match the new Postgres, HTTP, and NATS configuration model.
2026-03-28 13:52:48 -05:00

153 lines
4.3 KiB
Go

package sources
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"gitea.maximumdirect.net/ejr/feedkit/config"
"gitea.maximumdirect.net/ejr/feedkit/transport"
)
// HTTPSource is a reusable helper for polling HTTP-backed sources.
//
// It centralizes generic source config parsing (`params.url`,
// `params.user_agent`, and optional `params.conditional`), default HTTP client
// setup, and conditional GET validator handling. Concrete daemon sources remain
// responsible for decoding the response body and constructing events.
type HTTPSource struct {
Driver string
Name string
URL string
UserAgent string
Accept string
Conditional bool
ResponseBodyLimitBytes int64
Client *http.Client
mu sync.Mutex
validators transport.HTTPValidators
}
// NewHTTPSource builds a generic HTTP polling helper from SourceConfig.
//
// Required params:
// - params.url
// - params.user_agent
//
// Optional params:
// - params.conditional (default true): enable conditional GET using cached
// ETag / Last-Modified validators
func NewHTTPSource(driver string, cfg config.SourceConfig, accept string) (*HTTPSource, error) {
name := strings.TrimSpace(cfg.Name)
if name == "" {
return nil, fmt.Errorf("%s: name is required", driver)
}
if cfg.Params == nil {
return nil, fmt.Errorf("%s %q: params are required (need params.url and params.user_agent)", driver, cfg.Name)
}
url, ok := cfg.ParamString("url", "URL")
if !ok {
return nil, fmt.Errorf("%s %q: params.url is required", driver, cfg.Name)
}
userAgent, ok := cfg.ParamString("user_agent", "userAgent")
if !ok {
return nil, fmt.Errorf("%s %q: params.user_agent is required", driver, cfg.Name)
}
conditional := true
if _, exists := cfg.Params["conditional"]; exists {
var ok bool
conditional, ok = cfg.ParamBool("conditional")
if !ok {
return nil, fmt.Errorf("source %q: params.conditional must be a boolean", cfg.Name)
}
}
timeout := transport.DefaultHTTPTimeout
if _, exists := cfg.Params["http_timeout"]; exists {
var ok bool
timeout, ok = cfg.ParamDuration("http_timeout")
if !ok || timeout <= 0 {
return nil, fmt.Errorf("source %q: params.http_timeout must be a positive duration", cfg.Name)
}
}
bodyLimit := transport.DefaultHTTPResponseBodyLimitBytes
if _, exists := cfg.Params["http_response_body_limit_bytes"]; exists {
rawLimit, ok := cfg.ParamInt("http_response_body_limit_bytes")
if !ok || rawLimit <= 0 {
return nil, fmt.Errorf("source %q: params.http_response_body_limit_bytes must be a positive integer", cfg.Name)
}
bodyLimit = int64(rawLimit)
}
return &HTTPSource{
Driver: driver,
Name: name,
URL: url,
UserAgent: userAgent,
Accept: accept,
Conditional: conditional,
ResponseBodyLimitBytes: bodyLimit,
Client: transport.NewHTTPClient(timeout),
}, nil
}
// FetchBytesIfChanged fetches the configured URL and reports whether the
// upstream content changed. An unchanged 304 response returns changed=false
// with no body and no error.
func (s *HTTPSource) FetchBytesIfChanged(ctx context.Context) ([]byte, bool, error) {
client := s.Client
if client == nil {
client = transport.NewHTTPClient(transport.DefaultHTTPTimeout)
}
s.mu.Lock()
validators := s.validators
s.mu.Unlock()
bodyLimit := s.ResponseBodyLimitBytes
if bodyLimit <= 0 {
bodyLimit = transport.DefaultHTTPResponseBodyLimitBytes
}
body, changed, next, err := transport.FetchBodyIfChangedWithLimit(
ctx,
client,
s.URL,
s.UserAgent,
s.Accept,
s.Conditional,
validators,
bodyLimit,
)
if err != nil {
return nil, false, fmt.Errorf("%s %q: %w", s.Driver, s.Name, err)
}
if s.Conditional {
s.mu.Lock()
s.validators = next
s.mu.Unlock()
}
return body, changed, nil
}
// FetchJSONIfChanged fetches the configured URL and returns the raw response
// body as json.RawMessage when content changed. An unchanged 304 response
// returns changed=false with a nil body and no error.
func (s *HTTPSource) FetchJSONIfChanged(ctx context.Context) (json.RawMessage, bool, error) {
body, changed, err := s.FetchBytesIfChanged(ctx)
if err != nil || !changed {
return nil, changed, err
}
return json.RawMessage(body), true, nil
}