import json import logging import os import sqlite3 from datetime import datetime from pathlib import Path import random from time import sleep import requests from pydub import AudioSegment from courses.Attachment import Attachment def get_course_id(date_file_path): with open(date_file_path, encoding="UTF-8") as file: course_json_data = file.read() # 解析JSON数据 data = json.loads(course_json_data) # 初始化一个空数组来存储id值 course_ids = [] # 遍历items列表 for item in data['data']['items']: # 提取id并添加到数组中 course_ids.append(item['id']) return course_ids # 通过request来爬取课程信息json数据 def request_date(course_id, request_token): url = 'https://bandu-api.songy.info/v2/courses/' + str(course_id) + '?expand=contents' headers = {"Authorization": "Bearer " + request_token} course_json = requests.get(url, headers=headers) return course_json.json() # 下载MP3文件并按顺序合并 def download_mp3(url, filename): response = requests.get(url, stream=True) if response.status_code == 200: with open(filename, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) else: print(f"Failed to download {url}, status code {response.status_code}") # 合并下载的MP3文件 def merge_mp3_files(mp3_files, output_filename): audio_segments = [] for file in mp3_files: audio = AudioSegment.from_file(file) audio_segments.append(audio) # 合并音频 merged_audio = AudioSegment.empty() for audio_segment in audio_segments: merged_audio += audio_segment # 导出合并后的音频 merged_audio.export(output_filename, format="mp3") def get_audio(audio_data): # 解析JSON数据 data = json.loads(audio_data) # 提取MP3链接 mp3_urls = [item["attachment"]["raw_url"] for item in data["data"]["course_contents"] if item["category"] == "audio"] # 准备下载和合并MP3文件 output_title = data["data"]["title"].replace(".", "_").replace("/", "_") # 替换文件名中不允许的字符 output_filename = Path(f"{output_title}.mp3") save_file_path = os.path.join(output_title, output_filename) temp_mp3_files = [Path(f"{idx}.mp3") for idx in range(len(mp3_urls))] # 创建对应title的文件夹 if not os.path.exists(output_title): os.makedirs(output_title) logging.info("Folder created") else: logging.info("Folder already exists") # 下载每个MP3文件 for url, filename in zip(mp3_urls, temp_mp3_files): download_mp3(url, filename) # 合并MP3文件 merge_mp3_files(temp_mp3_files, save_file_path) # 清理临时文件 for file in temp_mp3_files: file.unlink() logging.info( f"All MP3 files have been downloaded, merged into {output_filename}, and temporary files have been removed.") # 获取全部附件 def get_all_attachments(attachment_json_data): data = json.loads(attachment_json_data) attachments = [item for item in data["data"]["course_contents"]] attachmentlist = [] for attachment in attachments: attachment = Attachment(attachments[0]["id"], attachments[0]["course_id"], attachments[0]["content"], attachments[0]["content"], attachments[0]["attachment"].get("url")) print(attachment) attachmentlist.append(attachment) print(attachments) return attachmentlist def query_all_course(): conn = sqlite3.connect('course_database.db') print("数据库打开成功") c = conn.cursor() all_course_json = c.execute('SELECT JSON from JSON_DATA jd WHERE "TYPE" = "ALL"').fetchall() return all_course_json[0][0] def query_course_by_id(course_id): conn = sqlite3.connect('course_database.db') print("数据库打开成功") c = conn.cursor() all_course_json = c.execute('SELECT JSON from JSON_DATA jd WHERE ID = ' + str(course_id)).fetchall() # return re.sub(r'[\r\n]', '', str(all_course_json[0])) return all_course_json[0][0] # 保存课程json数据文件到数据库 def save_course_json(ids): conn = sqlite3.connect('course_database.db') print("数据库打开成功") c = conn.cursor() for id in ids: # if id > 7: # continue token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMDAwMDgzNDciLCJleHAiOjE3MTkxODk0ODQsImp0aSI6IjU3ZTJhMzdmLTMyZGEtNGQ2My1hZjQxLTY5NTRlNmU1OTg2MiIsImlhdCI6MTcxNjUxMTA4NCwiaXNzIjoiYXBwdXNlciIsInVpZCI6ImJlMmViOGIyLTFhOTItNGVmMC05ZDAwLTA1YTlkN2E2OWRiMiIsInNjaGVtZSI6Imp3dGhzIiwic2lkIjoiMWI4ZjE1ZTItYjQ5ZC00MmRmLWEwNDUtZmQxYTUwNzI5ZjkxIn0.IO7C2gtsi8lMdrOgWGNuxK-t2zzmDPvmI4BqISHeZEI" json_data = request_date(id, token) title = json_data["data"]["title"].replace(".", "_").replace("/", "_") created_at = datetime.fromisoformat(json_data["data"]["created_at"].replace('Z', '+00:00')) updated_at = datetime.fromisoformat(json_data["data"]["updated_at"].replace('Z', '+00:00')) # 插入JSON字符串到SQLite表中 c.execute("INSERT OR IGNORE INTO JSON_DATA (ID,JSON,TYPE,REMARK,CREATED_AT,UPDATED_AT) VALUES (?,?,?,?,?,?)", (id, json.dumps(json_data), "COURSE", title, created_at, updated_at)) conn.commit() secs = random.normalvariate(1, 0.4) if secs <= 0: secs = 1 # 太小则重置为平均值 sleep(secs) conn.close() if __name__ == '__main__': # ids = get_course_id('all/course.json') token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMDAwMDgzNDciLCJleHAiOjE3MTkxODk0ODQsImp0aSI6IjU3ZTJhMzdmLTMyZGEtNGQ2My1hZjQxLTY5NTRlNmU1OTg2MiIsImlhdCI6MTcxNjUxMTA4NCwiaXNzIjoiYXBwdXNlciIsInVpZCI6ImJlMmViOGIyLTFhOTItNGVmMC05ZDAwLTA1YTlkN2E2OWRiMiIsInNjaGVtZSI6Imp3dGhzIiwic2lkIjoiMWI4ZjE1ZTItYjQ5ZC00MmRmLWEwNDUtZmQxYTUwNzI5ZjkxIn0.IO7C2gtsi8lMdrOgWGNuxK-t2zzmDPvmI4BqISHeZEI" # json_data = request_date(ids[0], token) # json_data = query_course_by_id(488) # get_audio(json_data) # print(json_data) # get_all_attachments(json_data) ids = [489, 490, 491] save_course_json(ids)