{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# 爬取定投课堂资料\n", "通过解析课程json文件,获取对应资源的url从而下载所有资料\n", "获取课程id" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "ExecuteTime": { "end_time": "2023-07-05T10:56:39.054587Z", "start_time": "2023-07-05T10:56:34.185665600Z" } }, "outputs": [ { "ename": "KeyError", "evalue": "1", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mKeyError\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn[3], line 13\u001b[0m\n\u001b[1;32m 9\u001b[0m audios \u001b[38;5;241m=\u001b[39m json\u001b[38;5;241m.\u001b[39mload(f)\n\u001b[1;32m 10\u001b[0m \u001b[38;5;66;03m# print(type(audios[1])) # Output: dict\u001b[39;00m\n\u001b[1;32m 11\u001b[0m \u001b[38;5;66;03m# TODO: 优化代码\u001b[39;00m\n\u001b[0;32m---> 13\u001b[0m course_id \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mstr\u001b[39m(\u001b[43maudios\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m]\u001b[49m[\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mcourse_id\u001b[39m\u001b[38;5;124m'\u001b[39m])\n\u001b[1;32m 14\u001b[0m os\u001b[38;5;241m.\u001b[39mmakedirs(course_id, exist_ok\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m)\n\u001b[1;32m 16\u001b[0m \u001b[38;5;66;03m# 遍历\u001b[39;00m\n", "\u001b[0;31mKeyError\u001b[0m: 1" ] } ], "source": [ "import json\n", "\n", "import requests\n", "from IPython.display import Audio,display\n", "from pydub import AudioSegment\n", "import os\n", "\n", "with open('audio.json', encoding=\"UTF-8\") as f:\n", " audios = json.load(f)\n", "# print(type(audios[1])) # Output: dict\n", "# TODO: 优化代码\n", "\n", "course_id = str(audios[1]['course_id'])\n", "os.makedirs(course_id, exist_ok=True)\n", "\n", "# 遍历\n", "audio_list = []\n", "for audio in audios:\n", " # t = random.randint(1, 10)\n", " # time.sleep(1)\n", " category = audio['category']\n", " # requests.adapters.DEFAULT_RETRIES = 100\n", " # 获取音频内容\n", " i = 1\n", " while(i >= 1):\n", " try:\n", " if category == \"PLAIN_AUDIO\":\n", " url = audio['attachment']['url']\n", " # print(url)\n", " audio = requests.get(url)\n", " # print(audio.status_code)\n", " # display(Audio(audio.content))\n", " audio_list.append(audio.content)\n", " # print(audio['attachment']['url'])\n", " # filename = os.path.basename(url)\n", " # with open(os.path.join(course_id, filename), 'wb') as file:\n", " # file.write(audio.content)\n", " # 获取文本笔记\n", " elif category == \"PLAIN_TEXT\":\n", " text = audio['text']\n", " print(text)\n", " with open(os.path.join(course_id, course_id + 'note.txt'), 'a') as file:\n", " file.write(text)\n", " file.write(\"\\n\")\n", " # 获取其他可下载附件\n", " elif category != \"MESSAGE_RECALL\":\n", " # print(audio['category'])\n", " if 'attachment' in audio:\n", " url = audio['attachment']['url']\n", " # print(audio['attachment']['url'])\n", " attachment = requests.get(url)\n", " filename = os.path.basename(url)\n", " with open(os.path.join(course_id, filename), 'wb') as file:\n", " file.write(attachment.content)\n", " # 获取其他内容\n", " else:\n", " print(audio['category'])\n", " if 'attachment' in audio:\n", " print(audio['attachment']['url'])\n", " i = 0\n", " except:\n", " i += 1\n", " print(\"get file failed\")\n", " if 'attachment' in audio:\n", " print(audio['attachment']['url'])\n", "\n", "# 处理获取所有音频文件\n", "audio_seg_list = []\n", "for i in audio_list:\n", " with open('temp.mp3', 'wb') as file:\n", " file.write(i)\n", " audio_part = AudioSegment.from_mp3('temp.mp3')\n", " # audio_part = AudioSegment.from_mp3(i)\n", " audio_seg_list.append(audio_part)\n", "\n", "# print(len(audio_seg_list))\n", "x = sum(audio_seg_list)\n", "\n", "# 递归求和\n", "# def sumOfList(list, size):\n", "# if (size == 0):\n", "# return 0\n", "# else:\n", "# return list[size - 1] + sumOfList(list, size - 1)\n", "#\n", "# total = sumOfList(list1, len(list1))\n", "\n", "audio_name = course_id + '/' + course_id + '.mp3'\n", "\n", "x.export(audio_name, format=\"mp3\")\n", "\n", "display(Audio(audio_name))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 获取图片url" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "ExecuteTime": { "end_time": "2023-07-05T10:56:34.184660400Z", "start_time": "2023-07-05T10:56:34.073384900Z" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "https://xuexi-courses-storage.firesbox.com/7000102069/replay/78b6da46-ecd8-46fb-a857-423ca6da8196.png\n" ] } ], "source": [ "# get all url and text\n", "import json\n", "import requests\n", "from IPython.display import Audio,display\n", "from pydub import AudioSegment\n", "import os\n", "\n", "# get pic url list\n", "\n", "\n", "with open('audio.json', encoding= \"UTF-8\") as f:\n", " audios = json.load(f)\n", "# print(type(audios[1])) # Output: dict\n", "\n", "course_id = str(audios[1]['course_id'])\n", "# os.makedirs(course_id, exist_ok=True)\n", "\n", "audio_list = []\n", "for audio in audios:\n", " category = audio['category']\n", " if category == \"PLAIN_IMAGE\":\n", " url = audio['attachment']['url']\n", " print(url)\n", " # with open( course_id + '_pic_url.txt', 'a') as file:\n", " # file.write(url)\n", " # file.write(\"\\n\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.7" } }, "nbformat": 4, "nbformat_minor": 1 }