# -*- coding: utf-8 -*- import configparser import os import shutil import sqlite3 from concurrent.futures import ThreadPoolExecutor from os import makedirs import requests import json from course_content_parser import max_download_threads from logging_config import setup_logging from transcribe_media import convert_media from pathlib import Path # 读取配置文件 config = configparser.ConfigParser() config.read('config.ini') # max_download_threads = int(config['DEFAULT']['max_download_threads']) max_download_threads = 10 logger = setup_logging() # 定义创建表的SQL语句 CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS audio_transcriptions ( id INTEGER PRIMARY KEY AUTOINCREMENT, course_id INTEGER NOT NULL, filename TEXT NOT NULL, text TEXT, UNIQUE(course_id, filename) ); """ def create_audio_transcriptions_table(db_path): """ 创建audio_transcriptions表的函数。 参数: db_path -- SQLite数据库文件的路径 """ # 连接到SQLite数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 执行创建表的SQL语句 cursor.execute(CREATE_TABLE_SQL) logger.info("表audio_transcriptions创建成功。") except sqlite3.Error as e: logger.error(f"创建表时出错: {e}") finally: # 关闭数据库连接 conn.close() # 调用函数创建表 db_path = 'courses.db' # 数据库文件路径 # 下载音频文件 def download_file(url, local_path): logger.info("download voice file: " + url + " to " + local_path) try: with requests.get(url, stream=True) as r: r.raise_for_status() with open(local_path, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) if not os.path.exists(local_path): raise FileNotFoundError(f"文件下载后未找到:{local_path}") return local_path except Exception as e: logger.error(f"下载文件时出错:{e}") return None # 调用api将语音转换为文本 def voice2txt(voice_path): text = convert_media(voice_path, True, False) return text # 保存文本到数据库 def save_to_db(course_id, filename, text_value): conn = sqlite3.connect('courses.db') cursor = conn.cursor() cursor.execute("INSERT INTO audio_transcriptions (course_id, filename, text) VALUES (?, ?, ?)", (course_id, filename, text_value)) conn.commit() conn.close() # 检查数据库中是否已存在转换后的文本 def check_db_for_text(course_id, filename): conn = sqlite3.connect('courses.db') cursor = conn.cursor() cursor.execute("SELECT text FROM audio_transcriptions WHERE course_id=? AND filename=?", (course_id, filename)) result = cursor.fetchone() conn.close() return result[0] if result else None def audio_to_text(audio_url, filename, course_id): # 检查数据库中是否已存在转换后的文本 db_text = check_db_for_text(course_id, filename) if db_text: logger.info(f"文本已存在,无需重复转换:{filename}") return db_text # 返回已存在的文本 try: logger.info(f"Downloading audio file: {audio_url}") download_path = os.path.join('course', filename) local_audio_path = download_file(audio_url, download_path) if local_audio_path is None: logger.error("音频文件下载失败") return "音频文件下载失败" text_value = voice2txt(local_audio_path) if text_value: # 只有当转换成功时才保存到数据库 save_to_db(course_id, filename, text_value) os.remove(local_audio_path) return text_value except Exception as e: logger.error(f"转换音频到文本时出错:{e}") return f"音频转文本失败: {e}" def process_item(item): if item['category'] == 'text': return f"{item['content']}\n" # elif item['category'] == 'image': # return f"![{item['content']}]({item['attachment']['url']})\n" elif item['category'] == 'audio': transcription = audio_to_text(item['attachment']['raw_url'], f"audio_{item['id']}.mp3", item['course_id']) return f"{transcription}\n" else: return "" # return f"[{item['content']}]({item['attachment']['url']})\n" def process_hugo_item(item): if item['category'] == 'text': return f"{item['content']}\n\n" elif item['category'] == 'image': return f"![{item['content']}]({item['attachment']['url']})\n\n" elif item['category'] == 'audio': transcription = audio_to_text(item['attachment']['raw_url'], f"audio_{item['id']}.mp3", item['course_id']) return f"![{item['content']}]({item['attachment']['url']})\n\n{transcription}\n\n" else: return f"[{item['content']}]({item['attachment']['url']})\n\n" def json_to_markdown(json_file, markdown_file, logseq=False): p = Path(markdown_file) curse_name = p.stem try: logger.info(f"Reading JSON file: {json_file}") with open(json_file, 'r', encoding='utf-8') as file: data = json.load(file) metadata = f'+++\ndate = \'{data['data'][0]['created_at']}\'\ndraft = false\ntitle = \'{curse_name}\'\n+++\n\n' logger.info(f"Writing Markdown file: {markdown_file}") with open(markdown_file, 'w', encoding='utf-8') as md_file: # md_file.write(metadata) with ThreadPoolExecutor(max_workers=max_download_threads) as executor: # Use a thread pool with 5 threads futures = [executor.submit(process_hugo_item if logseq else process_item, item) for item in data['data']] for future in futures: md_file.write(future.result()) # Write the result to the Markdown file if os.path.exists(markdown_file) and os.path.getsize(markdown_file) == 0: print(f"文件 '{markdown_file}' 是空的,将被删除。") # 2. 删除文件 os.remove(markdown_file) except Exception as e: logger.error(f"处理JSON文件时出错:{e}") def get_content(): # 连接到SQLite数据库 conn = sqlite3.connect('courses.db') cursor = conn.cursor() max_course_id = cursor.execute('SELECT id FROM courses ORDER BY id DESC LIMIT 1') # 获取数据库中最大的课程ID if max_course_id: max_course_id = max_course_id.fetchone()[0] logger.info(f"The maximum course ID is {max_course_id}") else: logger.info("No courses found in the database.") max_course_id = 11 start_course_id = max_course_id - 5 # 查询courses表中的所有课程ID # cursor.execute('SELECT id, title FROM courses where id >= ?', (1,)) cursor.execute('SELECT id, title FROM courses where id >= ?', (start_course_id,)) course_ids_data = cursor.fetchall() course_ids = [row[0] for row in course_ids_data] course_ids_dict = dict(course_ids_data) logger.info(course_ids_dict) # 创建json文件夹 if not os.path.exists('json'): os.makedirs('json') # 先请求全部的链接获取数据,并将获取到的课程信息保存到数据库中 for course_id in course_ids: logger.info(f"Processing course ID: {course_id}") json_filename = os.path.join('json', f'{course_id}.json') copy_json_file_name = os.path.join('course', f'{course_id}', 'json', f'{course_ids_dict[course_id]}.json').replace('?', '?') # md_file_name = os.path.join('markdown', f'{course_id}', f'{course_ids_dict[course_id]}.md') # if os.path.exists(json_filename): # logger.info(f"Course {course_id} JSON file already exists, using local file.") # makedirs(f'course/{course_id}/json', exist_ok=True) # shutil.copy2(json_filename, copy_json_file_name) # json_to_markdown(copy_json_file_name, md_file_name) # else: # continue logseq_md_file_name = os.path.join('markdown', f'{course_id}-{course_ids_dict[course_id]}.md') if os.path.exists(json_filename): logger.info(f"Course {course_id} JSON file already exists, using local file.") # shutil.copy2(json_filename, copy_json_file_name) json_to_markdown(json_filename, logseq_md_file_name, logseq=False) else: continue if __name__ == '__main__': get_content()