package sinks import ( "context" "fmt" "strings" "time" "gitea.maximumdirect.net/ejr/feedkit/event" ) // PostgresMapFunc maps one event into zero or more table writes. // // Returning zero writes means "this event is not mapped for this sink" and is // treated as a non-error no-op. type PostgresMapFunc func(ctx context.Context, e event.Event) ([]PostgresWrite, error) // PostgresSchema describes the downstream-provided relational model and mapper // for one configured postgres sink. type PostgresSchema struct { Tables []PostgresTable MapEvent PostgresMapFunc } type PostgresWrite struct { Table string Values map[string]any } type PostgresTable struct { Name string Columns []PostgresColumn PrimaryKey []string PruneColumn string Indexes []PostgresIndex } type PostgresColumn struct { Name string Type string Nullable bool } type PostgresIndex struct { Name string Columns []string Unique bool } // PostgresPruner is an optional interface exposed by PostgresSink so downstream // applications can call retention helpers via type assertion. type PostgresPruner interface { PruneKeepLatest(ctx context.Context, table string, keep int) (int64, error) PruneOlderThan(ctx context.Context, table string, cutoff time.Time) (int64, error) PruneAllKeepLatest(ctx context.Context, keep int) (map[string]int64, error) PruneAllOlderThan(ctx context.Context, cutoff time.Time) (map[string]int64, error) } type postgresSchemaCompiled struct { tableOrder []string tables map[string]postgresTableCompiled mapEvent PostgresMapFunc } type postgresTableCompiled struct { name string columns map[string]PostgresColumn columnOrder []string primaryKey []string pruneColumn string indexes []PostgresIndex } func compilePostgresSchema(schema PostgresSchema) (postgresSchemaCompiled, error) { if schema.MapEvent == nil { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: map function is required") } if len(schema.Tables) == 0 { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: at least one table is required") } compiled := postgresSchemaCompiled{ tableOrder: make([]string, 0, len(schema.Tables)), tables: make(map[string]postgresTableCompiled, len(schema.Tables)), mapEvent: schema.MapEvent, } seenTables := map[string]bool{} seenIndexes := map[string]bool{} for i, tbl := range schema.Tables { tableName := strings.TrimSpace(tbl.Name) if tableName == "" { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: tables[%d].name is required", i) } if seenTables[tableName] { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: duplicate table name %q", tableName) } seenTables[tableName] = true if len(tbl.Columns) == 0 { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q must define at least one column", tableName) } colOrder := make([]string, 0, len(tbl.Columns)) colMap := make(map[string]PostgresColumn, len(tbl.Columns)) for j, col := range tbl.Columns { colName := strings.TrimSpace(col.Name) if colName == "" { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q columns[%d].name is required", tableName, j) } if _, exists := colMap[colName]; exists { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q duplicate column %q", tableName, colName) } if strings.TrimSpace(col.Type) == "" { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q column %q type is required", tableName, colName) } colOrder = append(colOrder, colName) colMap[colName] = PostgresColumn{ Name: colName, Type: strings.TrimSpace(col.Type), Nullable: col.Nullable, } } pk, err := validatePostgresColumnSet(tableName, "primary key", tbl.PrimaryKey, colMap) if err != nil { return postgresSchemaCompiled{}, err } pruneCol := strings.TrimSpace(tbl.PruneColumn) if pruneCol == "" { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q prune column is required", tableName) } if _, ok := colMap[pruneCol]; !ok { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q prune column %q not found in columns", tableName, pruneCol) } indexes := make([]PostgresIndex, 0, len(tbl.Indexes)) for j, idx := range tbl.Indexes { idxName := strings.TrimSpace(idx.Name) if idxName == "" { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q indexes[%d].name is required", tableName, j) } if len(idx.Columns) == 0 { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: table %q index %q must include at least one column", tableName, idxName) } if seenIndexes[idxName] { return postgresSchemaCompiled{}, fmt.Errorf("postgres schema: duplicate index name %q", idxName) } seenIndexes[idxName] = true idxCols, err := validatePostgresColumnSet(tableName, fmt.Sprintf("index %q columns", idxName), idx.Columns, colMap) if err != nil { return postgresSchemaCompiled{}, err } indexes = append(indexes, PostgresIndex{ Name: idxName, Columns: idxCols, Unique: idx.Unique, }) } compiled.tableOrder = append(compiled.tableOrder, tableName) compiled.tables[tableName] = postgresTableCompiled{ name: tableName, columns: colMap, columnOrder: colOrder, primaryKey: pk, pruneColumn: pruneCol, indexes: indexes, } } return compiled, nil } func validatePostgresColumnSet(tableName, label string, cols []string, colMap map[string]PostgresColumn) ([]string, error) { if len(cols) == 0 { return nil, nil } out := make([]string, 0, len(cols)) seen := map[string]bool{} for _, c := range cols { name := strings.TrimSpace(c) if name == "" { return nil, fmt.Errorf("postgres schema: table %q %s contains empty column name", tableName, label) } if seen[name] { return nil, fmt.Errorf("postgres schema: table %q %s contains duplicate column %q", tableName, label, name) } if _, ok := colMap[name]; !ok { return nil, fmt.Errorf("postgres schema: table %q %s references unknown column %q", tableName, label, name) } seen[name] = true out = append(out, name) } return out, nil } func (s postgresSchemaCompiled) validateWrite(w PostgresWrite) (postgresTableCompiled, error) { tableName := strings.TrimSpace(w.Table) if tableName == "" { return postgresTableCompiled{}, fmt.Errorf("write table is required") } t, ok := s.tables[tableName] if !ok { return postgresTableCompiled{}, fmt.Errorf("table %q is not defined in postgres schema", tableName) } if len(w.Values) == 0 { return postgresTableCompiled{}, fmt.Errorf("write for table %q must include values", tableName) } for k := range w.Values { if _, ok := t.columns[k]; !ok { return postgresTableCompiled{}, fmt.Errorf("write for table %q includes unknown column %q", tableName, k) } } if len(w.Values) != len(t.columnOrder) { return postgresTableCompiled{}, fmt.Errorf("write for table %q must include all declared columns", tableName) } for _, col := range t.columnOrder { v, ok := w.Values[col] if !ok { return postgresTableCompiled{}, fmt.Errorf("write for table %q is missing column %q", tableName, col) } if v == nil { if c := t.columns[col]; !c.Nullable { return postgresTableCompiled{}, fmt.Errorf("write for table %q has nil value for non-null column %q", tableName, col) } } } return t, nil }