Files
sspai-100-hours-series-python/code/01/bilibili.py

172 lines
4.3 KiB
Python

#!/usr/bin/env python3
# coding:utf-8
import itertools
import logging
import math
import pathlib
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, List, TypeVar
import pandas as pd
import requests
import requests_html
# --------------------
# Prerequisites
# --------------------
# internal types
_T = TypeVar("_T")
APIData = Dict[str, _T]
Records = List[APIData]
# api or data url
VIDEO_API_URL = "https://api.bilibili.com/x/space/arc/search"
VIDEO_PAGE_URL = "https://www.bilibili.com/video/{bvid}"
# request header
USER_AGENTS = [
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_2)",
"AppleWebKit/537.36 (KHTML, like Gecko)",
"Chrome/81.0.4044.92",
"Safari/537.36",
"Edg/81.0.416.53",
]
HEADERS = {"User-Agent": " ".join(USER_AGENTS)}
MID_NUMBER = "533459953"
params = {"ps": "30", "tid": "0", "order": "pubdate"}
# logger
logging.basicConfig(
level=logging.INFO,
format="[{asctime}]-[{levelname:<8}]-[{funcName}:{lineno}] - {message}",
datefmt="%Y-%m-%d %H:%M:%S",
style="{",
)
log = logging.getLogger(__name__)
# -----------------------
# Query and Handle Method
# -----------------------
def fetch_page_number(mid: str) -> int:
"""fetch total page number from API at first time query."""
total = 0
payloads = {"mid": mid, "pn": 1, **params}
with requests.Session() as sess:
response = sess.get(
url=VIDEO_API_URL,
headers=HEADERS,
params=payloads,
)
response.raise_for_status()
count = response.json()["data"]["page"]["count"]
total += math.ceil(int(count) / 30)
return total
def fetch_video_data(mid: str, page: int) -> List[APIData]:
"""fetch video data from API."""
payload = {"mid": mid, "pn": str(page), **params}
with requests.Session() as sess:
response = sess.get(
url=VIDEO_API_URL,
headers=HEADERS,
params=payload,
)
response.raise_for_status()
jsons = response.json()["data"]["list"]["vlist"]
log.info(f"fetch video from '{mid}' at {page} page.")
return jsons
async def fetch_stats(bvid: str, asess) -> APIData:
"""fetch like, coin, collect and share from video page."""
info = {}
stats = ["rank", "like", "coin", "collect", "share"]
response = await asess.get(
url=VIDEO_PAGE_URL.format(bvid=bvid),
headers=HEADERS,
)
response.raise_for_status()
html = response.html
has_rank = html.find(".video-data .rank", first=True)
if has_rank:
info["rank"] = has_rank.text.strip()
try:
info["like"] = html.find(".toolbar-left .like", first=True).text.strip()
info["coin"] = html.find(".toolbar-left .coin", first=True).text.strip()
info["collect"] = html.find(".toolbar-left .collect", first=True).text.strip()
info["share"] = html.find(".toolbar-left .share", first=True).text.strip()
except AttributeError:
log.warning(f"cant' get stats from '{bvid}', use default.")
return {k: "" for k in stats}
log.info(f"fetch stats from '{bvid}'.")
return info
async def bundle(json, asess) -> APIData:
"""bundle json data with stats."""
bvid = json["bvid"]
stats = await fetch_stats(bvid, asess)
info = {**json, **stats}
return info
def query(mid: str) -> Records:
"""query data by mid number."""
log.info(f"querying data from '{mid}'...")
total_page = fetch_page_number(mid)
with ThreadPoolExecutor(max_workers=2) as p:
features = [
p.submit(fetch_video_data, mid=mid, page=page)
for page in range(1, total_page + 1)
]
jsons = itertools.chain(*[f.result() for f in features])
# async session for html request
asess = requests_html.AsyncHTMLSession(workers=2)
# compatible with requests-html async coroutine codes
# see: https://github.com/psf/requests-html/issues/362
results = asess.run(
*[lambda json=json, asess=asess: bundle(json, asess) for json in jsons]
)
return results
def parse(jsons: Records) -> pd.DataFrame:
"""normalize and combine json data."""
return pd.json_normalize(jsons)
def main():
csvfile = pathlib.Path("~/Desktop/bilibili.csv").expanduser()
jsons = query(MID_NUMBER)
data = parse(jsons)
data.to_csv(csvfile, index=False)
if __name__ == "__main__":
main()