feat: 新增Notion自动化综合案例示例代码

This commit is contained in:
100gle
2022-09-10 17:41:03 +08:00
parent ccc37da694
commit c3a251aca5
18 changed files with 2998 additions and 0 deletions

View File

@@ -0,0 +1,8 @@
from typing import Mapping, TypeVar, Union
__all__ = ["T", "DictLike", "PropertyLike", "Numeric"]
T = TypeVar("T")
DictLike = Mapping[T, T]
PropertyLike = DictLike
Numeric = TypeVar("Numeric", bound=Union[int, float])

View File

@@ -0,0 +1,23 @@
from pydantic import BaseModel, validator
from pytion import _types
__all__ = "AuthorizationHeader"
class AuthorizationHeader(BaseModel):
authorization: str
notion_version: str = "2022-06-28"
content_type: str = "application/json"
accept: str = "application/json"
@validator("authorization")
def has_bearer_prefix(cls, value: str) -> str:
return f"Bearer {value}" if not value.startswith("Bearer") else value
def dict(self, **kwargs) -> _types.DictLike:
data = super(AuthorizationHeader, self).dict(**kwargs)
headers = {}
for key in data.keys():
new = "-".join([part.capitalize() for part in key.split("_")])
headers[new] = data[key]
return headers

View File

@@ -0,0 +1,121 @@
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from textwrap import shorten
from typing import List
import requests
from pytion._types import DictLike
from pytion.auth import AuthorizationHeader
from pytion.exception import APIQueryException
from pytion.helper import pformat
from pytion.schemas import (
NewThingSchema,
NotionAPI,
NumberProperty,
PageSchema,
ParentSchema,
StringProperty,
URLProperty,
)
__all__ = "NotionClient"
LOG = logging.getLogger("pytion.core")
class Record:
def __init__(
self,
id: int,
author: str,
name: str,
price: str,
source: str,
url: str,
issue: int,
):
self.id = NumberProperty(number=id)
self.author = StringProperty(value=author)
self.name = StringProperty(value=name, type="title")
self.price = StringProperty(value=price)
self.source = StringProperty(value=source)
self.url = URLProperty(url=url)
self.issue = NumberProperty(number=issue)
def dict(self):
target = {}
for key in self.__dict__.keys():
if not key.startswith("_"):
target[key] = self.__dict__[key]
return target
class NotionClient:
def __init__(self, token: str, database_id: str) -> None:
self._token = token
self._database_id = database_id
self._auth = AuthorizationHeader(authorization=token).dict()
self._database_info: DictLike = {}
self._properties: DictLike = {}
self._parent = ParentSchema(database_id=self._database_id)
def __repr__(self) -> str:
masked_token = self._mask_token(self._token)
return f"NotionClient<token={masked_token}, database={self._database_id}>"
__str__ = __repr__
def _mask_token(self, token: str) -> str:
return shorten(token, width=10, placeholder="...")
def query_database(self) -> "DictLike":
payload = {"page_size": 100}
url = NotionAPI.QUERY_DATABASE_BY_ID.format(database_id=self._database_id)
response = requests.post(url, headers=self._auth, json=payload)
data = response.json()
if response.status_code != 200:
LOG.debug("query failed, see exception as below:")
raise APIQueryException(status_code=response.status_code, detail=data)
# Caching the database info
self._database_info = data
LOG.debug(f"database response is: {pformat(data)}")
return data
@property
def properties(self):
return self._extract_properties()
def _extract_properties(self) -> List[str]:
if not self._database_info:
self.query_database()
properties = self._database_info["results"][0]["properties"]
LOG.debug(f"properties are: {pformat(properties)}")
return list(properties.keys())
def add_row(self, record: NewThingSchema) -> DictLike:
properties = Record(**record.dict()).dict()
params = PageSchema(parent=self._parent, properties=properties)
response = requests.post(
NotionAPI.CREATE_PAGE, json=params.dict(), headers=self._auth
)
data = response.json()
if response.status_code != 200:
LOG.debug("add data failed, see exception as below:")
raise APIQueryException(status_code=response.status_code, detail=data)
LOG.debug(f"page response is: {pformat(data)}")
return data
def add_rows(self, records: List[NewThingSchema]) -> List[DictLike]:
responses = []
with ThreadPoolExecutor(max_workers=2) as worker:
features = [
worker.submit(self.add_row, record=record) for record in records
]
for feature in as_completed(features):
responses.append(feature.result())
return responses

View File

@@ -0,0 +1,7 @@
class APIQueryException(Exception):
def __init__(self, status_code: int, detail) -> None:
self.status_code = status_code
self.detail = detail
def __str__(self) -> str:
return f"staus_code={self.status_code}, detail={self.detail}"

View File

@@ -0,0 +1,62 @@
import logging
import re
from functools import partial
from pprint import pformat
from typing import List, Optional
import bs4
from pytion._types import DictLike
from pytion.schemas import NewThingSchema, ProductInfoSchema
LOG = logging.getLogger("pytion.helper")
pformat = partial(pformat, depth=2)
class HTMLBodyParser:
_REGEX_ISSUE = re.compile(r"(?P<issue>\d+).*")
_REGEX_TITLE = re.compile(r"@(?P<author>.+)(?:[:])(?P<name>.+)") # noqa:ignore
_REGEX_PRICE_TAG = re.compile(r"价格.*")
_REGEX_SOURCE_TAG = re.compile(r"渠道|平台")
def __init__(self, html: str, url: Optional[str] = None) -> None:
self.html = bs4.BeautifulSoup(html, "html.parser")
self.raw_html = self.html.prettify()
self.url = url
def _parse_issue(self) -> DictLike:
title = self.html.select("title")[0]
return self._REGEX_ISSUE.search(title.text).groupdict()
def _parse_entity(self) -> List[DictLike]:
data = []
starts = self.html.find_all("h2")
for h2 in starts:
info = ProductInfoSchema()
title = self._REGEX_TITLE.search(h2.text).groupdict()
ul = h2.find_next("ul")
sources = ul.find_next(text=self._REGEX_SOURCE_TAG)
prices = ul.find_next(text=self._REGEX_PRICE_TAG)
if sources:
info.source = sources
if prices:
info.price = prices
data.append(dict(**title, **info.dict()))
return data
def get(self) -> List[NewThingSchema]:
data = []
issue = self._parse_issue()
entities = self._parse_entity()
for index, entity in enumerate(entities, start=1):
newthing = NewThingSchema(id=index, **entity, **issue)
if self.url:
newthing.url = self.url
data.append(newthing)
LOG.debug(f"all data: {pformat(data)}")
return data

View File

@@ -0,0 +1,42 @@
import logging
from concurrent.futures import ThreadPoolExecutor
import requests
from pytion.core import NotionClient
from pytion.helper import HTMLBodyParser
from pytion.settings import settings
logging.basicConfig(
level=settings.PYTION_DEBUG,
format=settings.PYTION_LOG_FORMAT,
style="{",
)
client = NotionClient(
token=settings.NOTION_TOKEN,
database_id=settings.NOTION_DATABASE_ID,
)
def query(url):
response = requests.get(url=url)
items = HTMLBodyParser(response.text, url=url)
records = items.get()
client.add_rows(records)
return True
def main():
urls = [
"https://sspai.com/post/74158",
"https://sspai.com/post/73964",
"https://sspai.com/post/73826",
"https://sspai.com/post/73036",
"https://sspai.com/post/68115",
]
with ThreadPoolExecutor(2) as w:
w.map(query, urls)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,11 @@
from .api import NotionAPI, PageSchema, ParentSchema # noqa: ignore
from .property import ( # noqa: ignore
IdProvider,
NumberProperty,
Property,
RichTextProperty,
StringProperty,
TextProperty,
URLProperty,
)
from .sspai import NewThingSchema, ProductInfoSchema # noqa: ignore

View File

@@ -0,0 +1,25 @@
from pydantic import BaseModel
from pytion._types import DictLike
__all__ = ("NotionAPI", "ParentSchema", "PageSchema")
BASE_URL = "https://api.notion.com/v1"
class NotionAPI:
# databases
QUERY_DATABASE_BY_ID: str = BASE_URL + "/databases/{database_id}/query"
# pages
QUERY_PAGE_BY_ID: str = BASE_URL + "/pages/{page_id}"
CREATE_PAGE: str = BASE_URL + "/pages"
class ParentSchema(BaseModel):
type: str = "database_id"
database_id: str
class PageSchema(BaseModel):
parent: ParentSchema
properties: DictLike

View File

@@ -0,0 +1,74 @@
import random
from string import ascii_letters, digits, punctuation
from typing import List, Optional, Union
from uuid import UUID, uuid4
from pydantic import BaseModel, Field
from pytion._types import Numeric
# ===============
# Base type
# ===============
class IdProvider:
letters = "".join([ascii_letters, punctuation, digits])
def __init__(self, version="string") -> None:
self.version = version
def __call__(self, *args, **kwargs) -> str:
if self.version == "string":
return "".join(random.choices(self.letters, k=5))
return str(uuid4())
class Property(BaseModel):
id: Union[str, UUID] = Field(default_factory=IdProvider())
type: Optional[str]
# ===============
# String type
# ===============
class Content(BaseModel):
content: str
class TextProperty(BaseModel):
type: str = "text"
text: Content
class RichTextProperty(Property):
type: str = "rich_text"
rich_text: List[TextProperty]
class TitleProperty(Property):
type: str = "title"
title: List[TextProperty]
class StringProperty(dict):
def __new__(cls, value: str, type=None, *args, **kwargs):
data = [TextProperty(text=Content(content=value))]
if type == "title":
return TitleProperty(title=data)
return RichTextProperty(rich_text=data)
# ===============
# Numeric type
# ===============
class NumberProperty(Property):
type: str = "number"
number: Numeric
# ===============
# URL type
# ===============
class URLProperty(Property):
type: str = "url"
url: Optional[str]

View File

@@ -0,0 +1,18 @@
from typing import Optional
from pydantic import BaseModel, Field
__all__ = ("NewThingSchema", "ProductInfoSchema")
class ProductInfoSchema(BaseModel):
price: Optional[str] = None
source: Optional[str] = None
class NewThingSchema(ProductInfoSchema):
id: Optional[int] = Field(default_factory=int)
name: str
author: str
url: Optional[str] = None
issue: Optional[int] = None

View File

@@ -0,0 +1,20 @@
import os
from dotenv import load_dotenv
from pydantic import BaseSettings
load_dotenv()
__all__ = "settings"
class Settings(BaseSettings):
NOTION_TOKEN: str = os.getenv("NOTION_TOKEN", "")
NOTION_DATABASE_ID: str = os.getenv("NOTION_DATABASE_ID", "")
PYTION_DEBUG: int = os.getenv("PYTION_DEBUG", 20)
PYTION_LOG_FORMAT: str = "[{asctime}] [{levelname}] [{module}] - {message}"
settings = Settings()

View File

@@ -0,0 +1,116 @@
import os
import pathlib
import unittest
from concurrent.futures import ThreadPoolExecutor
from typing import List, Optional
import requests
from dotenv import load_dotenv
from pytion import auth, helper
from pytion.core import NotionClient
from pytion.schemas import NewThingSchema
from pytion.settings import settings
load_dotenv()
TOKEN = os.getenv("NOTION_TOKEN")
DATABASE_ID = os.getenv("NOTION_DATABASE_ID")
ROOT = pathlib.Path(__file__).parents[1]
def _teardown_data(block_id):
response = requests.delete(
url=f"https://api.notion.com/v1/blocks/{block_id}",
headers=auth.AuthorizationHeader(authorization=TOKEN).dict(),
)
if "status" not in response.json():
return True
return False
class TestSettings(unittest.TestCase):
def test_token(self):
self.assertEqual(settings.NOTION_TOKEN, TOKEN)
def test_database_id(self):
self.assertEqual(settings.NOTION_DATABASE_ID, DATABASE_ID)
class TestAuth(unittest.TestCase):
def test_AuthorizationHeader(self):
header = auth.AuthorizationHeader(
authorization="token",
notion_version="2022-01-01",
)
expected = {
"Authorization": "Bearer token",
"Notion-Version": "2022-01-01",
"Content-Type": "application/json",
"Accept": "application/json",
}
self.assertDictEqual(header.dict(), expected)
class TestHelper(unittest.TestCase):
HTML: Optional[str] = None
parser: Optional[helper.HTMLBodyParser] = None
@classmethod
def setUpClass(cls) -> None:
cls.HTML = ROOT.joinpath("resources/testdata.html").read_text("utf-8")
cls.parser = helper.HTMLBodyParser(cls.HTML)
def test__parse_issue(self):
self.assertNotEqual(self.parser._parse_issue(), {})
def test__parse_entity(self):
self.assertNotEqual(self.parser._parse_entity(), [])
def test_get(self):
self.assertNotEqual(self.parser.get(), [])
class TestCore(unittest.TestCase):
client: Optional[NotionClient] = None
items: Optional[List[NewThingSchema]] = None
blocks: Optional[List[str]] = []
_thread_pool = ThreadPoolExecutor(max_workers=2)
@classmethod
def setUpClass(cls) -> None:
cls.client = NotionClient(settings.NOTION_TOKEN, settings.NOTION_DATABASE_ID)
HTML = ROOT.joinpath("resources/testdata.html").read_text("utf-8")
cls.items = helper.HTMLBodyParser(HTML).get()
@classmethod
def tearDownClass(cls) -> None:
if cls.blocks:
with cls._thread_pool as worker:
_ = worker.map(_teardown_data, cls.blocks)
def test_query_database(self):
response = self.client.query_database()
self.assertEqual(response.get("status"), None)
def test_properties(self):
properties = set(self.client.properties)
expected = {"id", "name", "price", "source", "url", "issue", "author"}
self.assertSetEqual(properties, expected)
def test_add_row(self):
record = self.items[0]
data = self.client.add_row(record)
self.assertEqual(data.get("status"), None)
self.blocks.append(data["id"])
def test_add_rows(self):
data = self.client.add_rows(self.items)
for response in data:
with self.subTest(response=response):
self.assertEqual(response.get("status"), None)
self.blocks.append(response["id"])