feat(projects): 新增Scrapy综合案例示例代码

This commit is contained in:
100gle
2022-07-13 16:21:55 +08:00
parent 7e44811195
commit 5cf5991d2d
14 changed files with 954 additions and 0 deletions

View File

@@ -0,0 +1,49 @@
import re
from datetime import datetime
pats = dict(
rank=re.compile(r"(\d+)"),
length=re.compile(r"(?P<minute>\d{2}):(?P<second>\d{2})"),
)
def parse_rank(value):
matched = pats["rank"].search(value.strip())
if matched:
number = matched.group()
return int(number)
return 0
def parse_ops(value):
if not value:
return 0
value = value.strip()
if "" in value:
digits = float(value.replace("", "")) * 10000
return int(digits)
else:
return int(value)
def parse_length(value):
length = value.strip()
pat = pats["length"]
matched = pat.search(length).groupdict()
if matched:
minute = int(matched["minute"])
second = int(matched["second"])
total = minute * 60 + second
else:
total = 0
return total
def parse_timestamp(value):
timestamp = int(value)
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")

View File

@@ -0,0 +1,38 @@
from itemloaders import ItemLoader
from itemloaders.processors import Compose, TakeFirst
from scrapy import Field, Item
# isort:skip_file
from bilibili_crawler.helper import (
parse_length,
parse_rank,
parse_ops,
parse_timestamp,
)
class DefaultLoader(ItemLoader):
default_output_processor = TakeFirst()
class APIData(Item):
title = Field()
play = Field()
comment = Field()
typeid = Field()
author = Field()
mid = Field()
created = Field(output_processor=Compose(TakeFirst(), parse_timestamp))
length = Field(output_processor=Compose(TakeFirst(), parse_length))
bvid = Field()
class VideoData(Item):
rank = Field(output_processor=Compose(TakeFirst(), parse_rank))
like = Field(output_processor=Compose(TakeFirst(), parse_ops))
coin = Field(output_processor=Compose(TakeFirst(), parse_ops))
collect = Field(output_processor=Compose(TakeFirst(), parse_ops))
share = Field(output_processor=Compose(TakeFirst(), parse_ops))

View File

@@ -0,0 +1,103 @@
# Define here the models for your spider middleware
#
# See documentation in:
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
from scrapy import signals
# useful for handling different item types with a single interface
from itemadapter import is_item, ItemAdapter
class BilibiliCrawlerSpiderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the spider middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
# Called for each response that goes through the spider
# middleware and into the spider.
# Should return None or raise an exception.
return None
def process_spider_output(self, response, result, spider):
# Called with the results returned from the Spider, after
# it has processed the response.
# Must return an iterable of Request, or item objects.
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
# Called when a spider or process_spider_input() method
# (from other spider middleware) raises an exception.
# Should return either None or an iterable of Request or item objects.
pass
def process_start_requests(self, start_requests, spider):
# Called with the start requests of the spider, and works
# similarly to the process_spider_output() method, except
# that it doesnt have a response associated.
# Must return only requests (not items).
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
class BilibiliCrawlerDownloaderMiddleware:
# Not all methods need to be defined. If a method is not defined,
# scrapy acts as if the downloader middleware does not modify the
# passed objects.
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_request(self, request, spider):
# Called for each request that goes through the downloader
# middleware.
# Must either:
# - return None: continue processing this request
# - or return a Response object
# - or return a Request object
# - or raise IgnoreRequest: process_exception() methods of
# installed downloader middleware will be called
return None
def process_response(self, request, response, spider):
# Called with the response returned from the downloader.
# Must either;
# - return a Response object
# - return a Request object
# - or raise IgnoreRequest
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)

View File

@@ -0,0 +1,13 @@
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
from itemadapter import ItemAdapter
class BilibiliCrawlerPipeline:
def process_item(self, item, spider):
return item

View File

@@ -0,0 +1,98 @@
# Scrapy settings for bilibili_crawler project
#
# For simplicity, this file contains only settings considered important or
# commonly used. You can find more settings consulting the documentation:
#
# https://docs.scrapy.org/en/latest/topics/settings.html
# https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# https://docs.scrapy.org/en/latest/topics/spider-middleware.html
BOT_NAME = "bilibili_crawler"
SPIDER_MODULES = ["bilibili_crawler.spiders"]
NEWSPIDER_MODULE = "bilibili_crawler.spiders"
# Crawl responsibly by identifying yourself (and your website) on the user-agent
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# Configure maximum concurrent requests performed by Scrapy (default: 16)
CONCURRENT_REQUESTS = 10
# Configure a delay for requests for the same website (default: 0)
# See https://docs.scrapy.org/en/latest/topics/settings.html#download-delay
# See also autothrottle settings and docs
# DOWNLOAD_DELAY = 3
# The download delay setting will honor only one of:
# CONCURRENT_REQUESTS_PER_DOMAIN = 16
# CONCURRENT_REQUESTS_PER_IP = 16
# Disable cookies (enabled by default)
COOKIES_ENABLED = False
# Disable Telnet Console (enabled by default)
# TELNETCONSOLE_ENABLED = False
# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"accept-encoding": "gzip, deflate, br",
"origin": "https://www.bilibili.com",
"user-agent": " ".join(
[
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
"AppleWebKit/537.36 (KHTML, like Gecko)",
"Chrome/101.0.4951.64",
"Safari/537.36",
]
),
}
# Enable or disable spider middlewares
# See https://docs.scrapy.org/en/latest/topics/spider-middleware.html
# SPIDER_MIDDLEWARES = {
# 'bilibili_crawler.middlewares.BilibiliCrawlerSpiderMiddleware': 543,
# }
# Enable or disable downloader middlewares
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html
# DOWNLOADER_MIDDLEWARES = {
# 'bilibili_crawler.middlewares.BilibiliCrawlerDownloaderMiddleware': 543,
# }
# Enable or disable extensions
# See https://docs.scrapy.org/en/latest/topics/extensions.html
# EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
# }
# Configure item pipelines
# See https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# ITEM_PIPELINES = {
# 'bilibili_crawler.pipelines.BilibiliCrawlerPipeline': 300,
# }
# Enable and configure the AutoThrottle extension (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/autothrottle.html
AUTOTHROTTLE_ENABLED = True
# The initial download delay
# AUTOTHROTTLE_START_DELAY = 5
# The maximum download delay to be set in case of high latencies
# AUTOTHROTTLE_MAX_DELAY = 60
# The average number of requests Scrapy should be sending in parallel to
# each remote server
# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# Enable showing throttling stats for every response received:
AUTOTHROTTLE_DEBUG = True
# Enable and configure HTTP caching (disabled by default)
# See https://docs.scrapy.org/en/latest/topics/downloader-middleware.html#httpcache-middleware-settings
# HTTPCACHE_ENABLED = True
# HTTPCACHE_EXPIRATION_SECS = 0
# HTTPCACHE_DIR = 'httpcache'
# HTTPCACHE_IGNORE_HTTP_CODES = []
# HTTPCACHE_STORAGE = 'scrapy.extensions.httpcache.FilesystemCacheStorage'

View File

@@ -0,0 +1,4 @@
# This package will contain the spiders of your Scrapy project
#
# Please refer to the documentation for information on how to create and manage
# your spiders.

View File

@@ -0,0 +1,69 @@
import math
from urllib.parse import urlencode
import scrapy
from bilibili_crawler.items import APIData, DefaultLoader, VideoData
VIDEO_PAGE_URL = "https://www.bilibili.com/video/{bvid}"
API_URL = "https://api.bilibili.com/x/space/arc/search?"
API_QUERY_PARAMS = {
"mid": "533459953",
"ps": "30",
"tid": "0",
"order": "pubdate",
}
class BilibiliSpider(scrapy.Spider):
name = "bilibili"
start_urls = [
API_URL + urlencode({"pn": "1", **API_QUERY_PARAMS}),
]
def parse(self, response, **kwargs):
jsons = response.json()
count = jsons["data"]["page"]["count"]
total = math.ceil(int(count) / 30)
for page in range(1, total + 1):
url = API_URL + urlencode({"pn": str(page), **API_QUERY_PARAMS})
yield scrapy.Request(url=url, callback=self.parse_api)
def parse_api(self, response, **kwargs):
jsons = response.json()
api_data = jsons["data"]["list"]["vlist"]
for data in api_data:
bvid = data["bvid"]
loader = DefaultLoader(item=APIData())
for k in APIData.fields.keys():
loader.add_value(k, data[k])
yield scrapy.Request(
url=VIDEO_PAGE_URL.format(bvid=bvid),
callback=self.parse_video_data,
cb_kwargs={"api_data": loader.load_item()},
)
def parse_video_data(self, response, api_data, **kwargs):
loader = DefaultLoader(
item=VideoData(), response=response, selector=response.selector
)
loader.add_css("like", ".ops .like::text")
loader.add_css("coin", ".ops .coin::text")
loader.add_css("collect", ".ops .collect::text")
loader.add_css("share", ".ops .share::text")
loader.add_css("rank", ".video-data .rank::text")
video_data = loader.load_item()
data = dict(**api_data, **video_data)
yield data
if __name__ == "__main__":
from scrapy.crawler import CrawlerProcess
process = CrawlerProcess()
process.crawl(BilibiliSpider)
process.start()

View File

@@ -0,0 +1,11 @@
# Automatically created by: scrapy startproject
#
# For more information about the [deploy] section see:
# https://scrapyd.readthedocs.io/en/latest/deploy.html
[settings]
default = bilibili_crawler.settings
[deploy]
#url = http://localhost:6800/
project = bilibili_crawler

View File

@@ -0,0 +1,39 @@
class Order:
def __init__(self, item, telephone):
self.item = item
self.telephone = telephone
def query_item_number(name):
print(f"Query database for {name}...")
if name == "Macbook Pro M1":
return 0
def notify(telephone, message):
print(f"Send message to {telephone}: {message}")
def register(order, callback=None):
number = query_item_number(order.item)
if number == 0:
message = "Sorry, we are out of stock."
if callback:
callback(order.telephone, message=message)
return "register failed."
return "register success."
def main():
order = Order(item="Macbook Pro M1", telephone="000-0001")
status = register(order, callback=notify)
print(f"Register status: {status}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,39 @@
class GrandFather:
genre = "sports"
gender = "male"
def __init__(self):
pass
class GrandMother:
gender = "female"
def __init__(self):
pass
class Father(GrandFather, GrandMother):
gender = "male"
def __init__(self):
print("Father")
class Mother(GrandFather, GrandMother):
genre = "music"
gender = "female"
def __init__(self):
print("Mother")
class Child(Father, Mother):
def __init__(self):
pass
if __name__ == '__main__':
child = Child()
print(child.genre)
print(child.gender)

View File

@@ -0,0 +1,400 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"# Regular Expression"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['Hello, world']\n"
]
}
],
"source": [
"import re\n",
"\n",
"html = \"\"\"\n",
"<!DOCTYPE html>\n",
"<html lang=\"en\">\n",
" <head>\n",
" <meta charset=\"UTF-8\" />\n",
" <title>Hello, world</title>\n",
" </head>\n",
" <body>\n",
" <h1>从 HTML 提取数据的方式</h1>\n",
" <ul>\n",
" <li>正则表达式</li>\n",
" <li>CSS 选择器</li>\n",
" <li>XPath</li>\n",
" </ul>\n",
" </body>\n",
"</html>\n",
"\"\"\"\n",
"\n",
"\n",
"pattern = r\"<title>(.*)</title>\"\n",
"head_title = re.findall(pattern, html)\n",
"print(head_title)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['正则表达式</li>\\n <li>CSS 选择器</li>\\n <li>XPath']\n"
]
}
],
"source": [
"# greedy mode\n",
"\n",
"li_pattern = r\"<li>(.*)</li>\"\n",
"print(re.findall(li_pattern, html, re.DOTALL))"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['正则表达式', 'CSS 选择器', 'XPath']\n"
]
}
],
"source": [
"# non-greedy mode\n",
"\n",
"li_pattern = r\"<li>(.*?)</li>\"\n",
"print(re.findall(li_pattern, html, re.DOTALL))"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"# CSS Selector"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"# uncomment below command to install beautifulsoup\n",
"# if you haven't install it yet\n",
"\n",
"# !pip install beautifulsoup4"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[<li id=\"css-selector\">CSS 选择器</li>]\n"
]
}
],
"source": [
"from bs4 import BeautifulSoup\n",
"\n",
"html = \"\"\"\n",
"<!DOCTYPE html>\n",
"<html lang=\"en\">\n",
" <head>\n",
" <meta charset=\"UTF-8\" />\n",
" <title>Hello, world</title>\n",
" </head>\n",
" <body>\n",
" <h1>从 HTML 提取数据的方式有哪些</h1>\n",
" <ul>\n",
" <li class=\"option\">正则表达式</li>\n",
" <li id=\"css-selector\">CSS 选择器</li>\n",
" <li class=\"option\">XPath</li>\n",
" <li>...</li>\n",
" </ul>\n",
" </body>\n",
"</html>\n",
"\"\"\"\n",
"soup = BeautifulSoup(html)\n",
"css = soup.select(\"li#css-selector\")\n",
"print(css)\n"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[<li class=\"option\">正则表达式</li>, <li class=\"option\">XPath</li>]\n"
]
}
],
"source": [
"option = soup.select(\"li.option\")\n",
"print(option)"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['正则表达式', 'CSS 选择器', 'XPath', '...']\n"
]
}
],
"source": [
"li_content = [li.text for li in soup.select(\"li\")]\n",
"print(li_content)"
]
},
{
"cell_type": "markdown",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"source": [
"# XPath"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# uncomment below command to install beautifulsoup\n",
"# if you haven't install it yet\n",
"\n",
"# !pip install lxml"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['<li class=\"option\">正则表达式</li>\\n ', '<li id=\"css-selector\">CSS 选择器</li>\\n ', '<li class=\"option\">XPath</li>\\n ', '<li>...</li>\\n ']\n",
"<li id=\"css-selector\">CSS 选择器</li>\n",
" \n",
"['<li class=\"option\">正则表达式</li>\\n ', '<li class=\"option\">XPath</li>\\n ']\n",
"['正则表达式', 'CSS 选择器', 'XPath', '...']\n"
]
}
],
"source": [
"from lxml import etree\n",
"\n",
"html = \"\"\"\n",
"<!DOCTYPE html>\n",
"<html lang=\"en\">\n",
" <head>\n",
" <meta charset=\"UTF-8\" />\n",
" <title>Hello, world</title>\n",
" </head>\n",
" <body>\n",
" <h1>从 HTML 提取数据的方式有哪些</h1>\n",
" <ul>\n",
" <li class=\"option\">正则表达式</li>\n",
" <li id=\"css-selector\">CSS 选择器</li>\n",
" <li class=\"option\">XPath</li>\n",
" <li>...</li>\n",
" </ul>\n",
" </body>\n",
"</html>\n",
"\"\"\"\n",
"\n",
"tree = etree.HTML(html)\n",
"\n",
"\n",
"lis = tree.xpath(\"//li\") # equal to //li\n",
"print([etree.tounicode(li) for li in lis])\n",
"\n",
"css = tree.xpath(\"//li[@id='css-selector']\")[0]\n",
"print(etree.tounicode(css))\n",
"\n",
"options = tree.xpath(\"//li[contains(@class, 'option')]\")\n",
"print([etree.tounicode(opt) for opt in options])\n",
"\n",
"li_content = tree.xpath(\"//li/text()\")\n",
"print(li_content)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# JSON"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"from pprint import pprint as print\n",
"API_CONTENT = \"\"\"\n",
"{\n",
" \"name\": \"100gle\",\n",
" \"platform\": \"sspai\",\n",
" \"projects\": [\n",
" {\n",
" \"id\": 148,\n",
" \"name\": \"《Python 自学手册》\",\n",
" \"pubDate\": \"2020-08-24\"\n",
" },\n",
" {\n",
" \"id\": 271,\n",
" \"name\": \"《100 小时后请叫我程序员》\",\n",
" \"pubDate\": \"2022-04-20\"\n",
" }\n",
" ]\n",
"}\n",
"\"\"\"\n",
"\n",
"JSON = json.loads(API_CONTENT)"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'dict'>\n"
]
}
],
"source": [
"print(type(JSON))"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[{'id': 148, 'name': '《Python 自学手册》', 'pubDate': '2020-08-24'},\n",
" {'id': 271, 'name': '《100 小时后请叫我程序员》', 'pubDate': '2022-04-20'}]\n"
]
}
],
"source": [
"print(JSON[\"projects\"])"
]
}
],
"metadata": {
"interpreter": {
"hash": "13977d4cc82dee5f9d9535ceb495bd0ab12a43c33c664e5f0d53c24cf634b67f"
},
"kernelspec": {
"display_name": "Python 3.9.0 ('pandas-startup')",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.0"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@@ -0,0 +1,49 @@
import re
from itemloaders.processors import Compose, TakeFirst
from scrapy import Field, Item, Spider
from scrapy.loader import ItemLoader
def extract_number(value):
value = value.strip()
number = re.findall(r"(\d+)", value)[0]
return int(number)
class DefaultLoader(ItemLoader):
default_output_processor = TakeFirst()
class VideoData(Item):
title = Field()
play = Field(output_processor=Compose(TakeFirst(), extract_number))
danmu = Field(output_processor=Compose(TakeFirst(), extract_number))
pubdate = Field()
like = Field()
coin = Field()
collect = Field()
share = Field()
class QuickStartSpider(Spider):
name = "quickstart"
start_urls = ["https://www.bilibili.com/video/BV1PQ4y167xk"]
def parse(self, response, **kwargs):
loader = DefaultLoader(
item=VideoData(), response=response, selector=response.selector
)
loader.add_css("title", "span.tit::text")
loader.add_css("play", "span.view::attr(title)")
loader.add_css("danmu", "span.dm::attr(title)")
loader.add_css("pubdate", ".video-data>span:nth-child(3)::text")
loader.add_css("like", "span.like::text")
loader.add_css("coin", "span.coin::text")
loader.add_css("collect", "span.collect::text")
loader.add_css("share", "span.share::text")
data = loader.load_item()
yield data

View File

@@ -0,0 +1,42 @@
class Father: # 1
genre = "sports" # 2
def __init__(self, name): # 4
self.name = name
def exercise(self): # 4
print('exercise better!')
class Child(Father): # 5
def __init__(self, name, gender): # 6
self.name = name # 7
self.gender = gender # 8
def exercise(self): # 9
if self.genre == "sports":
print("exercise good!")
else:
print("exercise")
def hobbies(self): # 10
return ["reading", "watching movies", "music"]
def main():
elder_ming = Father("Ming")
elder_ming.exercise()
elder_ming_has_hobbies = hasattr(elder_ming, "hobbies") or False
print(f"Elder Ming has other hobbies? {elder_ming_has_hobbies}")
print("=" * 20)
young_ming = Child("Ming", "boy")
young_ming.exercise()
young_ming_has_hobbies = hasattr(young_ming, "hobbies")
print(f"Young Ming has other hobbies? {young_ming_has_hobbies}")
if young_ming_has_hobbies:
print(f"Young Ming's hobbies: {young_ming.hobbies()}")
if __name__ == '__main__':
main()