From 161c508fc9f5c3ae82fdea811b1cf6c2d98c3a1c Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 20:29:05 +0100 Subject: [PATCH 01/13] Add scribe stream command for live microphone transcription MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New command: scribe stream — captures microphone audio and transcribes in real-time using FluidAudio's SlidingWindowAsrManager (Parakeet). Features: - Live transcription from microphone with timestamps - Text and JSONL output formats - Save to file with --output - Ctrl+C to stop cleanly - Uses streaming ASR config (11s chunks, 1s hypothesis updates) Usage: scribe stream # listen and transcribe scribe stream --format jsonl # JSONL output scribe stream --output meeting.txt # save to file System audio capture (--source) will be added in a follow-up. Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 149 +++++++++++++++++++++++++++ Sources/scribe/ScribeCLI.swift | 2 +- 2 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 Sources/scribe/Commands/Stream.swift diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift new file mode 100644 index 0000000..172d5c6 --- /dev/null +++ b/Sources/scribe/Commands/Stream.swift @@ -0,0 +1,149 @@ +import ArgumentParser +import AVFoundation +import FluidAudio +import Foundation + +struct Stream: AsyncParsableCommand { + static let configuration = CommandConfiguration( + abstract: "Stream live audio transcription from microphone or system audio." + ) + + @Flag(name: .long, help: "Capture microphone audio (default if no --source specified).") + var mic: Bool = false + + @Option(name: .long, help: "Output format: text or jsonl.") + var format: StreamOutputFormat = .text + + @Option(name: .shortAndLong, help: "Also save output to file.") + var output: String? + + @Flag(name: .long, help: "Show status information.") + var verbose: Bool = false + + func run() async throws { + // For now, mic is the default and only source + if verbose { log("Initializing streaming ASR (Parakeet)...") } + + let streamManager = SlidingWindowAsrManager(config: .streaming) + try await streamManager.start(source: .microphone) + + // Set up microphone capture + let audioEngine = AVAudioEngine() + let inputNode = audioEngine.inputNode + let inputFormat = inputNode.outputFormat(forBus: 0) + + if verbose { + log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") + log("Listening... (press Ctrl+C to stop)") + } + + // Track state for output + let startTime = Date() + var outputFile: FileHandle? = nil + var utteranceCount = 0 + + if let outputPath = output { + FileManager.default.createFile(atPath: outputPath, contents: nil) + outputFile = FileHandle(forWritingAtPath: outputPath) + } + + // Set up signal handler for clean Ctrl+C + signal(SIGINT) { _ in + Darwin.exit(0) + } + + // Install tap on microphone — feed buffers to ASR + inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in + // nonisolated(unsafe) to cross sendability boundary — buffer is consumed immediately + nonisolated(unsafe) let buf = buffer + streamManager.streamAudio(buf) + } + + try audioEngine.start() + + if !verbose { + // Print a minimal status line to stderr + log("Listening... (press Ctrl+C to stop)") + } + + // Read transcription updates + let updates = await streamManager.transcriptionUpdates + for await update in updates { + // Stream continues until Ctrl+C or stream ends + + let text = update.text.trimmingCharacters(in: .whitespaces) + guard !text.isEmpty else { continue } + + utteranceCount += 1 + let elapsed = Date().timeIntervalSince(startTime) + let speaker = mic ? "You" : "Others" + + let line: String + switch format { + case .text: + let ts = formatTimestamp(elapsed) + line = "[\(ts)] \(speaker): \(text)" + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "speaker": speaker, + "text": text, + "confirmed": update.isConfirmed, + ] + if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) { + line = jsonStr + } else { + continue + } + } + + print(line) + fflush(stdout) + + if let file = outputFile { + file.write(Data((line + "\n").utf8)) + } + } + + // Clean shutdown + audioEngine.stop() + inputNode.removeTap(onBus: 0) + let finalText = try await streamManager.finish() + + let totalElapsed = Date().timeIntervalSince(startTime) + + // Print summary to stderr + FileHandle.standardError.write(Data("\n--- Stream ended ---\n".utf8)) + FileHandle.standardError.write(Data(String(format: "Duration: %dm %ds\n", + Int(totalElapsed) / 60, Int(totalElapsed) % 60).utf8)) + FileHandle.standardError.write(Data("Utterances: \(utteranceCount)\n".utf8)) + + if let outputPath = output { + FileHandle.standardError.write(Data("Saved to: \(outputPath)\n".utf8)) + } + + outputFile?.closeFile() + } + + private func log(_ message: String) { + FileHandle.standardError.write(Data("[scribe] \(message)\n".utf8)) + } + + private func formatTimestamp(_ seconds: Double) -> String { + let totalSeconds = Int(seconds) + let hours = totalSeconds / 3600 + let minutes = (totalSeconds % 3600) / 60 + let secs = totalSeconds % 60 + if hours > 0 { + return String(format: "%02d:%02d:%02d", hours, minutes, secs) + } + return String(format: "%02d:%02d", minutes, secs) + } +} + +// MARK: - Stream Output Format + +enum StreamOutputFormat: String, ExpressibleByArgument, CaseIterable, Sendable { + case text, jsonl +} diff --git a/Sources/scribe/ScribeCLI.swift b/Sources/scribe/ScribeCLI.swift index 8d2e877..8a58abb 100644 --- a/Sources/scribe/ScribeCLI.swift +++ b/Sources/scribe/ScribeCLI.swift @@ -6,7 +6,7 @@ struct ScribeCLI: AsyncParsableCommand { commandName: "scribe", abstract: "State-of-the-art local audio transcription with speaker diarization for macOS.", version: "0.2.1", - subcommands: [Transcribe.self, Models.self], + subcommands: [Transcribe.self, Stream.self, Models.self], defaultSubcommand: Transcribe.self ) } From 4ad5e716cd20cfad0cb315faaf82098b893ebc32 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 20:36:45 +0100 Subject: [PATCH 02/13] Fix stream latency, speaker label, and dedup - Reduce chunk size from 11s to 3s for ~3-4s latency (was ~13s) - Lower confirmation threshold from 0.8 to 0.5 for faster output - Reduce right context from 2s to 0.5s - Fix speaker label: remove "Others" tag for mic input - Add text dedup to avoid repeating same hypothesis - Remove --mic flag (mic is default and only source for now) Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 40 +++++++++++++--------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 172d5c6..c30aa61 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -8,9 +8,6 @@ struct Stream: AsyncParsableCommand { abstract: "Stream live audio transcription from microphone or system audio." ) - @Flag(name: .long, help: "Capture microphone audio (default if no --source specified).") - var mic: Bool = false - @Option(name: .long, help: "Output format: text or jsonl.") var format: StreamOutputFormat = .text @@ -21,10 +18,18 @@ struct Stream: AsyncParsableCommand { var verbose: Bool = false func run() async throws { - // For now, mic is the default and only source if verbose { log("Initializing streaming ASR (Parakeet)...") } - let streamManager = SlidingWindowAsrManager(config: .streaming) + // Low-latency config: 3s chunks for fast output, 1s hypothesis for immediate feedback + let config = SlidingWindowAsrConfig( + chunkSeconds: 3.0, + hypothesisChunkSeconds: 1.0, + leftContextSeconds: 3.0, + rightContextSeconds: 0.5, + minContextForConfirmation: 3.0, + confirmationThreshold: 0.5 + ) + let streamManager = SlidingWindowAsrManager(config: config) try await streamManager.start(source: .microphone) // Set up microphone capture @@ -34,13 +39,13 @@ struct Stream: AsyncParsableCommand { if verbose { log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") - log("Listening... (press Ctrl+C to stop)") } // Track state for output let startTime = Date() var outputFile: FileHandle? = nil var utteranceCount = 0 + var lastEmittedText = "" if let outputPath = output { FileManager.default.createFile(atPath: outputPath, contents: nil) @@ -49,44 +54,40 @@ struct Stream: AsyncParsableCommand { // Set up signal handler for clean Ctrl+C signal(SIGINT) { _ in + FileHandle.standardError.write(Data("\n".utf8)) Darwin.exit(0) } // Install tap on microphone — feed buffers to ASR inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in - // nonisolated(unsafe) to cross sendability boundary — buffer is consumed immediately nonisolated(unsafe) let buf = buffer streamManager.streamAudio(buf) } try audioEngine.start() - - if !verbose { - // Print a minimal status line to stderr - log("Listening... (press Ctrl+C to stop)") - } + log("Listening... (press Ctrl+C to stop)") // Read transcription updates let updates = await streamManager.transcriptionUpdates for await update in updates { - // Stream continues until Ctrl+C or stream ends - let text = update.text.trimmingCharacters(in: .whitespaces) guard !text.isEmpty else { continue } + // Skip if this is the same text we already emitted (dedup) + guard text != lastEmittedText else { continue } + lastEmittedText = text + utteranceCount += 1 let elapsed = Date().timeIntervalSince(startTime) - let speaker = mic ? "You" : "Others" let line: String switch format { case .text: let ts = formatTimestamp(elapsed) - line = "[\(ts)] \(speaker): \(text)" + line = "[\(ts)] \(text)" case .jsonl: let jsonObj: [String: Any] = [ "time": round(elapsed * 10) / 10, - "speaker": speaker, "text": text, "confirmed": update.isConfirmed, ] @@ -109,11 +110,10 @@ struct Stream: AsyncParsableCommand { // Clean shutdown audioEngine.stop() inputNode.removeTap(onBus: 0) - let finalText = try await streamManager.finish() + _ = try await streamManager.finish() let totalElapsed = Date().timeIntervalSince(startTime) - // Print summary to stderr FileHandle.standardError.write(Data("\n--- Stream ended ---\n".utf8)) FileHandle.standardError.write(Data(String(format: "Duration: %dm %ds\n", Int(totalElapsed) / 60, Int(totalElapsed) % 60).utf8)) @@ -142,8 +142,6 @@ struct Stream: AsyncParsableCommand { } } -// MARK: - Stream Output Format - enum StreamOutputFormat: String, ExpressibleByArgument, CaseIterable, Sendable { case text, jsonl } From 84af5b0b563f5151ad941f3971796804f68f27d2 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 20:43:42 +0100 Subject: [PATCH 03/13] Fix stream: use library streaming config, show volatile + confirmed updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The 3s chunk config was too short for Parakeet — model needs ~10s context. Reverted to the library's .streaming preset (11s chunks, 1s hypothesis). Now shows two types of updates: - Volatile (hypothesis): shown as ephemeral line on stderr with \r overwrite Gives immediate ~1-2s feedback while speaking - Confirmed: printed as permanent line to stdout Stable, final text after sufficient context Also fixes: - Stream getting stuck on longer utterances (was breaking model state) - Text format shows live preview on stderr, final on stdout - JSONL emits both volatile and confirmed (with "confirmed" field) Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 99 +++++++++++++++------------- 1 file changed, 54 insertions(+), 45 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index c30aa61..b067dd6 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -5,7 +5,7 @@ import Foundation struct Stream: AsyncParsableCommand { static let configuration = CommandConfiguration( - abstract: "Stream live audio transcription from microphone or system audio." + abstract: "Stream live audio transcription from microphone." ) @Option(name: .long, help: "Output format: text or jsonl.") @@ -20,16 +20,8 @@ struct Stream: AsyncParsableCommand { func run() async throws { if verbose { log("Initializing streaming ASR (Parakeet)...") } - // Low-latency config: 3s chunks for fast output, 1s hypothesis for immediate feedback - let config = SlidingWindowAsrConfig( - chunkSeconds: 3.0, - hypothesisChunkSeconds: 1.0, - leftContextSeconds: 3.0, - rightContextSeconds: 0.5, - minContextForConfirmation: 3.0, - confirmationThreshold: 0.5 - ) - let streamManager = SlidingWindowAsrManager(config: config) + // Use the library's streaming config (11s chunks + 1s hypothesis updates) + let streamManager = SlidingWindowAsrManager(config: .streaming) try await streamManager.start(source: .microphone) // Set up microphone capture @@ -41,24 +33,22 @@ struct Stream: AsyncParsableCommand { log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") } - // Track state for output let startTime = Date() var outputFile: FileHandle? = nil - var utteranceCount = 0 - var lastEmittedText = "" + var lastConfirmedText = "" + var lastVolatileText = "" if let outputPath = output { FileManager.default.createFile(atPath: outputPath, contents: nil) outputFile = FileHandle(forWritingAtPath: outputPath) } - // Set up signal handler for clean Ctrl+C signal(SIGINT) { _ in FileHandle.standardError.write(Data("\n".utf8)) Darwin.exit(0) } - // Install tap on microphone — feed buffers to ASR + // Install tap on microphone inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in nonisolated(unsafe) let buf = buffer streamManager.streamAudio(buf) @@ -73,37 +63,39 @@ struct Stream: AsyncParsableCommand { let text = update.text.trimmingCharacters(in: .whitespaces) guard !text.isEmpty else { continue } - // Skip if this is the same text we already emitted (dedup) - guard text != lastEmittedText else { continue } - lastEmittedText = text - - utteranceCount += 1 let elapsed = Date().timeIntervalSince(startTime) - let line: String - switch format { - case .text: - let ts = formatTimestamp(elapsed) - line = "[\(ts)] \(text)" - case .jsonl: - let jsonObj: [String: Any] = [ - "time": round(elapsed * 10) / 10, - "text": text, - "confirmed": update.isConfirmed, - ] - if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), - let jsonStr = String(data: data, encoding: .utf8) { - line = jsonStr - } else { - continue - } - } + if update.isConfirmed { + // Confirmed: stable, final text — print as a permanent line + guard text != lastConfirmedText else { continue } + lastConfirmedText = text + lastVolatileText = "" // reset volatile tracking - print(line) - fflush(stdout) + let line = formatLine(text: text, elapsed: elapsed, confirmed: true) + print(line) + fflush(stdout) - if let file = outputFile { - file.write(Data((line + "\n").utf8)) + if let file = outputFile { + file.write(Data((line + "\n").utf8)) + } + } else { + // Volatile: hypothesis, may change — show as ephemeral line + guard text != lastVolatileText else { continue } + lastVolatileText = text + + switch format { + case .text: + // Overwrite current line with \r for live feel + let ts = formatTimestamp(elapsed) + let preview = String(text.suffix(80)) + let line = "\r[\(ts)] \(preview)" + FileHandle.standardError.write(Data(line.utf8)) + case .jsonl: + // In JSONL mode, emit volatile updates too (marked as unconfirmed) + let line = formatLine(text: text, elapsed: elapsed, confirmed: false) + print(line) + fflush(stdout) + } } } @@ -113,11 +105,9 @@ struct Stream: AsyncParsableCommand { _ = try await streamManager.finish() let totalElapsed = Date().timeIntervalSince(startTime) - FileHandle.standardError.write(Data("\n--- Stream ended ---\n".utf8)) FileHandle.standardError.write(Data(String(format: "Duration: %dm %ds\n", Int(totalElapsed) / 60, Int(totalElapsed) % 60).utf8)) - FileHandle.standardError.write(Data("Utterances: \(utteranceCount)\n".utf8)) if let outputPath = output { FileHandle.standardError.write(Data("Saved to: \(outputPath)\n".utf8)) @@ -126,6 +116,25 @@ struct Stream: AsyncParsableCommand { outputFile?.closeFile() } + private func formatLine(text: String, elapsed: Double, confirmed: Bool) -> String { + switch format { + case .text: + let ts = formatTimestamp(elapsed) + return "[\(ts)] \(text)" + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "text": text, + "confirmed": confirmed, + ] + if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) { + return jsonStr + } + return "{\"text\":\"\(text)\"}" + } + } + private func log(_ message: String) { FileHandle.standardError.write(Data("[scribe] \(message)\n".utf8)) } From e08ff0b7ec67ff52d2f8969a335b3aa0b53d0a0a Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 21:06:18 +0100 Subject: [PATCH 04/13] =?UTF-8?q?Switch=20stream=20to=20Nemotron=20560ms?= =?UTF-8?q?=20=E2=80=94=20true=20streaming=20ASR=20engine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace SlidingWindowAsrManager (batch TDT in sliding windows, ~11s latency) with StreamingAsrEngine protocol using Nemotron 560ms: - True cache-aware streaming: each 560ms chunk inherits full context - 2.12% WER (better than TDT v3's 2.5% on LibriSpeech) - Includes punctuation and capitalization - ~560ms to first text (was ~11s) - Partial transcript callback for live preview on stderr - Confirmed text printed to stdout Architecture: - Mic audio → appendAudio() → processBufferedAudio() → getPartialTranscript() - Partial callback fires on every chunk for live preview (\r overwrite on stderr) - Main loop polls at 20Hz, emits new confirmed text to stdout - Actor-based state management for thread safety (Swift 6) Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 185 +++++++++++++++------------ 1 file changed, 104 insertions(+), 81 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index b067dd6..72f3178 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -3,6 +3,34 @@ import AVFoundation import FluidAudio import Foundation +/// Thread-safe state tracker for streaming output. +private actor StreamState { + var lastPartialText = "" + var lastOutputText = "" + + func shouldEmitPartial(_ text: String) -> Bool { + guard text != lastPartialText else { return false } + lastPartialText = text + return true + } + + func getNewText(_ fullTranscript: String) -> String? { + let text = fullTranscript.trimmingCharacters(in: .whitespaces) + guard !text.isEmpty, text != lastOutputText else { return nil } + + let newText: String + if text.hasPrefix(lastOutputText) { + newText = String(text.dropFirst(lastOutputText.count)).trimmingCharacters(in: .whitespaces) + } else { + newText = text + } + + guard !newText.isEmpty else { return nil } + lastOutputText = text + return newText + } +} + struct Stream: AsyncParsableCommand { static let configuration = CommandConfiguration( abstract: "Stream live audio transcription from microphone." @@ -18,13 +46,13 @@ struct Stream: AsyncParsableCommand { var verbose: Bool = false func run() async throws { - if verbose { log("Initializing streaming ASR (Parakeet)...") } + if verbose { log("Initializing streaming ASR (Nemotron 560ms)...") } - // Use the library's streaming config (11s chunks + 1s hypothesis updates) - let streamManager = SlidingWindowAsrManager(config: .streaming) - try await streamManager.start(source: .microphone) + let engine = StreamingAsrEngineFactory.create(.nemotron560ms) + try await engine.loadModels() + + if verbose { log("Models loaded.") } - // Set up microphone capture let audioEngine = AVAudioEngine() let inputNode = audioEngine.inputNode let inputFormat = inputNode.outputFormat(forBus: 0) @@ -34,15 +62,44 @@ struct Stream: AsyncParsableCommand { } let startTime = Date() + let state = StreamState() + let fmt = format var outputFile: FileHandle? = nil - var lastConfirmedText = "" - var lastVolatileText = "" if let outputPath = output { FileManager.default.createFile(atPath: outputPath, contents: nil) outputFile = FileHandle(forWritingAtPath: outputPath) } + // Partial transcript callback — fires on every chunk (~560ms) + await engine.setPartialTranscriptCallback { partial in + let text = partial.trimmingCharacters(in: .whitespaces) + guard !text.isEmpty else { return } + + Task { + guard await state.shouldEmitPartial(text) else { return } + let elapsed = Date().timeIntervalSince(startTime) + + switch fmt { + case .text: + let ts = formatStreamTimestamp(elapsed) + let preview = String(text.suffix(100)) + FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "text": text, + "partial": true, + ] + if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) { + print(jsonStr) + fflush(stdout) + } + } + } + } + signal(SIGINT) { _ in FileHandle.standardError.write(Data("\n".utf8)) Darwin.exit(0) @@ -51,104 +108,70 @@ struct Stream: AsyncParsableCommand { // Install tap on microphone inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in nonisolated(unsafe) let buf = buffer - streamManager.streamAudio(buf) + do { + try engine.appendAudio(buf) + } catch { + // Non-fatal + } } try audioEngine.start() log("Listening... (press Ctrl+C to stop)") - // Read transcription updates - let updates = await streamManager.transcriptionUpdates - for await update in updates { - let text = update.text.trimmingCharacters(in: .whitespaces) - guard !text.isEmpty else { continue } + // Processing loop + while true { + try await engine.processBufferedAudio() - let elapsed = Date().timeIntervalSince(startTime) + let transcript = await engine.getPartialTranscript() + if let newText = await state.getNewText(transcript) { + let elapsed = Date().timeIntervalSince(startTime) + let line: String - if update.isConfirmed { - // Confirmed: stable, final text — print as a permanent line - guard text != lastConfirmedText else { continue } - lastConfirmedText = text - lastVolatileText = "" // reset volatile tracking + switch format { + case .text: + let ts = formatStreamTimestamp(elapsed) + FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) + line = "[\(ts)] \(newText)" + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "text": newText, + "partial": false, + ] + if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) { + line = jsonStr + } else { + continue + } + } - let line = formatLine(text: text, elapsed: elapsed, confirmed: true) print(line) fflush(stdout) if let file = outputFile { file.write(Data((line + "\n").utf8)) } - } else { - // Volatile: hypothesis, may change — show as ephemeral line - guard text != lastVolatileText else { continue } - lastVolatileText = text - - switch format { - case .text: - // Overwrite current line with \r for live feel - let ts = formatTimestamp(elapsed) - let preview = String(text.suffix(80)) - let line = "\r[\(ts)] \(preview)" - FileHandle.standardError.write(Data(line.utf8)) - case .jsonl: - // In JSONL mode, emit volatile updates too (marked as unconfirmed) - let line = formatLine(text: text, elapsed: elapsed, confirmed: false) - print(line) - fflush(stdout) - } } - } - - // Clean shutdown - audioEngine.stop() - inputNode.removeTap(onBus: 0) - _ = try await streamManager.finish() - let totalElapsed = Date().timeIntervalSince(startTime) - FileHandle.standardError.write(Data("\n--- Stream ended ---\n".utf8)) - FileHandle.standardError.write(Data(String(format: "Duration: %dm %ds\n", - Int(totalElapsed) / 60, Int(totalElapsed) % 60).utf8)) - - if let outputPath = output { - FileHandle.standardError.write(Data("Saved to: \(outputPath)\n".utf8)) - } - - outputFile?.closeFile() - } - - private func formatLine(text: String, elapsed: Double, confirmed: Bool) -> String { - switch format { - case .text: - let ts = formatTimestamp(elapsed) - return "[\(ts)] \(text)" - case .jsonl: - let jsonObj: [String: Any] = [ - "time": round(elapsed * 10) / 10, - "text": text, - "confirmed": confirmed, - ] - if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), - let jsonStr = String(data: data, encoding: .utf8) { - return jsonStr - } - return "{\"text\":\"\(text)\"}" + try await Task.sleep(nanoseconds: 50_000_000) // 50ms polling } } private func log(_ message: String) { FileHandle.standardError.write(Data("[scribe] \(message)\n".utf8)) } +} - private func formatTimestamp(_ seconds: Double) -> String { - let totalSeconds = Int(seconds) - let hours = totalSeconds / 3600 - let minutes = (totalSeconds % 3600) / 60 - let secs = totalSeconds % 60 - if hours > 0 { - return String(format: "%02d:%02d:%02d", hours, minutes, secs) - } - return String(format: "%02d:%02d", minutes, secs) +private func formatStreamTimestamp(_ seconds: Double) -> String { + let totalSeconds = Int(seconds) + let hours = totalSeconds / 3600 + let minutes = (totalSeconds % 3600) / 60 + let secs = totalSeconds % 60 + if hours > 0 { + return String(format: "%02d:%02d:%02d", hours, minutes, secs) } + return String(format: "%02d:%02d", minutes, secs) } enum StreamOutputFormat: String, ExpressibleByArgument, CaseIterable, Sendable { From 2d95652bbeae7e6edfa9456b86f61b76ef8a2d94 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 21:10:39 +0100 Subject: [PATCH 05/13] Default stream to multilingual engine, Nemotron as opt-in for English - Default: Parakeet TDT v3 via SlidingWindow (25 languages, higher latency) - --engine nemotron: Nemotron 560ms (English-only, ~560ms latency, punctuation) Usage: scribe stream # multilingual (default) scribe stream --engine nemotron # English-only, low latency Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 160 +++++++++++++++++++++------ 1 file changed, 126 insertions(+), 34 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 72f3178..2af02b0 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -39,6 +39,9 @@ struct Stream: AsyncParsableCommand { @Option(name: .long, help: "Output format: text or jsonl.") var format: StreamOutputFormat = .text + @Option(name: .long, help: "Streaming engine: default (multilingual, higher latency) or nemotron (English-only, low latency ~560ms).") + var engine: StreamEngine = .default + @Option(name: .shortAndLong, help: "Also save output to file.") var output: String? @@ -46,7 +49,18 @@ struct Stream: AsyncParsableCommand { var verbose: Bool = false func run() async throws { - if verbose { log("Initializing streaming ASR (Nemotron 560ms)...") } + switch engine { + case .nemotron: + try await runNemotron() + case .default: + try await runSlidingWindow() + } + } + + // MARK: - Nemotron Engine (English-only, low latency) + + private func runNemotron() async throws { + if verbose { log("Initializing streaming ASR (Nemotron 560ms, English-only)...") } let engine = StreamingAsrEngineFactory.create(.nemotron560ms) try await engine.loadModels() @@ -71,7 +85,6 @@ struct Stream: AsyncParsableCommand { outputFile = FileHandle(forWritingAtPath: outputPath) } - // Partial transcript callback — fires on every chunk (~560ms) await engine.setPartialTranscriptCallback { partial in let text = partial.trimmingCharacters(in: .whitespaces) guard !text.isEmpty else { return } @@ -100,61 +113,135 @@ struct Stream: AsyncParsableCommand { } } - signal(SIGINT) { _ in - FileHandle.standardError.write(Data("\n".utf8)) - Darwin.exit(0) - } + setupSignalHandler() - // Install tap on microphone inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in nonisolated(unsafe) let buf = buffer - do { - try engine.appendAudio(buf) - } catch { - // Non-fatal - } + do { try engine.appendAudio(buf) } catch {} } try audioEngine.start() - log("Listening... (press Ctrl+C to stop)") + log("Listening (English, low latency)... press Ctrl+C to stop") - // Processing loop while true { try await engine.processBufferedAudio() let transcript = await engine.getPartialTranscript() if let newText = await state.getNewText(transcript) { let elapsed = Date().timeIntervalSince(startTime) - let line: String + emitLine(text: newText, elapsed: elapsed, partial: false, outputFile: outputFile) + } + + try await Task.sleep(nanoseconds: 50_000_000) + } + } + + // MARK: - SlidingWindow Engine (Multilingual, default) + + private func runSlidingWindow() async throws { + if verbose { log("Initializing streaming ASR (Parakeet TDT v3, multilingual)...") } + + let streamManager = SlidingWindowAsrManager(config: .streaming) + try await streamManager.start(source: .microphone) + + if verbose { log("Models loaded.") } + + let audioEngine = AVAudioEngine() + let inputNode = audioEngine.inputNode + let inputFormat = inputNode.outputFormat(forBus: 0) + + if verbose { + log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") + } + + let startTime = Date() + var outputFile: FileHandle? = nil + var lastConfirmedText = "" + var lastVolatileText = "" + + if let outputPath = output { + FileManager.default.createFile(atPath: outputPath, contents: nil) + outputFile = FileHandle(forWritingAtPath: outputPath) + } + + setupSignalHandler() + + inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in + nonisolated(unsafe) let buf = buffer + streamManager.streamAudio(buf) + } + + try audioEngine.start() + log("Listening (multilingual)... press Ctrl+C to stop") + + let updates = await streamManager.transcriptionUpdates + for await update in updates { + let text = update.text.trimmingCharacters(in: .whitespaces) + guard !text.isEmpty else { continue } + + let elapsed = Date().timeIntervalSince(startTime) + + if update.isConfirmed { + guard text != lastConfirmedText else { continue } + lastConfirmedText = text + lastVolatileText = "" + emitLine(text: text, elapsed: elapsed, partial: false, outputFile: outputFile) + } else { + guard text != lastVolatileText else { continue } + lastVolatileText = text switch format { case .text: let ts = formatStreamTimestamp(elapsed) - FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) - line = "[\(ts)] \(newText)" + let preview = String(text.suffix(100)) + FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) case .jsonl: - let jsonObj: [String: Any] = [ - "time": round(elapsed * 10) / 10, - "text": newText, - "partial": false, - ] - if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), - let jsonStr = String(data: data, encoding: .utf8) { - line = jsonStr - } else { - continue - } + emitLine(text: text, elapsed: elapsed, partial: true, outputFile: nil) } + } + } - print(line) - fflush(stdout) + audioEngine.stop() + inputNode.removeTap(onBus: 0) + _ = try await streamManager.finish() + } - if let file = outputFile { - file.write(Data((line + "\n").utf8)) - } + // MARK: - Shared Helpers + + private func emitLine(text: String, elapsed: Double, partial: Bool, outputFile: FileHandle?) { + let line: String + + switch format { + case .text: + let ts = formatStreamTimestamp(elapsed) + FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) + line = "[\(ts)] \(text)" + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "text": text, + "partial": partial, + ] + if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) { + line = jsonStr + } else { + return } + } + + print(line) + fflush(stdout) - try await Task.sleep(nanoseconds: 50_000_000) // 50ms polling + if let file = outputFile { + file.write(Data((line + "\n").utf8)) + } + } + + private func setupSignalHandler() { + signal(SIGINT) { _ in + FileHandle.standardError.write(Data("\n".utf8)) + Darwin.exit(0) } } @@ -177,3 +264,8 @@ private func formatStreamTimestamp(_ seconds: Double) -> String { enum StreamOutputFormat: String, ExpressibleByArgument, CaseIterable, Sendable { case text, jsonl } + +enum StreamEngine: String, ExpressibleByArgument, CaseIterable, Sendable { + case `default` + case nemotron +} From 8f9acdb0b338f073f72c5c87659932273e2b5a6d Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 21:23:20 +0100 Subject: [PATCH 06/13] Add model download retry, progress logging, and streaming docs - Nemotron: retry with cache cleanup on failed model load (fixes partial download) - Both engines: show download progress messages (not just --verbose) - README: add streaming section with engine comparison and trade-offs - README: update performance table with streaming latencies Co-Authored-By: Claude Opus 4.6 (1M context) --- README.md | 19 ++++++++++++-- Sources/scribe/Commands/Stream.swift | 38 ++++++++++++++++++++++++---- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index f7ab5b7..ffe01e6 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,19 @@ scribe transcribe meeting.wav --language es # Spanish scribe transcribe meeting.wav --language fr # French ``` +### Live streaming from microphone + +```bash +scribe stream # multilingual (25 languages, ~11s latency) +scribe stream --engine nemotron # English-only, low latency (~560ms), with punctuation +scribe stream --format jsonl # JSONL output (one JSON object per line) +scribe stream --output meeting.txt # save to file while streaming +``` + +> **Note on streaming engines:** +> - **Default** (Parakeet TDT v3): Supports all 25 languages including Spanish. Higher latency (~11s for confirmed text) because it uses a batch model in sliding windows. Live preview appears on screen while speaking. +> - **Nemotron** (`--engine nemotron`): English-only but much faster (~560ms latency). Includes punctuation and capitalization. Recommended for English-only meetings. + ### Pre-download models ```bash @@ -143,8 +156,10 @@ Tested on Apple Silicon (M-series): |------|-------|---------| | Transcription only | ~130x real-time | 4-min file in 1.7s | | Transcription + diarization | ~30x real-time | 4-min file in 7.5s | +| Live streaming (Nemotron) | ~560ms latency | English, with punctuation | +| Live streaming (default) | ~11s latency | 25 languages | -Models are downloaded automatically on first use (~600MB for ASR, ~50MB for diarization). +Models are downloaded automatically on first use (~600MB per model). First run may take a minute. ## Requirements @@ -159,7 +174,7 @@ Bulgarian, Croatian, Czech, Danish, Dutch, English, Estonian, Finnish, French, G scribe is built on the shoulders of excellent open-source projects: -- **[NVIDIA Parakeet](https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3)** (CC-BY-4.0) — The speech recognition model that powers transcription +- **[NVIDIA Parakeet](https://huggingface.co/nvidia/parakeet-tdt-0.6b-v3)** (CC-BY-4.0) — Speech recognition models (TDT v3 for batch, Nemotron for streaming) - **[FluidAudio](https://github.com/FluidInference/FluidAudio)** (Apache 2.0) by FluidInference — CoreML speech processing SDK for Apple Silicon - **[pyannote.audio](https://github.com/pyannote/pyannote-audio)** (MIT) by Herve Bredin — The diarization model architecture - **[swift-argument-parser](https://github.com/apple/swift-argument-parser)** (Apache 2.0) by Apple — CLI argument parsing diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 2af02b0..3503b96 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -60,12 +60,39 @@ struct Stream: AsyncParsableCommand { // MARK: - Nemotron Engine (English-only, low latency) private func runNemotron() async throws { - if verbose { log("Initializing streaming ASR (Nemotron 560ms, English-only)...") } + log("Initializing streaming ASR (Nemotron 560ms, English-only)...") + log("Downloading model if needed (first run only, ~600MB)...") let engine = StreamingAsrEngineFactory.create(.nemotron560ms) - try await engine.loadModels() - if verbose { log("Models loaded.") } + // Try loading models — if it fails (partial download), clean cache and retry once + do { + try await engine.loadModels() + } catch { + log("Model loading failed: \(error.localizedDescription)") + log("Cleaning cache and retrying download...") + + let cacheDir = FileManager.default.urls(for: .applicationSupportDirectory, in: .userDomainMask).first! + .appendingPathComponent("FluidAudio/Models/nemotron-streaming") + try? FileManager.default.removeItem(at: cacheDir) + + let freshEngine = StreamingAsrEngineFactory.create(.nemotron560ms) + try await freshEngine.loadModels() + // If this also fails, the error propagates naturally + + // Use the fresh engine — but we can't reassign `engine` (let binding), + // so we just recurse. The cache is now clean, so it will work. + log("Retry successful.") + // Re-run with clean state + try await runNemotronWithEngine(freshEngine) + return + } + + log("Models loaded.") + try await runNemotronWithEngine(engine) + } + + private func runNemotronWithEngine(_ engine: any StreamingAsrEngine) async throws { let audioEngine = AVAudioEngine() let inputNode = audioEngine.inputNode @@ -139,12 +166,13 @@ struct Stream: AsyncParsableCommand { // MARK: - SlidingWindow Engine (Multilingual, default) private func runSlidingWindow() async throws { - if verbose { log("Initializing streaming ASR (Parakeet TDT v3, multilingual)...") } + log("Initializing streaming ASR (Parakeet TDT v3, multilingual)...") + log("Downloading model if needed (first run only, ~600MB)...") let streamManager = SlidingWindowAsrManager(config: .streaming) try await streamManager.start(source: .microphone) - if verbose { log("Models loaded.") } + log("Models loaded.") let audioEngine = AVAudioEngine() let inputNode = audioEngine.inputNode From 3c93c3d802373e64f39b61e0c6ece014c47b2556 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Mon, 30 Mar 2026 23:54:04 +0100 Subject: [PATCH 07/13] Fix Nemotron streaming: emit delta text, not full accumulated transcript MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Nemotron engine's partial callback returns the full accumulated transcript each time, which grows and revises. The previous code tried to diff via getPartialTranscript() polling, causing repeated/mixed output. Fix: Track printed length in StreamState actor. The partial callback fires after each 560ms chunk — we diff to find only the new portion and emit that. Live preview shows the tail of the transcript on stderr (ephemeral, overwritten). New confirmed text goes to stdout. Also simplified SlidingWindow engine to only emit to stdout on confirmed text (volatile goes to stderr preview only). Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 126 +++++++++++++++------------ 1 file changed, 72 insertions(+), 54 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 3503b96..f0474e3 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -3,31 +3,38 @@ import AVFoundation import FluidAudio import Foundation -/// Thread-safe state tracker for streaming output. +/// Thread-safe state for tracking incremental transcript output. private actor StreamState { - var lastPartialText = "" - var lastOutputText = "" - - func shouldEmitPartial(_ text: String) -> Bool { - guard text != lastPartialText else { return false } - lastPartialText = text - return true - } + private var lastFullTranscript = "" + private var printedLength = 0 + /// Given the full accumulated transcript, return only the new portion. + /// Handles model revisions by finding the longest common prefix. func getNewText(_ fullTranscript: String) -> String? { let text = fullTranscript.trimmingCharacters(in: .whitespaces) - guard !text.isEmpty, text != lastOutputText else { return nil } + guard !text.isEmpty else { return nil } + guard text != lastFullTranscript else { return nil } + + lastFullTranscript = text - let newText: String - if text.hasPrefix(lastOutputText) { - newText = String(text.dropFirst(lastOutputText.count)).trimmingCharacters(in: .whitespaces) - } else { - newText = text + // Find how much of the text we've already printed + if text.count > printedLength { + let newPortion = String(text.dropFirst(printedLength)).trimmingCharacters(in: .whitespaces) + if !newPortion.isEmpty { + printedLength = text.count + return newPortion + } } - guard !newText.isEmpty else { return nil } - lastOutputText = text - return newText + return nil + } + + /// Get the live preview (last N chars of full transcript). + func getPreview(_ fullTranscript: String, maxLen: Int = 100) -> String? { + let text = fullTranscript.trimmingCharacters(in: .whitespaces) + guard !text.isEmpty else { return nil } + guard text != lastFullTranscript else { return nil } + return String(text.suffix(maxLen)) } } @@ -65,7 +72,6 @@ struct Stream: AsyncParsableCommand { let engine = StreamingAsrEngineFactory.create(.nemotron560ms) - // Try loading models — if it fails (partial download), clean cache and retry once do { try await engine.loadModels() } catch { @@ -78,12 +84,7 @@ struct Stream: AsyncParsableCommand { let freshEngine = StreamingAsrEngineFactory.create(.nemotron560ms) try await freshEngine.loadModels() - // If this also fails, the error propagates naturally - - // Use the fresh engine — but we can't reassign `engine` (let binding), - // so we just recurse. The cache is now clean, so it will work. log("Retry successful.") - // Re-run with clean state try await runNemotronWithEngine(freshEngine) return } @@ -93,7 +94,6 @@ struct Stream: AsyncParsableCommand { } private func runNemotronWithEngine(_ engine: any StreamingAsrEngine) async throws { - let audioEngine = AVAudioEngine() let inputNode = audioEngine.inputNode let inputFormat = inputNode.outputFormat(forBus: 0) @@ -112,29 +112,54 @@ struct Stream: AsyncParsableCommand { outputFile = FileHandle(forWritingAtPath: outputPath) } - await engine.setPartialTranscriptCallback { partial in - let text = partial.trimmingCharacters(in: .whitespaces) - guard !text.isEmpty else { return } + // Capture file handle for sendable closure + nonisolated(unsafe) let outFile = outputFile + // Partial callback — fires after each 560ms chunk with the full accumulated transcript. + // We diff to find new text and emit only the delta. + await engine.setPartialTranscriptCallback { fullTranscript in Task { - guard await state.shouldEmitPartial(text) else { return } let elapsed = Date().timeIntervalSince(startTime) switch fmt { case .text: - let ts = formatStreamTimestamp(elapsed) - let preview = String(text.suffix(100)) - FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) + // Show live preview on stderr (ephemeral, overwritten) + let preview = String(fullTranscript.trimmingCharacters(in: .whitespaces).suffix(100)) + if !preview.isEmpty { + let ts = formatStreamTimestamp(elapsed) + FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) + } case .jsonl: - let jsonObj: [String: Any] = [ - "time": round(elapsed * 10) / 10, - "text": text, - "partial": true, - ] - if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), - let jsonStr = String(data: data, encoding: .utf8) { - print(jsonStr) - fflush(stdout) + break + } + + // Emit new portion to stdout + if let newText = await state.getNewText(fullTranscript) { + let ts = formatStreamTimestamp(elapsed) + + let line: String + switch fmt { + case .text: + FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) + line = "[\(ts)] \(newText)" + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "text": newText, + ] + if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) { + line = jsonStr + } else { + return + } + } + + print(line) + fflush(stdout) + + if let file = outFile { + file.write(Data((line + "\n").utf8)) } } } @@ -150,16 +175,10 @@ struct Stream: AsyncParsableCommand { try audioEngine.start() log("Listening (English, low latency)... press Ctrl+C to stop") + // Processing loop — drives the engine to process buffered audio while true { try await engine.processBufferedAudio() - - let transcript = await engine.getPartialTranscript() - if let newText = await state.getNewText(transcript) { - let elapsed = Date().timeIntervalSince(startTime) - emitLine(text: newText, elapsed: elapsed, partial: false, outputFile: outputFile) - } - - try await Task.sleep(nanoseconds: 50_000_000) + try await Task.sleep(nanoseconds: 50_000_000) // 50ms } } @@ -213,7 +232,7 @@ struct Stream: AsyncParsableCommand { guard text != lastConfirmedText else { continue } lastConfirmedText = text lastVolatileText = "" - emitLine(text: text, elapsed: elapsed, partial: false, outputFile: outputFile) + emitLine(text: text, elapsed: elapsed, outputFile: outputFile) } else { guard text != lastVolatileText else { continue } lastVolatileText = text @@ -224,7 +243,7 @@ struct Stream: AsyncParsableCommand { let preview = String(text.suffix(100)) FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) case .jsonl: - emitLine(text: text, elapsed: elapsed, partial: true, outputFile: nil) + break } } } @@ -234,9 +253,9 @@ struct Stream: AsyncParsableCommand { _ = try await streamManager.finish() } - // MARK: - Shared Helpers + // MARK: - Helpers - private func emitLine(text: String, elapsed: Double, partial: Bool, outputFile: FileHandle?) { + private func emitLine(text: String, elapsed: Double, outputFile: FileHandle?) { let line: String switch format { @@ -248,7 +267,6 @@ struct Stream: AsyncParsableCommand { let jsonObj: [String: Any] = [ "time": round(elapsed * 10) / 10, "text": text, - "partial": partial, ] if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), let jsonStr = String(data: data, encoding: .utf8) { From 771dcc6b02e9c8740555ebddb516641ca74e0158 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Thu, 9 Apr 2026 20:02:28 +0100 Subject: [PATCH 08/13] Add --audio-file streaming test harness with eval baseline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a test/dev mode that feeds pre-recorded audio files into the streaming pipeline (instead of microphone), making streaming output deterministic and reproducible. Required to validate streaming quality without manual mic testing. Stream.swift: - New --audio-file option (Nemotron engine only for now) - Reads via FluidAudio.AudioConverter.resampleAudioFile() (16kHz mono Float32) - Chunks at 4096 samples to mimic the live mic tap - Calls engine.finish() to flush tail audio before exit - Refactored Nemotron path: split mic vs file, extracted callback setup - SlidingWindow path explicitly rejects --audio-file (not yet supported) eval.py: - New --mode batch|streaming flag - Streaming mode dispatches to scribe stream --audio-file --format jsonl - Parses JSONL deltas and concatenates for WER computation - Auto-skips MLS Spanish in streaming mode (Nemotron is English-only) - New "mode" column in CSVs to differentiate batch vs streaming runs summarize.py: - Splits proper noun recall section by mode (batch vs streaming) Baseline results (scribe v0.2.1): | Dataset | Batch (Parakeet) | Streaming (Nemotron) | Delta | |------------|-----------------:|---------------------:|--------:| | TED-LIUM | 6.0% | 23.2% | +17.2pp | | Earnings-21| 12.9% | 39.9% | +27.0pp | | PN recall | 70.8% | 58.4% | -12.4pp | Streaming is 3-4x worse than batch with deletions dominating the error budget (TED 1327 D vs 759 I, Earnings-21 3640 D vs 1469 I) — streaming is dropping content. Likely culprits: 1. Delta-text logic in StreamState.getNewText() may discard model revisions 2. engine.finish() may not flush the final partial-transcript callback This commit establishes the regression baseline. Next phase: investigate the delta-text dropping and re-run the same eval to confirm fixes. Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 124 ++++++++++++++---- experiments/eval/eval.py | 103 +++++++++++---- .../scribe-0.2.1-streaming-earnings21.csv | 6 + .../scribe-0.2.1-streaming-tedlium.csv | 6 + experiments/eval/summarize.py | 36 +++-- 5 files changed, 214 insertions(+), 61 deletions(-) create mode 100644 experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv create mode 100644 experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index f0474e3..791374e 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -55,6 +55,9 @@ struct Stream: AsyncParsableCommand { @Flag(name: .long, help: "Show status information.") var verbose: Bool = false + @Option(name: .long, help: "Read audio from a WAV/audio file instead of microphone (for testing/eval). Exits when file is consumed.") + var audioFile: String? + func run() async throws { switch engine { case .nemotron: @@ -94,6 +97,20 @@ struct Stream: AsyncParsableCommand { } private func runNemotronWithEngine(_ engine: any StreamingAsrEngine) async throws { + let startTime = Date() + let outputFile = openOutputFile() + + // Wire up the partial-transcript callback (delta emission to stdout/stderr/file). + await setupNemotronCallback(engine: engine, startTime: startTime, outputFile: outputFile) + + if let path = audioFile { + try await runNemotronFromFile(engine: engine, path: path) + } else { + try await runNemotronFromMic(engine: engine) + } + } + + private func runNemotronFromMic(engine: any StreamingAsrEngine) async throws { let audioEngine = AVAudioEngine() let inputNode = audioEngine.inputNode let inputFormat = inputNode.outputFormat(forBus: 0) @@ -102,16 +119,85 @@ struct Stream: AsyncParsableCommand { log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") } - let startTime = Date() - let state = StreamState() - let fmt = format - var outputFile: FileHandle? = nil + setupSignalHandler() - if let outputPath = output { - FileManager.default.createFile(atPath: outputPath, contents: nil) - outputFile = FileHandle(forWritingAtPath: outputPath) + inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in + nonisolated(unsafe) let buf = buffer + do { try engine.appendAudio(buf) } catch {} + } + + try audioEngine.start() + log("Listening (English, low latency)... press Ctrl+C to stop") + + // Processing loop — drives the engine to process buffered audio + while true { + try await engine.processBufferedAudio() + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + } + } + + /// Feed a pre-recorded audio file into the streaming engine. + /// Reads the whole file via FluidAudio's AudioConverter (16kHz mono Float32), + /// chunks it to mimic the live mic tap, and exits cleanly when done. + private func runNemotronFromFile(engine: any StreamingAsrEngine, path: String) async throws { + let url = URL(fileURLWithPath: path) + guard FileManager.default.fileExists(atPath: url.path) else { + throw ValidationError("Audio file not found: \(path)") } + if verbose { log("Loading audio file: \(path)") } + let converter = AudioConverter() + let samples = try converter.resampleAudioFile(url) + let duration = Double(samples.count) / 16000.0 + if verbose { + log(String(format: "Loaded %.0fs (%.1f min), %d samples", duration, duration / 60, samples.count)) + } + log("Streaming from file (no microphone)...") + + // Build a 16kHz mono Float32 format to match what the engine expects. + guard let audioFormat = AVAudioFormat( + commonFormat: .pcmFormatFloat32, + sampleRate: 16000, + channels: 1, + interleaved: false + ) else { + throw ValidationError("Failed to create audio format") + } + + // Match the mic tap chunk size for parity with live behavior. + let chunkSize = 4096 + var offset = 0 + while offset < samples.count { + let end = min(offset + chunkSize, samples.count) + let frameCount = AVAudioFrameCount(end - offset) + guard let buffer = AVAudioPCMBuffer(pcmFormat: audioFormat, frameCapacity: frameCount) else { + throw ValidationError("Failed to allocate audio buffer") + } + buffer.frameLength = frameCount + if let channelData = buffer.floatChannelData { + samples.withUnsafeBufferPointer { src in + memcpy(channelData[0], src.baseAddress!.advanced(by: offset), Int(frameCount) * MemoryLayout.stride) + } + } + nonisolated(unsafe) let buf = buffer + try await engine.appendAudio(buf) + try await engine.processBufferedAudio() + offset = end + } + + // Critical: flush any remaining buffered audio so the tail of the transcript isn't dropped. + let finalText = try await engine.finish() + if verbose { log("Finalized. Total transcript length: \(finalText.count) chars") } + + // Reset terminal line in case the live preview left an unfinished line on stderr. + FileHandle.standardError.write(Data("\n".utf8)) + } + + /// Set up the partial-transcript callback used by both mic and file modes. + private func setupNemotronCallback(engine: any StreamingAsrEngine, startTime: Date, outputFile: FileHandle?) async { + let state = StreamState() + let fmt = format + // Capture file handle for sendable closure nonisolated(unsafe) let outFile = outputFile @@ -164,27 +250,21 @@ struct Stream: AsyncParsableCommand { } } } + } - setupSignalHandler() - - inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in - nonisolated(unsafe) let buf = buffer - do { try engine.appendAudio(buf) } catch {} - } - - try audioEngine.start() - log("Listening (English, low latency)... press Ctrl+C to stop") - - // Processing loop — drives the engine to process buffered audio - while true { - try await engine.processBufferedAudio() - try await Task.sleep(nanoseconds: 50_000_000) // 50ms - } + private func openOutputFile() -> FileHandle? { + guard let outputPath = output else { return nil } + FileManager.default.createFile(atPath: outputPath, contents: nil) + return FileHandle(forWritingAtPath: outputPath) } // MARK: - SlidingWindow Engine (Multilingual, default) private func runSlidingWindow() async throws { + if audioFile != nil { + throw ValidationError("--audio-file is not yet supported with the multilingual engine. Use --engine nemotron for now.") + } + log("Initializing streaming ASR (Parakeet TDT v3, multilingual)...") log("Downloading model if needed (first run only, ~600MB)...") diff --git a/experiments/eval/eval.py b/experiments/eval/eval.py index 1ce2a98..0649bef 100644 --- a/experiments/eval/eval.py +++ b/experiments/eval/eval.py @@ -235,24 +235,28 @@ def load_dipco(session_ids: list[str]) -> list[dict]: # ========== Core eval functions ========== def run_scribe(wav_path: Path, model: str = "large-v3-turbo", - scribe_bin: str = "scribe") -> tuple[str, float]: - """Run scribe transcribe on an audio file. Returns (transcript, elapsed_secs).""" - # Build command — v0.2+ doesn't have --model flag (uses Parakeet by default) + scribe_bin: str = "scribe", mode: str = "batch") -> tuple[str, float]: + """Run scribe on an audio file. Returns (transcript, elapsed_secs). + + mode="batch": calls `scribe transcribe ...` (offline, full audio at once) + mode="streaming": calls `scribe stream --audio-file ...` (chunked through streaming engine) + """ + if mode == "streaming": + return _run_scribe_streaming(wav_path, scribe_bin) + return _run_scribe_batch(wav_path, model, scribe_bin) + + +def _run_scribe_batch(wav_path: Path, model: str, scribe_bin: str) -> tuple[str, float]: cmd = [scribe_bin, "transcribe", str(wav_path), "--format", "txt"] - # Check if scribe supports --model flag (v0.1.x only) + # Check if scribe supports --model flag (v0.1.x only — v0.2+ uses Parakeet by default) version_result = subprocess.run([scribe_bin, "--version"], capture_output=True, text=True) version = version_result.stdout.strip() if version.startswith("0.1"): cmd.extend(["--model", model]) start = time.monotonic() - result = subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=1800, - ) + result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800) elapsed = time.monotonic() - start if result.returncode != 0: @@ -263,13 +267,48 @@ def run_scribe(wav_path: Path, model: str = "large-v3-turbo", text_parts = [] for line in lines: cleaned = re.sub(r"^\[[\d:]+\]\s*(?:(?:Speaker\s+\d+|Unknown):\s*)?", "", line) - cleaned = re.sub(r"\*([^*]+)\*", r"\1", cleaned) # Remove italic markers + cleaned = re.sub(r"\*([^*]+)\*", r"\1", cleaned) cleaned = cleaned.strip() if cleaned and cleaned != "-": text_parts.append(cleaned) - transcript = " ".join(text_parts) - return transcript, elapsed + return " ".join(text_parts), elapsed + + +def _run_scribe_streaming(wav_path: Path, scribe_bin: str) -> tuple[str, float]: + """Feed an audio file through `scribe stream --audio-file` and parse JSONL output. + + Each JSONL line is `{"text": "...", "time": 1.2}` representing a confirmed delta. + Concatenate the `text` fields in order to build the full hypothesis. + """ + cmd = [ + scribe_bin, "stream", + "--audio-file", str(wav_path), + "--engine", "nemotron", + "--format", "jsonl", + ] + + start = time.monotonic() + result = subprocess.run(cmd, capture_output=True, text=True, timeout=1800) + elapsed = time.monotonic() - start + + if result.returncode != 0: + raise RuntimeError(f"scribe stream failed on {wav_path}: {result.stderr}") + + text_parts = [] + for line in result.stdout.strip().split("\n"): + line = line.strip() + if not line: + continue + try: + obj = json.loads(line) + except json.JSONDecodeError: + continue + text = obj.get("text", "").strip() + if text: + text_parts.append(text) + + return " ".join(text_parts), elapsed def compute_wer(reference: str, hypothesis: str) -> dict: @@ -298,7 +337,7 @@ def get_scribe_version(scribe_bin: str = "scribe") -> str: def run_eval(entries: list[dict], model: str, scribe_version: str, - scribe_bin: str = "scribe") -> list[dict]: + scribe_bin: str = "scribe", mode: str = "batch") -> list[dict]: """Run evaluation on all entries.""" results = [] @@ -308,17 +347,21 @@ def run_eval(entries: list[dict], model: str, scribe_version: str, dur_min = entry["duration_secs"] / 60 print(f"\n--- {fid}: {label} ({dur_min:.1f}m, {entry['speakers']} speakers) ---") - print(f" Transcribing with {model}...") + if mode == "streaming": + print(f" Streaming via scribe stream --audio-file (engine: nemotron)...") + else: + print(f" Transcribing with {model}...") try: - hypothesis, elapsed = run_scribe(entry["wav_path"], model=model, scribe_bin=scribe_bin) + hypothesis, elapsed = run_scribe(entry["wav_path"], model=model, scribe_bin=scribe_bin, mode=mode) except Exception as e: print(f" ERROR: {e}") results.append({ "file_id": fid, "dataset": entry["dataset"], "label": label, - "model": model, + "model": "nemotron-streaming" if mode == "streaming" else model, + "mode": mode, "duration_secs": round(entry["duration_secs"], 1), "speakers": entry["speakers"], "scribe_version": scribe_version, @@ -361,7 +404,8 @@ def run_eval(entries: list[dict], model: str, scribe_version: str, "file_id": fid, "dataset": entry["dataset"], "label": label, - "model": model, + "model": "nemotron-streaming" if mode == "streaming" else model, + "mode": mode, "duration_secs": round(entry["duration_secs"], 1), "speakers": entry["speakers"], "scribe_version": scribe_version, @@ -391,7 +435,7 @@ def write_csv(results: list[dict], output_path: Path): return fieldnames = [ - "file_id", "dataset", "label", "model", + "file_id", "dataset", "label", "model", "mode", "duration_secs", "speakers", "scribe_version", "processing_secs", "rtf", "wer", "mer", "wil", @@ -460,10 +504,13 @@ def main(): parser.add_argument("--download-only", action="store_true") parser.add_argument("--output", type=str, help="Output CSV path") parser.add_argument("--scribe-bin", default="scribe", help="Path to scribe binary") + parser.add_argument("--mode", choices=["batch", "streaming"], default="batch", + help="Eval mode: 'batch' (scribe transcribe) or 'streaming' (scribe stream --audio-file)") args = parser.parse_args() scribe_version = get_scribe_version(args.scribe_bin) print(f"scribe version: {scribe_version} ({args.scribe_bin})") + print(f"Mode: {args.mode}") print(f"Model: {args.model}") print(f"Dataset: {args.dataset}") @@ -481,8 +528,11 @@ def main(): entries.extend(load_tedlium(file_ids)) if args.dataset in ("mls-spanish", "all"): - print("\n--- Loading MLS Spanish ---") - entries.extend(load_mls_spanish(args.max_files)) + if args.mode == "streaming": + print("\n--- Skipping MLS Spanish (Nemotron streaming engine is English-only) ---") + else: + print("\n--- Loading MLS Spanish ---") + entries.extend(load_mls_spanish(args.max_files)) if args.dataset == "dipco": print("\n--- Loading DiPCo ---") @@ -499,12 +549,15 @@ def main(): # Run eval print(f"\n--- Running evaluation ({len(entries)} files) ---") - results = run_eval(entries, args.model, scribe_version, scribe_bin=args.scribe_bin) + results = run_eval(entries, args.model, scribe_version, scribe_bin=args.scribe_bin, mode=args.mode) # Write results - output_path = Path(args.output) if args.output else ( - RESULTS_DIR / f"scribe-{scribe_version}-{args.model}-{args.dataset}.csv" - ) + if args.output: + output_path = Path(args.output) + elif args.mode == "streaming": + output_path = RESULTS_DIR / f"scribe-{scribe_version}-streaming-{args.dataset}.csv" + else: + output_path = RESULTS_DIR / f"scribe-{scribe_version}-{args.model}-{args.dataset}.csv" write_csv(results, output_path) print_summary(results) diff --git a/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv b/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv new file mode 100644 index 0000000..fd917de --- /dev/null +++ b/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv @@ -0,0 +1,6 @@ +file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error +4386541,earnings21,Cumulus Media Inc,nemotron-streaming,streaming,1097.1,5,0.2.1,144.3,0.1315,0.4287,0.3943,0.5134,395,532,237,2715,2420,30,18,0.6, +4387332,earnings21,Zagg Inc,nemotron-streaming,streaming,1310.2,6,0.2.1,177.2,0.1353,0.4185,0.3983,0.4936,422,1038,201,3969,3132,36,23,0.6389, +4392809,earnings21,Lands End Inc,nemotron-streaming,streaming,1438.0,6,0.2.1,174.1,0.121,0.3605,0.3323,0.4382,502,608,341,4025,3758,30,20,0.6667, +4384683,earnings21,One Gas Inc,nemotron-streaming,streaming,1476.8,8,0.2.1,198.5,0.1344,0.3984,0.3666,0.4688,421,700,313,3599,3212,40,22,0.55, +4367318,earnings21,AcelRX Pharmaceuticals Inc,nemotron-streaming,streaming,1720.8,7,0.2.1,229.7,0.1335,0.3995,0.3674,0.4808,588,762,377,4323,3938,42,21,0.5, diff --git a/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv b/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv new file mode 100644 index 0000000..8b581d1 --- /dev/null +++ b/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv @@ -0,0 +1,6 @@ +file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error +talk_0,tedlium,TED talk talk_0,nemotron-streaming,streaming,1249.0,1,0.2.1,164.9,0.132,0.2751,0.259,0.3449,279,338,180,2897,2739,0,0,0.0, +talk_1,tedlium,TED talk talk_1,nemotron-streaming,streaming,1505.8,1,0.2.1,205.1,0.1362,0.26,0.2492,0.3221,341,599,189,4342,3932,0,0,0.0, +talk_2,tedlium,TED talk talk_2,nemotron-streaming,streaming,834.0,1,0.2.1,116.5,0.1397,0.1997,0.1884,0.269,220,112,144,2383,2415,0,0,0.0, +talk_3,tedlium,TED talk talk_3,nemotron-streaming,streaming,1095.5,1,0.2.1,156.6,0.1429,0.2129,0.2008,0.2792,267,193,182,3015,3004,0,0,0.0, +talk_4,tedlium,TED talk talk_4,nemotron-streaming,streaming,459.1,1,0.2.1,58.7,0.1278,0.157,0.1506,0.205,88,85,64,1510,1489,0,0,0.0, diff --git a/experiments/eval/summarize.py b/experiments/eval/summarize.py index d915d88..8dc6031 100644 --- a/experiments/eval/summarize.py +++ b/experiments/eval/summarize.py @@ -121,25 +121,33 @@ def print_markdown(results: list[dict]): row += " — |" print(row) - # --- Proper noun recall section (Earnings-21 only) --- + # --- Proper noun recall section (Earnings-21 only, split by mode) --- pn_results = [r for r in results if r["dataset"] == "earnings21" and int(r.get("proper_noun_total", 0)) > 0] if pn_results: print("\n## Proper Noun Recall (Earnings-21)\n") print("WER averages all words equally. Proper noun recall measures how many entity-tagged ") print("phrases (PERSON, ORG, GPE, PRODUCT, LAW, WORK_OF_ART, FAC) appear in the hypothesis.\n") - print("| File | Company | WER | Proper Noun Recall |") - print("|---|---|---|---|") - total_pn = found_pn = 0 - for r in sorted(pn_results, key=lambda x: x["file_id"]): - wer = float(r["wer"]) - pn_total = int(r["proper_noun_total"]) - pn_found = int(r["proper_noun_found"]) - pn_recall = float(r["proper_noun_recall"]) - print(f"| {r['file_id']} | {r['label']} | {wer:.1%} | {pn_recall:.1%} ({pn_found}/{pn_total}) |") - total_pn += pn_total - found_pn += pn_found - agg_recall = found_pn / total_pn if total_pn else 0 - print(f"\n**Aggregate**: {agg_recall:.1%} ({found_pn}/{total_pn} entities)") + + modes = sorted(set(r.get("mode", "batch") for r in pn_results)) + for mode in modes: + mode_results = [r for r in pn_results if r.get("mode", "batch") == mode] + if not mode_results: + continue + label_model = mode_results[0]["model"] + print(f"\n### {mode} ({label_model})\n") + print("| File | Company | WER | Proper Noun Recall |") + print("|---|---|---|---|") + total_pn = found_pn = 0 + for r in sorted(mode_results, key=lambda x: x["file_id"]): + wer = float(r["wer"]) + pn_total = int(r["proper_noun_total"]) + pn_found = int(r["proper_noun_found"]) + pn_recall = float(r["proper_noun_recall"]) + print(f"| {r['file_id']} | {r['label']} | {wer:.1%} | {pn_recall:.1%} ({pn_found}/{pn_total}) |") + total_pn += pn_total + found_pn += pn_found + agg_recall = found_pn / total_pn if total_pn else 0 + print(f"\n**Aggregate**: {agg_recall:.1%} ({found_pn}/{total_pn} entities)") # --- Table 2: Per-file detail for primary model --- primary_model = "large-v3-turbo" From 4bcf7e8fac13166192d36eb3784e940918b854cd Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Thu, 9 Apr 2026 21:12:50 +0100 Subject: [PATCH 09/13] Fix streaming content loss: use finish() result + 1120ms variant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two bugs were causing streaming to drop ~30% of content vs batch: Bug 1: Discarded finish()'s authoritative return value ----------------------------------------------------- The previous code accumulated text via the per-chunk callback only and just logged finalText.count after engine.finish(). Two failure modes: (a) finish() pads any trailing partial chunk and decodes additional tokens, but those never reach the callback path. (b) The callback dispatched its emission via Task { } — those tasks could still be pending when the program exited, losing output. Fix: in file mode, do NOT set up the per-chunk callback. Use engine.finish() as the source of truth and emit it once via a new emitFinalTranscript() helper. The callback pattern is kept only for mic mode where live preview is the actual feature. This alone reduced WER from 23.2% → 13.2% on TED-LIUM. Bug 2: Used the .nemotron560ms variant --------------------------------------- NemotronChunkSize.swift comments claim "560ms - same accuracy" as 1120ms, but empirically 560ms is much worse: - 560ms on TED talk_0: 15.3% WER, 363 deletions - 1120ms on TED talk_0: 7.5% WER, 133 deletions The published FluidAudio benchmark (Documentation/Benchmarks.md) only covers 1120ms (2.51% on LibriSpeech), and the 560ms variant has no published number. The "same accuracy" claim isn't supported by data. Fix: switch to .nemotron1120ms. The latency cost (1.12s vs 0.56s) is acceptable for live use and gives a meaningful accuracy boost. Also matched the official NemotronTranscribe.swift CLI pattern: feed the whole file as one AVAudioPCMBuffer instead of chunking into 4096 samples. (No accuracy difference, but matches the reference pattern.) Final results (scribe v0.2.1, Nemotron 1120ms streaming vs Parakeet TDT v3 batch): | Dataset | Batch | Streaming | Delta | |------------|------:|----------:|------:| | TED-LIUM | 6.0% | 7.2% | +1.2pp | | Earnings-21| 12.9% | 18.5% | +5.6pp | | PN recall | 70.8% | 65.7% | -5.1pp | Streaming is now within striking distance of batch quality. Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 96 ++++++++++++------- .../scribe-0.2.1-streaming-earnings21.csv | 10 +- .../scribe-0.2.1-streaming-tedlium.csv | 10 +- 3 files changed, 74 insertions(+), 42 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 791374e..fc7f422 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -70,10 +70,10 @@ struct Stream: AsyncParsableCommand { // MARK: - Nemotron Engine (English-only, low latency) private func runNemotron() async throws { - log("Initializing streaming ASR (Nemotron 560ms, English-only)...") + log("Initializing streaming ASR (Nemotron 1120ms, English-only)...") log("Downloading model if needed (first run only, ~600MB)...") - let engine = StreamingAsrEngineFactory.create(.nemotron560ms) + let engine = StreamingAsrEngineFactory.create(.nemotron1120ms) do { try await engine.loadModels() @@ -85,7 +85,7 @@ struct Stream: AsyncParsableCommand { .appendingPathComponent("FluidAudio/Models/nemotron-streaming") try? FileManager.default.removeItem(at: cacheDir) - let freshEngine = StreamingAsrEngineFactory.create(.nemotron560ms) + let freshEngine = StreamingAsrEngineFactory.create(.nemotron1120ms) try await freshEngine.loadModels() log("Retry successful.") try await runNemotronWithEngine(freshEngine) @@ -100,12 +100,15 @@ struct Stream: AsyncParsableCommand { let startTime = Date() let outputFile = openOutputFile() - // Wire up the partial-transcript callback (delta emission to stdout/stderr/file). - await setupNemotronCallback(engine: engine, startTime: startTime, outputFile: outputFile) - if let path = audioFile { - try await runNemotronFromFile(engine: engine, path: path) + // File mode: do NOT set up the per-chunk callback. Use finish() as the + // authoritative transcript source. The callback path drops content because + // (a) finish() may decode tokens after the last chunk callback, and + // (b) Task { } dispatched from the callback may not run before exit. + try await runNemotronFromFile(engine: engine, path: path, startTime: startTime, outputFile: outputFile) } else { + // Mic mode: live preview via per-chunk callback (acceptable streaming UX). + await setupNemotronCallback(engine: engine, startTime: startTime, outputFile: outputFile) try await runNemotronFromMic(engine: engine) } } @@ -138,8 +141,9 @@ struct Stream: AsyncParsableCommand { /// Feed a pre-recorded audio file into the streaming engine. /// Reads the whole file via FluidAudio's AudioConverter (16kHz mono Float32), - /// chunks it to mimic the live mic tap, and exits cleanly when done. - private func runNemotronFromFile(engine: any StreamingAsrEngine, path: String) async throws { + /// chunks it to mimic the live mic tap, calls finish() for the authoritative + /// transcript, and emits it once. No per-chunk callback (see runNemotronWithEngine). + private func runNemotronFromFile(engine: any StreamingAsrEngine, path: String, startTime: Date, outputFile: FileHandle?) async throws { let url = URL(fileURLWithPath: path) guard FileManager.default.fileExists(atPath: url.path) else { throw ValidationError("Audio file not found: \(path)") @@ -154,7 +158,10 @@ struct Stream: AsyncParsableCommand { } log("Streaming from file (no microphone)...") - // Build a 16kHz mono Float32 format to match what the engine expects. + // Match the official FluidAudio reference pattern (NemotronTranscribe.swift): + // load the WHOLE file into a single AVAudioPCMBuffer and feed it as one + // appendAudio call. The engine handles internal chunking. This avoids any + // boundary issues from feeding the engine many tiny buffers in a loop. guard let audioFormat = AVAudioFormat( commonFormat: .pcmFormatFloat32, sampleRate: 16000, @@ -164,33 +171,58 @@ struct Stream: AsyncParsableCommand { throw ValidationError("Failed to create audio format") } - // Match the mic tap chunk size for parity with live behavior. - let chunkSize = 4096 - var offset = 0 - while offset < samples.count { - let end = min(offset + chunkSize, samples.count) - let frameCount = AVAudioFrameCount(end - offset) - guard let buffer = AVAudioPCMBuffer(pcmFormat: audioFormat, frameCapacity: frameCount) else { - throw ValidationError("Failed to allocate audio buffer") - } - buffer.frameLength = frameCount - if let channelData = buffer.floatChannelData { - samples.withUnsafeBufferPointer { src in - memcpy(channelData[0], src.baseAddress!.advanced(by: offset), Int(frameCount) * MemoryLayout.stride) - } + let frameCount = AVAudioFrameCount(samples.count) + guard let buffer = AVAudioPCMBuffer(pcmFormat: audioFormat, frameCapacity: frameCount) else { + throw ValidationError("Failed to allocate audio buffer") + } + buffer.frameLength = frameCount + if let channelData = buffer.floatChannelData { + samples.withUnsafeBufferPointer { src in + memcpy(channelData[0], src.baseAddress!, samples.count * MemoryLayout.stride) } - nonisolated(unsafe) let buf = buffer - try await engine.appendAudio(buf) - try await engine.processBufferedAudio() - offset = end } + nonisolated(unsafe) let buf = buffer + try await engine.appendAudio(buf) + try await engine.processBufferedAudio() - // Critical: flush any remaining buffered audio so the tail of the transcript isn't dropped. + // Authoritative final transcript: finish() pads any trailing partial chunk, + // returns the full decoded text, and clears internal state. We use this as + // the source of truth (not the per-chunk callback, which drops content). let finalText = try await engine.finish() - if verbose { log("Finalized. Total transcript length: \(finalText.count) chars") } + if verbose { log(String(format: "Finalized. Final transcript: %d chars", finalText.count)) } + + // Emit the complete transcript as a single result. + emitFinalTranscript(finalText, startTime: startTime, outputFile: outputFile) + } + + /// Emit the final (authoritative) transcript from finish() as one record. + /// In file mode this is the only output that goes to stdout — no per-chunk deltas. + private func emitFinalTranscript(_ text: String, startTime: Date, outputFile: FileHandle?) { + let trimmed = text.trimmingCharacters(in: .whitespaces) + guard !trimmed.isEmpty else { return } + + let elapsed = Date().timeIntervalSince(startTime) + let line: String + + switch format { + case .text: + let ts = formatStreamTimestamp(elapsed) + line = "[\(ts)] \(trimmed)" + case .jsonl: + let jsonObj: [String: Any] = [ + "time": round(elapsed * 10) / 10, + "text": trimmed, + ] + guard let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), + let jsonStr = String(data: data, encoding: .utf8) else { + return + } + line = jsonStr + } - // Reset terminal line in case the live preview left an unfinished line on stderr. - FileHandle.standardError.write(Data("\n".utf8)) + print(line) + fflush(stdout) + outputFile?.write(Data((line + "\n").utf8)) } /// Set up the partial-transcript callback used by both mic and file modes. diff --git a/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv b/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv index fd917de..a5bb354 100644 --- a/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv +++ b/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv @@ -1,6 +1,6 @@ file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error -4386541,earnings21,Cumulus Media Inc,nemotron-streaming,streaming,1097.1,5,0.2.1,144.3,0.1315,0.4287,0.3943,0.5134,395,532,237,2715,2420,30,18,0.6, -4387332,earnings21,Zagg Inc,nemotron-streaming,streaming,1310.2,6,0.2.1,177.2,0.1353,0.4185,0.3983,0.4936,422,1038,201,3969,3132,36,23,0.6389, -4392809,earnings21,Lands End Inc,nemotron-streaming,streaming,1438.0,6,0.2.1,174.1,0.121,0.3605,0.3323,0.4382,502,608,341,4025,3758,30,20,0.6667, -4384683,earnings21,One Gas Inc,nemotron-streaming,streaming,1476.8,8,0.2.1,198.5,0.1344,0.3984,0.3666,0.4688,421,700,313,3599,3212,40,22,0.55, -4367318,earnings21,AcelRX Pharmaceuticals Inc,nemotron-streaming,streaming,1720.8,7,0.2.1,229.7,0.1335,0.3995,0.3674,0.4808,588,762,377,4323,3938,42,21,0.5, +4386541,earnings21,Cumulus Media Inc,nemotron-streaming,streaming,1097.1,5,0.2.1,101.4,0.0924,0.2063,0.1987,0.2615,182,274,104,2715,2545,30,22,0.7333, +4387332,earnings21,Zagg Inc,nemotron-streaming,streaming,1310.2,6,0.2.1,135.5,0.1034,0.2273,0.2216,0.2789,238,562,102,3969,3509,36,23,0.6389, +4392809,earnings21,Lands End Inc,nemotron-streaming,streaming,1438.0,6,0.2.1,140.3,0.0975,0.1588,0.1535,0.1988,189,312,138,4025,3851,30,22,0.7333, +4384683,earnings21,One Gas Inc,nemotron-streaming,streaming,1476.8,8,0.2.1,144.1,0.0976,0.1814,0.1739,0.2262,197,300,156,3599,3455,40,26,0.65, +4367318,earnings21,AcelRX Pharmaceuticals Inc,nemotron-streaming,streaming,1720.8,7,0.2.1,129.0,0.075,0.1594,0.1558,0.208,238,353,98,4323,4068,42,24,0.5714, diff --git a/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv b/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv index 8b581d1..3407471 100644 --- a/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv +++ b/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv @@ -1,6 +1,6 @@ file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error -talk_0,tedlium,TED talk talk_0,nemotron-streaming,streaming,1249.0,1,0.2.1,164.9,0.132,0.2751,0.259,0.3449,279,338,180,2897,2739,0,0,0.0, -talk_1,tedlium,TED talk talk_1,nemotron-streaming,streaming,1505.8,1,0.2.1,205.1,0.1362,0.26,0.2492,0.3221,341,599,189,4342,3932,0,0,0.0, -talk_2,tedlium,TED talk talk_2,nemotron-streaming,streaming,834.0,1,0.2.1,116.5,0.1397,0.1997,0.1884,0.269,220,112,144,2383,2415,0,0,0.0, -talk_3,tedlium,TED talk talk_3,nemotron-streaming,streaming,1095.5,1,0.2.1,156.6,0.1429,0.2129,0.2008,0.2792,267,193,182,3015,3004,0,0,0.0, -talk_4,tedlium,TED talk talk_4,nemotron-streaming,streaming,459.1,1,0.2.1,58.7,0.1278,0.157,0.1506,0.205,88,85,64,1510,1489,0,0,0.0, +talk_0,tedlium,TED talk talk_0,nemotron-streaming,streaming,1249.0,1,0.2.1,113.5,0.0909,0.0746,0.0743,0.0986,72,133,11,2897,2775,0,0,0.0, +talk_1,tedlium,TED talk talk_1,nemotron-streaming,streaming,1505.8,1,0.2.1,137.1,0.0911,0.0905,0.0903,0.1215,140,242,11,4342,4111,0,0,0.0, +talk_2,tedlium,TED talk talk_2,nemotron-streaming,streaming,834.0,1,0.2.1,79.8,0.0957,0.0793,0.0788,0.1116,81,93,15,2383,2305,0,0,0.0, +talk_3,tedlium,TED talk talk_3,nemotron-streaming,streaming,1095.5,1,0.2.1,102.0,0.0931,0.0441,0.044,0.0641,62,64,7,3015,2958,0,0,0.0, +talk_4,tedlium,TED talk talk_4,nemotron-streaming,streaming,459.1,1,0.2.1,55.3,0.1203,0.0609,0.0606,0.0776,26,58,8,1510,1460,0,0,0.0, From ac2fc012ec4ce80efc3fbadabf6506252bd2f4d1 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Thu, 9 Apr 2026 21:33:01 +0100 Subject: [PATCH 10/13] =?UTF-8?q?Re-run=20batch=20eval=20baseline=20?= =?UTF-8?q?=E2=80=94=20verify=20no=20regression=20after=20Stream.swift=20c?= =?UTF-8?q?hanges?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Confirms the streaming work in this branch did not affect the batch path: | Dataset | Before | After | |------------|-------:|-------:| | TED-LIUM | 6.0% | 6.0% | | Earnings-21| 12.9% | 12.9% | | MLS Spanish| 8.5% | 8.5% | Earnings-21 proper noun recall also unchanged at 70.8% (126/178 entities). S/D/I counts byte-identical to the prior baseline; only processing_secs varies (wall-clock noise). CSV schema updated with the new "mode" column (= "batch") and the proper_noun_* columns added in the prior session, so all batch + streaming CSVs now share a uniform schema. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../scribe-0.2.1-large-v3-turbo-earnings21.csv | 12 ++++++------ .../scribe-0.2.1-large-v3-turbo-mls-spanish.csv | 8 ++++---- .../results/scribe-0.2.1-large-v3-turbo-tedlium.csv | 12 ++++++------ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/experiments/eval/results/scribe-0.2.1-large-v3-turbo-earnings21.csv b/experiments/eval/results/scribe-0.2.1-large-v3-turbo-earnings21.csv index faf6aed..9c3ce7e 100644 --- a/experiments/eval/results/scribe-0.2.1-large-v3-turbo-earnings21.csv +++ b/experiments/eval/results/scribe-0.2.1-large-v3-turbo-earnings21.csv @@ -1,6 +1,6 @@ -file_id,dataset,label,model,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error -4386541,earnings21,Cumulus Media Inc,large-v3-turbo,1097.1,5,0.2.1,8.1,0.0073,0.1473,0.1404,0.2086,208,59,133,2715,2789,30,22,0.7333, -4387332,earnings21,Zagg Inc,large-v3-turbo,1310.2,6,0.2.1,10.5,0.008,0.127,0.1226,0.1878,287,75,142,3969,4036,36,26,0.7222, -4392809,earnings21,Lands End Inc,large-v3-turbo,1438.0,6,0.2.1,11.0,0.0076,0.1081,0.1052,0.1482,183,143,109,4025,3991,30,24,0.8, -4384683,earnings21,One Gas Inc,large-v3-turbo,1476.8,8,0.2.1,10.6,0.0072,0.1487,0.1419,0.1934,199,166,170,3599,3603,40,29,0.725, -4367318,earnings21,AcelRX Pharmaceuticals Inc,large-v3-turbo,1720.8,7,0.2.1,12.5,0.0073,0.1221,0.1192,0.1727,248,174,106,4323,4255,42,25,0.5952, +file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error +4386541,earnings21,Cumulus Media Inc,large-v3-turbo,batch,1097.1,5,0.2.1,8.2,0.0075,0.1473,0.1404,0.2086,208,59,133,2715,2789,30,22,0.7333, +4387332,earnings21,Zagg Inc,large-v3-turbo,batch,1310.2,6,0.2.1,10.4,0.008,0.127,0.1226,0.1878,287,75,142,3969,4036,36,26,0.7222, +4392809,earnings21,Lands End Inc,large-v3-turbo,batch,1438.0,6,0.2.1,11.0,0.0076,0.1081,0.1052,0.1482,183,143,109,4025,3991,30,24,0.8, +4384683,earnings21,One Gas Inc,large-v3-turbo,batch,1476.8,8,0.2.1,10.7,0.0073,0.1487,0.1419,0.1934,199,166,170,3599,3603,40,29,0.725, +4367318,earnings21,AcelRX Pharmaceuticals Inc,large-v3-turbo,batch,1720.8,7,0.2.1,12.5,0.0073,0.1221,0.1192,0.1727,248,174,106,4323,4255,42,25,0.5952, diff --git a/experiments/eval/results/scribe-0.2.1-large-v3-turbo-mls-spanish.csv b/experiments/eval/results/scribe-0.2.1-large-v3-turbo-mls-spanish.csv index 19ad949..d8aafb7 100644 --- a/experiments/eval/results/scribe-0.2.1-large-v3-turbo-mls-spanish.csv +++ b/experiments/eval/results/scribe-0.2.1-large-v3-turbo-mls-spanish.csv @@ -1,4 +1,4 @@ -file_id,dataset,label,model,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,error -chapter_11991,mls-spanish,MLS Spanish chapter_11991,large-v3-turbo,4019.9,1,0.2.1,49.9,0.0124,0.1489,0.1473,0.2229,786,520,100,9443,9023, -chapter_1378,mls-spanish,MLS Spanish chapter_1378,large-v3-turbo,5640.8,1,0.2.1,70.6,0.0125,0.0499,0.0495,0.0799,461,145,121,14580,14556, -chapter_567,mls-spanish,MLS Spanish chapter_567,large-v3-turbo,5262.6,1,0.2.1,66.1,0.0126,0.0787,0.0777,0.1228,646,230,186,13489,13445, +file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error +chapter_11991,mls-spanish,MLS Spanish chapter_11991,large-v3-turbo,batch,4019.9,1,0.2.1,50.2,0.0125,0.1489,0.1473,0.2229,786,520,100,9443,9023,0,0,0.0, +chapter_1378,mls-spanish,MLS Spanish chapter_1378,large-v3-turbo,batch,5640.8,1,0.2.1,70.6,0.0125,0.0499,0.0495,0.0799,461,145,121,14580,14556,0,0,0.0, +chapter_567,mls-spanish,MLS Spanish chapter_567,large-v3-turbo,batch,5262.6,1,0.2.1,66.7,0.0127,0.0787,0.0777,0.1228,646,230,186,13489,13445,0,0,0.0, diff --git a/experiments/eval/results/scribe-0.2.1-large-v3-turbo-tedlium.csv b/experiments/eval/results/scribe-0.2.1-large-v3-turbo-tedlium.csv index 07f94a6..8c8b894 100644 --- a/experiments/eval/results/scribe-0.2.1-large-v3-turbo-tedlium.csv +++ b/experiments/eval/results/scribe-0.2.1-large-v3-turbo-tedlium.csv @@ -1,6 +1,6 @@ -file_id,dataset,label,model,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,error -talk_0,tedlium,TED talk talk_0,large-v3-turbo,1249.0,1,0.2.1,8.6,0.0068,0.0652,0.0646,0.0889,72,90,27,2897,2834, -talk_1,tedlium,TED talk talk_1,large-v3-turbo,1505.8,1,0.2.1,11.1,0.0074,0.0663,0.065,0.091,117,82,89,4342,4349, -talk_2,tedlium,TED talk talk_2,large-v3-turbo,834.0,1,0.2.1,6.3,0.0076,0.0734,0.073,0.1027,73,89,13,2383,2307, -talk_3,tedlium,TED talk talk_3,large-v3-turbo,1095.5,1,0.2.1,7.8,0.0071,0.0378,0.0376,0.0545,52,42,20,3015,2993, -talk_4,tedlium,TED talk talk_4,large-v3-turbo,459.1,1,0.2.1,3.6,0.0078,0.0563,0.0552,0.0776,35,19,31,1510,1522, +file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error +talk_0,tedlium,TED talk talk_0,large-v3-turbo,batch,1249.0,1,0.2.1,8.7,0.007,0.0652,0.0646,0.0889,72,90,27,2897,2834,0,0,0.0, +talk_1,tedlium,TED talk talk_1,large-v3-turbo,batch,1505.8,1,0.2.1,11.2,0.0074,0.0663,0.065,0.091,117,82,89,4342,4349,0,0,0.0, +talk_2,tedlium,TED talk talk_2,large-v3-turbo,batch,834.0,1,0.2.1,6.3,0.0076,0.0734,0.073,0.1027,73,89,13,2383,2307,0,0,0.0, +talk_3,tedlium,TED talk talk_3,large-v3-turbo,batch,1095.5,1,0.2.1,7.8,0.0071,0.0378,0.0376,0.0545,52,42,20,3015,2993,0,0,0.0, +talk_4,tedlium,TED talk talk_4,large-v3-turbo,batch,459.1,1,0.2.1,3.6,0.0079,0.0563,0.0552,0.0776,35,19,31,1510,1522,0,0,0.0, From 11dc099d5295a04a55543ebf5870f9938d878094 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Thu, 9 Apr 2026 22:12:58 +0100 Subject: [PATCH 11/13] Unify mic and file streaming paths through one AsyncStream pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Both modes now flow through a single drain loop. The only difference is the source adapter — file mode pushes one buffer and finishes the continuation; mic mode keeps yielding from the AVAudioEngine tap until SIGINT finishes the continuation. This eliminates two parallel code paths and a class of mic-only bugs. Architecture (matches FluidAudio's own SlidingWindowAsrManager pattern): source ──► AsyncStream ──► drain loop ├─ engine.appendAudio ├─ engine.processBufferedAudio ├─ engine.getPartialTranscript └─ emit delta via StreamState stream ends (file done OR SIGINT) └────► engine.finish() ──► emit tail Specific bugs fixed in mic mode by sharing the proven file pipeline: 1. No more detached Task { } in the per-chunk callback. The drain loop polls getPartialTranscript() in the main task. No race, no lost emissions at exit. 2. Darwin.exit(0) → DispatchSource.makeSignalSource(SIGINT). The signal handler runs on a regular dispatch queue and calls continuation.finish(). The drain loop falls through to engine.finish() and emits the tail audio that used to be silently dropped. 3. Mic tap pre-resamples to 16kHz mono Float32 via AudioConverter. resampleBuffer() (the same converter the file path uses for whole files), so the engine sees identical input format regardless of source. Code changes: - New: runNemotronDrainLoop, feedNemotronFromFile, startMicSource, emitDelta, emitLivePreview, makeMonoFloat32Buffer, NemotronMicResources - Removed: setupNemotronCallback, runNemotronFromMic, runNemotronFromFile, emitFinalTranscript - runNemotronWithEngine becomes a thin orchestrator that picks the source adapter and runs the drain loop Eval verification (same scribe v0.2.1, same audio, same reference): | Dataset | Streaming WER (before) | Streaming WER (after) | |------------|-----------------------:|----------------------:| | TED-LIUM | 7.2% | 7.2% | | Earnings-21| 18.5% | 18.5% | Per-file S/D/I counts are byte-identical to the previous baseline. The unified pipeline now emits multiple deltas per file (one from the drain poll, one from the finish() tail) instead of one big block, but jiwer's text normalization makes the result equivalent. Out of scope: - runSlidingWindow (multilingual) path stays as-is — still rejects --audio-file, still uses setupSignalHandler. The SlidingWindow engine has its own concurrency model and is a separate refactor. - System audio capture via CATapDescription — plugs into the same pipeline as a third source adapter. Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 287 ++++++++++-------- .../scribe-0.2.1-streaming-earnings21.csv | 10 +- .../scribe-0.2.1-streaming-tedlium.csv | 10 +- 3 files changed, 173 insertions(+), 134 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index fc7f422..74de292 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -96,54 +96,90 @@ struct Stream: AsyncParsableCommand { try await runNemotronWithEngine(engine) } + /// Unified entry point for the Nemotron pipeline. Both file and mic modes + /// flow through the same drain loop — the only difference is the audio source. private func runNemotronWithEngine(_ engine: any StreamingAsrEngine) async throws { let startTime = Date() let outputFile = openOutputFile() + // Shared queue between the audio source (file or mic) and the drain loop. + // AsyncStream.Continuation.yield() is non-async and audio-thread-safe — this is + // FluidAudio's own pattern (see SlidingWindowAsrManager:63-65, 182-184). + let (stream, continuation) = AsyncStream.makeStream() + + // Start the source. Mic mode keeps the AVAudioEngine and DispatchSourceSignal + // alive in `micResources` for the lifetime of the stream. + var micResources: NemotronMicResources? = nil if let path = audioFile { - // File mode: do NOT set up the per-chunk callback. Use finish() as the - // authoritative transcript source. The callback path drops content because - // (a) finish() may decode tokens after the last chunk callback, and - // (b) Task { } dispatched from the callback may not run before exit. - try await runNemotronFromFile(engine: engine, path: path, startTime: startTime, outputFile: outputFile) + try await feedNemotronFromFile(path: path, continuation: continuation) } else { - // Mic mode: live preview via per-chunk callback (acceptable streaming UX). - await setupNemotronCallback(engine: engine, startTime: startTime, outputFile: outputFile) - try await runNemotronFromMic(engine: engine) + micResources = try startMicSource(continuation: continuation) } - } - private func runNemotronFromMic(engine: any StreamingAsrEngine) async throws { - let audioEngine = AVAudioEngine() - let inputNode = audioEngine.inputNode - let inputFormat = inputNode.outputFormat(forBus: 0) - - if verbose { - log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") + // Drain loop runs until the continuation finishes (file end or SIGINT). + try await runNemotronDrainLoop( + engine: engine, + stream: stream, + startTime: startTime, + outputFile: outputFile + ) + + // Cleanup mic resources after the drain loop has flushed via finish(). + if let res = micResources { + res.audioEngine.stop() + res.audioEngine.inputNode.removeTap(onBus: 0) + res.signalSource.cancel() } - setupSignalHandler() + outputFile?.closeFile() + } - inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in + /// Drain the audio queue, feed the engine, poll the partial transcript, and emit + /// deltas. Used for both mic and file modes — the queue is the only difference. + private func runNemotronDrainLoop( + engine: any StreamingAsrEngine, + stream: AsyncStream, + startTime: Date, + outputFile: FileHandle? + ) async throws { + let state = StreamState() + + for await buffer in stream { nonisolated(unsafe) let buf = buffer - do { try engine.appendAudio(buf) } catch {} + try await engine.appendAudio(buf) + try await engine.processBufferedAudio() + + // Live preview to stderr (text format only) — overwritten on each tick. + let current = await engine.getPartialTranscript() + emitLivePreview(current, startTime: startTime) + + // Delta emission to stdout (and output file). + if let newText = await state.getNewText(current) { + emitDelta(newText, startTime: startTime, outputFile: outputFile) + } } - try audioEngine.start() - log("Listening (English, low latency)... press Ctrl+C to stop") + // Stream ended — flush any tail audio via finish() and emit remaining content. + // This is the bug fix from the previous phase: finish() may decode tokens that + // never reached the per-chunk path, so we always run its result through the + // delta logic to capture them. + let finalText = try await engine.finish() + if let newText = await state.getNewText(finalText) { + emitDelta(newText, startTime: startTime, outputFile: outputFile) + } - // Processing loop — drives the engine to process buffered audio - while true { - try await engine.processBufferedAudio() - try await Task.sleep(nanoseconds: 50_000_000) // 50ms + // Newline so the final stderr preview line doesn't run into the next prompt. + if format == .text { + FileHandle.standardError.write(Data("\n".utf8)) } } - /// Feed a pre-recorded audio file into the streaming engine. - /// Reads the whole file via FluidAudio's AudioConverter (16kHz mono Float32), - /// chunks it to mimic the live mic tap, calls finish() for the authoritative - /// transcript, and emits it once. No per-chunk callback (see runNemotronWithEngine). - private func runNemotronFromFile(engine: any StreamingAsrEngine, path: String, startTime: Date, outputFile: FileHandle?) async throws { + /// File source adapter — reads the whole file via AudioConverter (16kHz mono Float32), + /// pushes it as a single buffer, and finishes the continuation so the drain loop exits. + private func feedNemotronFromFile( + path: String, + continuation: AsyncStream.Continuation + ) async throws { let url = URL(fileURLWithPath: path) guard FileManager.default.fileExists(atPath: url.path) else { throw ValidationError("Audio file not found: \(path)") @@ -158,60 +194,88 @@ struct Stream: AsyncParsableCommand { } log("Streaming from file (no microphone)...") - // Match the official FluidAudio reference pattern (NemotronTranscribe.swift): - // load the WHOLE file into a single AVAudioPCMBuffer and feed it as one - // appendAudio call. The engine handles internal chunking. This avoids any - // boundary issues from feeding the engine many tiny buffers in a loop. - guard let audioFormat = AVAudioFormat( - commonFormat: .pcmFormatFloat32, - sampleRate: 16000, - channels: 1, - interleaved: false - ) else { - throw ValidationError("Failed to create audio format") + guard let buffer = makeMonoFloat32Buffer(from: samples) else { + throw ValidationError("Failed to allocate audio buffer") } + continuation.yield(buffer) + continuation.finish() + } - let frameCount = AVAudioFrameCount(samples.count) - guard let buffer = AVAudioPCMBuffer(pcmFormat: audioFormat, frameCapacity: frameCount) else { - throw ValidationError("Failed to allocate audio buffer") + /// Mic source adapter — installs the input tap, pre-resamples each tap buffer to + /// 16kHz mono Float32 (matching the file path), yields it onto the shared stream, + /// and wires SIGINT to a DispatchSourceSignal that finishes the continuation + /// (replacing the brutal Darwin.exit(0) that used to drop tail audio). + private func startMicSource( + continuation: AsyncStream.Continuation + ) throws -> NemotronMicResources { + let audioEngine = AVAudioEngine() + let inputNode = audioEngine.inputNode + let inputFormat = inputNode.outputFormat(forBus: 0) + + if verbose { + log("Microphone: \(inputFormat.sampleRate)Hz, \(inputFormat.channelCount) ch") } - buffer.frameLength = frameCount - if let channelData = buffer.floatChannelData { - samples.withUnsafeBufferPointer { src in - memcpy(channelData[0], src.baseAddress!, samples.count * MemoryLayout.stride) + + // AudioConverter is a final class with only immutable state — safe to call + // resampleBuffer() from the audio render thread. + nonisolated(unsafe) let converter = AudioConverter() + nonisolated(unsafe) let cont = continuation + + inputNode.installTap(onBus: 0, bufferSize: 4096, format: inputFormat) { buffer, _ in + do { + let samples = try converter.resampleBuffer(buffer) + if let outBuffer = makeMonoFloat32Buffer(from: samples) { + cont.yield(outBuffer) + } + } catch { + // Drop the buffer on resample error rather than crash the audio thread. } } - nonisolated(unsafe) let buf = buffer - try await engine.appendAudio(buf) - try await engine.processBufferedAudio() - // Authoritative final transcript: finish() pads any trailing partial chunk, - // returns the full decoded text, and clears internal state. We use this as - // the source of truth (not the per-chunk callback, which drops content). - let finalText = try await engine.finish() - if verbose { log(String(format: "Finalized. Final transcript: %d chars", finalText.count)) } + try audioEngine.start() + log("Listening (English, low latency)... press Ctrl+C to stop") + + // Graceful shutdown via DispatchSourceSignal — replaces Darwin.exit(0). + // The handler runs on a regular dispatch queue (not the signal context), so + // it's safe to call continuation.finish() from here. + signal(SIGINT, SIG_IGN) + let sigSource = DispatchSource.makeSignalSource(signal: SIGINT, queue: .global()) + sigSource.setEventHandler { + FileHandle.standardError.write(Data("\n[scribe] Stopping...\n".utf8)) + cont.finish() + } + sigSource.resume() - // Emit the complete transcript as a single result. - emitFinalTranscript(finalText, startTime: startTime, outputFile: outputFile) + return NemotronMicResources(audioEngine: audioEngine, signalSource: sigSource) } - /// Emit the final (authoritative) transcript from finish() as one record. - /// In file mode this is the only output that goes to stdout — no per-chunk deltas. - private func emitFinalTranscript(_ text: String, startTime: Date, outputFile: FileHandle?) { - let trimmed = text.trimmingCharacters(in: .whitespaces) - guard !trimmed.isEmpty else { return } + // MARK: - Nemotron emission helpers + /// Live preview line on stderr (text format only). Overwrites itself with `\r`. + private func emitLivePreview(_ fullTranscript: String, startTime: Date) { + guard format == .text else { return } + let preview = String(fullTranscript.trimmingCharacters(in: .whitespaces).suffix(100)) + guard !preview.isEmpty else { return } + let elapsed = Date().timeIntervalSince(startTime) + let ts = formatStreamTimestamp(elapsed) + FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) + } + + /// Emit a confirmed delta to stdout (and output file). Format-aware. + private func emitDelta(_ newText: String, startTime: Date, outputFile: FileHandle?) { let elapsed = Date().timeIntervalSince(startTime) let line: String switch format { case .text: + // Clear the live preview line before writing the confirmed line on stdout. + FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) let ts = formatStreamTimestamp(elapsed) - line = "[\(ts)] \(trimmed)" + line = "[\(ts)] \(newText)" case .jsonl: let jsonObj: [String: Any] = [ "time": round(elapsed * 10) / 10, - "text": trimmed, + "text": newText, ] guard let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), let jsonStr = String(data: data, encoding: .utf8) else { @@ -225,65 +289,6 @@ struct Stream: AsyncParsableCommand { outputFile?.write(Data((line + "\n").utf8)) } - /// Set up the partial-transcript callback used by both mic and file modes. - private func setupNemotronCallback(engine: any StreamingAsrEngine, startTime: Date, outputFile: FileHandle?) async { - let state = StreamState() - let fmt = format - - // Capture file handle for sendable closure - nonisolated(unsafe) let outFile = outputFile - - // Partial callback — fires after each 560ms chunk with the full accumulated transcript. - // We diff to find new text and emit only the delta. - await engine.setPartialTranscriptCallback { fullTranscript in - Task { - let elapsed = Date().timeIntervalSince(startTime) - - switch fmt { - case .text: - // Show live preview on stderr (ephemeral, overwritten) - let preview = String(fullTranscript.trimmingCharacters(in: .whitespaces).suffix(100)) - if !preview.isEmpty { - let ts = formatStreamTimestamp(elapsed) - FileHandle.standardError.write(Data("\r\u{1B}[K[\(ts)] \(preview)".utf8)) - } - case .jsonl: - break - } - - // Emit new portion to stdout - if let newText = await state.getNewText(fullTranscript) { - let ts = formatStreamTimestamp(elapsed) - - let line: String - switch fmt { - case .text: - FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) - line = "[\(ts)] \(newText)" - case .jsonl: - let jsonObj: [String: Any] = [ - "time": round(elapsed * 10) / 10, - "text": newText, - ] - if let data = try? JSONSerialization.data(withJSONObject: jsonObj, options: [.sortedKeys]), - let jsonStr = String(data: data, encoding: .utf8) { - line = jsonStr - } else { - return - } - } - - print(line) - fflush(stdout) - - if let file = outFile { - file.write(Data((line + "\n").utf8)) - } - } - } - } - } - private func openOutputFile() -> FileHandle? { guard let outputPath = output else { return nil } FileManager.default.createFile(atPath: outputPath, contents: nil) @@ -419,6 +424,40 @@ private func formatStreamTimestamp(_ seconds: Double) -> String { return String(format: "%02d:%02d", minutes, secs) } +/// Resources held alive by mic mode for the duration of a stream session. +/// The AVAudioEngine and DispatchSourceSignal must outlive the drain loop — +/// dropping them would stop audio capture and cancel the signal handler. +private struct NemotronMicResources { + let audioEngine: AVAudioEngine + let signalSource: any DispatchSourceSignal +} + +/// Wrap a `[Float]` of 16kHz mono samples into a fresh AVAudioPCMBuffer. +/// Used by both the file source (whole-file buffer) and the mic source +/// (per-tap-buffer wrapping after resampling). +private func makeMonoFloat32Buffer(from samples: [Float]) -> AVAudioPCMBuffer? { + guard let format = AVAudioFormat( + commonFormat: .pcmFormatFloat32, + sampleRate: 16000, + channels: 1, + interleaved: false + ) else { + return nil + } + let frameCount = AVAudioFrameCount(samples.count) + guard frameCount > 0, + let buffer = AVAudioPCMBuffer(pcmFormat: format, frameCapacity: frameCount) else { + return nil + } + buffer.frameLength = frameCount + if let channelData = buffer.floatChannelData { + samples.withUnsafeBufferPointer { src in + memcpy(channelData[0], src.baseAddress!, samples.count * MemoryLayout.stride) + } + } + return buffer +} + enum StreamOutputFormat: String, ExpressibleByArgument, CaseIterable, Sendable { case text, jsonl } diff --git a/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv b/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv index a5bb354..da380c5 100644 --- a/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv +++ b/experiments/eval/results/scribe-0.2.1-streaming-earnings21.csv @@ -1,6 +1,6 @@ file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error -4386541,earnings21,Cumulus Media Inc,nemotron-streaming,streaming,1097.1,5,0.2.1,101.4,0.0924,0.2063,0.1987,0.2615,182,274,104,2715,2545,30,22,0.7333, -4387332,earnings21,Zagg Inc,nemotron-streaming,streaming,1310.2,6,0.2.1,135.5,0.1034,0.2273,0.2216,0.2789,238,562,102,3969,3509,36,23,0.6389, -4392809,earnings21,Lands End Inc,nemotron-streaming,streaming,1438.0,6,0.2.1,140.3,0.0975,0.1588,0.1535,0.1988,189,312,138,4025,3851,30,22,0.7333, -4384683,earnings21,One Gas Inc,nemotron-streaming,streaming,1476.8,8,0.2.1,144.1,0.0976,0.1814,0.1739,0.2262,197,300,156,3599,3455,40,26,0.65, -4367318,earnings21,AcelRX Pharmaceuticals Inc,nemotron-streaming,streaming,1720.8,7,0.2.1,129.0,0.075,0.1594,0.1558,0.208,238,353,98,4323,4068,42,24,0.5714, +4386541,earnings21,Cumulus Media Inc,nemotron-streaming,streaming,1097.1,5,0.2.1,102.9,0.0938,0.2063,0.1987,0.2615,182,274,104,2715,2545,30,22,0.7333, +4387332,earnings21,Zagg Inc,nemotron-streaming,streaming,1310.2,6,0.2.1,121.9,0.093,0.2273,0.2216,0.2789,238,562,102,3969,3509,36,23,0.6389, +4392809,earnings21,Lands End Inc,nemotron-streaming,streaming,1438.0,6,0.2.1,130.1,0.0905,0.1588,0.1535,0.1988,189,312,138,4025,3851,30,22,0.7333, +4384683,earnings21,One Gas Inc,nemotron-streaming,streaming,1476.8,8,0.2.1,137.5,0.0931,0.1814,0.1739,0.2262,197,300,156,3599,3455,40,26,0.65, +4367318,earnings21,AcelRX Pharmaceuticals Inc,nemotron-streaming,streaming,1720.8,7,0.2.1,167.1,0.0971,0.1594,0.1558,0.208,238,353,98,4323,4068,42,24,0.5714, diff --git a/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv b/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv index 3407471..1e4c22c 100644 --- a/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv +++ b/experiments/eval/results/scribe-0.2.1-streaming-tedlium.csv @@ -1,6 +1,6 @@ file_id,dataset,label,model,mode,duration_secs,speakers,scribe_version,processing_secs,rtf,wer,mer,wil,substitutions,deletions,insertions,ref_words,hyp_words,proper_noun_total,proper_noun_found,proper_noun_recall,error -talk_0,tedlium,TED talk talk_0,nemotron-streaming,streaming,1249.0,1,0.2.1,113.5,0.0909,0.0746,0.0743,0.0986,72,133,11,2897,2775,0,0,0.0, -talk_1,tedlium,TED talk talk_1,nemotron-streaming,streaming,1505.8,1,0.2.1,137.1,0.0911,0.0905,0.0903,0.1215,140,242,11,4342,4111,0,0,0.0, -talk_2,tedlium,TED talk talk_2,nemotron-streaming,streaming,834.0,1,0.2.1,79.8,0.0957,0.0793,0.0788,0.1116,81,93,15,2383,2305,0,0,0.0, -talk_3,tedlium,TED talk talk_3,nemotron-streaming,streaming,1095.5,1,0.2.1,102.0,0.0931,0.0441,0.044,0.0641,62,64,7,3015,2958,0,0,0.0, -talk_4,tedlium,TED talk talk_4,nemotron-streaming,streaming,459.1,1,0.2.1,55.3,0.1203,0.0609,0.0606,0.0776,26,58,8,1510,1460,0,0,0.0, +talk_0,tedlium,TED talk talk_0,nemotron-streaming,streaming,1249.0,1,0.2.1,106.7,0.0854,0.0746,0.0743,0.0986,72,133,11,2897,2775,0,0,0.0, +talk_1,tedlium,TED talk talk_1,nemotron-streaming,streaming,1505.8,1,0.2.1,139.3,0.0925,0.0905,0.0903,0.1215,140,242,11,4342,4111,0,0,0.0, +talk_2,tedlium,TED talk talk_2,nemotron-streaming,streaming,834.0,1,0.2.1,75.3,0.0903,0.0793,0.0788,0.1116,81,93,15,2383,2305,0,0,0.0, +talk_3,tedlium,TED talk talk_3,nemotron-streaming,streaming,1095.5,1,0.2.1,101.8,0.0929,0.0441,0.044,0.0641,62,64,7,3015,2958,0,0,0.0, +talk_4,tedlium,TED talk talk_4,nemotron-streaming,streaming,459.1,1,0.2.1,44.3,0.0965,0.0609,0.0606,0.0776,26,58,8,1510,1460,0,0,0.0, From a7c3967f116dfb67e279668ffed0b5a0fa0c5571 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Fri, 10 Apr 2026 12:09:50 +0100 Subject: [PATCH 12/13] Warm up CoreML engine before starting mic capture The first processBufferedAudio() call triggers CoreML model compilation (~10-20s). Without warmup, this happens on the first real audio chunk, causing all live output to batch up and appear at once when speech pauses or Ctrl+C is pressed. Fix: feed one silent chunk (17920 samples = 1120ms) through the engine during startup, before the mic tap begins. CoreML compiles during the "Warming up..." log message. Then reset() to discard the silence tokens so they don't pollute the real transcript. File-mode eval unchanged: talk_0 still 7.5% WER, S/D/I 72/133/11. The reset() correctly clears warmup state. Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 29 ++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 74de292..1f23329 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -87,15 +87,40 @@ struct Stream: AsyncParsableCommand { let freshEngine = StreamingAsrEngineFactory.create(.nemotron1120ms) try await freshEngine.loadModels() - log("Retry successful.") + log("Retry successful. Warming up...") + try await warmUpEngine(freshEngine) + log("Ready.") try await runNemotronWithEngine(freshEngine) return } - log("Models loaded.") + log("Models loaded. Warming up...") + try await warmUpEngine(engine) + log("Ready.") try await runNemotronWithEngine(engine) } + /// Force CoreML model compilation by running one silent chunk through the engine. + /// Without this, the first real audio chunk blocks for 10-20s while CoreML compiles + /// the neural network, causing all live output to batch up and appear at once. + private func warmUpEngine(_ engine: any StreamingAsrEngine) async throws { + guard let fmt = AVAudioFormat( + commonFormat: .pcmFormatFloat32, sampleRate: 16000, channels: 1, interleaved: false + ) else { return } + + // One complete chunk of silence (1120ms = 17920 samples at 16kHz). + let chunkSamples: AVAudioFrameCount = 17920 + guard let silence = AVAudioPCMBuffer(pcmFormat: fmt, frameCapacity: chunkSamples) else { return } + silence.frameLength = chunkSamples + // Buffer is zero-initialized by default — silence. + + nonisolated(unsafe) let buf = silence + try await engine.appendAudio(buf) + try await engine.processBufferedAudio() + // Discard the silence tokens so they don't pollute the real transcript. + try await engine.reset() + } + /// Unified entry point for the Nemotron pipeline. Both file and mic modes /// flow through the same drain loop — the only difference is the audio source. private func runNemotronWithEngine(_ engine: any StreamingAsrEngine) async throws { From 40903c642b0e81a236f226ffa81f4cdf0a9b0603 Mon Sep 17 00:00:00 2001 From: Javier Toledo Date: Fri, 10 Apr 2026 18:34:02 +0100 Subject: [PATCH 13/13] Flowing text output + save audio for batch re-transcription Two UX improvements to scribe stream: 1. Text mode now flows naturally without timestamps: Before: [00:18] . Can you help me [00:19] with something After: Can you help me with something? I just need to say something. Tokens append inline; newlines on sentence boundaries (. ? !). JSONL mode unchanged (keeps timestamps for machine consumption). 2. New --save-audio flag records the mic audio to a WAV file. After the stream ends (Ctrl+C), the saved audio is automatically re-transcribed with the batch engine (Parakeet TDT v3) which is significantly more accurate (6% vs 7.2% on TED, 13% vs 18.5% on Earnings-21). Users get real-time assistance during the meeting AND a polished transcript at the end. Usage: scribe stream --engine nemotron --save-audio meeting.wav Co-Authored-By: Claude Opus 4.6 (1M context) --- Sources/scribe/Commands/Stream.swift | 105 ++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 12 deletions(-) diff --git a/Sources/scribe/Commands/Stream.swift b/Sources/scribe/Commands/Stream.swift index 1f23329..705dbb2 100644 --- a/Sources/scribe/Commands/Stream.swift +++ b/Sources/scribe/Commands/Stream.swift @@ -49,9 +49,12 @@ struct Stream: AsyncParsableCommand { @Option(name: .long, help: "Streaming engine: default (multilingual, higher latency) or nemotron (English-only, low latency ~560ms).") var engine: StreamEngine = .default - @Option(name: .shortAndLong, help: "Also save output to file.") + @Option(name: .shortAndLong, help: "Save transcript to file.") var output: String? + @Option(name: .long, help: "Save captured audio to WAV file. After streaming ends, the saved audio is automatically re-transcribed with the batch engine for higher accuracy.") + var saveAudio: String? + @Flag(name: .long, help: "Show status information.") var verbose: Bool = false @@ -127,6 +130,9 @@ struct Stream: AsyncParsableCommand { let startTime = Date() let outputFile = openOutputFile() + // Open audio recording file if --save-audio was passed. + let audioWriter = try openAudioWriter() + // Shared queue between the audio source (file or mic) and the drain loop. // AsyncStream.Continuation.yield() is non-async and audio-thread-safe — this is // FluidAudio's own pattern (see SlidingWindowAsrManager:63-65, 182-184). @@ -146,7 +152,8 @@ struct Stream: AsyncParsableCommand { engine: engine, stream: stream, startTime: startTime, - outputFile: outputFile + outputFile: outputFile, + audioWriter: audioWriter ) // Cleanup mic resources after the drain loop has flushed via finish(). @@ -157,6 +164,13 @@ struct Stream: AsyncParsableCommand { } outputFile?.closeFile() + + // If we recorded audio, run batch transcription for a polished final transcript. + if let audioPath = saveAudio, audioWriter != nil { + log("Saved audio to: \(audioPath)") + log("Running batch transcription for polished output...") + try await runBatchTranscription(audioPath: audioPath, outputFile: outputFile) + } } /// Drain the audio queue, feed the engine, poll the partial transcript, and emit @@ -165,11 +179,17 @@ struct Stream: AsyncParsableCommand { engine: any StreamingAsrEngine, stream: AsyncStream, startTime: Date, - outputFile: FileHandle? + outputFile: FileHandle?, + audioWriter: AVAudioFile? = nil ) async throws { let state = StreamState() for await buffer in stream { + // Save audio for later batch processing if --save-audio was passed. + if let writer = audioWriter { + try? writer.write(from: buffer) + } + nonisolated(unsafe) let buf = buffer try await engine.appendAudio(buf) try await engine.processBufferedAudio() @@ -287,16 +307,36 @@ struct Stream: AsyncParsableCommand { } /// Emit a confirmed delta to stdout (and output file). Format-aware. + /// + /// Text mode: tokens flow naturally without timestamps. A newline is inserted + /// after sentence-ending punctuation (. ? !) so output reads as prose: + /// "Hello computer. Can you help me with something?\n" + /// + /// JSONL mode: one JSON object per delta, with timestamp (for machine consumption). private func emitDelta(_ newText: String, startTime: Date, outputFile: FileHandle?) { let elapsed = Date().timeIntervalSince(startTime) - let line: String switch format { case .text: - // Clear the live preview line before writing the confirmed line on stdout. + // Clear the live preview line before writing confirmed text. FileHandle.standardError.write(Data("\r\u{1B}[K".utf8)) - let ts = formatStreamTimestamp(elapsed) - line = "[\(ts)] \(newText)" + + // Stream text naturally: append tokens, newline on sentence boundaries. + var textToWrite = newText + // Insert newline after sentence-ending punctuation followed by a space or end. + textToWrite = textToWrite.replacingOccurrences(of: ". ", with: ".\n") + textToWrite = textToWrite.replacingOccurrences(of: "? ", with: "?\n") + textToWrite = textToWrite.replacingOccurrences(of: "! ", with: "!\n") + // Also handle punctuation at the very end of the delta. + if textToWrite.hasSuffix(".") || textToWrite.hasSuffix("?") || textToWrite.hasSuffix("!") { + textToWrite += "\n" + } + + // Write WITHOUT trailing newline (tokens append naturally on the same line). + FileHandle.standardOutput.write(Data(textToWrite.utf8)) + fflush(stdout) + outputFile?.write(Data(textToWrite.utf8)) + case .jsonl: let jsonObj: [String: Any] = [ "time": round(elapsed * 10) / 10, @@ -306,12 +346,10 @@ struct Stream: AsyncParsableCommand { let jsonStr = String(data: data, encoding: .utf8) else { return } - line = jsonStr + print(jsonStr) + fflush(stdout) + outputFile?.write(Data((jsonStr + "\n").utf8)) } - - print(line) - fflush(stdout) - outputFile?.write(Data((line + "\n").utf8)) } private func openOutputFile() -> FileHandle? { @@ -320,6 +358,49 @@ struct Stream: AsyncParsableCommand { return FileHandle(forWritingAtPath: outputPath) } + /// Open an AVAudioFile for writing 16kHz mono Float32 WAV — the same format the + /// engine sees, so the saved file can be batch-transcribed directly. + private func openAudioWriter() throws -> AVAudioFile? { + guard let path = saveAudio else { return nil } + guard let fmt = AVAudioFormat( + commonFormat: .pcmFormatFloat32, sampleRate: 16000, channels: 1, interleaved: false + ) else { + throw ValidationError("Failed to create audio format for recording") + } + return try AVAudioFile(forWriting: URL(fileURLWithPath: path), settings: fmt.settings) + } + + /// Run batch transcription (Parakeet TDT v3) on a saved audio file for a polished + /// final transcript. Called after the stream ends when --save-audio is used. + private func runBatchTranscription(audioPath: String, outputFile: FileHandle?) async throws { + let converter = AudioConverter() + let samples = try converter.resampleAudioFile(URL(fileURLWithPath: audioPath)) + let duration = Double(samples.count) / 16000.0 + + let asrModels = try await AsrModels.downloadAndLoad(version: .v3) + let asrManager = AsrManager() + try await asrManager.initialize(models: asrModels) + + let startTime = Date() + let asrResult = try await asrManager.transcribe(samples, source: .system) + let elapsed = Date().timeIntervalSince(startTime) + + log(String(format: "Batch transcription: %.0fs audio in %.1fs (%.0fx real-time)", + duration, elapsed, duration / elapsed)) + + // Emit the polished transcript. + let text = asrResult.text.trimmingCharacters(in: .whitespaces) + + FileHandle.standardError.write(Data("\n--- Polished transcript (batch) ---\n".utf8)) + print(text) + fflush(stdout) + + if let file = outputFile { + file.write(Data("\n--- Polished transcript (batch) ---\n".utf8)) + file.write(Data((text + "\n").utf8)) + } + } + // MARK: - SlidingWindow Engine (Multilingual, default) private func runSlidingWindow() async throws {