import json import logging import os from pathlib import Path import requests from pydub import AudioSegment from courses.Attachment import Attachment def get_course_id(date_file_path): with open(date_file_path, encoding="UTF-8") as file: course_json_data = file.read() # 解析JSON数据 data = json.loads(course_json_data) # 初始化一个空数组来存储id值 course_ids = [] # 遍历items列表 for item in data['data']['items']: # 提取id并添加到数组中 course_ids.append(item['id']) return course_ids # 通过request来爬取课程信息json数据 def request_date(course_id, request_token): url = 'https://bandu-api.songy.info/v2/courses/' + str(course_id) + '?expand=contents' headers = {"Authorization": "Bearer " + request_token} course_json = requests.get(url, headers=headers) return course_json.content # 下载MP3文件并按顺序合并 def download_mp3(url, filename): response = requests.get(url, stream=True) if response.status_code == 200: with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) else: print(f"Failed to download {url}, status code {response.status_code}") # 合并下载的MP3文件 def merge_mp3_files(mp3_files, output_filename): audio_segments = [] for file in mp3_files: audio = AudioSegment.from_file(file) audio_segments.append(audio) # 合并音频 merged_audio = AudioSegment.empty() for audio_segment in audio_segments: merged_audio += audio_segment # 导出合并后的音频 merged_audio.export(output_filename, format="mp3") def get_audio(audio_data): # 解析JSON数据 data = json.loads(audio_data) # 提取MP3链接 mp3_urls = [item["attachment"]["raw_url"] for item in data["data"]["course_contents"] if item["category"] == "audio"] # 准备下载和合并MP3文件 output_title = data["data"]["title"].replace(".", "_").replace("/", "_") # 替换文件名中不允许的字符 output_filename = Path(f"{output_title}.mp3") save_file_path = os.path.join(output_title, output_filename) temp_mp3_files = [Path(f"{idx}.mp3") for idx in range(len(mp3_urls))] # 创建对应title的文件夹 if not os.path.exists(output_title): os.makedirs(output_title) logging.info("Folder created") else: logging.info("Folder already exists") # 下载每个MP3文件 for url, filename in zip(mp3_urls, temp_mp3_files): download_mp3(url, filename) # 合并MP3文件 merge_mp3_files(temp_mp3_files, save_file_path) # 清理临时文件 for file in temp_mp3_files: file.unlink() logging.info(f"All MP3 files have been downloaded, merged into {output_filename}, and temporary files have been removed.") # 获取全部附件 def get_all_attachments(attachment_json_data): data = json.loads(attachment_json_data) attachments = [item for item in data["data"]["course_contents"]] attachmentlist = [] for attachment in attachments: attachment = Attachment(attachments[0]["id"], attachments[0]["course_id"], attachments[0]["content"], attachments[0]["content"], attachments[0]["attachment"].get("url")) print(attachment) attachmentlist.append(attachment) print(attachments) return attachmentlist if __name__ == '__main__': ids = get_course_id('all/course.json') token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMDAwMDgzNDciLCJleHAiOjE3MTkxODk0ODQsImp0aSI6IjU3ZTJhMzdmLTMyZGEtNGQ2My1hZjQxLTY5NTRlNmU1OTg2MiIsImlhdCI6MTcxNjUxMTA4NCwiaXNzIjoiYXBwdXNlciIsInVpZCI6ImJlMmViOGIyLTFhOTItNGVmMC05ZDAwLTA1YTlkN2E2OWRiMiIsInNjaGVtZSI6Imp3dGhzIiwic2lkIjoiMWI4ZjE1ZTItYjQ5ZC00MmRmLWEwNDUtZmQxYTUwNzI5ZjkxIn0.IO7C2gtsi8lMdrOgWGNuxK-t2zzmDPvmI4BqISHeZEI" json_data = request_date(ids[0], token) #get_audio(json_data) get_all_attachments(json_data)