import os import argparse import uuid import subprocess from moviepy.video.io.VideoFileClip import VideoFileClip from pydub import AudioSegment from logging_config import setup_logging logger = setup_logging() def extract_or_convert_audio(file_path, output_audio_path="processed_audio"): ext = os.path.splitext(file_path)[1].lower() filename = os.path.basename(file_path) random_uuid = str(uuid.uuid4()) output_audio_path = output_audio_path + "_" + random_uuid + ".wav" if ext in [".mp4", ".mov", ".avi", ".mkv"]: logger.info("🎬 Extracting audio from video...") video = VideoFileClip(file_path) if video.audio is None: print("⚠️ 警告:该视频没有音频轨道。") return None video.audio.write_audiofile(output_audio_path) elif ext in [".mp3", ".wav", ".flac", ".m4a", ".aac"]: logger.info("🎧 Converting audio format...") sound = AudioSegment.from_file(file_path) sound.export(output_audio_path, format="wav") else: raise ValueError(f"Unsupported file type: {ext}") logger.info(f"Converted Audio saved to: {output_audio_path}") return output_audio_path def transcribe_audio_whisper(audio_path, model_path="~/Documents/ggml-large-v3-turbo-q8_0.bin", language="zh"): logger.info("🎙️ Transcribing with Whisper-cpp...") # Expand the ~ in the model path model_path = os.path.expanduser(model_path) # Run whisper-cli command cmd = [ "whisper-cli", "-nt", "--language", language, "--model", model_path, "--file", audio_path ] try: result = subprocess.run( cmd, capture_output=True, text=True, check=True ) # Get the transcription text from stdout text = result.stdout.strip() # Remove any potential metadata or timing information if present # Whisper output typically contains the actual transcription if text: return split_into_sentences(text) else: logger.warning("⚠️ No transcription output received") return "" except subprocess.CalledProcessError as e: logger.error(f"❌ Whisper-cpp transcription failed: {e}") logger.error(f"Error output: {e.stderr}") raise except FileNotFoundError: logger.error("❌ whisper-cli not found. Make sure whisper-cpp is installed and in your PATH.") raise def transcribe_audio_whisper_batch(audio_path, model_path="~/Documents/ggml-small-q8_0.bin", language="zh"): # Batch mode uses the same function as whisper-cpp doesn't have separate batch processing return transcribe_audio_whisper(audio_path, model_path, language) # 新增:用于句子分割的符号列表 SENTENCE_ENDINGS = ["。", "!", "?", ".", "!", "?", "\n"] def split_into_sentences(text, max_length=100): """ 将文本按句子结束符分割,并确保每行不超过指定长度 参数: text (str): 待分割的文本 max_length (int): 每行最大长度 返回: str: 处理后的文本,句子间用换行符分隔 """ if not text: return "" # 首先按句子结束符进行分割 sentences = [] current_sentence = "" for char in text: current_sentence += char # 如果遇到句子结束符,则将当前积累的字符添加到句子列表中 if char in SENTENCE_ENDINGS: sentences.append(current_sentence.strip()) current_sentence = "" # 添加最后一个可能不完整的句子 if current_sentence.strip(): sentences.append(current_sentence.strip()) # 然后处理过长的句子,确保每行不超过max_length processed_lines = [] for sentence in sentences: if len(sentence) <= max_length: processed_lines.append(sentence) else: # 对于过长的句子,按最大长度分割,但尽量在标点符号处分割 current_line = "" for i, char in enumerate(sentence): current_line += char # 如果达到最大长度,并且下一个字符是标点符号,或者当前字符是空格,则分割 if len(current_line) >= max_length: if (i + 1 < len(sentence) and sentence[i + 1] in SENTENCE_ENDINGS) or char == ' ': processed_lines.append(current_line.strip()) current_line = "" # 添加最后一个片段 if current_line.strip(): processed_lines.append(current_line.strip()) # 用换行符连接所有处理后的行 return "\n".join(processed_lines) def convert_media(file_path, is_batch=False, save_to_disk=True): try: audio_file = extract_or_convert_audio(file_path) if audio_file is None: return None if is_batch: transcript = transcribe_audio_whisper_batch(audio_file) else: transcript = transcribe_audio_whisper(audio_file) logger.info("\n📜 Transcript:") logger.info(transcript) # ✅ Save transcript to disk as .txt file if save_to_disk: output_path = os.path.splitext(file_path)[0] + ".txt" with open(output_path, "w", encoding="utf-8") as f: f.write(transcript) logger.info(f"✅ Transcript saved to: {output_path}") return transcript finally: if os.path.exists(audio_file): os.remove(audio_file) def process_input(path, recursive=False): if not os.path.exists(path): logger.error(f"❌ Path does not exist: {path}") return supported_exts = {".mp4", ".mov", ".avi", ".mkv", ".mp3", ".wav", ".flac", ".m4a", ".aac"} if os.path.isfile(path): ext = os.path.splitext(path)[1].lower() if ext in supported_exts: convert_media(path) else: logger.warning(f"🚫 Unsupported file skipped: {path}") elif os.path.isdir(path): for root, dirs, files in os.walk(path): for file in files: file_path = os.path.join(root, file) ext = os.path.splitext(file)[1].lower() if ext in supported_exts: try: convert_media(file_path, False) except Exception as e: logger.error(f"Error processing {file_path}: {e}") else: logger.debug(f"Skipping non-media file: {file_path}") if not recursive: break def main(): parser = argparse.ArgumentParser(description="Convert audio/video to text using Whisper-cpp.") parser.add_argument("input_path", nargs='?', default="./media", help="Path to a file or folder containing media files. Defaults to './media'.") parser.add_argument("--recursive", "-r", action="store_true", help="Process subdirectories recursively.") args = parser.parse_args() input_path = args.input_path process_input(input_path, recursive=args.recursive) if __name__ == '__main__': main()