From 34172585f455862d193b57e63b901a8d5194da03 Mon Sep 17 00:00:00 2001 From: Edvard Rejthar Date: Tue, 12 Mar 2024 18:25:00 +0100 Subject: [PATCH] CLI and TUI --- .github/workflows/run-unittest.yml | 3 +- README.md | 7 +- deduplidog/__main__.py | 112 +++++++++++++++++ deduplidog/deduplidog.py | 186 +++++++++++++++++------------ deduplidog/form.tcss | 10 ++ deduplidog/interface_utils.py | 44 +++++++ pyproject.toml | 5 +- 7 files changed, 288 insertions(+), 79 deletions(-) create mode 100644 deduplidog/__main__.py create mode 100644 deduplidog/form.tcss create mode 100644 deduplidog/interface_utils.py diff --git a/.github/workflows/run-unittest.yml b/.github/workflows/run-unittest.yml index 7a172c0..1da2119 100644 --- a/.github/workflows/run-unittest.yml +++ b/.github/workflows/run-unittest.yml @@ -5,8 +5,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - # python-version: [3.11, 3.12] TODO - python-version: [3.12] + python-version: [3.11, 3.12] steps: - uses: actions/checkout@v2 - name: Set up Python ${{ matrix.python-version }} diff --git a/README.md b/README.md index 5717ffd..936be18 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,14 @@ These imply the folders have the same structure. Deduplidog is tolerant towards The program does not write anything to the disk, unless `execute=True` is set. Feel free to launch it just to inspect the recommended actions. Or set `bashify=True` to output bash commands you may launch after thorough examining. +# Launch + +It works as a standalone program with both CLI and TUI interfaces. Just launch the `deduplidog` command. +Moreover, it works best when imported from a [Jupyter Notebook](https://jupyter.org/). + # Examples -It works great when launched from a [Jupyter Notebook](https://jupyter.org/). +Let's take a closer look to a use-case. ```python3 import logging diff --git a/deduplidog/__main__.py b/deduplidog/__main__.py new file mode 100644 index 0000000..3f10e75 --- /dev/null +++ b/deduplidog/__main__.py @@ -0,0 +1,112 @@ +import sys +from dataclasses import fields +from typing import get_args + +import click +from dataclass_click import dataclass_click +from textual import events +from textual.app import App, ComposeResult +from textual.containers import VerticalScroll +from textual.widgets import Checkbox, Footer, Input, Label + +from .interface_utils import Field +from .deduplidog import Deduplidog + + +class CheckboxApp(App[None]): + CSS_PATH = "form.tcss" + + BINDINGS = [ + ("up", "go_up", "Go up"), + ("down", "go_up", "Go down"), + ("ctrl+s", "confirm", "Run"), # ctrl/alt+enter does not work; enter does not work with checkboxes + ("escape", "exit", "Exit"), + ] + + def compose(self) -> ComposeResult: + yield Footer() + self.inputs = INPUTS + with VerticalScroll(): + for input in self.inputs: + if isinstance(input, Input): + yield Label(input.placeholder) + yield input + yield Label(input._link.help) + yield Label("") + + def on_mount(self): + self.inputs[0].focus() + + def action_confirm(self): + self.exit(True) + + def action_exit(self): + self.exit() + + def on_key(self, event: events.Key) -> None: + try: + index = self.inputs.index(self.focused) + except ValueError: # probably some other element were focused + return + match event.key: + case "down": + self.inputs[(index + 1) % len(self.inputs)].focus() + case "up": + self.inputs[(index - 1) % len(self.inputs)].focus() + case letter if len(letter) == 1: # navigate by letters + for inp_ in self.inputs[index+1:] + self.inputs[:index]: + label = inp_.label if isinstance(inp_, Checkbox) else inp_.placeholder + if str(label).casefold().startswith(letter): + inp_.focus() + break + + +class RaiseOnMissingParam(click.Command): + def __call__(self, *args, **kwargs): + return super(RaiseOnMissingParam, self).__call__(*args, standalone_mode=False, **kwargs) + + +@click.command(cls=RaiseOnMissingParam) +@dataclass_click(Deduplidog) +def cli(dd: Deduplidog): + return dd + + +if __name__ == "__main__": + # CLI + try: + dd = cli() + if input("Continue? [Y/n] ").casefold() not in ("", "y"): + sys.exit() + except click.MissingParameter: + # User launched the program without parameters. + # This is not a problem, we have TUI instead. + dd = None + + # TUI + dog_fields: list[Field] = [] + for f in fields(Deduplidog): + try: + dog_fields.append(Field(f.name, + getattr(dd, f.name, f.default), + get_args(f.type)[0], + get_args(f.type)[1].kwargs["help"])) + except Exception as e: + # we want only documented fields, in case of an incorrenctly defined field, we do not let user to edit + continue + while True: + print("") + INPUTS = [f.get_widgets() for f in dog_fields] + if not CheckboxApp().run(): + break + for form, field in zip(INPUTS, dog_fields): + field.value = form.value + try: + Deduplidog(**{f.name: f.convert() for f in dog_fields}) + except Exception as e: + print("-"*100) + print(e) + input() + continue + if input("See more options? [Y/n] ").casefold() not in ("y", ""): + break diff --git a/deduplidog/deduplidog.py b/deduplidog/deduplidog.py index 6744895..5a9517d 100644 --- a/deduplidog/deduplidog.py +++ b/deduplidog/deduplidog.py @@ -10,16 +10,21 @@ from itertools import chain from pathlib import Path from time import sleep +from typing import Annotated, get_args, get_type_hints from zlib import crc32 +import click import cv2 import imagehash +from dataclass_click import option from humanize import naturaldelta, naturalsize from IPython.display import Image, clear_output, display from ipywidgets import HBox, widgets from PIL import ExifTags, Image from sh import find -from tqdm.notebook import tqdm +from tqdm.autonotebook import tqdm + +from .interface_utils import Field VIDEO_SUFFIXES = ".mp4", ".mov", ".avi", ".vob", ".mts", ".3gp", ".mpg", ".mpeg", ".wmv" IMAGE_SUFFIXES = ".jpg", ".jpeg", ".png", ".gif" @@ -30,6 +35,29 @@ "Lists changes performed/suggested to given path. First entry is the work file, the second is the original file." +# Unfortunately, instead of writing brief docstrings, Python has no regular way to annotate dataclass attributes. +# As mere strings are not kept in the runtime, we have to use cubersome Annotated syntax. +# Pros: We do not have to duplicate the copy the text while using TUI and CLI. +# Cons: +# Help text is not displayed during static analysis (as an IDE hint). +# We have to write the default value twice. (For the CLI and for the direct import to i.e. a jupyter notebook.) +def flag(help): + "CLI support" + return option(help=help, is_flag=True, default=False) + + +def conversion(_ctx, option, value): + return Field(option.name, + value, + get_args(get_type_hints(Deduplidog, include_extras=True)[option.name])[0]) \ + .convert() + + +def opt(help, default): + "CLI support" + return option(help=help, default=default, type=click.UNPROCESSED, callback=conversion) + + @dataclass class Deduplidog: """ @@ -40,75 +68,74 @@ class Deduplidog: If media_magic=True, media files receive different rules: Neither the size nor the date are compared. See its help. """ - work_dir: str | Path - "Folder of the files suspectible to be duplicates." - original_dir: str | Path - "Folder of the original files. Normally, these files will not be affected." \ - " (However, they might get affected by treat_bigger_as_original or set_both_to_older_date)." + work_dir: Annotated[str | Path, option( + help="""Folder of the files suspectible to be duplicates.""", required=True, type=click.UNPROCESSED)] + original_dir: Annotated[str | Path, option( + help="""Folder of the original files. Normally, these files will not be affected. + (However, they might get affected by treat_bigger_as_original or set_both_to_older_date).""", default="", type=click.UNPROCESSED)] = "" # Action section - execute: bool = False - "If False, nothing happens, just a safe run is performed." - bashify: bool = False - """Print bash commands that correspond to the actions that would have been executed if execute were True. - You can check and run them yourself.""" - affect_only_if_smaller: bool = False - """If media_magic=True, all writing actions like rename, replace_with_original, set_both_to_older_date and treat_bigger_as_original - are executed only if the affectable file is smaller than the other.""" - rename: bool = False - """If execute=True, prepend ✓ to the duplicated work file name (or possibly to the original file name if treat_bigger_as_original). - Mutually exclusive with replace_with_original and delete.""" - delete: bool = False - """If execute=True, delete theduplicated work file name (or possibly to the original file name if treat_bigger_as_original). - Mutually exclusive with replace_with_original and rename.""" - replace_with_original: bool = False - """If execute=True, replace duplicated work file with the original (or possibly vice versa if treat_bigger_as_original). - Mutually exclusive with rename and delete.""" - set_both_to_older_date: bool = False - "If execute=True, media_magic=True or (media_magic=False and ignore_date=True), both files are set to the older date. Ex: work file get's the original file's date or vice versa." - treat_bigger_as_original: bool = False - "If execute=True and rename=True and media_magic=True, the original file might be affected (by renaming) if smaller than the work file." + execute: Annotated[bool, flag( + "If False, nothing happens, just a safe run is performed.")] = False + bashify: Annotated[bool, flag( + """Print bash commands that correspond to the actions that would have been executed if execute were True. + You can check and run them yourself.""")] = False + affect_only_if_smaller: Annotated[bool, flag( + """If media_magic=True, all writing actions like rename, replace_with_original, set_both_to_older_date and treat_bigger_as_original + are executed only if the affectable file is smaller than the other.""")] = False + rename: Annotated[bool, flag( + """If execute=True, prepend ✓ to the duplicated work file name (or possibly to the original file name if treat_bigger_as_original). + Mutually exclusive with replace_with_original and delete.""")] = False + delete: Annotated[bool, flag( + """If execute=True, delete theduplicated work file name (or possibly to the original file name if treat_bigger_as_original). + Mutually exclusive with replace_with_original and rename.""")] = False + replace_with_original: Annotated[bool, flag( + """If execute=True, replace duplicated work file with the original (or possibly vice versa if treat_bigger_as_original). + Mutually exclusive with rename and delete.""")] = False + set_both_to_older_date: Annotated[bool, flag( + "If execute=True, media_magic=True or (media_magic=False and ignore_date=True), both files are set to the older date. Ex: work file get's the original file's date or vice versa.")] = False + treat_bigger_as_original: Annotated[bool, flag( + "If execute=True and rename=True and media_magic=True, the original file might be affected (by renaming) if smaller than the work file.")] = False # Match section - casefold: bool = False - "Case insensitive file name comparing." - checksum: bool = False - """If media_magic=False and ignore_size=False, files will be compared by CRC32 checksum. - (This mode is considerably slower.)""" - tolerate_hour: int | tuple[int, int] | bool = False - """When comparing files in work_dir and media_magic=False, tolerate hour difference. + casefold: Annotated[bool, flag( + "Case insensitive file name comparing.")] = False + checksum: Annotated[bool, flag( + """If media_magic=False and ignore_size=False, files will be compared by CRC32 checksum. + (This mode is considerably slower.)""")] = False + tolerate_hour: Annotated[int | tuple[int, int] | bool, opt( + """When comparing files in work_dir and media_magic=False, tolerate hour difference. Sometimes when dealing with FS changes, files might got shifted few hours. * bool → -1 .. +1 * int → -int .. +int * tuple → int1 .. int2 - Ex: tolerate_hour=2 → work_file.st_mtime -7200 ... + 7200 is compared to the original_file.st_mtime """ - ignore_date: bool = False - "If media_magic=False, files will not be compared by date." - ignore_size: bool = False - "If media_magic=False, files will not be compared by size." - space2char: bool | str = False - """When comparing files in work_dir, consider space as another char. Ex: "file 012.jpg" is compared as "file_012.jpg" """ - strip_end_counter: bool = False - """When comparing files in work_dir, strip the counter. Ex: "00034(3).MTS" is compared as "00034.MTS" """ - strip_suffix: str = False - """When comparing files in work_dir, strip the file name end matched by a regular. Ex: "001-edited.jpg" is compared as "001.jpg" """ - work_file_stem_shortened: int = None - "Photos downloaded from Google have its stem shortened to 47 chars. For the comparing purpose, treat original folder file names shortened." + Ex: tolerate_hour=2 → work_file.st_mtime -7200 ... + 7200 is compared to the original_file.st_mtime """, False)] = False + ignore_date: Annotated[bool, flag( + "If media_magic=False, files will not be compared by date.")] = False + ignore_size: Annotated[bool, flag( + "If media_magic=False, files will not be compared by size.")] = False + space2char: Annotated[bool, flag( + """When comparing files in work_dir, consider space as another char. Ex: "file 012.jpg" is compared as "file_012.jpg" """)] = False + strip_end_counter: Annotated[bool, flag( + """When comparing files in work_dir, strip the counter. Ex: "00034(3).MTS" is compared as "00034.MTS" """)] = False + strip_suffix: Annotated[str, opt( + """When comparing files in work_dir, strip the file name end matched by a regular. Ex: "001-edited.jpg" is compared as "001.jpg" """, False)] = False + work_file_stem_shortened: Annotated[int, opt( + "Photos downloaded from Google have its stem shortened to 47 chars. For the comparing purpose, treat original folder file names shortened.", None)] = None # Media section - media_magic: bool = False - """ - Nor the size or date is compared for files with media suffixes. + media_magic: Annotated[bool, flag( + """Nor the size or date is compared for files with media suffixes. A video is considered a duplicate if it has the same name and a similar number of frames, even if it has a different extension. An image is considered a duplicate if it has the same name and a similar image hash, even if the files are of different sizes. (This mode is considerably slower.) - """ - accepted_frame_delta: int = 1 - "Used only when media_magic is True" - accepted_img_hash_diff: int = 1 - "Used only when media_magic is True" - img_compare_date: bool = False - "If True and media_magic=True, the file date or the EXIF date must match." + """)] = False + accepted_frame_delta: Annotated[int, opt( + "Used only when media_magic is True", 1)] = 1 + accepted_img_hash_diff: Annotated[int, opt( + "Used only when media_magic is True", 1)] = 1 + img_compare_date: Annotated[bool, flag( + "If True and media_magic=True, the file date or the EXIF date must match.")] = False # Following parameters are undocumented: @@ -128,6 +155,11 @@ class Deduplidog: ending_counter = re.compile(r"\(\d+\)$") + def __repr__(self): + text = ', '.join(f'{attr}={len(v) if isinstance(v, (set, list, dict)) else v}' for attr, + v in vars(self).items()) + return f'Deduplidog({text})' + def __post_init__(self): logging.basicConfig(level=self.logging_level, format="%(message)s", force=True) logger.setLevel(self.logging_level) @@ -154,6 +186,8 @@ def __post_init__(self): self.tolerate_hour = -1, 1 case n if isinstance(n, int): self.tolerate_hour = -abs(n), abs(n) + case n if isinstance(n, tuple) and all(isinstance(x, int) for x in n): + pass case _: raise AssertionError("Use whole hours only") self._files_cache: dict[str, set[Path]] = defaultdict(set) @@ -163,13 +197,19 @@ def __post_init__(self): " TODO deprecated" # Distinguish paths - for a, b in zip(Path(self.work_dir).parts, Path(self.original_dir).parts): - if a != b: - self.work_dir_name = a - self.original_dir_name = b - break + if not self.original_dir: + self.original_dir = self.work_dir + if not self.work_dir: + raise AssertionError("Missing work_dir") else: - self.work_dir_name = self.original_dir_name = "(same superdir)" + for a, b in zip(Path(self.work_dir).parts, Path(self.original_dir).parts): + if a != b: + self.work_dir_name = a + self.original_dir_name = b + break + else: + self.work_dir_name = a + self.original_dir_name = "(same superdir)" self.check() self.perform() @@ -281,7 +321,7 @@ def _loop_files(self): else: [next(work_files) for _ in range(skip)] print("Skipped", skip) - self.bar = bar = tqdm(work_files) + self.bar = bar = tqdm(work_files, leave=False) for work_file in bar: for attempt in range(5): try: @@ -363,7 +403,7 @@ def _affect(self, work_file: Path, original: Path): return if self.media_magic: # why checking media_magic? # This is just a double check because if not media_magic, - # the files must have the same size nevertheless. + # the files must have the same size nevertheless.) work_size, orig_size = work_file.stat().st_size, original.stat().st_size match self.treat_bigger_as_original, work_size > orig_size: case True, True: @@ -372,7 +412,7 @@ def _affect(self, work_file: Path, original: Path): change[work_file].append(f"SIZE WARNING {naturalsize(work_size-orig_size)}") warning = True if self.affect_only_if_smaller and affected_file.stat().st_size >= other_file.stat().st_size: - logger.debug("Skipping %s as it is smaller than %s", affected_file, other_file) # TODO check + logger.debug("Skipping %s as it is not smaller than %s", affected_file, other_file) return # execute changes or write a log @@ -426,7 +466,7 @@ def _rename(self, change: Change, affected_file: Path): affected_file.rename(target_path) msg = "renaming" if self.bashify: - print(f"mv -n {_qp(affected_file)} {_qp(target_path)}") # TODO check + print(f"mv -n {_qp(affected_file)} {_qp(target_path)}") self.passed_away.add(affected_file) change[affected_file].append(msg) @@ -437,7 +477,7 @@ def _delete(self, change: Change, affected_file: Path): affected_file.unlink() msg = "deleting" if self.bashify: - print(f"rm {_qp(affected_file)}") # TODO check + print(f"rm {_qp(affected_file)}") self.passed_away.add(affected_file) change[affected_file].append(msg) @@ -488,8 +528,8 @@ def _find_similar(self, work_file: Path, candidates: list[Path]): for original in candidates: ost, wst = original.stat(), work_file.stat() if (self.ignore_date - or wst.st_mtime == ost.st_mtime - or self.tolerate_hour and self.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.tolerate_hour[1] + or wst.st_mtime == ost.st_mtime + or self.tolerate_hour and self.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.tolerate_hour[1] ) and (self.ignore_size or wst.st_size == ost.st_size and (not self.checksum or crc(original) == crc(work_file))): return original @@ -554,7 +594,7 @@ def image_similar(self, original: Path, work_file: Path, work_pil: Image, ref_ti @staticmethod @cache def build_originals(original_dir: str | Path, suffixes: bool | tuple[str]): - return [p for p in tqdm(Path(original_dir).rglob("*"), desc="Caching original files") if p.is_file() and not p.is_symlink() and (not suffixes or p.suffix.lower() in suffixes)] + return [p for p in tqdm(Path(original_dir).rglob("*"), desc="Caching original files", leave=False) if p.is_file() and not p.is_symlink() and (not suffixes or p.suffix.lower() in suffixes)] def print_changes(self): "Prints performed/suggested changes to be inspected in a human readable form." @@ -775,7 +815,3 @@ def mtime_files_in_dir_according_to_json(dir_, json_dir): # mtime_files_in_dir_according_to_json("/media/user/disk2/Takeoutuser/Google Photos/Photos from 2019/", # "/media/user/disk2/photos_json/") - - -# DISK1_PHOTOS = mdf.file_list -# NAHRAVKY_LIST = mdf.file_list diff --git a/deduplidog/form.tcss b/deduplidog/form.tcss new file mode 100644 index 0000000..d1b0929 --- /dev/null +++ b/deduplidog/form.tcss @@ -0,0 +1,10 @@ +Screen { + align: center middle; +} + +VerticalScroll { + width: auto; + height: auto; + background: $boost; + padding: 2; +} diff --git a/deduplidog/interface_utils.py b/deduplidog/interface_utils.py new file mode 100644 index 0000000..d41f2fa --- /dev/null +++ b/deduplidog/interface_utils.py @@ -0,0 +1,44 @@ +from ast import literal_eval +from dataclasses import _MISSING_TYPE, dataclass +from types import UnionType +from typing import Any, get_args + +from textual.widgets import Checkbox, Input + + +@dataclass +class Field: + name: str + value: Any + type: Any + help: str = "" + + def __post_init__(self): + if isinstance(self.value, _MISSING_TYPE): + self.value = "" + self.types = get_args(self.type) \ + if isinstance(self.type, UnionType) else (self.type, ) + "All possible types in a tuple. Ex 'int | str' -> (int, str)" + + def get_widgets(self): + if self.type is bool: + o = Checkbox(self.name, self.value) + else: + o = Input(str(self.value), placeholder=self.name) + o._link = self + return o + + def convert(self): + """ Convert the self.value to the given self.type. + The value might be in str due to CLI or TUI whereas the programs wants bool. + """ + if self.value == "True": + return True + if self.value == "False": + return False + if type(self.value) is str and str not in self.types: + try: + return literal_eval(self.value) # ex: int, tuple[int, int] + except: + raise ValueError(f"{self.name}: Cannot convert value {self.value}") + return self.value diff --git a/pyproject.toml b/pyproject.toml index b17ef87..877e464 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,12 +4,14 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "deduplidog" -version = "0.5.0" +version = "0.6.0" description = "Deduplicate folders" authors = ["Edvard Rejthar "] license = "GPL-3.0-or-later" [tool.poetry.dependencies] +click = "~=8.1.7" +dataclass_click = "~=1.0.2" python = ">=3.11" humanize = "*" imagehash = "*" @@ -17,5 +19,6 @@ IPython = "*" ipywidgets = "*" opencv-python = "*" Pillow = "*" +textual = "~=0.52.1" sh = "*" tqdm = "*" \ No newline at end of file