diff --git a/code/newsletter/N9/crud/pw_create.py b/code/newsletter/N9/crud/pw_create.py new file mode 100644 index 0000000..246611d --- /dev/null +++ b/code/newsletter/N9/crud/pw_create.py @@ -0,0 +1,35 @@ +import pathlib + +import peewee as pw + +db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3") +db = pw.SqliteDatabase(db_url) + + +class BaseModel(pw.Model): + class Meta: + database = db + + +class User(BaseModel): + uid = pw.AutoField() + username = pw.CharField(max_length=10) + + +def main(): + User.create(username="Steve Jobs") + + user = User(username="Bill Gates") + user.save() + + User.insert(username="Larry Page").execute() + + rows = [ + dict(username="100gle"), + dict(username="Jay Chou"), + ] + User.insert_many(rows).execute() + + +if __name__ == '__main__': + main() diff --git a/code/newsletter/N9/crud/pw_delete.py b/code/newsletter/N9/crud/pw_delete.py new file mode 100644 index 0000000..140983f --- /dev/null +++ b/code/newsletter/N9/crud/pw_delete.py @@ -0,0 +1,35 @@ +import pathlib + +import peewee as pw + +db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3") +db = pw.SqliteDatabase(db_url) + + +class BaseModel(pw.Model): + class Meta: + database = db + + +class User(BaseModel): + uid = pw.AutoField() + username = pw.CharField(max_length=10) + + def __repr__(self) -> str: + return f"User" + + __str__ = __repr__ + + +def main(): + user = User.select().where(User.username.startswith("Jay")).get() + ok = user.delete_instance() + print(f"delete rows: {ok}") + + stmt = User.delete().where(User.username.in_(["bill gates", "larry page"])) + ok = stmt.execute() + print(f"delete rows: {ok}") + + +if __name__ == '__main__': + main() diff --git a/code/newsletter/N9/crud/pw_query.py b/code/newsletter/N9/crud/pw_query.py new file mode 100644 index 0000000..be755da --- /dev/null +++ b/code/newsletter/N9/crud/pw_query.py @@ -0,0 +1,55 @@ +import pathlib + +import peewee as pw + +db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3") +db = pw.SqliteDatabase(db_url) + + +class BaseModel(pw.Model): + class Meta: + database = db + + +class User(BaseModel): + uid = pw.AutoField() + username = pw.CharField(max_length=10) + + def __repr__(self) -> str: + return f"User" + + __str__ = __repr__ + + +def main(): + # get* + User.get(User.uid == 1) + User.get_by_id(2) + User.get_or_none(User.uid == 3) + User[1] + + # select + users = User.select() + print(list(users)) + + users = User.select().limit(3) + print(list(users)) + + users = User.select().limit(100).iterator() + print(list(users)) + + # select with condition + users = User.select().where(User.uid.in_([1, 3])) + print(list(users)) + + users = User.select().where((User.uid == 1) | (User.uid == 4)) + print(list(users)) + + users = User.select().where( + (User.username.startswith("Steve")) & (User.username.endswith("Jobs")) + ) + print(list(users)) + + +if __name__ == '__main__': + main() diff --git a/code/newsletter/N9/crud/pw_update.py b/code/newsletter/N9/crud/pw_update.py new file mode 100644 index 0000000..429429c --- /dev/null +++ b/code/newsletter/N9/crud/pw_update.py @@ -0,0 +1,77 @@ +import pathlib +from datetime import datetime + +import peewee as pw + +db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3") +db = pw.SqliteDatabase(db_url) + + +class BaseModel(pw.Model): + class Meta: + database = db + + +class User(BaseModel): + uid = pw.AutoField() + username = pw.CharField(max_length=10) + + def __repr__(self) -> str: + return f"User" + + __str__ = __repr__ + + +class Weibo(BaseModel): + wid = pw.AutoField() + content = pw.TextField() + timestamp = pw.DateTimeField(default=datetime.now) + user = pw.ForeignKeyField(User, backref="user", field="uid") + + +class Likes(BaseModel): + user = pw.ForeignKeyField(User, backref="likes", field="uid") + weibo = pw.ForeignKeyField(Weibo, backref="likes", field="wid") + + class Meta: + primary_key = pw.CompositeKey("user", "weibo") + + +def main(): + + # -------------- + # inplace change + user = User.select().where(User.username.startswith("Steve")).get() + print(f"before updating: {user}") + + user.username = user.username.lower() + user.save() + user = User.select().where(User.username.startswith("Steve")).get() + print(f"after updating: {user}") + + # ------- + # update + user = User.select().where(User.username == "100gle").get() + print(f"before updating: {user}") + + query = User.update(username="100GLE").where(User.username == "100gle") + query.execute() + + user = User.select().where(User.username.startswith("100")).get() + print(f"after updating: {user}") + + # -------------------- + # update multiple rows + users = User.select().where(User.username.in_(["Bill Gates", "Larry Page"])) + print(f"before updating: {list(users)}") + u1, u2 = users + u1.username = u1.username.lower() + u2.username = u2.username.lower() + User.bulk_update([u1, u2], fields=[User.username]) + + users = user.select().where(User.username.in_(["bill gates", "larry page"])) + print(f"after updating: {list(users)}") + + +if __name__ == '__main__': + main() diff --git a/code/newsletter/N9/mydb.sqlite3 b/code/newsletter/N9/mydb.sqlite3 new file mode 100644 index 0000000..cfc00c2 Binary files /dev/null and b/code/newsletter/N9/mydb.sqlite3 differ diff --git a/code/newsletter/N9/pw_connect.py b/code/newsletter/N9/pw_connect.py new file mode 100644 index 0000000..e6d2123 --- /dev/null +++ b/code/newsletter/N9/pw_connect.py @@ -0,0 +1,21 @@ +import pathlib + +import peewee as pw + +db_url = pathlib.Path(__file__).parent.joinpath("./mydb.sqlite3") +db = pw.SqliteDatabase(db_url) + + +# # equivalent to this: +# db_url = pathlib.Path(__file__).parent.joinpath("./mydb.sqlite3") +# db = pw.SqliteDatabase(None) +# db.init(db_url) + + +db.connect() + +stmt = db.execute_sql("SELECT 1 as foo;").fetchall() +print(stmt) + + +db.close() diff --git a/code/newsletter/N9/pw_models.py b/code/newsletter/N9/pw_models.py new file mode 100644 index 0000000..8ff1395 --- /dev/null +++ b/code/newsletter/N9/pw_models.py @@ -0,0 +1,20 @@ +import pathlib + +import peewee as pw + +db_url = pathlib.Path(__file__).parent.joinpath("./mydb.sqlite3") +db = pw.SqliteDatabase(db_url) + + +class BaseModel(pw.Model): + class Meta: + database = db + + +class User(BaseModel): + uid = pw.AutoField() + username = pw.CharField(max_length=10) + + +if __name__ == '__main__': + db.create_tables([User]) diff --git a/code/newsletter/N9/web/main.py b/code/newsletter/N9/web/main.py new file mode 100644 index 0000000..d05e4f7 --- /dev/null +++ b/code/newsletter/N9/web/main.py @@ -0,0 +1,139 @@ +import pathlib +import typing as t +from datetime import datetime + +import peewee as pw +from fastapi import FastAPI +from playhouse.shortcuts import model_to_dict +from pydantic import BaseModel, Field + +db_url = pathlib.Path(__file__).parent.joinpath("web.sqlite3") +db = pw.SqliteDatabase(db_url) + +app = FastAPI() + +T = t.TypeVar("T") + + +class NoteORM(pw.Model): + id = pw.AutoField() + title = pw.CharField() + content = pw.TextField() + tags = pw.CharField() + create_at = pw.DateTimeField(default=datetime.now()) + + class Meta: + database = db + + +class GenericResponse(BaseModel): + code: int = 0 + msg: t.Optional[str] + data: t.Optional[t.List[T]] + + +class NoteModel(BaseModel): + id: t.Optional[int] = Field(example="null") + title: str + content: str + tags: t.Optional[t.List[str]] = Field(example="null") + + +@app.on_event("startup") +def connect(): + db.create_tables([NoteORM]) + db.connect(reuse_if_open=True) + + +@app.on_event("shutdown") +def disconnect(): + if not db.is_closed(): + db.close() + + +@app.get("/") +def index(): + data = NoteORM.select() + return GenericResponse( + code=200, msg="success", data=[model_to_dict(orm) for orm in data] + ) + + +@app.get("/note/{id}", response_model=GenericResponse) +def query_note_by_id(id: int): + result = NoteORM.get_or_none(NoteORM.id == id) + + if not result: + response = GenericResponse( + code=404, msg=f"can't get the note which id is: `{id}`." + ) + else: + response = GenericResponse( + code=200, + msg="success", + data=[model_to_dict(result)], + ) + return response + + +@app.post("/note", response_model=GenericResponse) +def add_note(payload: NoteModel): + tags = "" if not payload.tags else ",".join(payload.tags) + + note = NoteORM.create( + title=payload.title, + content=payload.content, + tags=tags, + ) + + return GenericResponse( + code=200, + msg="success", + data=[model_to_dict(note)], + ) + + +@app.put("/note", response_model=GenericResponse) +def update_note(payload: NoteModel): + data = payload.dict() + id = data.pop("id") + if not id: + return GenericResponse(code=400, msg="`id` is invalid value.") + + result: t.Optional[NoteORM] = NoteORM.get_or_none(NoteORM.id == id) + + if not result: + return GenericResponse( + code=404, msg=f"can't update the note which id is: `{id}`." + ) + + result.update(**data) + result.save() + + return GenericResponse(code=200, msg="success", data=[model_to_dict(result)]) + + +@app.delete("/note/{id}", response_model=GenericResponse) +def delete_note_by_id(id: int): + data: t.Optional[NoteORM] = NoteORM.get_or_none(NoteORM.id == id) + if not data: + return GenericResponse( + code=404, msg=f"can't update the note which id is: `{id}`." + ) + + id = data.id + ok = data.delete_instance() + if ok >= 1: + return GenericResponse(code=200, msg="success", data=[{"id": id}]) + + return GenericResponse( + code=500, + msg=f"something wrong when delete note which id is: {id}.", + data=[{"id": id}], + ) + + +if __name__ == '__main__': + import uvicorn + + uvicorn.run("main:app", reload=True) diff --git a/code/newsletter/N9/web/web.sqlite3 b/code/newsletter/N9/web/web.sqlite3 new file mode 100644 index 0000000..274816f Binary files /dev/null and b/code/newsletter/N9/web/web.sqlite3 differ