122 lines
3.8 KiB
Python
122 lines
3.8 KiB
Python
import logging
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from textwrap import shorten
|
|
from typing import List
|
|
|
|
import requests
|
|
from pytion._types import DictLike
|
|
from pytion.auth import AuthorizationHeader
|
|
from pytion.exception import APIQueryException
|
|
from pytion.helper import pformat
|
|
from pytion.schemas import (
|
|
NewThingSchema,
|
|
NotionAPI,
|
|
NumberProperty,
|
|
PageSchema,
|
|
ParentSchema,
|
|
StringProperty,
|
|
URLProperty,
|
|
)
|
|
|
|
__all__ = "NotionClient"
|
|
|
|
LOG = logging.getLogger("pytion.core")
|
|
|
|
|
|
class Record:
|
|
def __init__(
|
|
self,
|
|
id: int,
|
|
author: str,
|
|
name: str,
|
|
price: str,
|
|
source: str,
|
|
url: str,
|
|
issue: int,
|
|
):
|
|
self.id = NumberProperty(number=id)
|
|
self.author = StringProperty(value=author)
|
|
self.name = StringProperty(value=name, type="title")
|
|
self.price = StringProperty(value=price)
|
|
self.source = StringProperty(value=source)
|
|
self.url = URLProperty(url=url)
|
|
self.issue = NumberProperty(number=issue)
|
|
|
|
def dict(self):
|
|
target = {}
|
|
for key in self.__dict__.keys():
|
|
if not key.startswith("_"):
|
|
target[key] = self.__dict__[key]
|
|
return target
|
|
|
|
|
|
class NotionClient:
|
|
def __init__(self, token: str, database_id: str) -> None:
|
|
self._token = token
|
|
self._database_id = database_id
|
|
self._auth = AuthorizationHeader(authorization=token).dict()
|
|
self._database_info: DictLike = {}
|
|
self._properties: DictLike = {}
|
|
self._parent = ParentSchema(database_id=self._database_id)
|
|
|
|
def __repr__(self) -> str:
|
|
masked_token = self._mask_token(self._token)
|
|
return f"NotionClient<token={masked_token}, database={self._database_id}>"
|
|
|
|
__str__ = __repr__
|
|
|
|
def _mask_token(self, token: str) -> str:
|
|
return shorten(token, width=10, placeholder="...")
|
|
|
|
def query_database(self) -> "DictLike":
|
|
payload = {"page_size": 100}
|
|
url = NotionAPI.QUERY_DATABASE_BY_ID.format(database_id=self._database_id)
|
|
response = requests.post(url, headers=self._auth, json=payload)
|
|
data = response.json()
|
|
if response.status_code != 200:
|
|
LOG.debug("query failed, see exception as below:")
|
|
raise APIQueryException(status_code=response.status_code, detail=data)
|
|
|
|
# Caching the database info
|
|
self._database_info = data
|
|
LOG.debug(f"database response is: {pformat(data)}")
|
|
return data
|
|
|
|
@property
|
|
def properties(self):
|
|
return self._extract_properties()
|
|
|
|
def _extract_properties(self) -> List[str]:
|
|
if not self._database_info:
|
|
self.query_database()
|
|
properties = self._database_info["results"][0]["properties"]
|
|
LOG.debug(f"properties are: {pformat(properties)}")
|
|
|
|
return list(properties.keys())
|
|
|
|
def add_row(self, record: NewThingSchema) -> DictLike:
|
|
properties = Record(**record.dict()).dict()
|
|
params = PageSchema(parent=self._parent, properties=properties)
|
|
response = requests.post(
|
|
NotionAPI.CREATE_PAGE, json=params.dict(), headers=self._auth
|
|
)
|
|
data = response.json()
|
|
if response.status_code != 200:
|
|
LOG.debug("add data failed, see exception as below:")
|
|
raise APIQueryException(status_code=response.status_code, detail=data)
|
|
|
|
LOG.debug(f"page response is: {pformat(data)}")
|
|
return data
|
|
|
|
def add_rows(self, records: List[NewThingSchema]) -> List[DictLike]:
|
|
responses = []
|
|
with ThreadPoolExecutor(max_workers=2) as worker:
|
|
features = [
|
|
worker.submit(self.add_row, record=record) for record in records
|
|
]
|
|
|
|
for feature in as_completed(features):
|
|
responses.append(feature.result())
|
|
|
|
return responses
|