2025-01-06 18:04:01 +01:00
|
|
|
import torch
|
|
|
|
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor, pipeline
|
|
|
|
import sounddevice as sd
|
|
|
|
import numpy as np
|
|
|
|
import wave
|
|
|
|
import gradio as gr
|
2025-01-06 18:05:19 +01:00
|
|
|
import time
|
2025-01-06 18:20:43 +01:00
|
|
|
from pydub import AudioSegment
|
|
|
|
import os
|
2025-01-06 18:04:01 +01:00
|
|
|
|
|
|
|
# Setup device
|
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu"
|
|
|
|
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
|
|
|
|
|
|
|
|
# Load Whisper model
|
|
|
|
model_id = "openai/whisper-large-v3-turbo"
|
|
|
|
|
|
|
|
model = AutoModelForSpeechSeq2Seq.from_pretrained(
|
|
|
|
model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
|
|
|
|
)
|
|
|
|
model.to(device)
|
|
|
|
|
|
|
|
processor = AutoProcessor.from_pretrained(model_id)
|
|
|
|
|
|
|
|
pipe = pipeline(
|
|
|
|
"automatic-speech-recognition",
|
|
|
|
model=model,
|
|
|
|
tokenizer=processor.tokenizer,
|
|
|
|
feature_extractor=processor.feature_extractor,
|
|
|
|
torch_dtype=torch_dtype,
|
|
|
|
device=device,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Audio recording settings
|
|
|
|
SAMPLE_RATE = 16000 # Whisper prefers 16 kHz
|
|
|
|
FILENAME = "recorded_audio.wav"
|
|
|
|
is_recording = False
|
2025-01-06 18:05:19 +01:00
|
|
|
start_time = None # Track the recording start time
|
2025-01-06 18:04:01 +01:00
|
|
|
recorded_audio = None
|
|
|
|
|
|
|
|
|
|
|
|
def start_recording():
|
|
|
|
"""Starts recording audio."""
|
2025-01-06 18:05:19 +01:00
|
|
|
global is_recording, recorded_audio, start_time
|
2025-01-06 18:04:01 +01:00
|
|
|
is_recording = True
|
2025-01-06 18:05:19 +01:00
|
|
|
start_time = time.time() # Record the start time
|
2025-01-06 18:04:01 +01:00
|
|
|
print("Recording started...")
|
2025-01-16 17:00:39 +01:00
|
|
|
recorded_audio = [] # Start with an empty list to store audio chunks
|
|
|
|
|
|
|
|
# Start recording continuously
|
|
|
|
def callback(indata, frames, time, status):
|
|
|
|
if is_recording: # Append audio data only while recording
|
|
|
|
recorded_audio.append(indata.copy())
|
|
|
|
|
|
|
|
stream = sd.InputStream(
|
|
|
|
samplerate=SAMPLE_RATE, channels=1, dtype=np.float32, callback=callback
|
|
|
|
)
|
|
|
|
stream.start() # Start the stream
|
2025-01-06 18:04:01 +01:00
|
|
|
return "Recording... Click 'Stop Recording' to finish."
|
|
|
|
|
|
|
|
|
|
|
|
def stop_recording():
|
|
|
|
"""Stops recording audio and saves it."""
|
2025-01-06 18:05:19 +01:00
|
|
|
global is_recording, recorded_audio, start_time
|
2025-01-06 18:04:01 +01:00
|
|
|
if not is_recording:
|
|
|
|
return "Not recording!"
|
|
|
|
is_recording = False
|
2025-01-06 18:05:19 +01:00
|
|
|
elapsed_time = time.time() - start_time # Calculate elapsed time
|
|
|
|
print(f"Recording stopped. Duration: {elapsed_time:.2f} seconds.")
|
2025-01-16 17:00:39 +01:00
|
|
|
|
|
|
|
# Combine all recorded chunks into a single array
|
|
|
|
audio_data = np.concatenate(recorded_audio, axis=0)
|
|
|
|
save_audio_to_wav(audio_data[: int(SAMPLE_RATE * elapsed_time)], FILENAME)
|
|
|
|
return f"Recording stopped. Duration: {elapsed_time:.2f} seconds. Click 'Transcribe' to see the result."
|
|
|
|
|
2025-01-06 18:04:01 +01:00
|
|
|
|
|
|
|
|
|
|
|
def save_audio_to_wav(audio, filename):
|
|
|
|
"""Saves audio data to a WAV file."""
|
|
|
|
audio = (audio * 32767).astype(np.int16) # Convert to 16-bit PCM format
|
|
|
|
with wave.open(filename, 'w') as wf:
|
|
|
|
wf.setnchannels(1) # Mono
|
|
|
|
wf.setsampwidth(2) # 2 bytes per sample
|
|
|
|
wf.setframerate(SAMPLE_RATE)
|
|
|
|
wf.writeframes(audio.tobytes())
|
|
|
|
|
2025-01-16 16:27:50 +01:00
|
|
|
|
2025-01-06 18:20:43 +01:00
|
|
|
def get_audio_duration(filename):
|
|
|
|
"""Returns the duration of the audio file in seconds."""
|
|
|
|
audio = AudioSegment.from_wav(filename)
|
2025-01-06 18:22:54 +01:00
|
|
|
return len(audio) / 1000
|
2025-01-06 18:20:43 +01:00
|
|
|
|
2025-01-16 16:27:50 +01:00
|
|
|
|
2025-01-06 18:20:43 +01:00
|
|
|
def split_audio(filename, chunk_length_ms=30000):
|
|
|
|
"""Splits an audio file into chunks."""
|
|
|
|
audio = AudioSegment.from_wav(filename)
|
|
|
|
chunks = [audio[i:i + chunk_length_ms] for i in range(0, len(audio), chunk_length_ms)]
|
|
|
|
return chunks
|
|
|
|
|
2025-01-06 18:04:01 +01:00
|
|
|
|
|
|
|
def transcribe_audio():
|
|
|
|
"""Transcribes the audio file using Whisper."""
|
2025-01-06 18:20:43 +01:00
|
|
|
print("Checking audio duration...")
|
|
|
|
duration = get_audio_duration(FILENAME)
|
|
|
|
|
|
|
|
if duration > 30:
|
|
|
|
print(f"Audio is too long ({duration:.2f} seconds). Splitting into chunks...")
|
|
|
|
chunks = split_audio(FILENAME)
|
|
|
|
transcription = []
|
|
|
|
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
|
|
chunk_filename = f"chunk_{i}.wav"
|
|
|
|
chunk.export(chunk_filename, format="wav")
|
|
|
|
print(f"Transcribing chunk {i + 1}/{len(chunks)}...")
|
2025-01-16 17:00:39 +01:00
|
|
|
|
|
|
|
# Transcribe the chunk
|
2025-01-06 18:20:43 +01:00
|
|
|
result = pipe(chunk_filename)
|
|
|
|
transcription.append(result["text"])
|
|
|
|
|
|
|
|
# Clean up temporary chunk file
|
|
|
|
os.remove(chunk_filename)
|
|
|
|
|
2025-01-16 17:00:39 +01:00
|
|
|
# Stream intermediate transcription
|
|
|
|
yield f"{' '.join(transcription)}"
|
|
|
|
|
2025-01-06 18:20:43 +01:00
|
|
|
print("Transcription complete.")
|
2025-01-16 17:00:39 +01:00
|
|
|
yield f"{' '.join(transcription)}"
|
2025-01-06 18:20:43 +01:00
|
|
|
else:
|
|
|
|
print(f"Audio is short enough ({duration:.2f} seconds). Transcribing directly...")
|
|
|
|
result = pipe(FILENAME)
|
|
|
|
print("Transcription complete.")
|
2025-01-16 17:00:39 +01:00
|
|
|
yield f"{result['text']}"
|
|
|
|
|
2025-01-06 18:04:01 +01:00
|
|
|
|
|
|
|
|
|
|
|
# Gradio Interface
|
|
|
|
with gr.Blocks() as interface:
|
|
|
|
gr.Markdown("# Voice to Text App")
|
|
|
|
gr.Markdown("Click 'Start Recording' to record your voice, 'Stop Recording' to save, and 'Transcribe' to convert speech to text.")
|
|
|
|
|
|
|
|
start_button = gr.Button("Start Recording")
|
|
|
|
stop_button = gr.Button("Stop Recording")
|
|
|
|
transcribe_button = gr.Button("Transcribe")
|
|
|
|
|
2025-01-16 16:27:50 +01:00
|
|
|
output = gr.Textbox(label="Output", show_label=True, show_copy_button=True)
|
2025-01-06 18:04:01 +01:00
|
|
|
|
|
|
|
start_button.click(start_recording, outputs=output)
|
|
|
|
stop_button.click(stop_recording, outputs=output)
|
|
|
|
transcribe_button.click(transcribe_audio, outputs=output)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
interface.launch()
|