167 lines
6.2 KiB
Python
Executable File
167 lines
6.2 KiB
Python
Executable File
import json
|
|
import logging
|
|
import os
|
|
import sqlite3
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
import random
|
|
from time import sleep
|
|
|
|
import requests
|
|
from pydub import AudioSegment
|
|
|
|
from courses.Attachment import Attachment
|
|
|
|
|
|
def get_course_id(date_file_path):
|
|
with open(date_file_path, encoding="UTF-8") as file:
|
|
course_json_data = file.read()
|
|
# 解析JSON数据
|
|
data = json.loads(course_json_data)
|
|
# 初始化一个空数组来存储id值
|
|
course_ids = []
|
|
# 遍历items列表
|
|
for item in data['data']['items']:
|
|
# 提取id并添加到数组中
|
|
course_ids.append(item['id'])
|
|
return course_ids
|
|
|
|
|
|
# 通过request来爬取课程信息json数据
|
|
def request_date(course_id, request_token):
|
|
url = 'https://bandu-api.songy.info/v2/courses/' + str(course_id) + '?expand=contents'
|
|
headers = {"Authorization": "Bearer " + request_token}
|
|
course_json = requests.get(url, headers=headers)
|
|
return course_json.json()
|
|
|
|
|
|
# 下载MP3文件并按顺序合并
|
|
def download_mp3(url, filename):
|
|
response = requests.get(url, stream=True)
|
|
if response.status_code == 200:
|
|
with open(filename, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
f.write(chunk)
|
|
else:
|
|
print(f"Failed to download {url}, status code {response.status_code}")
|
|
|
|
|
|
# 合并下载的MP3文件
|
|
def merge_mp3_files(mp3_files, output_filename):
|
|
audio_segments = []
|
|
for file in mp3_files:
|
|
audio = AudioSegment.from_file(file)
|
|
audio_segments.append(audio)
|
|
|
|
# 合并音频
|
|
merged_audio = AudioSegment.empty()
|
|
for audio_segment in audio_segments:
|
|
merged_audio += audio_segment
|
|
|
|
# 导出合并后的音频
|
|
merged_audio.export(output_filename, format="mp3")
|
|
|
|
|
|
def get_audio(audio_data):
|
|
# 解析JSON数据
|
|
data = json.loads(audio_data)
|
|
|
|
# 提取MP3链接
|
|
mp3_urls = [item["attachment"]["raw_url"] for item in data["data"]["course_contents"] if
|
|
item["category"] == "audio"]
|
|
|
|
# 准备下载和合并MP3文件
|
|
output_title = data["data"]["title"].replace(".", "_").replace("/", "_") # 替换文件名中不允许的字符
|
|
output_filename = Path(f"{output_title}.mp3")
|
|
save_file_path = os.path.join(output_title, output_filename)
|
|
temp_mp3_files = [Path(f"{idx}.mp3") for idx in range(len(mp3_urls))]
|
|
|
|
# 创建对应title的文件夹
|
|
if not os.path.exists(output_title):
|
|
os.makedirs(output_title)
|
|
logging.info("Folder created")
|
|
else:
|
|
logging.info("Folder already exists")
|
|
|
|
# 下载每个MP3文件
|
|
for url, filename in zip(mp3_urls, temp_mp3_files):
|
|
download_mp3(url, filename)
|
|
|
|
# 合并MP3文件
|
|
merge_mp3_files(temp_mp3_files, save_file_path)
|
|
|
|
# 清理临时文件
|
|
for file in temp_mp3_files:
|
|
file.unlink()
|
|
|
|
logging.info(
|
|
f"All MP3 files have been downloaded, merged into {output_filename}, and temporary files have been removed.")
|
|
|
|
|
|
# 获取全部附件
|
|
def get_all_attachments(attachment_json_data):
|
|
data = json.loads(attachment_json_data)
|
|
attachments = [item for item in data["data"]["course_contents"]]
|
|
attachmentlist = []
|
|
for attachment in attachments:
|
|
attachment = Attachment(attachments[0]["id"], attachments[0]["course_id"], attachments[0]["content"],
|
|
attachments[0]["content"], attachments[0]["attachment"].get("url"))
|
|
print(attachment)
|
|
attachmentlist.append(attachment)
|
|
print(attachments)
|
|
return attachmentlist
|
|
|
|
|
|
def query_all_course():
|
|
conn = sqlite3.connect('course_database.db')
|
|
print("数据库打开成功")
|
|
c = conn.cursor()
|
|
all_course_json = c.execute('SELECT JSON from JSON_DATA jd WHERE "TYPE" = "ALL"').fetchall()
|
|
return all_course_json[0][0]
|
|
|
|
|
|
def query_course_by_id(course_id):
|
|
conn = sqlite3.connect('course_database.db')
|
|
print("数据库打开成功")
|
|
c = conn.cursor()
|
|
all_course_json = c.execute('SELECT JSON from JSON_DATA jd WHERE ID = ' + str(course_id)).fetchall()
|
|
# return re.sub(r'[\r\n]', '', str(all_course_json[0]))
|
|
return all_course_json[0][0]
|
|
|
|
|
|
# 保存课程json数据文件到数据库
|
|
def save_course_json(ids):
|
|
conn = sqlite3.connect('course_database.db')
|
|
print("数据库打开成功")
|
|
c = conn.cursor()
|
|
for id in ids:
|
|
# if id > 7:
|
|
# continue
|
|
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMDAwMDgzNDciLCJleHAiOjE3MTkxODk0ODQsImp0aSI6IjU3ZTJhMzdmLTMyZGEtNGQ2My1hZjQxLTY5NTRlNmU1OTg2MiIsImlhdCI6MTcxNjUxMTA4NCwiaXNzIjoiYXBwdXNlciIsInVpZCI6ImJlMmViOGIyLTFhOTItNGVmMC05ZDAwLTA1YTlkN2E2OWRiMiIsInNjaGVtZSI6Imp3dGhzIiwic2lkIjoiMWI4ZjE1ZTItYjQ5ZC00MmRmLWEwNDUtZmQxYTUwNzI5ZjkxIn0.IO7C2gtsi8lMdrOgWGNuxK-t2zzmDPvmI4BqISHeZEI"
|
|
json_data = request_date(id, token)
|
|
title = json_data["data"]["title"].replace(".", "_").replace("/", "_")
|
|
created_at = datetime.fromisoformat(json_data["data"]["created_at"].replace('Z', '+00:00'))
|
|
updated_at = datetime.fromisoformat(json_data["data"]["updated_at"].replace('Z', '+00:00'))
|
|
|
|
# 插入JSON字符串到SQLite表中
|
|
c.execute("INSERT OR IGNORE INTO JSON_DATA (ID,JSON,TYPE,REMARK,CREATED_AT,UPDATED_AT) VALUES (?,?,?,?,?,?)",
|
|
(id, json.dumps(json_data), "COURSE", title, created_at, updated_at))
|
|
conn.commit()
|
|
secs = random.normalvariate(1, 0.4)
|
|
if secs <= 0:
|
|
secs = 1 # 太小则重置为平均值
|
|
sleep(secs)
|
|
conn.close()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
# ids = get_course_id('all/course.json')
|
|
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiIxMDAwMDgzNDciLCJleHAiOjE3MTkxODk0ODQsImp0aSI6IjU3ZTJhMzdmLTMyZGEtNGQ2My1hZjQxLTY5NTRlNmU1OTg2MiIsImlhdCI6MTcxNjUxMTA4NCwiaXNzIjoiYXBwdXNlciIsInVpZCI6ImJlMmViOGIyLTFhOTItNGVmMC05ZDAwLTA1YTlkN2E2OWRiMiIsInNjaGVtZSI6Imp3dGhzIiwic2lkIjoiMWI4ZjE1ZTItYjQ5ZC00MmRmLWEwNDUtZmQxYTUwNzI5ZjkxIn0.IO7C2gtsi8lMdrOgWGNuxK-t2zzmDPvmI4BqISHeZEI"
|
|
# json_data = request_date(ids[0], token)
|
|
# json_data = query_course_by_id(488)
|
|
# get_audio(json_data)
|
|
# print(json_data)
|
|
# get_all_attachments(json_data)
|
|
ids = [489, 490, 491]
|
|
save_course_json(ids)
|