Files
seriatim/internal/pipeline/registry.go

99 lines
2.7 KiB
Go

package pipeline
import "fmt"
// Registry stores all built-in modules addressable by canonical name.
type Registry struct {
inputReaders map[string]InputReader
preprocessors map[string]Preprocessor
postprocessors map[string]Postprocessor
outputWriters map[string]OutputWriter
merger Merger
}
// NewRegistry creates an empty module registry.
func NewRegistry() *Registry {
return &Registry{
inputReaders: make(map[string]InputReader),
preprocessors: make(map[string]Preprocessor),
postprocessors: make(map[string]Postprocessor),
outputWriters: make(map[string]OutputWriter),
}
}
// RegisterInputReader registers an input reader by name.
func (r *Registry) RegisterInputReader(module InputReader) {
r.inputReaders[module.Name()] = module
}
// RegisterPreprocessor registers a preprocessing module by name.
func (r *Registry) RegisterPreprocessor(module Preprocessor) {
r.preprocessors[module.Name()] = module
}
// RegisterPostprocessor registers a postprocessing module by name.
func (r *Registry) RegisterPostprocessor(module Postprocessor) {
r.postprocessors[module.Name()] = module
}
// RegisterOutputWriter registers an output writer by name.
func (r *Registry) RegisterOutputWriter(module OutputWriter) {
r.outputWriters[module.Name()] = module
}
// RegisterMerger registers the merger implementation.
func (r *Registry) RegisterMerger(module Merger) {
r.merger = module
}
func (r *Registry) resolveInputReader(name string) (InputReader, error) {
module, ok := r.inputReaders[name]
if !ok {
return nil, fmt.Errorf("unknown input reader %q", name)
}
return module, nil
}
func (r *Registry) resolvePreprocessors(names []string) ([]Preprocessor, error) {
modules := make([]Preprocessor, 0, len(names))
for _, name := range names {
module, ok := r.preprocessors[name]
if !ok {
return nil, fmt.Errorf("unknown preprocessing module %q", name)
}
modules = append(modules, module)
}
return modules, nil
}
func (r *Registry) resolvePostprocessors(names []string) ([]Postprocessor, error) {
modules := make([]Postprocessor, 0, len(names))
for _, name := range names {
module, ok := r.postprocessors[name]
if !ok {
return nil, fmt.Errorf("unknown postprocessing module %q", name)
}
modules = append(modules, module)
}
return modules, nil
}
func (r *Registry) resolveOutputWriters(names []string) ([]OutputWriter, error) {
modules := make([]OutputWriter, 0, len(names))
for _, name := range names {
module, ok := r.outputWriters[name]
if !ok {
return nil, fmt.Errorf("unknown output module %q", name)
}
modules = append(modules, module)
}
return modules, nil
}
func (r *Registry) resolveMerger() (Merger, error) {
if r.merger == nil {
return nil, fmt.Errorf("no merger registered")
}
return r.merger, nil
}