Files
songyi/markdown_generator.py
2025-04-15 09:11:14 +08:00

232 lines
8.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import configparser
import os
import shutil
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from os import makedirs
import requests
from gradio_client import Client, handle_file
import json
from logging_config import setup_logging
from sense_voice_process import short_audio_process
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
max_download_threads = int(config['DEFAULT']['max_download_threads'])
logger = setup_logging()
# 定义创建表的SQL语句
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS audio_transcriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
course_id INTEGER NOT NULL,
filename TEXT NOT NULL,
text TEXT,
UNIQUE(course_id, filename)
);
"""
def create_audio_transcriptions_table(db_path):
"""
创建audio_transcriptions表的函数。
参数:
db_path -- SQLite数据库文件的路径
"""
# 连接到SQLite数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 执行创建表的SQL语句
cursor.execute(CREATE_TABLE_SQL)
logger.info("表audio_transcriptions创建成功。")
except sqlite3.Error as e:
logger.error(f"创建表时出错: {e}")
finally:
# 关闭数据库连接
conn.close()
# 调用函数创建表
db_path = 'courses.db' # 数据库文件路径
# 下载音频文件
def download_file(url, local_path):
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if not os.path.exists(local_path):
raise FileNotFoundError(f"文件下载后未找到:{local_path}")
return local_path
except Exception as e:
logger.error(f"下载文件时出错:{e}")
return None
# 调用api将语音转换为文本
def voice2txt(voice_path):
# client = Client("http://192.168.31.3:7860/")
# text = client.predict(
# input_wav=handle_file(voice_path),
# language="zh",
# api_name="/model_inference"
# )
# logger.info(text)
text = short_audio_process(voice_path)
return text
# 保存文本到数据库
def save_to_db(course_id, filename, text_value):
conn = sqlite3.connect('courses.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO audio_transcriptions (course_id, filename, text) VALUES (?, ?, ?)",
(course_id, filename, text_value))
conn.commit()
conn.close()
# 检查数据库中是否已存在转换后的文本
def check_db_for_text(course_id, filename):
conn = sqlite3.connect('courses.db')
cursor = conn.cursor()
cursor.execute("SELECT text FROM audio_transcriptions WHERE course_id=? AND filename=?", (course_id, filename))
result = cursor.fetchone()
conn.close()
return result[0] if result else None
def audio_to_text(audio_url, filename, course_id):
# 检查数据库中是否已存在转换后的文本
db_text = check_db_for_text(course_id, filename)
if db_text:
logger.info(f"文本已存在,无需重复转换:{filename}")
return db_text # 返回已存在的文本
try:
logger.info(f"Downloading audio file: {audio_url}")
download_path = os.path.join('course', filename)
local_audio_path = download_file(audio_url, download_path)
if local_audio_path is None:
logger.error("音频文件下载失败")
return "音频文件下载失败"
text_value = voice2txt(local_audio_path)
if text_value: # 只有当转换成功时才保存到数据库
save_to_db(course_id, filename, text_value)
os.remove(local_audio_path)
return text_value
except Exception as e:
logger.error(f"转换音频到文本时出错:{e}")
return f"音频转文本失败: {e}"
def process_item(item):
if item['category'] == 'text':
return f"{item['content']}\n"
elif item['category'] == 'image':
return f"![{item['content']}]({item['attachment']['url']})\n"
elif item['category'] == 'audio':
transcription = audio_to_text(item['attachment']['raw_url'], f"audio_{item['id']}.mp3", item['course_id'])
return f"{transcription}\n"
else:
return f"[{item['content']}]({item['attachment']['url']})\n"
def process_logseq_item(item):
if item['category'] == 'text':
return f"- {item['content']}\n"
elif item['category'] == 'image':
return f"- ![{item['content']}]({item['attachment']['url']})\n"
elif item['category'] == 'audio':
transcription = audio_to_text(item['attachment']['raw_url'], f"audio_{item['id']}.mp3", item['course_id'])
return f"- ![{item['content']}]({item['attachment']['url']})\n- {transcription}\n"
else:
return f"- [{item['content']}]({item['attachment']['url']})\n"
def json_to_markdown(json_file, markdown_file, logseq=False):
try:
logger.info(f"Reading JSON file: {json_file}")
with open(json_file, 'r', encoding='utf-8') as file:
data = json.load(file)
logger.info(f"Writing Markdown file: {markdown_file}")
with open(markdown_file, 'w', encoding='utf-8') as md_file:
with ThreadPoolExecutor(max_workers=max_download_threads) as executor: # Use a thread pool with 5 threads
futures = [executor.submit(process_logseq_item if logseq else process_item, item) for item in
data['data']]
for future in futures:
md_file.write(future.result()) # Write the result to the Markdown file
except Exception as e:
logger.error(f"处理JSON文件时出错{e}")
def get_content():
# 连接到SQLite数据库
conn = sqlite3.connect('courses.db')
cursor = conn.cursor()
max_course_id = cursor.execute('SELECT id FROM courses ORDER BY id DESC LIMIT 1') # 获取数据库中最大的课程ID
if max_course_id:
max_course_id = max_course_id.fetchone()[0]
logger.info(f"The maximum course ID is {max_course_id}")
else:
logger.info("No courses found in the database.")
max_course_id = 11
start_course_id = max_course_id - 5
# 查询courses表中的所有课程ID
cursor.execute('SELECT id, title FROM courses where id >= ?', (start_course_id,))
# cursor.execute('SELECT id, title FROM courses where id >= 609')
course_ids_data = cursor.fetchall()
course_ids = [row[0] for row in course_ids_data]
course_ids_dict = dict(course_ids_data)
logger.info(course_ids_dict)
# 创建json文件夹
if not os.path.exists('json'):
os.makedirs('json')
# 先请求全部的链接获取数据,并将获取到的课程信息保存到数据库中
for course_id in course_ids:
logger.info(f"Processing course ID: {course_id}")
json_filename = os.path.join('json', f'{course_id}.json')
# copy_json_file_name = os.path.join('data', 'json', f'{course_ids_dict[course_id]}.json').replace('?', '')
copy_json_file_name = os.path.join('course', f'{course_id}', 'json',
f'{course_ids_dict[course_id]}.json').replace('?', '')
# md_file_name = os.path.join('data', 'markdown', f'{course_ids_dict[course_id]}.md')
md_file_name = os.path.join('course', f'{course_id}', f'{course_ids_dict[course_id]}.md')
if os.path.exists(json_filename):
logger.info(f"Course {course_id} JSON file already exists, using local file.")
makedirs(f'course/{course_id}/json', exist_ok=True)
shutil.copy2(json_filename, copy_json_file_name)
json_to_markdown(copy_json_file_name, md_file_name)
else:
continue
logseq_md_file_name = os.path.join('course', f'{course_id}', f'{course_ids_dict[course_id]}_logseq.md')
if os.path.exists(json_filename):
logger.info(f"Course {course_id} JSON file already exists, using local file.")
shutil.copy2(json_filename, copy_json_file_name)
json_to_markdown(copy_json_file_name, logseq_md_file_name, logseq=True)
else:
continue
if __name__ == '__main__':
# create_audio_transcriptions_table(db_path)
get_content()