feat: 新增技能扩展N9一章相关示例源码

This commit is contained in:
100gle
2022-12-15 09:02:32 +08:00
parent a93c74e29c
commit 7686cabcd2
9 changed files with 382 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
import pathlib
import peewee as pw
db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3")
db = pw.SqliteDatabase(db_url)
class BaseModel(pw.Model):
class Meta:
database = db
class User(BaseModel):
uid = pw.AutoField()
username = pw.CharField(max_length=10)
def main():
User.create(username="Steve Jobs")
user = User(username="Bill Gates")
user.save()
User.insert(username="Larry Page").execute()
rows = [
dict(username="100gle"),
dict(username="Jay Chou"),
]
User.insert_many(rows).execute()
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,35 @@
import pathlib
import peewee as pw
db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3")
db = pw.SqliteDatabase(db_url)
class BaseModel(pw.Model):
class Meta:
database = db
class User(BaseModel):
uid = pw.AutoField()
username = pw.CharField(max_length=10)
def __repr__(self) -> str:
return f"User<uid={self.uid}, username={self.username}>"
__str__ = __repr__
def main():
user = User.select().where(User.username.startswith("Jay")).get()
ok = user.delete_instance()
print(f"delete rows: {ok}")
stmt = User.delete().where(User.username.in_(["bill gates", "larry page"]))
ok = stmt.execute()
print(f"delete rows: {ok}")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,55 @@
import pathlib
import peewee as pw
db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3")
db = pw.SqliteDatabase(db_url)
class BaseModel(pw.Model):
class Meta:
database = db
class User(BaseModel):
uid = pw.AutoField()
username = pw.CharField(max_length=10)
def __repr__(self) -> str:
return f"User<uid={self.uid}, username={self.username}>"
__str__ = __repr__
def main():
# get*
User.get(User.uid == 1)
User.get_by_id(2)
User.get_or_none(User.uid == 3)
User[1]
# select
users = User.select()
print(list(users))
users = User.select().limit(3)
print(list(users))
users = User.select().limit(100).iterator()
print(list(users))
# select with condition
users = User.select().where(User.uid.in_([1, 3]))
print(list(users))
users = User.select().where((User.uid == 1) | (User.uid == 4))
print(list(users))
users = User.select().where(
(User.username.startswith("Steve")) & (User.username.endswith("Jobs"))
)
print(list(users))
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,77 @@
import pathlib
from datetime import datetime
import peewee as pw
db_url = pathlib.Path(__file__).parents[1].joinpath("./mydb.sqlite3")
db = pw.SqliteDatabase(db_url)
class BaseModel(pw.Model):
class Meta:
database = db
class User(BaseModel):
uid = pw.AutoField()
username = pw.CharField(max_length=10)
def __repr__(self) -> str:
return f"User<uid={self.uid}, username={self.username}>"
__str__ = __repr__
class Weibo(BaseModel):
wid = pw.AutoField()
content = pw.TextField()
timestamp = pw.DateTimeField(default=datetime.now)
user = pw.ForeignKeyField(User, backref="user", field="uid")
class Likes(BaseModel):
user = pw.ForeignKeyField(User, backref="likes", field="uid")
weibo = pw.ForeignKeyField(Weibo, backref="likes", field="wid")
class Meta:
primary_key = pw.CompositeKey("user", "weibo")
def main():
# --------------
# inplace change
user = User.select().where(User.username.startswith("Steve")).get()
print(f"before updating: {user}")
user.username = user.username.lower()
user.save()
user = User.select().where(User.username.startswith("Steve")).get()
print(f"after updating: {user}")
# -------
# update
user = User.select().where(User.username == "100gle").get()
print(f"before updating: {user}")
query = User.update(username="100GLE").where(User.username == "100gle")
query.execute()
user = User.select().where(User.username.startswith("100")).get()
print(f"after updating: {user}")
# --------------------
# update multiple rows
users = User.select().where(User.username.in_(["Bill Gates", "Larry Page"]))
print(f"before updating: {list(users)}")
u1, u2 = users
u1.username = u1.username.lower()
u2.username = u2.username.lower()
User.bulk_update([u1, u2], fields=[User.username])
users = user.select().where(User.username.in_(["bill gates", "larry page"]))
print(f"after updating: {list(users)}")
if __name__ == '__main__':
main()

Binary file not shown.

View File

@@ -0,0 +1,21 @@
import pathlib
import peewee as pw
db_url = pathlib.Path(__file__).parent.joinpath("./mydb.sqlite3")
db = pw.SqliteDatabase(db_url)
# # equivalent to this:
# db_url = pathlib.Path(__file__).parent.joinpath("./mydb.sqlite3")
# db = pw.SqliteDatabase(None)
# db.init(db_url)
db.connect()
stmt = db.execute_sql("SELECT 1 as foo;").fetchall()
print(stmt)
db.close()

View File

@@ -0,0 +1,20 @@
import pathlib
import peewee as pw
db_url = pathlib.Path(__file__).parent.joinpath("./mydb.sqlite3")
db = pw.SqliteDatabase(db_url)
class BaseModel(pw.Model):
class Meta:
database = db
class User(BaseModel):
uid = pw.AutoField()
username = pw.CharField(max_length=10)
if __name__ == '__main__':
db.create_tables([User])

View File

@@ -0,0 +1,139 @@
import pathlib
import typing as t
from datetime import datetime
import peewee as pw
from fastapi import FastAPI
from playhouse.shortcuts import model_to_dict
from pydantic import BaseModel, Field
db_url = pathlib.Path(__file__).parent.joinpath("web.sqlite3")
db = pw.SqliteDatabase(db_url)
app = FastAPI()
T = t.TypeVar("T")
class NoteORM(pw.Model):
id = pw.AutoField()
title = pw.CharField()
content = pw.TextField()
tags = pw.CharField()
create_at = pw.DateTimeField(default=datetime.now())
class Meta:
database = db
class GenericResponse(BaseModel):
code: int = 0
msg: t.Optional[str]
data: t.Optional[t.List[T]]
class NoteModel(BaseModel):
id: t.Optional[int] = Field(example="null")
title: str
content: str
tags: t.Optional[t.List[str]] = Field(example="null")
@app.on_event("startup")
def connect():
db.create_tables([NoteORM])
db.connect(reuse_if_open=True)
@app.on_event("shutdown")
def disconnect():
if not db.is_closed():
db.close()
@app.get("/")
def index():
data = NoteORM.select()
return GenericResponse(
code=200, msg="success", data=[model_to_dict(orm) for orm in data]
)
@app.get("/note/{id}", response_model=GenericResponse)
def query_note_by_id(id: int):
result = NoteORM.get_or_none(NoteORM.id == id)
if not result:
response = GenericResponse(
code=404, msg=f"can't get the note which id is: `{id}`."
)
else:
response = GenericResponse(
code=200,
msg="success",
data=[model_to_dict(result)],
)
return response
@app.post("/note", response_model=GenericResponse)
def add_note(payload: NoteModel):
tags = "" if not payload.tags else ",".join(payload.tags)
note = NoteORM.create(
title=payload.title,
content=payload.content,
tags=tags,
)
return GenericResponse(
code=200,
msg="success",
data=[model_to_dict(note)],
)
@app.put("/note", response_model=GenericResponse)
def update_note(payload: NoteModel):
data = payload.dict()
id = data.pop("id")
if not id:
return GenericResponse(code=400, msg="`id` is invalid value.")
result: t.Optional[NoteORM] = NoteORM.get_or_none(NoteORM.id == id)
if not result:
return GenericResponse(
code=404, msg=f"can't update the note which id is: `{id}`."
)
result.update(**data)
result.save()
return GenericResponse(code=200, msg="success", data=[model_to_dict(result)])
@app.delete("/note/{id}", response_model=GenericResponse)
def delete_note_by_id(id: int):
data: t.Optional[NoteORM] = NoteORM.get_or_none(NoteORM.id == id)
if not data:
return GenericResponse(
code=404, msg=f"can't update the note which id is: `{id}`."
)
id = data.id
ok = data.delete_instance()
if ok >= 1:
return GenericResponse(code=200, msg="success", data=[{"id": id}])
return GenericResponse(
code=500,
msg=f"something wrong when delete note which id is: {id}.",
data=[{"id": id}],
)
if __name__ == '__main__':
import uvicorn
uvicorn.run("main:app", reload=True)

Binary file not shown.