From 4e027263a45e5d70478e000b47bc5010cfd52b90 Mon Sep 17 00:00:00 2001 From: Eric Rakestraw Date: Sun, 26 Apr 2026 13:03:07 -0500 Subject: [PATCH] Implemented the initial Go framework --- cmd/seriatim/main.go | 15 ++ go.mod | 10 ++ go.sum | 10 ++ internal/builtin/input.go | 31 ++++ internal/builtin/merge.go | 52 ++++++ internal/builtin/output.go | 39 +++++ internal/builtin/postprocess.go | 47 ++++++ internal/builtin/preprocess.go | 82 ++++++++++ internal/builtin/registry.go | 23 +++ internal/cli/merge.go | 39 +++++ internal/cli/merge_test.go | 282 ++++++++++++++++++++++++++++++++ internal/cli/root.go | 18 ++ internal/config/config.go | 206 +++++++++++++++++++++++ internal/config/config_test.go | 35 ++++ internal/model/model.go | 70 ++++++++ internal/pipeline/interfaces.go | 56 +++++++ internal/pipeline/registry.go | 98 +++++++++++ internal/pipeline/runner.go | 176 ++++++++++++++++++++ internal/report/report.go | 64 ++++++++ 19 files changed, 1353 insertions(+) create mode 100644 cmd/seriatim/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/builtin/input.go create mode 100644 internal/builtin/merge.go create mode 100644 internal/builtin/output.go create mode 100644 internal/builtin/postprocess.go create mode 100644 internal/builtin/preprocess.go create mode 100644 internal/builtin/registry.go create mode 100644 internal/cli/merge.go create mode 100644 internal/cli/merge_test.go create mode 100644 internal/cli/root.go create mode 100644 internal/config/config.go create mode 100644 internal/config/config_test.go create mode 100644 internal/model/model.go create mode 100644 internal/pipeline/interfaces.go create mode 100644 internal/pipeline/registry.go create mode 100644 internal/pipeline/runner.go create mode 100644 internal/report/report.go diff --git a/cmd/seriatim/main.go b/cmd/seriatim/main.go new file mode 100644 index 0000000..bdc0b1f --- /dev/null +++ b/cmd/seriatim/main.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" + "os" + + "gitea.maximumdirect.net/eric/seriatim/internal/cli" +) + +func main() { + if err := cli.NewRootCommand().Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..aff73ab --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module gitea.maximumdirect.net/eric/seriatim + +go 1.25 + +require github.com/spf13/cobra v1.10.1 + +require ( + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/spf13/pflag v1.0.9 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e613680 --- /dev/null +++ b/go.sum @@ -0,0 +1,10 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.10.1 h1:lJeBwCfmrnXthfAupyUTzJ/J4Nc1RsHC/mSRU2dll/s= +github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4XaB0= +github.com/spf13/pflag v1.0.9 h1:9exaQaMOCwffKiiiYk6/BndUBv+iRViNW+4lEMi0PvY= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/builtin/input.go b/internal/builtin/input.go new file mode 100644 index 0000000..9b191c1 --- /dev/null +++ b/internal/builtin/input.go @@ -0,0 +1,31 @@ +package builtin + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +type jsonFilesReader struct{} + +func (jsonFilesReader) Name() string { + return "json-files" +} + +func (jsonFilesReader) Read(ctx context.Context, cfg config.Config) ([]model.RawTranscript, []report.Event, error) { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + + raw := make([]model.RawTranscript, 0, len(cfg.InputFiles)) + for _, inputFile := range cfg.InputFiles { + raw = append(raw, model.RawTranscript{Source: inputFile}) + } + + return raw, []report.Event{ + report.Info("input", "json-files", fmt.Sprintf("accepted %d input file(s)", len(raw))), + }, nil +} diff --git a/internal/builtin/merge.go b/internal/builtin/merge.go new file mode 100644 index 0000000..cfc7f06 --- /dev/null +++ b/internal/builtin/merge.go @@ -0,0 +1,52 @@ +package builtin + +import ( + "context" + "sort" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +type placeholderMerger struct{} + +func (placeholderMerger) Name() string { + return "placeholder-merger" +} + +func (placeholderMerger) Merge(ctx context.Context, in []model.CanonicalTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error) { + if err := ctx.Err(); err != nil { + return model.MergedTranscript{}, nil, err + } + + segments := make([]model.Segment, 0) + for _, transcript := range in { + segments = append(segments, transcript.Segments...) + } + + sort.SliceStable(segments, func(i, j int) bool { + left := segments[i] + right := segments[j] + if left.Start != right.Start { + return left.Start < right.Start + } + if left.End != right.End { + return left.End < right.End + } + if left.Source != right.Source { + return left.Source < right.Source + } + if left.SourceSegmentIndex != right.SourceSegmentIndex { + return left.SourceSegmentIndex < right.SourceSegmentIndex + } + return left.Speaker < right.Speaker + }) + + return model.MergedTranscript{ + Segments: segments, + OverlapGroups: nil, + }, []report.Event{ + report.Info("merge", "placeholder-merger", "merged placeholder canonical transcript(s)"), + }, nil +} diff --git a/internal/builtin/output.go b/internal/builtin/output.go new file mode 100644 index 0000000..4151e6c --- /dev/null +++ b/internal/builtin/output.go @@ -0,0 +1,39 @@ +package builtin + +import ( + "context" + "encoding/json" + "os" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +type jsonOutputWriter struct{} + +func (jsonOutputWriter) Name() string { + return "json" +} + +func (jsonOutputWriter) Write(ctx context.Context, out model.FinalTranscript, rpt report.Report, cfg config.Config) ([]report.Event, error) { + if err := ctx.Err(); err != nil { + return nil, err + } + + file, err := os.Create(cfg.OutputFile) + if err != nil { + return nil, err + } + defer file.Close() + + enc := json.NewEncoder(file) + enc.SetIndent("", " ") + if err := enc.Encode(out); err != nil { + return nil, err + } + + return []report.Event{ + report.Info("output", "json", "wrote placeholder transcript JSON"), + }, nil +} diff --git a/internal/builtin/postprocess.go b/internal/builtin/postprocess.go new file mode 100644 index 0000000..b0678e0 --- /dev/null +++ b/internal/builtin/postprocess.go @@ -0,0 +1,47 @@ +package builtin + +import ( + "context" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +type noopPostprocessor struct { + name string +} + +func (p noopPostprocessor) Name() string { + return p.name +} + +func (p noopPostprocessor) Process(ctx context.Context, in model.MergedTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error) { + if err := ctx.Err(); err != nil { + return model.MergedTranscript{}, nil, err + } + + return in, []report.Event{ + report.Info("postprocessing", p.name, "completed no-op postprocessing module"), + }, nil +} + +type assignIDs struct{} + +func (assignIDs) Name() string { + return "assign-ids" +} + +func (assignIDs) Process(ctx context.Context, in model.MergedTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error) { + if err := ctx.Err(); err != nil { + return model.MergedTranscript{}, nil, err + } + + for index := range in.Segments { + in.Segments[index].ID = index + 1 + } + + return in, []report.Event{ + report.Info("postprocessing", "assign-ids", "assigned final segment IDs"), + }, nil +} diff --git a/internal/builtin/preprocess.go b/internal/builtin/preprocess.go new file mode 100644 index 0000000..d705585 --- /dev/null +++ b/internal/builtin/preprocess.go @@ -0,0 +1,82 @@ +package builtin + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/pipeline" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +type noopPreprocessor struct { + name string + requires pipeline.ModelState + produces pipeline.ModelState +} + +func (p noopPreprocessor) Name() string { + return p.name +} + +func (p noopPreprocessor) Requires() pipeline.ModelState { + return p.requires +} + +func (p noopPreprocessor) Produces() pipeline.ModelState { + return p.produces +} + +func (p noopPreprocessor) Process(ctx context.Context, in pipeline.PreprocessState, cfg config.Config) (pipeline.PreprocessState, []report.Event, error) { + if err := ctx.Err(); err != nil { + return pipeline.PreprocessState{}, nil, err + } + if in.State != p.requires { + return pipeline.PreprocessState{}, nil, fmt.Errorf("preprocessing module %q requires state %q but received %q", p.name, p.requires, in.State) + } + + in.State = p.produces + return in, []report.Event{ + report.Info("preprocessing", p.name, "completed no-op preprocessing module"), + }, nil +} + +type normalizeSpeakers struct{} + +func (normalizeSpeakers) Name() string { + return "normalize-speakers" +} + +func (normalizeSpeakers) Requires() pipeline.ModelState { + return pipeline.StateRaw +} + +func (normalizeSpeakers) Produces() pipeline.ModelState { + return pipeline.StateCanonical +} + +func (normalizeSpeakers) Process(ctx context.Context, in pipeline.PreprocessState, cfg config.Config) (pipeline.PreprocessState, []report.Event, error) { + if err := ctx.Err(); err != nil { + return pipeline.PreprocessState{}, nil, err + } + if in.State != pipeline.StateRaw { + return pipeline.PreprocessState{}, nil, fmt.Errorf("preprocessing module %q requires state %q but received %q", "normalize-speakers", pipeline.StateRaw, in.State) + } + + canonical := make([]model.CanonicalTranscript, 0, len(in.Raw)) + for _, raw := range in.Raw { + canonical = append(canonical, model.CanonicalTranscript{ + Source: raw.Source, + Segments: nil, + }) + } + + return pipeline.PreprocessState{ + State: pipeline.StateCanonical, + Raw: append([]model.RawTranscript(nil), in.Raw...), + Canonical: canonical, + }, []report.Event{ + report.Info("preprocessing", "normalize-speakers", "created placeholder canonical transcript(s)"), + }, nil +} diff --git a/internal/builtin/registry.go b/internal/builtin/registry.go new file mode 100644 index 0000000..470967f --- /dev/null +++ b/internal/builtin/registry.go @@ -0,0 +1,23 @@ +package builtin + +import "gitea.maximumdirect.net/eric/seriatim/internal/pipeline" + +// NewRegistry registers the MVP built-in modules. +func NewRegistry() *pipeline.Registry { + registry := pipeline.NewRegistry() + + registry.RegisterInputReader(jsonFilesReader{}) + registry.RegisterPreprocessor(noopPreprocessor{name: "validate-raw", requires: pipeline.StateRaw, produces: pipeline.StateRaw}) + registry.RegisterPreprocessor(normalizeSpeakers{}) + registry.RegisterPreprocessor(noopPreprocessor{name: "trim-text", requires: pipeline.StateCanonical, produces: pipeline.StateCanonical}) + registry.RegisterPreprocessor(noopPreprocessor{name: "autocorrect", requires: pipeline.StateCanonical, produces: pipeline.StateCanonical}) + registry.RegisterMerger(placeholderMerger{}) + registry.RegisterPostprocessor(noopPostprocessor{name: "detect-overlaps"}) + registry.RegisterPostprocessor(noopPostprocessor{name: "resolve-overlaps"}) + registry.RegisterPostprocessor(assignIDs{}) + registry.RegisterPostprocessor(noopPostprocessor{name: "validate-output"}) + registry.RegisterPostprocessor(noopPostprocessor{name: "autocorrect"}) + registry.RegisterOutputWriter(jsonOutputWriter{}) + + return registry +} diff --git a/internal/cli/merge.go b/internal/cli/merge.go new file mode 100644 index 0000000..e89c3a8 --- /dev/null +++ b/internal/cli/merge.go @@ -0,0 +1,39 @@ +package cli + +import ( + "github.com/spf13/cobra" + + "gitea.maximumdirect.net/eric/seriatim/internal/builtin" + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/pipeline" +) + +func newMergeCommand() *cobra.Command { + var opts config.MergeOptions + + cmd := &cobra.Command{ + Use: "merge", + Short: "Run the transcript merge pipeline", + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := config.NewMergeConfig(opts) + if err != nil { + return err + } + + return pipeline.Run(cmd.Context(), cfg, builtin.NewRegistry()) + }, + } + + flags := cmd.Flags() + flags.StringArrayVar(&opts.InputFiles, "input-file", nil, "input transcript file; may be repeated") + flags.StringVar(&opts.OutputFile, "output-file", "", "output transcript JSON file") + flags.StringVar(&opts.ReportFile, "report-file", "", "optional report JSON file") + flags.StringVar(&opts.SpeakersFile, "speakers", "", "speaker map file") + flags.StringVar(&opts.AutocorrectFile, "autocorrect", "", "autocorrect rules file") + flags.StringVar(&opts.InputReader, "input-reader", config.DefaultInputReader, "input reader module") + flags.StringVar(&opts.OutputModules, "output-modules", config.DefaultOutputModules, "comma-separated output modules") + flags.StringVar(&opts.PreprocessingModules, "preprocessing-modules", config.DefaultPreprocessingModules, "comma-separated preprocessing modules") + flags.StringVar(&opts.PostprocessingModules, "postprocessing-modules", config.DefaultPostprocessingModules, "comma-separated postprocessing modules") + + return cmd +} diff --git a/internal/cli/merge_test.go b/internal/cli/merge_test.go new file mode 100644 index 0000000..42802d8 --- /dev/null +++ b/internal/cli/merge_test.go @@ -0,0 +1,282 @@ +package cli + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +func TestMergeWritesPlaceholderOutputAndReport(t *testing.T) { + dir := t.TempDir() + inputA := writeFile(t, dir, "a.json") + inputB := writeFile(t, dir, "b.json") + speakers := writeFile(t, dir, "speakers.yml") + output := filepath.Join(dir, "merged.json") + reportPath := filepath.Join(dir, "report.json") + + err := executeMerge( + "--input-file", inputB, + "--input-file", inputA, + "--speakers", speakers, + "--output-file", output, + "--report-file", reportPath, + ) + if err != nil { + t.Fatalf("merge failed: %v", err) + } + + var transcript model.FinalTranscript + readJSON(t, output, &transcript) + outputBytes, err := os.ReadFile(output) + if err != nil { + t.Fatalf("read output bytes: %v", err) + } + outputJSON := string(outputBytes) + if !strings.Contains(outputJSON, `"segments": []`) { + t.Fatalf("expected segments to serialize as an empty array, got:\n%s", outputJSON) + } + if !strings.Contains(outputJSON, `"overlap_groups": []`) { + t.Fatalf("expected overlap_groups to serialize as an empty array, got:\n%s", outputJSON) + } + if transcript.Metadata.Application != "seriatim" { + t.Fatalf("unexpected application metadata: %q", transcript.Metadata.Application) + } + if got, want := transcript.Metadata.InputFiles, []string{inputA, inputB}; !equalStrings(got, want) { + t.Fatalf("input files not sorted deterministically: got %v want %v", got, want) + } + if len(transcript.Segments) != 0 { + t.Fatalf("expected placeholder output to contain no segments, got %d", len(transcript.Segments)) + } + if len(transcript.OverlapGroups) != 0 { + t.Fatalf("expected placeholder output to contain no overlap groups, got %d", len(transcript.OverlapGroups)) + } + + var rpt report.Report + readJSON(t, reportPath, &rpt) + gotModules := make([]string, 0, len(rpt.Events)) + for _, event := range rpt.Events { + gotModules = append(gotModules, event.Module) + } + wantModules := []string{ + "json-files", + "validate-raw", + "normalize-speakers", + "trim-text", + "placeholder-merger", + "detect-overlaps", + "resolve-overlaps", + "assign-ids", + "validate-output", + "json", + } + if !equalStrings(gotModules, wantModules) { + t.Fatalf("report event order mismatch:\ngot %v\nwant %v", gotModules, wantModules) + } +} + +func TestUnknownModulesFailDuringValidation(t *testing.T) { + dir := t.TempDir() + input := writeFile(t, dir, "input.json") + speakers := writeFile(t, dir, "speakers.yml") + output := filepath.Join(dir, "merged.json") + + tests := []struct { + name string + args []string + want string + }{ + { + name: "input reader", + args: []string{"--input-reader", "missing-reader"}, + want: `unknown input reader "missing-reader"`, + }, + { + name: "preprocessing", + args: []string{"--preprocessing-modules", "validate-raw,missing-module"}, + want: `unknown preprocessing module "missing-module"`, + }, + { + name: "postprocessing", + args: []string{"--postprocessing-modules", "missing-module"}, + want: `unknown postprocessing module "missing-module"`, + }, + { + name: "output", + args: []string{"--output-modules", "missing-module"}, + want: `unknown output module "missing-module"`, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + args := []string{ + "--input-file", input, + "--speakers", speakers, + "--output-file", output, + } + args = append(args, test.args...) + + err := executeMerge(args...) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), test.want) { + t.Fatalf("expected error to contain %q, got %q", test.want, err.Error()) + } + }) + } +} + +func TestInvalidPreprocessingOrderFails(t *testing.T) { + dir := t.TempDir() + input := writeFile(t, dir, "input.json") + output := filepath.Join(dir, "merged.json") + + err := executeMerge( + "--input-file", input, + "--output-file", output, + "--preprocessing-modules", "trim-text,validate-raw", + ) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), `requires state "canonical"`) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestMissingInputFileFailsBeforePipelineExecution(t *testing.T) { + dir := t.TempDir() + speakers := writeFile(t, dir, "speakers.yml") + output := filepath.Join(dir, "merged.json") + + err := executeMerge( + "--input-file", filepath.Join(dir, "missing.json"), + "--speakers", speakers, + "--output-file", output, + ) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "--input-file") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestNormalizeSpeakersRequiresSpeakersFile(t *testing.T) { + dir := t.TempDir() + input := writeFile(t, dir, "input.json") + output := filepath.Join(dir, "merged.json") + + err := executeMerge( + "--input-file", input, + "--output-file", output, + ) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "--speakers is required") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestAutocorrectRequiresAutocorrectFile(t *testing.T) { + dir := t.TempDir() + input := writeFile(t, dir, "input.json") + speakers := writeFile(t, dir, "speakers.yml") + output := filepath.Join(dir, "merged.json") + + err := executeMerge( + "--input-file", input, + "--speakers", speakers, + "--output-file", output, + "--preprocessing-modules", "validate-raw,normalize-speakers,autocorrect", + ) + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "--autocorrect is required") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestOutputJSONIsByteStable(t *testing.T) { + dir := t.TempDir() + inputA := writeFile(t, dir, "a.json") + inputB := writeFile(t, dir, "b.json") + speakers := writeFile(t, dir, "speakers.yml") + outputA := filepath.Join(dir, "merged-a.json") + outputB := filepath.Join(dir, "merged-b.json") + + args := []string{ + "--input-file", inputB, + "--input-file", inputA, + "--speakers", speakers, + } + + err := executeMerge(append(append([]string(nil), args...), "--output-file", outputA)...) + if err != nil { + t.Fatalf("first merge failed: %v", err) + } + err = executeMerge(append(append([]string(nil), args...), "--output-file", outputB)...) + if err != nil { + t.Fatalf("second merge failed: %v", err) + } + + first, err := os.ReadFile(outputA) + if err != nil { + t.Fatalf("read first output: %v", err) + } + second, err := os.ReadFile(outputB) + if err != nil { + t.Fatalf("read second output: %v", err) + } + if string(first) != string(second) { + t.Fatalf("expected byte-stable output\nfirst:\n%s\nsecond:\n%s", first, second) + } +} + +func executeMerge(args ...string) error { + cmd := NewRootCommand() + cmd.SetArgs(append([]string{"merge"}, args...)) + return cmd.Execute() +} + +func writeFile(t *testing.T, dir string, name string) string { + t.Helper() + + path := filepath.Join(dir, name) + if err := os.WriteFile(path, []byte("{}\n"), 0o600); err != nil { + t.Fatalf("write file: %v", err) + } + return path +} + +func readJSON(t *testing.T, path string, target any) { + t.Helper() + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("read %s: %v", path, err) + } + if err := json.Unmarshal(data, target); err != nil { + t.Fatalf("unmarshal %s: %v", path, err) + } +} + +func equalStrings(left []string, right []string) bool { + if len(left) != len(right) { + return false + } + for index := range left { + if left[index] != right[index] { + return false + } + } + return true +} diff --git a/internal/cli/root.go b/internal/cli/root.go new file mode 100644 index 0000000..684ee7b --- /dev/null +++ b/internal/cli/root.go @@ -0,0 +1,18 @@ +package cli + +import ( + "github.com/spf13/cobra" +) + +// NewRootCommand builds the seriatim command tree. +func NewRootCommand() *cobra.Command { + cmd := &cobra.Command{ + Use: "seriatim", + Short: "Merge per-speaker transcripts into a chronological transcript", + SilenceErrors: true, + SilenceUsage: true, + } + + cmd.AddCommand(newMergeCommand()) + return cmd +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..4651e1a --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,206 @@ +package config + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sort" + "strings" +) + +const ( + DefaultInputReader = "json-files" + DefaultOutputModules = "json" + DefaultPreprocessingModules = "validate-raw,normalize-speakers,trim-text" + DefaultPostprocessingModules = "detect-overlaps,resolve-overlaps,assign-ids,validate-output" +) + +// MergeOptions captures raw CLI option values before validation. +type MergeOptions struct { + InputFiles []string + OutputFile string + ReportFile string + SpeakersFile string + AutocorrectFile string + InputReader string + OutputModules string + PreprocessingModules string + PostprocessingModules string +} + +// Config is the validated runtime configuration for a merge invocation. +type Config struct { + InputFiles []string + OutputFile string + ReportFile string + SpeakersFile string + AutocorrectFile string + InputReader string + OutputModules []string + PreprocessingModules []string + PostprocessingModules []string +} + +// NewMergeConfig validates raw merge options and returns normalized config. +func NewMergeConfig(opts MergeOptions) (Config, error) { + cfg := Config{ + InputReader: strings.TrimSpace(opts.InputReader), + OutputModules: nil, + PreprocessingModules: nil, + PostprocessingModules: nil, + } + + if cfg.InputReader == "" { + return Config{}, errors.New("--input-reader is required") + } + + var err error + cfg.OutputModules, err = parseModuleList(opts.OutputModules) + if err != nil { + return Config{}, fmt.Errorf("--output-modules: %w", err) + } + cfg.PreprocessingModules, err = parseModuleList(opts.PreprocessingModules) + if err != nil { + return Config{}, fmt.Errorf("--preprocessing-modules: %w", err) + } + cfg.PostprocessingModules, err = parseModuleList(opts.PostprocessingModules) + if err != nil { + return Config{}, fmt.Errorf("--postprocessing-modules: %w", err) + } + if len(cfg.OutputModules) == 0 { + return Config{}, errors.New("--output-modules must include at least one module") + } + + cfg.InputFiles, err = normalizeInputFiles(opts.InputFiles) + if err != nil { + return Config{}, err + } + + cfg.OutputFile, err = normalizeOutputPath(opts.OutputFile, "--output-file") + if err != nil { + return Config{}, err + } + + if opts.ReportFile != "" { + cfg.ReportFile, err = normalizeOutputPath(opts.ReportFile, "--report-file") + if err != nil { + return Config{}, err + } + } + + cfg.SpeakersFile = filepath.Clean(strings.TrimSpace(opts.SpeakersFile)) + if opts.SpeakersFile == "" { + cfg.SpeakersFile = "" + } + cfg.AutocorrectFile = filepath.Clean(strings.TrimSpace(opts.AutocorrectFile)) + if opts.AutocorrectFile == "" { + cfg.AutocorrectFile = "" + } + + if contains(cfg.PreprocessingModules, "normalize-speakers") { + if cfg.SpeakersFile == "" { + return Config{}, errors.New("--speakers is required when normalize-speakers is enabled") + } + if err := requireFile(cfg.SpeakersFile, "--speakers"); err != nil { + return Config{}, err + } + } else if cfg.SpeakersFile != "" { + if err := requireFile(cfg.SpeakersFile, "--speakers"); err != nil { + return Config{}, err + } + } + + if contains(cfg.PreprocessingModules, "autocorrect") || contains(cfg.PostprocessingModules, "autocorrect") { + if cfg.AutocorrectFile == "" { + return Config{}, errors.New("--autocorrect is required when autocorrect is enabled") + } + if err := requireFile(cfg.AutocorrectFile, "--autocorrect"); err != nil { + return Config{}, err + } + } else if cfg.AutocorrectFile != "" { + if err := requireFile(cfg.AutocorrectFile, "--autocorrect"); err != nil { + return Config{}, err + } + } + + return cfg, nil +} + +func parseModuleList(value string) ([]string, error) { + value = strings.TrimSpace(value) + if value == "" { + return nil, nil + } + + parts := strings.Split(value, ",") + names := make([]string, 0, len(parts)) + for _, part := range parts { + name := strings.TrimSpace(part) + if name == "" { + return nil, errors.New("module names cannot be empty") + } + names = append(names, name) + } + return names, nil +} + +func normalizeInputFiles(paths []string) ([]string, error) { + if len(paths) == 0 { + return nil, errors.New("at least one --input-file is required") + } + + normalized := make([]string, 0, len(paths)) + for _, path := range paths { + path = strings.TrimSpace(path) + if path == "" { + return nil, errors.New("--input-file cannot be empty") + } + + clean := filepath.Clean(path) + if err := requireFile(clean, "--input-file"); err != nil { + return nil, err + } + normalized = append(normalized, clean) + } + sort.Strings(normalized) + return normalized, nil +} + +func normalizeOutputPath(path string, flag string) (string, error) { + path = strings.TrimSpace(path) + if path == "" { + return "", fmt.Errorf("%s is required", flag) + } + + clean := filepath.Clean(path) + parent := filepath.Dir(clean) + stat, err := os.Stat(parent) + if err != nil { + return "", fmt.Errorf("%s parent directory %q: %w", flag, parent, err) + } + if !stat.IsDir() { + return "", fmt.Errorf("%s parent path %q is not a directory", flag, parent) + } + return clean, nil +} + +func requireFile(path string, flag string) error { + stat, err := os.Stat(path) + if err != nil { + return fmt.Errorf("%s %q: %w", flag, path, err) + } + if stat.IsDir() { + return fmt.Errorf("%s %q is a directory, not a file", flag, path) + } + return nil +} + +func contains(values []string, target string) bool { + for _, value := range values { + if value == target { + return true + } + } + return false +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..7bf3f42 --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,35 @@ +package config + +import ( + "os" + "path/filepath" + "testing" +) + +func TestOmittingNormalizeSpeakersDoesNotRequireSpeakers(t *testing.T) { + dir := t.TempDir() + input := writeTempFile(t, dir, "input.json") + output := filepath.Join(dir, "merged.json") + + _, err := NewMergeConfig(MergeOptions{ + InputFiles: []string{input}, + OutputFile: output, + InputReader: DefaultInputReader, + OutputModules: DefaultOutputModules, + PreprocessingModules: "validate-raw", + PostprocessingModules: DefaultPostprocessingModules, + }) + if err != nil { + t.Fatalf("expected speakers file to be optional when normalize-speakers is omitted, got %v", err) + } +} + +func writeTempFile(t *testing.T, dir string, name string) string { + t.Helper() + + path := filepath.Join(dir, name) + if err := os.WriteFile(path, []byte("{}\n"), 0o600); err != nil { + t.Fatalf("write temp file: %v", err) + } + return path +} diff --git a/internal/model/model.go b/internal/model/model.go new file mode 100644 index 0000000..0e1e648 --- /dev/null +++ b/internal/model/model.go @@ -0,0 +1,70 @@ +package model + +// RawTranscript is a loaded input document before canonical normalization. +type RawTranscript struct { + Source string `json:"source"` +} + +// CanonicalTranscript is a per-speaker transcript in seriatim's internal model. +type CanonicalTranscript struct { + Source string `json:"source"` + Segments []Segment `json:"segments"` +} + +// MergedTranscript is the globally merged in-memory transcript. +type MergedTranscript struct { + Segments []Segment `json:"segments"` + OverlapGroups []OverlapGroup `json:"overlap_groups"` +} + +// FinalTranscript is the serialized transcript artifact. +type FinalTranscript struct { + Metadata OutputMetadata `json:"metadata"` + Segments []Segment `json:"segments"` + OverlapGroups []OverlapGroup `json:"overlap_groups"` +} + +// OutputMetadata records the pipeline configuration that produced an artifact. +type OutputMetadata struct { + Application string `json:"application"` + Version string `json:"version"` + InputReader string `json:"input_reader"` + InputFiles []string `json:"input_files"` + PreprocessingModules []string `json:"preprocessing_modules"` + PostprocessingModules []string `json:"postprocessing_modules"` + OutputModules []string `json:"output_modules"` +} + +// Segment is the canonical transcript segment shape used by the framework. +type Segment struct { + ID int `json:"id,omitempty"` + InternalRef string `json:"internal_ref,omitempty"` + Source string `json:"source"` + SourceSegmentIndex int `json:"source_segment_index"` + Speaker string `json:"speaker"` + Start float64 `json:"start"` + End float64 `json:"end"` + Text string `json:"text"` + Words []Word `json:"words,omitempty"` + OverlapGroupID int `json:"overlap_group_id,omitempty"` +} + +// Word preserves optional word-level timing data. +type Word struct { + Text string `json:"text"` + Start float64 `json:"start"` + End float64 `json:"end"` + Score float64 `json:"score,omitempty"` + Speaker string `json:"speaker,omitempty"` +} + +// OverlapGroup describes a detected overlapping speech region. +type OverlapGroup struct { + ID int `json:"id"` + Start float64 `json:"start"` + End float64 `json:"end"` + Segments []string `json:"segments"` + Speakers []string `json:"speakers"` + Class string `json:"class"` + Resolution string `json:"resolution"` +} diff --git a/internal/pipeline/interfaces.go b/internal/pipeline/interfaces.go new file mode 100644 index 0000000..3f34187 --- /dev/null +++ b/internal/pipeline/interfaces.go @@ -0,0 +1,56 @@ +package pipeline + +import ( + "context" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +// ModelState identifies which representation a preprocessing module consumes. +type ModelState string + +const ( + StateRaw ModelState = "raw" + StateCanonical ModelState = "canonical" +) + +// PreprocessState carries transcript data as it moves from raw to canonical. +type PreprocessState struct { + State ModelState + Raw []model.RawTranscript + Canonical []model.CanonicalTranscript +} + +// InputReader loads external input specs into raw transcript documents. +type InputReader interface { + Name() string + Read(ctx context.Context, cfg config.Config) ([]model.RawTranscript, []report.Event, error) +} + +// Preprocessor transforms preprocessing state. +type Preprocessor interface { + Name() string + Requires() ModelState + Produces() ModelState + Process(ctx context.Context, in PreprocessState, cfg config.Config) (PreprocessState, []report.Event, error) +} + +// Merger combines canonical transcripts into one merged transcript. +type Merger interface { + Name() string + Merge(ctx context.Context, in []model.CanonicalTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error) +} + +// Postprocessor transforms a merged transcript. +type Postprocessor interface { + Name() string + Process(ctx context.Context, in model.MergedTranscript, cfg config.Config) (model.MergedTranscript, []report.Event, error) +} + +// OutputWriter emits final artifacts. +type OutputWriter interface { + Name() string + Write(ctx context.Context, out model.FinalTranscript, rpt report.Report, cfg config.Config) ([]report.Event, error) +} diff --git a/internal/pipeline/registry.go b/internal/pipeline/registry.go new file mode 100644 index 0000000..985838a --- /dev/null +++ b/internal/pipeline/registry.go @@ -0,0 +1,98 @@ +package pipeline + +import "fmt" + +// Registry stores all built-in modules addressable by canonical name. +type Registry struct { + inputReaders map[string]InputReader + preprocessors map[string]Preprocessor + postprocessors map[string]Postprocessor + outputWriters map[string]OutputWriter + merger Merger +} + +// NewRegistry creates an empty module registry. +func NewRegistry() *Registry { + return &Registry{ + inputReaders: make(map[string]InputReader), + preprocessors: make(map[string]Preprocessor), + postprocessors: make(map[string]Postprocessor), + outputWriters: make(map[string]OutputWriter), + } +} + +// RegisterInputReader registers an input reader by name. +func (r *Registry) RegisterInputReader(module InputReader) { + r.inputReaders[module.Name()] = module +} + +// RegisterPreprocessor registers a preprocessing module by name. +func (r *Registry) RegisterPreprocessor(module Preprocessor) { + r.preprocessors[module.Name()] = module +} + +// RegisterPostprocessor registers a postprocessing module by name. +func (r *Registry) RegisterPostprocessor(module Postprocessor) { + r.postprocessors[module.Name()] = module +} + +// RegisterOutputWriter registers an output writer by name. +func (r *Registry) RegisterOutputWriter(module OutputWriter) { + r.outputWriters[module.Name()] = module +} + +// RegisterMerger registers the merger implementation. +func (r *Registry) RegisterMerger(module Merger) { + r.merger = module +} + +func (r *Registry) resolveInputReader(name string) (InputReader, error) { + module, ok := r.inputReaders[name] + if !ok { + return nil, fmt.Errorf("unknown input reader %q", name) + } + return module, nil +} + +func (r *Registry) resolvePreprocessors(names []string) ([]Preprocessor, error) { + modules := make([]Preprocessor, 0, len(names)) + for _, name := range names { + module, ok := r.preprocessors[name] + if !ok { + return nil, fmt.Errorf("unknown preprocessing module %q", name) + } + modules = append(modules, module) + } + return modules, nil +} + +func (r *Registry) resolvePostprocessors(names []string) ([]Postprocessor, error) { + modules := make([]Postprocessor, 0, len(names)) + for _, name := range names { + module, ok := r.postprocessors[name] + if !ok { + return nil, fmt.Errorf("unknown postprocessing module %q", name) + } + modules = append(modules, module) + } + return modules, nil +} + +func (r *Registry) resolveOutputWriters(names []string) ([]OutputWriter, error) { + modules := make([]OutputWriter, 0, len(names)) + for _, name := range names { + module, ok := r.outputWriters[name] + if !ok { + return nil, fmt.Errorf("unknown output module %q", name) + } + modules = append(modules, module) + } + return modules, nil +} + +func (r *Registry) resolveMerger() (Merger, error) { + if r.merger == nil { + return nil, fmt.Errorf("no merger registered") + } + return r.merger, nil +} diff --git a/internal/pipeline/runner.go b/internal/pipeline/runner.go new file mode 100644 index 0000000..c7e6763 --- /dev/null +++ b/internal/pipeline/runner.go @@ -0,0 +1,176 @@ +package pipeline + +import ( + "context" + "fmt" + + "gitea.maximumdirect.net/eric/seriatim/internal/config" + "gitea.maximumdirect.net/eric/seriatim/internal/model" + "gitea.maximumdirect.net/eric/seriatim/internal/report" +) + +const ( + applicationName = "seriatim" + version = "dev" +) + +// Run validates module composition, executes the pipeline, and emits outputs. +func Run(ctx context.Context, cfg config.Config, registry *Registry) error { + plan, err := resolvePlan(cfg, registry) + if err != nil { + return err + } + + events := make([]report.Event, 0) + + raw, newEvents, err := plan.inputReader.Read(ctx, cfg) + if err != nil { + return err + } + events = append(events, newEvents...) + + state := PreprocessState{ + State: StateRaw, + Raw: raw, + } + for _, module := range plan.preprocessors { + state, newEvents, err = module.Process(ctx, state, cfg) + if err != nil { + return err + } + events = append(events, newEvents...) + } + if state.State != StateCanonical { + return fmt.Errorf("preprocessing ended in state %q; expected %q", state.State, StateCanonical) + } + + merged, newEvents, err := plan.merger.Merge(ctx, state.Canonical, cfg) + if err != nil { + return err + } + events = append(events, newEvents...) + + for _, module := range plan.postprocessors { + merged, newEvents, err = module.Process(ctx, merged, cfg) + if err != nil { + return err + } + events = append(events, newEvents...) + } + + final := finalizeTranscript(cfg, merged) + rpt := finalizeReport(cfg, events) + + for _, module := range plan.outputWriters { + newEvents, err = module.Write(ctx, final, rpt, cfg) + if err != nil { + return err + } + events = append(events, newEvents...) + } + + rpt = finalizeReport(cfg, events) + if cfg.ReportFile != "" { + if err := report.WriteJSON(cfg.ReportFile, rpt); err != nil { + return err + } + } + + return nil +} + +type executionPlan struct { + inputReader InputReader + preprocessors []Preprocessor + merger Merger + postprocessors []Postprocessor + outputWriters []OutputWriter +} + +func resolvePlan(cfg config.Config, registry *Registry) (executionPlan, error) { + inputReader, err := registry.resolveInputReader(cfg.InputReader) + if err != nil { + return executionPlan{}, err + } + + preprocessors, err := registry.resolvePreprocessors(cfg.PreprocessingModules) + if err != nil { + return executionPlan{}, err + } + if err := validatePreprocessors(preprocessors); err != nil { + return executionPlan{}, err + } + + merger, err := registry.resolveMerger() + if err != nil { + return executionPlan{}, err + } + + postprocessors, err := registry.resolvePostprocessors(cfg.PostprocessingModules) + if err != nil { + return executionPlan{}, err + } + + outputWriters, err := registry.resolveOutputWriters(cfg.OutputModules) + if err != nil { + return executionPlan{}, err + } + + return executionPlan{ + inputReader: inputReader, + preprocessors: preprocessors, + merger: merger, + postprocessors: postprocessors, + outputWriters: outputWriters, + }, nil +} + +func validatePreprocessors(modules []Preprocessor) error { + state := StateRaw + for _, module := range modules { + if module.Requires() != state { + return fmt.Errorf("preprocessing module %q requires state %q but current state is %q", module.Name(), module.Requires(), state) + } + state = module.Produces() + } + if state != StateCanonical { + return fmt.Errorf("preprocessing chain ends in state %q; expected %q", state, StateCanonical) + } + return nil +} + +func finalizeTranscript(cfg config.Config, merged model.MergedTranscript) model.FinalTranscript { + segments := make([]model.Segment, len(merged.Segments)) + copy(segments, merged.Segments) + overlapGroups := make([]model.OverlapGroup, len(merged.OverlapGroups)) + copy(overlapGroups, merged.OverlapGroups) + + return model.FinalTranscript{ + Metadata: model.OutputMetadata{ + Application: applicationName, + Version: version, + InputReader: cfg.InputReader, + InputFiles: append([]string(nil), cfg.InputFiles...), + PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...), + PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...), + OutputModules: append([]string(nil), cfg.OutputModules...), + }, + Segments: segments, + OverlapGroups: overlapGroups, + } +} + +func finalizeReport(cfg config.Config, events []report.Event) report.Report { + return report.Report{ + Metadata: report.Metadata{ + Application: applicationName, + Version: version, + InputReader: cfg.InputReader, + InputFiles: append([]string(nil), cfg.InputFiles...), + PreprocessingModules: append([]string(nil), cfg.PreprocessingModules...), + PostprocessingModules: append([]string(nil), cfg.PostprocessingModules...), + OutputModules: append([]string(nil), cfg.OutputModules...), + }, + Events: append([]report.Event(nil), events...), + } +} diff --git a/internal/report/report.go b/internal/report/report.go new file mode 100644 index 0000000..2a8a21e --- /dev/null +++ b/internal/report/report.go @@ -0,0 +1,64 @@ +package report + +import ( + "encoding/json" + "os" +) + +// Severity classifies report events. +type Severity string + +const ( + SeverityInfo Severity = "info" + SeverityWarning Severity = "warning" + SeverityCorrected Severity = "corrected" + SeverityError Severity = "error" +) + +// Event records a validation finding, correction, or pipeline action. +type Event struct { + Severity Severity `json:"severity"` + Stage string `json:"stage"` + Module string `json:"module"` + Message string `json:"message"` +} + +// Report is the deterministic report artifact emitted by the framework. +type Report struct { + Metadata Metadata `json:"metadata"` + Events []Event `json:"events"` +} + +// Metadata records the pipeline configuration that produced the report. +type Metadata struct { + Application string `json:"application"` + Version string `json:"version"` + InputReader string `json:"input_reader"` + InputFiles []string `json:"input_files"` + PreprocessingModules []string `json:"preprocessing_modules"` + PostprocessingModules []string `json:"postprocessing_modules"` + OutputModules []string `json:"output_modules"` +} + +// Info constructs an informational report event. +func Info(stage string, module string, message string) Event { + return Event{ + Severity: SeverityInfo, + Stage: stage, + Module: module, + Message: message, + } +} + +// WriteJSON writes a deterministic JSON report. +func WriteJSON(path string, rpt Report) error { + file, err := os.Create(path) + if err != nil { + return err + } + defer file.Close() + + enc := json.NewEncoder(file) + enc.SetIndent("", " ") + return enc.Encode(rpt) +}