Files
songyi/course.py
2025-03-07 15:10:44 +08:00

263 lines
10 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import configparser
import json
import os
import shutil
import sqlite3
import subprocess
from queue import Queue
from threading import Thread
import requests
from headers import headers
import logging
from video_voice_process import process_audio_file
from logging.handlers import RotatingFileHandler
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # 控制台日志
RotatingFileHandler('app.log', maxBytes=1024*1024*5, backupCount=3) # 日志文件
])
# 读取配置文件
config = configparser.ConfigParser()
config.read('config.ini')
authorization_token = config['DEFAULT']['authorization_token']
max_download_threads = int(config['DEFAULT']['max_download_threads'])
max_retry_attempts = int(config['DEFAULT']['max_retry_attempts'])
headers = headers
headers['authorization'] = f'Bearer {authorization_token}'
def download_attachment(attachment, course_id_folder, course_audio_filename, max_retries):
if attachment['name'] in ["", None] or attachment['name'].endswith(".m3u8"):
logging.info("字符串为空")
last_slash_index = attachment['url'].rfind('/')
download_filename = attachment['url'][last_slash_index + 1:]
else:
download_filename = attachment['name']
attempt = 0
while attempt < max_retries:
try:
url = attachment['url']
file_extension = attachment['name'].split('.')[-1].lower()
if file_extension != 'mp3':
course_id_folder = os.path.join(course_id_folder, file_extension)
else:
if os.path.exists(course_audio_filename):
logging.info(f"File {course_audio_filename} already exists, skipping download.")
return
filename = os.path.join(course_id_folder, download_filename)
if os.path.exists(filename):
logging.info(f"File {filename} already exists, skipping download.")
return
command = f"aria2c -o {filename} -x 16 -s 16 {url}"
subprocess.run(command, shell=True, check=True)
logging.info(f"Download Command: {command}")
return
except subprocess.CalledProcessError as e:
logging.error(f"Failed to download {attachment['name']}: {e}")
attempt += 1
if attempt == max_retries:
logging.error(f"Failed to download {attachment['name']} after {max_retries} attempts.")
else:
logging.warning(f"Retrying {attachment['name']}... ({attempt}/{max_retries})")
def worker(queue, course_id_folder, course_audio_filename, max_retries):
while not queue.empty():
attachment = queue.get()
download_attachment(attachment, course_id_folder, course_audio_filename, max_retries)
queue.task_done()
def convert_mp4(mp4_file):
try:
mp4_dir = os.path.dirname(mp4_file)
mp4_filename = os.path.splitext(os.path.basename(mp4_file))[0]
wav_file = os.path.join(mp4_dir, f"{mp4_filename}.wav")
command = [
'ffmpeg',
'-y',
'-i', mp4_file,
'-vn', # 去除视频流
'-acodec', 'pcm_s16le', # 使用 PCM 16 位有符号小端编码
'-ar', '44100', # 设置采样率为 44100 Hz
'-ac', '2', # 设置声道数为 2立体声
wav_file
]
subprocess.run(command, check=True)
logging.info(f"成功将 {mp4_file} 转换为 {wav_file}")
return wav_file
except subprocess.CalledProcessError as e:
logging.error(f"转换失败: {e}")
return None
except FileNotFoundError:
logging.error("未找到 FFmpeg请确保已安装并配置好 FFmpeg 环境。")
return None
def get_course():
conn = sqlite3.connect('courses.db')
cursor = conn.cursor()
max_course_id = cursor.execute('SELECT id FROM courses ORDER BY id DESC LIMIT 1')
if max_course_id:
max_course_id = max_course_id.fetchone()[0]
logging.info(f"The maximum course ID is {max_course_id}")
else:
logging.info("No courses found in the database.")
max_course_id = 11
start_course_id = max_course_id - 5
cursor.execute('SELECT id, title FROM courses where id >= ?', (start_course_id,))
course_ids_data = cursor.fetchall()
course_ids = [row[0] for row in course_ids_data]
course_ids_dict = dict(course_ids_data)
if not os.path.exists('json'):
os.makedirs('json')
if not os.path.exists('course'):
os.makedirs('course')
for course_id in course_ids:
logging.info(f"Processing course ID: {course_id}")
json_filename = os.path.join('json', f'{course_id}.json')
if os.path.exists(json_filename):
logging.info(f"Course {course_id} JSON file already exists, using local file.")
with open(json_filename, 'r', encoding='utf-8') as json_file:
contents_data = json.load(json_file)
else:
response = requests.get(f'https://bandu-api.songy.info/v2/courses/{course_id}/contents', headers=headers)
contents_data = response.json()
with open(json_filename, 'w', encoding='utf-8') as save_json_file:
json.dump(contents_data, save_json_file, ensure_ascii=False, indent=4)
for item in contents_data['data']:
cursor.execute('''
INSERT INTO contents (id, course_id, content, category, audio_order, attachment_url, mime_type)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING
''', (item['id'], course_id, item['content'], item['category'], item['order'],
item['attachment']['url'] if item['attachment'] else None,
item['attachment']['mime_type'] if item['attachment'] else None))
conn.commit()
cursor.close()
conn.close()
for course_id in course_ids:
course_id_folder = os.path.join('course', str(course_id))
if not os.path.exists(course_id_folder):
os.makedirs(course_id_folder)
json_filename = os.path.join('json', f'{course_id}.json')
with open(json_filename, 'r', encoding='utf-8') as json_file:
contents_data = json.load(json_file)
course_audio_filename = os.path.join(course_id_folder, f'{course_ids_dict[course_id]}.mp3')
attachment_queue = Queue()
for attachment in [item['attachment'] for item in contents_data['data'] if item['attachment']]:
attachment_queue.put(attachment)
threads = []
for _ in range(max_download_threads):
t = Thread(target=worker,
args=(attachment_queue, course_id_folder, course_audio_filename, max_retry_attempts))
t.start()
threads.append(t)
attachment_queue.join()
for t in threads:
t.join()
audio_files = [item for item in contents_data['data'] if item['category'] == 'audio']
if audio_files:
audio_files.sort(key=lambda x: x['order'])
combined_audio_filename = os.path.join(course_id_folder, 'combined_audio.mp3')
if not os.path.exists(course_audio_filename):
text_file = os.path.join(course_id_folder, 'input_files.txt')
with open(text_file, 'w') as f:
for audio_file in audio_files:
f.write(f"file '{audio_file['attachment']['name']}'\n")
ffmpeg_command = f"ffmpeg -f concat -safe 0 -i {text_file} -c copy {combined_audio_filename}"
subprocess.run(ffmpeg_command, shell=True)
shutil.move(combined_audio_filename, course_audio_filename)
os.remove(text_file)
for item in audio_files:
audio_file_path = os.path.join(course_id_folder, item['attachment']['name'])
try:
os.remove(audio_file_path)
except:
logging.error('delete file fail')
for item in contents_data['data']:
attachment = item['attachment']
if attachment:
filename = os.path.join(course_id_folder, attachment['name'])
if os.path.exists(filename):
file_extension = attachment['name'].split('.')[-1].lower()
folder_name = f"{file_extension}"
if folder_name == 'mp3':
continue
folder_path = os.path.join(course_id_folder, folder_name)
if not os.path.exists(folder_path):
os.makedirs(folder_path)
move_file = os.path.join(folder_path, attachment['name'])
shutil.move(filename, move_file)
# 保存category为text的content到TXT文件
text_contents = [item['content'] for item in contents_data['data'] if item['category'] == 'text']
if text_contents:
with open(os.path.join(course_id_folder, f'{course_id}.txt'), 'w', encoding='utf-8') as txt_file:
for content in text_contents:
txt_file.write(content + '\n')
# 处理mp4文件
mp4_folder = os.path.join(course_id_folder, 'mp4')
mp4_file = None
exist_md_file = False
if os.path.exists(mp4_folder):
# 遍历指定文件夹内的所有文件和子文件夹
for root, dirs, files in os.walk(mp4_folder):
for file in files:
# 检查是否已经存在
if file.lower().endswith('.md'):
exist_md_file = True
# 检查文件扩展名是否为.mp4
if file.lower().endswith('.mp4'):
# 构建完整的 MP4 文件路径
mp4_file = os.path.join(root, file)
if (not exist_md_file) and mp4_file is not None:
# 调用 mp4_to_wav 函数进行转换
wav_file = convert_mp4(mp4_file)
if wav_file is not None:
try:
process_audio_file(wav_file)
except:
print('process_audio_file fail')
if __name__ == '__main__':
get_course()