Files
dt_audio/courses/parse_course.py
2024-05-26 09:36:58 +08:00

99 lines
3.4 KiB
Python

import json
import logging
import os
from pathlib import Path
import requests
from pydub import AudioSegment
def get_course_id(date_file_path):
with open(date_file_path, encoding="UTF-8") as file:
course_json_data = file.read()
# 解析JSON数据
data = json.loads(course_json_data)
# 初始化一个空数组来存储id值
course_ids = []
# 遍历items列表
for item in data['data']['items']:
# 提取id并添加到数组中
course_ids.append(item['id'])
return course_ids
# 通过request来爬取课程信息json数据
def request_date(course_id, request_token):
url = 'https://bandu-api.songy.info/v2/courses/' + str(course_id) + '?expand=contents'
headers = {"Authorization": "Bearer " + request_token}
course_json = requests.get(url, headers=headers)
return course_json.content
# 下载MP3文件并按顺序合并
def download_mp3(url, filename):
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(filename, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
else:
print(f"Failed to download {url}, status code {response.status_code}")
# 合并下载的MP3文件
def merge_mp3_files(mp3_files, output_filename):
audio_segments = []
for file in mp3_files:
audio = AudioSegment.from_file(file)
audio_segments.append(audio)
# 合并音频
merged_audio = AudioSegment.empty()
for audio_segment in audio_segments:
merged_audio += audio_segment
# 导出合并后的音频
merged_audio.export(output_filename, format="mp3")
def get_audio(audio_data):
# 解析JSON数据
data = json.loads(audio_data)
# 提取MP3链接
mp3_urls = [item["attachment"]["raw_url"] for item in data["data"]["course_contents"] if
item["category"] == "audio"]
# 准备下载和合并MP3文件
output_title = data["data"]["title"].replace(".", "_").replace("/", "_") # 替换文件名中不允许的字符
output_filename = Path(f"{output_title}.mp3")
save_file_path = os.path.join(output_title, output_filename)
temp_mp3_files = [Path(f"{idx}.mp3") for idx in range(len(mp3_urls))]
# 创建对应title的文件夹
if not os.path.exists(output_title):
os.makedirs(output_title)
logging.info("Folder created")
else:
logging.info("Folder already exists")
# 下载每个MP3文件
for url, filename in zip(mp3_urls, temp_mp3_files):
download_mp3(url, filename)
# 合并MP3文件
merge_mp3_files(temp_mp3_files, save_file_path)
# 清理临时文件
for file in temp_mp3_files:
file.unlink()
logging.info(f"All MP3 files have been downloaded, merged into {output_filename}, and temporary files have been removed.")
if __name__ == '__main__':
ids = get_course_id('all/course.json')
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMDAwMDgzNDciLCJleHAiOjE3MTkxODk0ODQsImp0aSI6IjU3ZTJhMzdmLTMyZGEtNGQ2My1hZjQxLTY5NTRlNmU1OTg2MiIsImlhdCI6MTcxNjUxMTA4NCwiaXNzIjoiYXBwdXNlciIsInVpZCI6ImJlMmViOGIyLTFhOTItNGVmMC05ZDAwLTA1YTlkN2E2OWRiMiIsInNjaGVtZSI6Imp3dGhzIiwic2lkIjoiMWI4ZjE1ZTItYjQ5ZC00MmRmLWEwNDUtZmQxYTUwNzI5ZjkxIn0.IO7C2gtsi8lMdrOgWGNuxK-t2zzmDPvmI4BqISHeZEI"
json_data = request_date(ids[0], token)
get_audio(json_data)