diff --git a/projects/click-cli/basic/echo.py b/projects/click-cli/basic/echo.py new file mode 100644 index 0000000..467c4db --- /dev/null +++ b/projects/click-cli/basic/echo.py @@ -0,0 +1,25 @@ +import argparse + + +def echo(name=None): + print(f"Hello, {name}!") + + +def main(): + parser = argparse.ArgumentParser( + description="the argparse usage example", + ) + parser.add_argument( + "-n", + "--name", + dest="name", + help="the name pass to echo", + default="World", + ) + args = parser.parse_args() + + echo(name=args.name) + + +if __name__ == '__main__': + main() diff --git a/projects/click-cli/basic/fake_pip.py b/projects/click-cli/basic/fake_pip.py new file mode 100644 index 0000000..75acffa --- /dev/null +++ b/projects/click-cli/basic/fake_pip.py @@ -0,0 +1,45 @@ +def fetch(target): + print(f"fetch {target}...") + + +def is_url(src): + if not (src.startswith("http") and src.startswith("https")): + return False + return True + + +def download(url_or_package): + if not is_url(url_or_package): + fetch(url_or_package) + return + + +def _install(package=None, requirements=None): + if not package and requirements: + for _package in requirements: + download(_package) + elif package and not requirements: + download(package) + else: + raise ValueError( + "`package` parameter is required " + "if there isn't `requirements` parameter." + ) + + +class PipCommand: + def __init__(self) -> None: + pass + + def install(self, *args, **kwargs): + _install(*args, **kwargs) + + +if __name__ == '__main__': + pip = PipCommand() + package = "pandas" + requirements = ["pandas", "numpy"] + + # usage + pip.install(package=package) + pip.install(requirements=requirements) diff --git a/projects/click-cli/basic/fake_pip_cli.py b/projects/click-cli/basic/fake_pip_cli.py new file mode 100644 index 0000000..d845de4 --- /dev/null +++ b/projects/click-cli/basic/fake_pip_cli.py @@ -0,0 +1,80 @@ +import argparse +from importlib.metadata import requires + + +def fetch(target): + print(f"fetch {target}...") + + +def is_url(src): + if not (src.startswith("http") and src.startswith("https")): + return False + return True + + +def download(url_or_package): + if not is_url(url_or_package): + fetch(url_or_package) + return + + +def _install(package=None, requirements=None): + if not package and requirements: + for _package in requirements: + download(_package) + elif package and not requirements: + download(package) + else: + raise ValueError( + "`package` parameter is required " + "if there isn't `requirements` parameter." + ) + + +class PipCommand: + def __init__(self) -> None: + pass + + def install(self, *args, **kwargs): + _install(*args, **kwargs) + + +def _tidy_requirements(requirements): + filtered = filter(lambda r: not r.startswith("#") and r.strip() != "", requirements) + keep = list(map(lambda r: r.strip(), filtered)) + return keep + + +def main(): + parser = argparse.ArgumentParser( + description="the fake pip usage example", + ) + subparsers = parser.add_subparsers( + description="the sub-command for fake pip tools", + ) + + parser_install = subparsers.add_parser("install") + parser_install.add_argument( + "package", + type=str, + nargs="?", + help="the name of package", + ) + parser_install.add_argument( + "-r", + "--requirements", + dest="requirements", + help="the requirements file", + type=argparse.FileType("r", encoding="utf-8"), + ) + + args = parser.parse_args() + deps = ( + _tidy_requirements(args.requirements.readlines()) if args.requirements else None + ) + pip = PipCommand() + pip.install(args.package, deps) + + +if __name__ == '__main__': + main() diff --git a/projects/click-cli/basic/pipeline.py b/projects/click-cli/basic/pipeline.py new file mode 100644 index 0000000..5135958 --- /dev/null +++ b/projects/click-cli/basic/pipeline.py @@ -0,0 +1,54 @@ +def pipeline(handler, **opts): + def decorator(func): + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + return handler(result, **opts) + + return wrapper + + return decorator + + +def grep(content: str, pattern: str): + import re + + filtered = [] + content = content.splitlines() + for line in content: + if re.search(pattern, line): + filtered.append(line) + + return "\n".join(filtered) + + +def tr(content: str, delete: bool, char: str): + final = [] + + if delete: + content = content.splitlines() + for line in content: + new_line = line.replace(char, "") + final.append(new_line) + if final: + return "".join(final) + + return content + + +@pipeline(tr, delete=True, char="\n") +@pipeline(grep, pattern="ed") +def echo(): + poetry = """ +Beautiful is better than ugly. +Explicit is better than implicit. +Simple is better than complex. +Complex is better than complicated. +Flat is better than nested. +Sparse is better than dense. + """ + return poetry.strip() + + +if __name__ == '__main__': + result = echo() + print(result) diff --git a/projects/click-cli/click-usage/click_command.py b/projects/click-cli/click-usage/click_command.py new file mode 100644 index 0000000..a79ae07 --- /dev/null +++ b/projects/click-cli/click-usage/click_command.py @@ -0,0 +1,10 @@ +import click + + +@click.command() +def greet(): + pass + + +if __name__ == '__main__': + greet() diff --git a/projects/click-cli/click-usage/click_group.py b/projects/click-cli/click-usage/click_group.py new file mode 100644 index 0000000..b3a577a --- /dev/null +++ b/projects/click-cli/click-usage/click_group.py @@ -0,0 +1,20 @@ +import click + + +@click.group() +def pip(): + ... + + +@pip.command(name="foo") +def install(): + click.echo("Installing...") + + +@pip.command(name="nonfoo") +def uninstall(): + click.echo("Uninstalling...") + + +if __name__ == '__main__': + pip() diff --git a/projects/click-cli/click-usage/click_multi_commands.py b/projects/click-cli/click-usage/click_multi_commands.py new file mode 100644 index 0000000..1b99be6 --- /dev/null +++ b/projects/click-cli/click-usage/click_multi_commands.py @@ -0,0 +1,38 @@ +import click + + +@click.group() +def pip(): + pass + + +@pip.command() +def install(): + """pip install method""" + click.echo("Installing...") + + +@pip.command() +def uninstall(): + """pip uninstall method""" + click.echo("Uninstalling...") + + +@click.group() +def pytest(): + pass + + +@pytest.command() +def test(): + """run the tests.""" + click.echo("Running tests...") + + +cli = click.CommandCollection( + sources=[pip, pytest], + help="the collections of custom commands", +) + +if __name__ == '__main__': + cli() diff --git a/projects/click-cli/click-usage/decorator.ipynb b/projects/click-cli/click-usage/decorator.ipynb new file mode 100644 index 0000000..4ca41cf --- /dev/null +++ b/projects/click-cli/click-usage/decorator.ipynb @@ -0,0 +1,195 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Curry and Closure" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "initializing for request function.\n", + ".wrapper at 0x7fb82839f0d0>\n", + "Query for https://www.sspai.com using GET method.\n", + "response\n" + ] + } + ], + "source": [ + "# Initialization without decorator\n", + "\n", + "def init(func):\n", + " print(f\"initializing for {func.__name__} function.\")\n", + " def wrapper(*args, **kwargs):\n", + " return func(*args, **kwargs)\n", + " return wrapper\n", + "\n", + "def request(url, method):\n", + " print(f\"Query for {url} using {method} method.\")\n", + " return \"response\"\n", + "\n", + "prepared_request = init(func=request)\n", + "print(prepared_request)\n", + "response = prepared_request(\"https://www.sspai.com\", method=\"GET\")\n", + "print(response)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "initializing for request function.\n", + "Query for https://www.sspai.com using GET method.\n", + "response\n" + ] + } + ], + "source": [ + "\n", + "# Initialization wit decorator\n", + "\n", + "def init(func):\n", + " print(f\"initializing for {func.__name__} function.\")\n", + " def wrapper(*args, **kwargs):\n", + " return func(*args, **kwargs)\n", + " return wrapper\n", + "\n", + "@init\n", + "def request(url, method):\n", + " print(f\"Query for {url} using {method} method.\")\n", + " return \"response\"\n", + "\n", + "response = request(\"https://www.sspai.com\", method=\"GET\")\n", + "print(response)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "initializing for request function.\n", + "Use headers: {'User-Agent': 'Chrome/101.0.4951.64'}\n", + "Query for https://www.sspai.com using GET method.\n", + "None\n" + ] + } + ], + "source": [ + "# Initialization with closure\n", + "\n", + "def init(func):\n", + " headers = {\"User-Agent\": \"Chrome/101.0.4951.64\"}\n", + " print(f\"initializing for {func.__name__} function.\")\n", + " def wrapper(*args, **kwargs):\n", + " result = func(headers=headers, *args, **kwargs)\n", + " return result\n", + " return wrapper\n", + "\n", + "@init\n", + "def request(url, method, **kwargs):\n", + " headers = kwargs.get(\"headers\", None)\n", + " if headers:\n", + " print(f\"Use headers: {headers}\")\n", + " print(f\"Query for {url} using {method} method.\")\n", + "\n", + "response = request(\"https://www.sspai.com\", method=\"GET\")\n", + "print(response)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "# currying function\n", + "\n", + "def curry(func):\n", + "\n", + " f_args = []\n", + "\n", + " def wrapper(*args):\n", + " if args:\n", + " f_args.extend(args)\n", + " return wrapper\n", + "\n", + " result = func(f_args)\n", + " return result\n", + "\n", + " return wrapper\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "the result is .wrapper at 0x7fb820779b80> before evaling.\n", + "the result is 145 after evaling.\n" + ] + } + ], + "source": [ + "total = curry(sum)\n", + "result = total(1)(2)(3, 4)(5, 6)(7, 8, 9)(10, 20, 30, 40)\n", + "print(f\"the result is {result} before evaling.\")\n", + "result = result()\n", + "print(f\"the result is {result} after evaling.\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "interpreter": { + "hash": "13977d4cc82dee5f9d9535ceb495bd0ab12a43c33c664e5f0d53c24cf634b67f" + }, + "kernelspec": { + "display_name": "Python 3.9.0 ('pandas-startup')", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.0" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/projects/click-cli/click-usage/mocked_curl.py b/projects/click-cli/click-usage/mocked_curl.py new file mode 100644 index 0000000..5bd492e --- /dev/null +++ b/projects/click-cli/click-usage/mocked_curl.py @@ -0,0 +1,69 @@ +import click + + +def record_params(**kwargs): + tmpl = "" + for key, value in kwargs.items(): + tmpl += f" {key}: {value}\n" + return tmpl + + +@click.command() +@click.argument("url", type=str) +@click.option( + "-X", + "--method", + default="GET", + type=click.Choice(["GET", "POST"], case_sensitive=False), + show_default=True, + help="the HTTP method to use", +) +@click.option( + "--data", + default=None, + multiple=True, + help="data to be sent with the request", +) +@click.option( + "-d", + "--header", + default=None, + multiple=True, + help="the request header", +) +@click.option( + "-v", + "--verbose", + is_flag=True, + help="verbose mode", +) +def mocked_curl(url, method, verbose=False, **kwargs): + """request for target url""" + + default_params = dict(method=method) + extra = {} + + data = kwargs.get("data") + header = kwargs.get("header") + if header: + default_params.update(dict(header=header)) + + if method == "POST": + if data: + extra = dict(method="POST", data=data) + else: + extra = dict(method="POST") + + if extra: + default_params.update(extra) + + if verbose: + log = record_params(**default_params) + click.echo(f"request for {url} with: \n{log}") + return + + click.echo(f"request for {url} ...") + + +if __name__ == '__main__': + mocked_curl() diff --git a/projects/click-cli/setup.py b/projects/click-cli/setup.py new file mode 100644 index 0000000..2f478ff --- /dev/null +++ b/projects/click-cli/setup.py @@ -0,0 +1,18 @@ +from setuptools import find_packages, setup + +setup( + name="watermark", + version="0.1", + author="100gle", + py_modules=["watermark"], + packages=find_packages(), + install_requires=[ + "Click", + "pillow", + ], + entry_points={ + "console_scripts": [ + "watermark=watermark.main:cli", + ] + }, +) diff --git a/projects/click-cli/testdata/banner.png b/projects/click-cli/testdata/banner.png new file mode 100644 index 0000000..6c75ec2 Binary files /dev/null and b/projects/click-cli/testdata/banner.png differ diff --git a/projects/click-cli/testdata/logo.png b/projects/click-cli/testdata/logo.png new file mode 100644 index 0000000..f5d1d6c Binary files /dev/null and b/projects/click-cli/testdata/logo.png differ diff --git a/projects/click-cli/watermark/PingFang.ttc b/projects/click-cli/watermark/PingFang.ttc new file mode 100644 index 0000000..68add37 Binary files /dev/null and b/projects/click-cli/watermark/PingFang.ttc differ diff --git a/projects/click-cli/watermark/__init__.py b/projects/click-cli/watermark/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/projects/click-cli/watermark/cmd.sh b/projects/click-cli/watermark/cmd.sh new file mode 100644 index 0000000..0a1bfa0 --- /dev/null +++ b/projects/click-cli/watermark/cmd.sh @@ -0,0 +1,18 @@ +# text watermark +watermark text \ + --font-name=./PingFang.ttc \ + --font-size=30 \ + --color=red \ + --alpha=0.5 \ + --location=all \ + --offset=30 \ + -v "@少数派 sspai" ./testdata/banner.png + +# image watermark +watermark image \ + --image-watermark-path ./testdata/logo.png \ + --zoom=0.8 \ + --alpha=0.8 \ + --location=bottom \ + --offset=30 \ + -v ./testdata/banner.png \ No newline at end of file diff --git a/projects/click-cli/watermark/helper.py b/projects/click-cli/watermark/helper.py new file mode 100644 index 0000000..50bb6a2 --- /dev/null +++ b/projects/click-cli/watermark/helper.py @@ -0,0 +1,107 @@ +import logging + +from PIL import Image, ImageEnhance + +logger = logging.getLogger("watermark.helper") + + +def set_opacity(layer, alpha): + if layer.mode != "RGBA": + layer = layer.convert("RGBA") + + if 0 <= alpha <= 1: + alpha_layer = layer.split()[-1] + alpha_layer = ImageEnhance.Brightness(alpha_layer).enhance(alpha) + layer.putalpha(alpha_layer) + else: + logger.warning( + f"alpha not works, because the value {alpha} not in [0, 1] interval." + ) + + return layer + + +def get_location(image_size, marker_size, loc, offset=0): + img_x, img_y = image_size + mx, my = marker_size + + half_x = int((img_x - mx) / 2) + half_y = int((img_y - my) / 2) + + locations = { + # top orient + "top-left": (0, 0), + "top": (half_x, 0), + "top-right": (img_x - mx, 0), + # center orient + "center-left": (0, half_y), + "center": (half_x, half_y), + "center-right": (img_x - mx, half_y), + # bottom orient + "bottom-left": (0, img_y - my), + "bottom": (half_x, img_y - my), + "bottom-right": (img_x - mx, img_y - my), + } + location = locations[loc] + logger.debug( + f"image size: {image_size}, marker size: {marker_size}, {loc} default location: {location}" + ) + + if offset: + x, y = location + + if x == 0: + x += offset + elif x != half_x: + x -= offset + + if loc.startswith("top"): + y += offset + elif loc.startswith("bottom"): + y -= offset + + location = (x, y) + + logger.debug(f"{loc} location: {location}") + + return location + + +def adjust_marker_layer( + image, + marker, + loc="center", + offset=0, + **kwargs, +): + layer = Image.new("RGBA", size=image.size) + + locations = [ + "top", + "top-left", + "top-right", + "bottom", + "bottom-left", + "bottom-right", + "center", + "center-left", + "center-right", + ] + + if loc != "all": + if loc not in locations: + logger.warning(f"{loc} location not found, use default location: center") + locations = ["center"] + else: + locations = [loc] + + for location in locations: + position = get_location( + image.size, + marker.size, + loc=location, + offset=offset, + ) + layer.paste(marker, position, mask=marker) + + return layer diff --git a/projects/click-cli/watermark/main.py b/projects/click-cli/watermark/main.py new file mode 100644 index 0000000..cfa9f0b --- /dev/null +++ b/projects/click-cli/watermark/main.py @@ -0,0 +1,211 @@ +import logging +import pathlib + +import click +from watermark.markers import make + +ImagePath = click.Path(exists=True, allow_dash=True, dir_okay=False) + + +def ensure_logger(level): + logging.basicConfig( + level=level, + format="[{asctime}] [{levelname}] [{name}] - {message}", + style="{", + ) + + +logger = logging.getLogger("watermark.cli") + +COMMON_OPTIONS = [ + click.option( + "--alpha", + type=click.FloatRange(0.0, 1.0), + help="the alpha value", + show_default=True, + ), + click.option( + "--rotation", + type=int, + default=0, + help="the rotation value", + show_default=True, + ), + click.option( + "--offset", + type=int, + default=30, + help="the offset value", + show_default=True, + ), + click.option( + "--location", + "loc", + type=click.Choice( + [ + "top-left", + "top", + "top-right", + "center-left", + "center", + "center-right", + "bottom-left", + "bottom", + "bottom-right", + "all", + ] + ), + default="bottom-right", + help="""\ + the location value, + text watermark will be placed at the all locations if set "all" option. + """, + show_default=True, + ), + click.option( + "-v", + "--verbose", + is_flag=True, + default=False, + help="show debug info", + show_default=True, + ), +] + + +def shared_options(func): + for option in COMMON_OPTIONS[::-1]: + func = option(func) + return func + + +@click.group(invoke_without_command=True) +@click.pass_context +def cli(ctx: click.Context, **kwargs): + r"""the watermark cli tools implemented by 100gle.""" + if not ctx.invoked_subcommand: + banner() + + +def banner(): + msg = r""" + _ __ __ __ +| | / /____ _ / /_ ___ _____ ____ ___ ____ _ _____ / /__ +| | /| / // __ `// __// _ \ / ___// __ `__ \ / __ `// ___// //_/ +| |/ |/ // /_/ // /_ / __// / / / / / / // /_/ // / / ,< +|__/|__/ \__,_/ \__/ \___//_/ /_/ /_/ /_/ \__,_//_/ /_/|_| +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Hope you enjoy it. +You can use this command to get help: + + $ watermark -h +""" + click.echo(msg) + + +@cli.command() +@click.argument("content") +@click.argument("path", type=ImagePath) +@click.option( + "-f", + "--font-name", + "font", + type=str, + help="the font name value", +) +@click.option( + "-F", + "--font-path", + type=ImagePath, + help="the font path if font name isn't passed.", +) +@click.option( + "-s", + "--font-size", + "size", + type=int, + default=20, + help="the font size value", + show_default=True, +) +@click.option( + "--color", + type=str, + default="black", + help="""\ + the color value, + you can use hex color code like "#000000" or color name "black". + """, + show_default=True, +) +@shared_options +def text(path, font, font_path, **kwargs): + """the sub-command for text watermark.""" + verbose = kwargs.get("verbose") + if verbose: + ensure_logger(level=logging.DEBUG) + logger.debug("enable verbose mode.") + + font_or_path = font or font_path + logger.debug(f"image watermark options are: {kwargs}") + + fpath = pathlib.Path(path) + + name = fpath.stem + ext = fpath.suffix + image_suffixes = {".jpg", ".jpeg", ".png"} + if ext not in image_suffixes: + click.echo(f"[WARNING] can't handle {ext} image type.") + exit(1) + + marked = make(path=path, text=font_or_path, **kwargs) + file = fpath.parent.joinpath(f"{name}_marked{ext}") + + marked.save(file, quality=95) + + logger.debug(f"add watermark to {path} success. see {file}") + + +@cli.command() +@click.argument("path", type=ImagePath) +@click.option( + "--image-watermark-path", + "image_path", + type=ImagePath, + help="the path of image watermark", +) +@click.option( + "--zoom", + type=float, + default=0.5, + help="the zoom value", + show_default=True, +) +@shared_options +def image(path, image_path, **kwargs): + """the sub-command for image watermark.""" + verbose = kwargs.pop("verbose") + if verbose: + ensure_logger(level=logging.DEBUG) + logger.debug("enable verbose mode.") + + logger.debug(f"image watermark options are: {kwargs}") + fpath = pathlib.Path(path) + + name = fpath.stem + ext = fpath.suffix + image_suffixes = {".jpg", ".jpeg", ".png"} + if ext not in image_suffixes: + click.echo(f"[WARNING] can't handle {ext} image type.") + exit(1) + + marked = make(path=path, image=image_path, **kwargs) + file = fpath.parent.joinpath(f"{name}_marked{ext}") + + marked.save(file, quality=95) + + logger.debug(f"add watermark to {path} success. see {file}") + + +if __name__ == "__main__": + cli() diff --git a/projects/click-cli/watermark/markers.py b/projects/click-cli/watermark/markers.py new file mode 100644 index 0000000..7100935 --- /dev/null +++ b/projects/click-cli/watermark/markers.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +import logging + +from PIL import Image, ImageDraw, ImageFont +from watermark.helper import adjust_marker_layer, set_opacity + +logger = logging.getLogger("watermark.markers") + + +def text_marker( + font_or_path, + content, + size=20, + color="black", + rotation=0, + alpha: int | float = 1, + **kwargs, +): + + # set font and get the font size which determined by content. + font = ImageFont.truetype(font_or_path, size=size) + CONTENT_SIZE = font.getsize(text=content) + + layer = Image.new("RGBA", CONTENT_SIZE) + + plot = ImageDraw.Draw(layer) + plot.text( + xy=(0, 0), # x=0, y=0 + text=content, + fill=color, + font=font, + align="center", + ) + if rotation: + layer = layer.rotate(rotation, expand=True) + + if alpha: + layer = set_opacity(layer, alpha) + + return layer + + +def image_marker(fpath, zoom=0.8, rotation=0, alpha: int | float = 1, **kwargs): + + # convert image watermarker to RGBA mode. + layer = Image.open(fpath).convert("RGBA") + + width, height = layer.size + + if 0 <= zoom <= 1: + layer = layer.resize((int(width * zoom), int(height * zoom))) + + if rotation: + layer = layer.rotate(rotation, expand=True) + + if alpha: + layer = set_opacity(layer, alpha) + + return layer + + +def make(path, *, text=None, image=None, **kwargs): + pic = Image.open(path).convert("RGBA") + if text and not image: + marker = text_marker(text, **kwargs) + elif not text and image: + marker = image_marker(image, **kwargs) + else: + raise ValueError("use text or image") + + layer = adjust_marker_layer(pic, marker, **kwargs) + marked = Image.alpha_composite(pic, layer) + + return marked