// Package sinks defines the feedkit sink interface, sink driver registry, and // built-in infrastructure sinks. // // External API surface: // - Sink: adapter interface that consumes event.Event values // - Registry / NewRegistry: named sink factory registry // - RegisterBuiltins: registers the schema-free built-in sink drivers // // Built-in sink implementations: // - stdout // - nats // - postgres // // Optional helpers from helpers.go: // - PostgresFactory: returns a sink factory for the built-in Postgres sink // using a provided downstream schema // // # NATS built-in overview // // The NATS sink publishes each event as JSON to a configured subject. // // Required params: // - url: NATS server URL (for example, nats://localhost:4222) // - subject: NATS subject to publish to // // Example config: // // sinks: // - name: nats_main // driver: nats // params: // url: nats://localhost:4222 // subject: feedkit.events // // # Postgres built-in overview // // The postgres sink is intentionally split between downstream daemon ownership // and feedkit ownership: // - downstream code registers table schema + event mapping functions // - feedkit manages DB connection, create-if-missing DDL, transactional // inserts, optional automatic retention pruning, and manual prune helpers // // Example config: // // sinks: // - name: pg_main // driver: postgres // params: // uri: postgres://localhost:5432/feedkit?sslmode=disable // username: feedkit_user // password: feedkit_pass // prune: 3d # optional: prune rows older than now-3d on each write tx // // params.prune supports: // - Go duration strings (72h, 90m, 30s, ...) // - day/week suffixes (3d, 2w) // // If params.prune is omitted, automatic pruning is disabled. // // Example downstream wiring: // // sinkReg := sinks.NewRegistry() // sinks.RegisterBuiltins(sinkReg) // sinkReg.Register("postgres", sinks.PostgresFactory(sinks.PostgresSchema{ // Tables: []sinks.PostgresTable{ // { // Name: "events", // Columns: []sinks.PostgresColumn{ // {Name: "event_id", Type: "TEXT", Nullable: false}, // {Name: "emitted_at", Type: "TIMESTAMPTZ", Nullable: false}, // {Name: "payload_json", Type: "JSONB", Nullable: false}, // }, // PrimaryKey: []string{"event_id"}, // PruneColumn: "emitted_at", // required for retention pruning // }, // }, // MapEvent: func(ctx context.Context, e event.Event) ([]sinks.PostgresWrite, error) { // b, err := json.Marshal(e.Payload) // if err != nil { // return nil, err // } // return []sinks.PostgresWrite{ // { // Table: "events", // Values: map[string]any{ // "event_id": e.ID, // "emitted_at": e.EmittedAt, // "payload_json": string(b), // }, // }, // }, nil // }, // })) // // Manual pruning via type assertion (administrative helpers): // // if p, ok := sink.(sinks.PostgresPruner); ok { // _, _ = p.PruneKeepLatest(ctx, "events", 10000) // _, _ = p.PruneOlderThan(ctx, "events", time.Now().UTC().AddDate(0, -1, 0)) // } package sinks