Implemented the initial Go framework
This commit is contained in:
56
internal/pipeline/interfaces.go
Normal file
56
internal/pipeline/interfaces.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"gitea.maximumdirect.net/eric/seriatim/internal/config"
|
||||
"gitea.maximumdirect.net/eric/seriatim/internal/model"
|
||||
"gitea.maximumdirect.net/eric/seriatim/internal/report"
|
||||
)
|
||||
|
||||
// ModelState identifies which representation a preprocessing module consumes.
|
||||
type ModelState string
|
||||
|
||||
const (
|
||||
StateRaw ModelState = "raw"
|
||||
StateCanonical ModelState = "canonical"
|
||||
)
|
||||
|
||||
// PreprocessState carries transcript data as it moves from raw to canonical.
|
||||
type PreprocessState struct {
|
||||
State ModelState
|
||||
Raw []model.RawTranscript
|
||||
Canonical []model.CanonicalTranscript
|
||||
}
|
||||
|
||||
// InputReader loads external input specs into raw transcript documents.
|
||||
type InputReader interface {
|
||||
Name() string
|
||||
Read(ctx context.Context, cfg config.Config) ([]model.RawTranscript, []report.Event, error)
|
||||
}
|
||||
|
||||
// Preprocessor transforms preprocessing state.
|
||||
type Preprocessor interface {
|
||||
Name() string
|
||||
Requires() ModelState
|
||||
Produces() ModelState
|
||||
Process(ctx context.Context, in PreprocessState, cfg config.Config) (PreprocessState, []report.Event, error)
|
||||
}
|
||||
|
||||
// Merger combines canonical transcripts into one merged transcript.
|
||||
type Merger interface {
|
||||
Name() string
|
||||
Merge(ctx context.Context, in []model.CanonicalTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error)
|
||||
}
|
||||
|
||||
// Postprocessor transforms a merged transcript.
|
||||
type Postprocessor interface {
|
||||
Name() string
|
||||
Process(ctx context.Context, in model.MergedTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error)
|
||||
}
|
||||
|
||||
// OutputWriter emits final artifacts.
|
||||
type OutputWriter interface {
|
||||
Name() string
|
||||
Write(ctx context.Context, out model.FinalTranscript, rpt report.Report, cfg config.Config) ([]report.Event, error)
|
||||
}
|
||||
98
internal/pipeline/registry.go
Normal file
98
internal/pipeline/registry.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package pipeline
|
||||
|
||||
import "fmt"
|
||||
|
||||
// Registry stores all built-in modules addressable by canonical name.
|
||||
type Registry struct {
|
||||
inputReaders map[string]InputReader
|
||||
preprocessors map[string]Preprocessor
|
||||
postprocessors map[string]Postprocessor
|
||||
outputWriters map[string]OutputWriter
|
||||
merger Merger
|
||||
}
|
||||
|
||||
// NewRegistry creates an empty module registry.
|
||||
func NewRegistry() *Registry {
|
||||
return &Registry{
|
||||
inputReaders: make(map[string]InputReader),
|
||||
preprocessors: make(map[string]Preprocessor),
|
||||
postprocessors: make(map[string]Postprocessor),
|
||||
outputWriters: make(map[string]OutputWriter),
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterInputReader registers an input reader by name.
|
||||
func (r *Registry) RegisterInputReader(module InputReader) {
|
||||
r.inputReaders[module.Name()] = module
|
||||
}
|
||||
|
||||
// RegisterPreprocessor registers a preprocessing module by name.
|
||||
func (r *Registry) RegisterPreprocessor(module Preprocessor) {
|
||||
r.preprocessors[module.Name()] = module
|
||||
}
|
||||
|
||||
// RegisterPostprocessor registers a postprocessing module by name.
|
||||
func (r *Registry) RegisterPostprocessor(module Postprocessor) {
|
||||
r.postprocessors[module.Name()] = module
|
||||
}
|
||||
|
||||
// RegisterOutputWriter registers an output writer by name.
|
||||
func (r *Registry) RegisterOutputWriter(module OutputWriter) {
|
||||
r.outputWriters[module.Name()] = module
|
||||
}
|
||||
|
||||
// RegisterMerger registers the merger implementation.
|
||||
func (r *Registry) RegisterMerger(module Merger) {
|
||||
r.merger = module
|
||||
}
|
||||
|
||||
func (r *Registry) resolveInputReader(name string) (InputReader, error) {
|
||||
module, ok := r.inputReaders[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown input reader %q", name)
|
||||
}
|
||||
return module, nil
|
||||
}
|
||||
|
||||
func (r *Registry) resolvePreprocessors(names []string) ([]Preprocessor, error) {
|
||||
modules := make([]Preprocessor, 0, len(names))
|
||||
for _, name := range names {
|
||||
module, ok := r.preprocessors[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown preprocessing module %q", name)
|
||||
}
|
||||
modules = append(modules, module)
|
||||
}
|
||||
return modules, nil
|
||||
}
|
||||
|
||||
func (r *Registry) resolvePostprocessors(names []string) ([]Postprocessor, error) {
|
||||
modules := make([]Postprocessor, 0, len(names))
|
||||
for _, name := range names {
|
||||
module, ok := r.postprocessors[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown postprocessing module %q", name)
|
||||
}
|
||||
modules = append(modules, module)
|
||||
}
|
||||
return modules, nil
|
||||
}
|
||||
|
||||
func (r *Registry) resolveOutputWriters(names []string) ([]OutputWriter, error) {
|
||||
modules := make([]OutputWriter, 0, len(names))
|
||||
for _, name := range names {
|
||||
module, ok := r.outputWriters[name]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unknown output module %q", name)
|
||||
}
|
||||
modules = append(modules, module)
|
||||
}
|
||||
return modules, nil
|
||||
}
|
||||
|
||||
func (r *Registry) resolveMerger() (Merger, error) {
|
||||
if r.merger == nil {
|
||||
return nil, fmt.Errorf("no merger registered")
|
||||
}
|
||||
return r.merger, nil
|
||||
}
|
||||
176
internal/pipeline/runner.go
Normal file
176
internal/pipeline/runner.go
Normal file
@@ -0,0 +1,176 @@
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"gitea.maximumdirect.net/eric/seriatim/internal/config"
|
||||
"gitea.maximumdirect.net/eric/seriatim/internal/model"
|
||||
"gitea.maximumdirect.net/eric/seriatim/internal/report"
|
||||
)
|
||||
|
||||
const (
|
||||
applicationName = "seriatim"
|
||||
version = "dev"
|
||||
)
|
||||
|
||||
// Run validates module composition, executes the pipeline, and emits outputs.
|
||||
func Run(ctx context.Context, cfg config.Config, registry *Registry) error {
|
||||
plan, err := resolvePlan(cfg, registry)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
events := make([]report.Event, 0)
|
||||
|
||||
raw, newEvents, err := plan.inputReader.Read(ctx, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events = append(events, newEvents...)
|
||||
|
||||
state := PreprocessState{
|
||||
State: StateRaw,
|
||||
Raw: raw,
|
||||
}
|
||||
for _, module := range plan.preprocessors {
|
||||
state, newEvents, err = module.Process(ctx, state, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events = append(events, newEvents...)
|
||||
}
|
||||
if state.State != StateCanonical {
|
||||
return fmt.Errorf("preprocessing ended in state %q; expected %q", state.State, StateCanonical)
|
||||
}
|
||||
|
||||
merged, newEvents, err := plan.merger.Merge(ctx, state.Canonical, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events = append(events, newEvents...)
|
||||
|
||||
for _, module := range plan.postprocessors {
|
||||
merged, newEvents, err = module.Process(ctx, merged, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events = append(events, newEvents...)
|
||||
}
|
||||
|
||||
final := finalizeTranscript(cfg, merged)
|
||||
rpt := finalizeReport(cfg, events)
|
||||
|
||||
for _, module := range plan.outputWriters {
|
||||
newEvents, err = module.Write(ctx, final, rpt, cfg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
events = append(events, newEvents...)
|
||||
}
|
||||
|
||||
rpt = finalizeReport(cfg, events)
|
||||
if cfg.ReportFile != "" {
|
||||
if err := report.WriteJSON(cfg.ReportFile, rpt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type executionPlan struct {
|
||||
inputReader InputReader
|
||||
preprocessors []Preprocessor
|
||||
merger Merger
|
||||
postprocessors []Postprocessor
|
||||
outputWriters []OutputWriter
|
||||
}
|
||||
|
||||
func resolvePlan(cfg config.Config, registry *Registry) (executionPlan, error) {
|
||||
inputReader, err := registry.resolveInputReader(cfg.InputReader)
|
||||
if err != nil {
|
||||
return executionPlan{}, err
|
||||
}
|
||||
|
||||
preprocessors, err := registry.resolvePreprocessors(cfg.PreprocessingModules)
|
||||
if err != nil {
|
||||
return executionPlan{}, err
|
||||
}
|
||||
if err := validatePreprocessors(preprocessors); err != nil {
|
||||
return executionPlan{}, err
|
||||
}
|
||||
|
||||
merger, err := registry.resolveMerger()
|
||||
if err != nil {
|
||||
return executionPlan{}, err
|
||||
}
|
||||
|
||||
postprocessors, err := registry.resolvePostprocessors(cfg.PostprocessingModules)
|
||||
if err != nil {
|
||||
return executionPlan{}, err
|
||||
}
|
||||
|
||||
outputWriters, err := registry.resolveOutputWriters(cfg.OutputModules)
|
||||
if err != nil {
|
||||
return executionPlan{}, err
|
||||
}
|
||||
|
||||
return executionPlan{
|
||||
inputReader: inputReader,
|
||||
preprocessors: preprocessors,
|
||||
merger: merger,
|
||||
postprocessors: postprocessors,
|
||||
outputWriters: outputWriters,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func validatePreprocessors(modules []Preprocessor) error {
|
||||
state := StateRaw
|
||||
for _, module := range modules {
|
||||
if module.Requires() != state {
|
||||
return fmt.Errorf("preprocessing module %q requires state %q but current state is %q", module.Name(), module.Requires(), state)
|
||||
}
|
||||
state = module.Produces()
|
||||
}
|
||||
if state != StateCanonical {
|
||||
return fmt.Errorf("preprocessing chain ends in state %q; expected %q", state, StateCanonical)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func finalizeTranscript(cfg config.Config, merged model.MergedTranscript) model.FinalTranscript {
|
||||
segments := make([]model.Segment, len(merged.Segments))
|
||||
copy(segments, merged.Segments)
|
||||
overlapGroups := make([]model.OverlapGroup, len(merged.OverlapGroups))
|
||||
copy(overlapGroups, merged.OverlapGroups)
|
||||
|
||||
return model.FinalTranscript{
|
||||
Metadata: model.OutputMetadata{
|
||||
Application: applicationName,
|
||||
Version: version,
|
||||
InputReader: cfg.InputReader,
|
||||
InputFiles: append([]string(nil), cfg.InputFiles...),
|
||||
PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...),
|
||||
PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...),
|
||||
OutputModules: append([]string(nil), cfg.OutputModules...),
|
||||
},
|
||||
Segments: segments,
|
||||
OverlapGroups: overlapGroups,
|
||||
}
|
||||
}
|
||||
|
||||
func finalizeReport(cfg config.Config, events []report.Event) report.Report {
|
||||
return report.Report{
|
||||
Metadata: report.Metadata{
|
||||
Application: applicationName,
|
||||
Version: version,
|
||||
InputReader: cfg.InputReader,
|
||||
InputFiles: append([]string(nil), cfg.InputFiles...),
|
||||
PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...),
|
||||
PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...),
|
||||
OutputModules: append([]string(nil), cfg.OutputModules...),
|
||||
},
|
||||
Events: append([]report.Event(nil), events...),
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user