feat: 新增技能拓展N4一章相关示例代码

This commit is contained in:
100gle
2022-10-27 15:19:09 +08:00
parent 226d721608
commit cb8a449048
2 changed files with 71 additions and 0 deletions

View File

@@ -0,0 +1,38 @@
class Device:
def __init__(self, name: str):
self.name = name
def boot(self):
print(f"Device<{self.name}> is booting...")
class Switcher:
def __init__(self) -> None:
pass
def __enter__(self):
self._on()
return self
def __exit__(self, exc_type, exc_value, traceback):
self._off()
def _on(self):
print(f"Loading switcher...")
def _off(self):
print(f"Closing switcher...")
def plug(self, device: Device):
device.boot()
def main():
with Switcher() as switcher:
device = Device("PC")
switcher.plug(device)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,33 @@
import contextlib
class Thread:
def __init__(self, max_worker):
self.max_worker = max_worker
def execute(self, func, *args, **kwargs):
print(f"Using {self.max_worker} workers to execute {func.__name__} function...")
func(*args, **kwargs)
@contextlib.contextmanager
def ThreadPoolExecutor(max_worker=1):
pool = Thread(max_worker)
print("Initial for threads...")
try:
yield pool
finally:
print("Recycling threads and closing pool...")
def echo(s):
print(f"\techo: {s}")
def main():
with ThreadPoolExecutor(max_worker=2) as pool:
pool.execute(echo, s="Hello, world!")
if __name__ == '__main__':
main()