Files
songyi/markdown_generator.py
2024-11-28 09:55:22 +08:00

171 lines
6.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import shutil
import json
import logging
import requests
from concurrent.futures import ThreadPoolExecutor
import os
import sqlite3
import configparser
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
max_download_threads = int(config['DEFAULT']['max_download_threads'])
start_course_id = int(config['DEFAULT']['start_course_id'])
# 转译url
trans_url = 'https://api.siliconflow.cn/v1/audio/transcriptions'
headers = {
"Authorization": "Bearer sk-lakndqcjlmtukekcliwkkryaxquifduhvzgcnlhofzvofllv",
# "Content-Type": "multipart/form-data"
}
# 设置日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 创建json文件夹
if not os.path.exists('data/json'):
os.makedirs('data/json')
# 创建json文件夹
if not os.path.exists('data/markdown'):
os.makedirs('data/markdown')
if not os.path.exists('data/markdown_logseq'):
os.makedirs('data/markdown_logseq')
# 下载音频文件
def download_file(url, local_path):
try:
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
if not os.path.exists(local_path):
raise FileNotFoundError(f"文件下载后未找到:{local_path}")
return local_path
except Exception as e:
logging.error(f"下载文件时出错:{e}")
return None
# 调用api将语音转换为文本
def voice2txt(voice_path):
url = trans_url
multipart_form_data = {
'file': ('audio.mp3', open(voice_path, 'rb')),
'model': (None, 'FunAudioLLM/SenseVoiceSmall')
}
response = requests.request("POST", url, files=multipart_form_data, headers=headers)
# 检查请求是否成功
if response.status_code == 200:
# 解析JSON响应
data = response.json()
# 提取text的值
text_value = data.get('text', None) # 使用get方法可以避免KeyError如果'text'键不存在则返回None
logging.info(f"Text value: {text_value}")
return text_value
else:
print('请求失败,状态码:', response.status_code)
def audio_to_text(audio_url, filename):
try:
logging.info(f"Downloading audio file: {audio_url}")
download_path = os.path.join('data', filename)
local_audio_path = download_file(audio_url, download_path)
if local_audio_path is None:
logging.error("音频文件下载失败")
return "音频文件下载失败"
text_value = voice2txt(local_audio_path)
os.remove(local_audio_path)
return text_value
except Exception as e:
logging.error(f"转换音频到文本时出错:{e}")
return f"音频转文本失败: {e}"
def process_item(item):
if item['category'] == 'text':
return f"{item['content']}\n\n"
elif item['category'] == 'image':
return f"![{item['content']}]({item['attachment']['url']})\n\n"
elif item['category'] == 'audio':
transcription = audio_to_text(item['attachment']['raw_url'], f"audio_{item['id']}.mp3")
return f"{transcription}\n\n"
else:
return f"[{item['content']}]({item['attachment']['url']})\n\n"
def process_logseq_item(item):
if item['category'] == 'text':
return f"- {item['content']}\n\n"
elif item['category'] == 'image':
return f"- ![{item['content']}]({item['attachment']['url']})\n\n"
elif item['category'] == 'audio':
transcription = audio_to_text(item['attachment']['raw_url'], f"audio_{item['id']}.mp3")
return f"- ![{item['content']}]({item['attachment']['url']})\n\n- {transcription}\n\n"
else:
return f"- [{item['content']}]({item['attachment']['url']})\n\n"
def json_to_markdown(json_file, markdown_file, logseq=False):
try:
logging.info(f"Reading JSON file: {json_file}")
with open(json_file, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info(f"Writing Markdown file: {markdown_file}")
with open(markdown_file, 'w', encoding='utf-8') as md_file:
with ThreadPoolExecutor(max_workers=max_download_threads) as executor: # Use a thread pool with 5 threads
futures = [executor.submit(process_logseq_item if logseq else process_item, item) for item in data['data']]
for future in futures:
md_file.write(future.result()) # Write the result to the Markdown file
except Exception as e:
logging.error(f"处理JSON文件时出错{e}")
def get_content():
# 连接到SQLite数据库
conn = sqlite3.connect('courses.db')
cursor = conn.cursor()
# 查询courses表中的所有课程ID
cursor.execute('SELECT id, title FROM courses where id >= ?', (start_course_id,))
# cursor.execute('SELECT id, title FROM courses where id >= 609')
course_ids_data = cursor.fetchall()
course_ids = [row[0] for row in course_ids_data]
course_ids_dict = dict(course_ids_data)
logging.info(course_ids_dict)
# 创建json文件夹
if not os.path.exists('json'):
os.makedirs('json')
# 先请求全部的链接获取数据,并将获取到的课程信息保存到数据库中
for course_id in course_ids:
print(f"Processing course ID: {course_id}")
json_filename = os.path.join('json', f'{course_id}.json')
copy_json_file_name = os.path.join('data', 'json', f'{course_ids_dict[course_id]}.json').replace('?', '')
md_file_name = os.path.join('data', 'markdown', f'{course_ids_dict[course_id]}.md')
if os.path.exists(json_filename):
print(f"Course {course_id} JSON file already exists, using local file.")
shutil.copy2(json_filename, copy_json_file_name)
json_to_markdown(copy_json_file_name, md_file_name)
else:
continue
logseq_md_file_name = os.path.join('data', 'markdown_logseq', f'{course_ids_dict[course_id]}.md')
if os.path.exists(json_filename):
print(f"Course {course_id} JSON file already exists, using local file.")
shutil.copy2(json_filename, copy_json_file_name)
json_to_markdown(copy_json_file_name, logseq_md_file_name)
else:
continue
if __name__ == '__main__':
get_content()