Files
songyi/transcribe_media.py
2025-04-20 08:44:06 +08:00

109 lines
3.2 KiB
Python

import os
from funasr import AutoModel
from funasr.utils.postprocess_utils import rich_transcription_postprocess
from moviepy.video.io.VideoFileClip import VideoFileClip
from pydub import AudioSegment
from logging_config import setup_logging
logger = setup_logging()
def extract_or_convert_audio(file_path, output_audio_path="processed_audio.wav"):
ext = os.path.splitext(file_path)[1].lower()
if ext in [".mp4", ".mov", ".avi", ".mkv"]:
logger.info("🎬 Extracting audio from video...")
video = VideoFileClip(file_path)
video.audio.write_audiofile(output_audio_path)
elif ext in [".mp3", ".wav", ".flac", ".m4a", ".aac"]:
logger.info("🎧 Converting audio format...")
sound = AudioSegment.from_file(file_path)
sound.export(output_audio_path, format="wav")
else:
raise ValueError("Unsupported file type.")
return output_audio_path
def transcribe_audio_funasr(audio_path, device="cuda:0"):
logger.info("🧠 Loading FunASR model...")
model = AutoModel(
model="iic/SenseVoiceSmall",
trust_remote_code=True,
remote_code="./model.py", # Make sure this file is accessible
vad_model="fsmn-vad",
vad_kwargs={"max_single_segment_time": 30000},
device=device,
disable_update=True
)
logger.info("📤 Transcribing with FunASR...")
res = model.generate(
input=audio_path,
cache={},
language="auto",
use_itn=True,
batch_size_s=60,
merge_vad=True,
merge_length_s=15,
)
text = rich_transcription_postprocess(res[0]["text"])
return text
def transcribe_audio_funasr_batch(audio_path):
model = AutoModel(model="iic/SenseVoiceSmall", trust_remote_code=True, device="cuda:0", disable_update=True)
res = model.generate(
input=audio_path,
cache={},
language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech"
use_itn=True,
batch_size=64,
)
text = rich_transcription_postprocess(res[0]["text"])
return text
def convert_media(file_path, is_batch=False, save_to_disk=True):
try:
audio_file = extract_or_convert_audio(file_path)
if is_batch:
transcript = transcribe_audio_funasr_batch(audio_file)
else:
transcript = transcribe_audio_funasr(audio_file)
logger.info("\n📜 Transcript:")
logger.info(transcript)
# ✅ Save transcript to disk
if save_to_disk:
output_path = os.path.splitext(file_path)[0] + ".md"
with open(output_path, "w", encoding="utf-8") as f:
f.write(transcript)
logger.info(f"✅ Transcript saved to: {output_path}")
return transcript
finally:
if os.path.exists("processed_audio.wav"):
os.remove("processed_audio.wav")
def main():
audio_files = []
for root, dirs, files in os.walk('media'):
for file in files:
audio_files.append(os.path.join(root, file))
logger.info("scan files: " + audio_files)
for audio_file in audio_files:
audio_file_path = os.path.join(audio_file)
convert_media(audio_file_path)
if __name__ == '__main__':
main()