89 lines
2.6 KiB
Python
89 lines
2.6 KiB
Python
import os
|
|
|
|
from funasr import AutoModel
|
|
from funasr.utils.postprocess_utils import rich_transcription_postprocess
|
|
from moviepy.video.io.VideoFileClip import VideoFileClip
|
|
from pydub import AudioSegment
|
|
|
|
from logging_config import setup_logging
|
|
|
|
logger = setup_logging()
|
|
|
|
|
|
def extract_or_convert_audio(file_path, output_audio_path="processed_audio.wav"):
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
|
|
if ext in [".mp4", ".mov", ".avi", ".mkv"]:
|
|
logger.info("🎬 Extracting audio from video...")
|
|
video = VideoFileClip(file_path)
|
|
video.audio.write_audiofile(output_audio_path)
|
|
elif ext in [".mp3", ".wav", ".flac", ".m4a", ".aac"]:
|
|
logger.info("🎧 Converting audio format...")
|
|
sound = AudioSegment.from_file(file_path)
|
|
sound.export(output_audio_path, format="wav")
|
|
else:
|
|
raise ValueError("Unsupported file type.")
|
|
|
|
return output_audio_path
|
|
|
|
|
|
def transcribe_audio_funasr(audio_path, device="cuda:0"):
|
|
logger.info("🧠 Loading FunASR model...")
|
|
model = AutoModel(
|
|
model="iic/SenseVoiceSmall",
|
|
trust_remote_code=True,
|
|
remote_code="./model.py", # Make sure this file is accessible
|
|
vad_model="fsmn-vad",
|
|
vad_kwargs={"max_single_segment_time": 30000},
|
|
device=device
|
|
)
|
|
|
|
logger.info("📤 Transcribing with FunASR...")
|
|
res = model.generate(
|
|
input=audio_path,
|
|
cache={},
|
|
language="auto",
|
|
use_itn=True,
|
|
batch_size_s=60,
|
|
merge_vad=True,
|
|
merge_length_s=15,
|
|
)
|
|
|
|
text = rich_transcription_postprocess(res[0]["text"])
|
|
return text
|
|
|
|
|
|
def convert_media(file_path):
|
|
try:
|
|
audio_file = extract_or_convert_audio(file_path)
|
|
transcript = transcribe_audio_funasr(audio_file)
|
|
logger.info("\n📜 Transcript:")
|
|
logger.info(transcript)
|
|
|
|
# ✅ Save transcript to disk
|
|
output_path = os.path.splitext(file_path)[0] + "_transcript.md"
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
f.write(transcript)
|
|
|
|
logger.info(f"✅ Transcript saved to: {output_path}")
|
|
return transcript
|
|
finally:
|
|
if os.path.exists("processed_audio.wav"):
|
|
os.remove("processed_audio.wav")
|
|
|
|
|
|
def main():
|
|
audio_files = []
|
|
for root, dirs, files in os.walk('media'):
|
|
for file in files:
|
|
audio_files.append(os.path.join(root, file))
|
|
logger.info("scan files: " + audio_files)
|
|
|
|
for audio_file in audio_files:
|
|
audio_file_path = os.path.join(audio_file)
|
|
convert_media(audio_file_path)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|