From cb8a449048e99a1f137efb57486913c534721f69 Mon Sep 17 00:00:00 2001 From: 100gle <569590461@qq.com> Date: Thu, 27 Oct 2022 15:19:09 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E6=8A=80=E8=83=BD?= =?UTF-8?q?=E6=8B=93=E5=B1=95N4=E4=B8=80=E7=AB=A0=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E7=A4=BA=E4=BE=8B=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- code/newsletter/N4/switcher.py | 38 +++++++++++++++++++++ code/newsletter/N4/threadpool_simulation.py | 33 ++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 code/newsletter/N4/switcher.py create mode 100644 code/newsletter/N4/threadpool_simulation.py diff --git a/code/newsletter/N4/switcher.py b/code/newsletter/N4/switcher.py new file mode 100644 index 0000000..4ceed89 --- /dev/null +++ b/code/newsletter/N4/switcher.py @@ -0,0 +1,38 @@ +class Device: + def __init__(self, name: str): + self.name = name + + def boot(self): + print(f"Device<{self.name}> is booting...") + + +class Switcher: + def __init__(self) -> None: + pass + + def __enter__(self): + self._on() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._off() + + def _on(self): + print(f"Loading switcher...") + + def _off(self): + print(f"Closing switcher...") + + def plug(self, device: Device): + device.boot() + + +def main(): + + with Switcher() as switcher: + device = Device("PC") + switcher.plug(device) + + +if __name__ == '__main__': + main() diff --git a/code/newsletter/N4/threadpool_simulation.py b/code/newsletter/N4/threadpool_simulation.py new file mode 100644 index 0000000..6cc3a3f --- /dev/null +++ b/code/newsletter/N4/threadpool_simulation.py @@ -0,0 +1,33 @@ +import contextlib + + +class Thread: + def __init__(self, max_worker): + self.max_worker = max_worker + + def execute(self, func, *args, **kwargs): + print(f"Using {self.max_worker} workers to execute {func.__name__} function...") + func(*args, **kwargs) + + +@contextlib.contextmanager +def ThreadPoolExecutor(max_worker=1): + pool = Thread(max_worker) + print("Initial for threads...") + try: + yield pool + finally: + print("Recycling threads and closing pool...") + + +def echo(s): + print(f"\techo: {s}") + + +def main(): + with ThreadPoolExecutor(max_worker=2) as pool: + pool.execute(echo, s="Hello, world!") + + +if __name__ == '__main__': + main()