99 lines
2.7 KiB
Go
99 lines
2.7 KiB
Go
package pipeline
|
|
|
|
import "fmt"
|
|
|
|
// Registry stores all built-in modules addressable by canonical name.
|
|
type Registry struct {
|
|
inputReaders map[string]InputReader
|
|
preprocessors map[string]Preprocessor
|
|
postprocessors map[string]Postprocessor
|
|
outputWriters map[string]OutputWriter
|
|
merger Merger
|
|
}
|
|
|
|
// NewRegistry creates an empty module registry.
|
|
func NewRegistry() *Registry {
|
|
return &Registry{
|
|
inputReaders: make(map[string]InputReader),
|
|
preprocessors: make(map[string]Preprocessor),
|
|
postprocessors: make(map[string]Postprocessor),
|
|
outputWriters: make(map[string]OutputWriter),
|
|
}
|
|
}
|
|
|
|
// RegisterInputReader registers an input reader by name.
|
|
func (r *Registry) RegisterInputReader(module InputReader) {
|
|
r.inputReaders[module.Name()] = module
|
|
}
|
|
|
|
// RegisterPreprocessor registers a preprocessing module by name.
|
|
func (r *Registry) RegisterPreprocessor(module Preprocessor) {
|
|
r.preprocessors[module.Name()] = module
|
|
}
|
|
|
|
// RegisterPostprocessor registers a postprocessing module by name.
|
|
func (r *Registry) RegisterPostprocessor(module Postprocessor) {
|
|
r.postprocessors[module.Name()] = module
|
|
}
|
|
|
|
// RegisterOutputWriter registers an output writer by name.
|
|
func (r *Registry) RegisterOutputWriter(module OutputWriter) {
|
|
r.outputWriters[module.Name()] = module
|
|
}
|
|
|
|
// RegisterMerger registers the merger implementation.
|
|
func (r *Registry) RegisterMerger(module Merger) {
|
|
r.merger = module
|
|
}
|
|
|
|
func (r *Registry) resolveInputReader(name string) (InputReader, error) {
|
|
module, ok := r.inputReaders[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown input reader %q", name)
|
|
}
|
|
return module, nil
|
|
}
|
|
|
|
func (r *Registry) resolvePreprocessors(names []string) ([]Preprocessor, error) {
|
|
modules := make([]Preprocessor, 0, len(names))
|
|
for _, name := range names {
|
|
module, ok := r.preprocessors[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown preprocessing module %q", name)
|
|
}
|
|
modules = append(modules, module)
|
|
}
|
|
return modules, nil
|
|
}
|
|
|
|
func (r *Registry) resolvePostprocessors(names []string) ([]Postprocessor, error) {
|
|
modules := make([]Postprocessor, 0, len(names))
|
|
for _, name := range names {
|
|
module, ok := r.postprocessors[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown postprocessing module %q", name)
|
|
}
|
|
modules = append(modules, module)
|
|
}
|
|
return modules, nil
|
|
}
|
|
|
|
func (r *Registry) resolveOutputWriters(names []string) ([]OutputWriter, error) {
|
|
modules := make([]OutputWriter, 0, len(names))
|
|
for _, name := range names {
|
|
module, ok := r.outputWriters[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("unknown output module %q", name)
|
|
}
|
|
modules = append(modules, module)
|
|
}
|
|
return modules, nil
|
|
}
|
|
|
|
func (r *Registry) resolveMerger() (Merger, error) {
|
|
if r.merger == nil {
|
|
return nil, fmt.Errorf("no merger registered")
|
|
}
|
|
return r.merger, nil
|
|
}
|