diff --git a/README.md b/README.md index a303e18..17a2da1 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,8 @@ Works great when the files keep more or less the same name. (Photos downloaded f You can impose the same file *mtime*, tolerate few hours (to correct timezone confusion) or ignore the date altogether. +Note: we ignore smaller than a second differences. + * The file size, the image hash or the video frame count. The file must have the same size. Or take advantage of the media magic under the hood which ignores the file size but compares the image or the video inside. It is great whenever you end up with some files converted to a different format. @@ -66,7 +68,7 @@ Warnings: 1 We found out all the files in the *duplicates* folder seem to be useless but one. It's date is earlier than the original one. See with full log. ```python3 -Deduplidog("/home/user/duplicates", "/media/disk/origs", ignore_date=True, rename=True, set_both_to_older_date=True, logging_level=logging.INFO) +Deduplidog("/home/user/duplicates", "/media/disk/origs", ignore_date=True, rename=True, set_both_to_older_date=True, log_level=logging.INFO) ``` ``` @@ -116,27 +118,30 @@ from deduplidog import Deduplidog Or change these parameter from CLI or TUI, by launching `deduplidog`. -Find the duplicates. Normally, the file must have the same size, date and name. (Name might be just similar if parameters like strip_end_counter are set.) If media_magic=True, media files receive different rules: Neither the size nor the date are compared. See its help. +Find the duplicates. Normally, the file must have the same size, date and name. (Name might be just similar if parameters like strip_end_counter are set.) If `media_magic=True`, media files receive different rules: Neither the size nor the date are compared. See its help. | parameter | type | default | description | |-----------|------|---------|-------------| | work_dir | str \| Path | - | Folder of the files suspectible to be duplicates. | -| original_dir | str \| Path | - | Folder of the original files. Normally, these files will not be affected.
(However, they might get affected by treat_bigger_as_original or set_both_to_older_date). | +| original_dir | str \| Path | - | Folder of the original files. Normally, these files will not be affected.
(However, they might get affected by `treat_bigger_as_original` or `set_both_to_older_date`). | | **Actions** | | execute | bool | False | If False, nothing happens, just a safe run is performed. | | bashify | bool | False | Print bash commands that correspond to the actions that would have been executed if execute were True.
You can check and run them yourself. | -| affect_only_if_smaller | bool | False | If media_magic=True, all writing actions like rename, replace_with_original, set_both_to_older_date and treat_bigger_as_original
are executed only if the affectable file is smaller than the other. | -| rename | bool | False | If execute=True, prepend ✓ to the duplicated work file name (or possibly to the original file name if treat_bigger_as_original).
Mutually exclusive with replace_with_original and delete. | -| delete | bool | False | If execute=True, delete theduplicated work file name (or possibly to the original file name if treat_bigger_as_original).
Mutually exclusive with replace_with_original and rename. | -| replace_with_original | bool | False | If execute=True, replace duplicated work file with the original (or possibly vice versa if treat_bigger_as_original).
Mutually exclusive with rename and delete. | -| set_both_to_older_date | bool | False | If execute=True, media_magic=True or (media_magic=False and ignore_date=True), both files are set to the older date. Ex: work file get's the original file's date or vice versa. | -| treat_bigger_as_original | bool | False | If execute=True and rename=True and media_magic=True, the original file might be affected (by renaming) if smaller than the work file. | +| rename | bool | False | If `execute=True`, prepend ✓ to the duplicated work file name (or possibly to the original file name if treat_bigger_as_original).
Mutually exclusive with `replace_with_original` and `delete`. | +| delete | bool | False | If `execute=True`, delete theduplicated work file name (or possibly to the original file name if treat_bigger_as_original).
Mutually exclusive with replace_with_original and rename. | +| replace_with_original | bool | False | If `execute=True`, replace duplicated work file with the original (or possibly vice versa if treat_bigger_as_original).
Mutually exclusive with rename and delete. | +| set_both_to_older_date | bool | False | If `execute=True`, `media_magic=True` or (media_magic=False and `ignore_date=True`), both files are set to the older date. Ex: work file get's the original file's date or vice versa. | +| treat_bigger_as_original | bool | False | If `execute=True` and `rename=True` and `media_magic=True`, the original file might be affected (by renaming) if smaller than the work file. | +| skip_bigger | bool | False | If `media_magic=True`, all writing actions, such as `rename`, `replace_with_original`, `set_both_to_older_date` and `treat_bigger_as_original` are executed only if the affectable file is smaller (or the same size) than the other. | +| skip_empty | bool | False | Skip files with zero size. | +| neglect_warning | bool | False | By default, when a file with bigger size or older date should be affected, just warning is generated. Turn this to suppress it.| | **Matching** | | casefold | bool | False | Case insensitive file name comparing. | -| checksum | bool | False | If media_magic=False and ignore_size=False, files will be compared by CRC32 checksum.
(This mode is considerably slower.) | -| tolerate_hour | int \| tuple[int, int] \| bool | False | When comparing files in work_dir and media_magic=False, tolerate hour difference.
Sometimes when dealing with FS changes, files might got shifted few hours.
* bool → -1 .. +1
* int → -int .. +int
* tuple → int1 .. int2
Ex: tolerate_hour=2 → work_file.st_mtime -7200 ... + 7200 is compared to the original_file.st_mtime | -| ignore_date | bool | False | If media_magic=False, files will not be compared by date. | -| ignore_size | bool | False | If media_magic=False, files will not be compared by size. | +| checksum | bool | False | If `media_magic=False` and `ignore_size=False`, files will be compared by CRC32 checksum.
(This mode is considerably slower.) | +| tolerate_hour | int \| tuple[int, int] \| bool | False | When comparing files in work_dir and `media_magic=False`, tolerate hour difference.
Sometimes when dealing with FS changes, files might got shifted few hours.
* bool → -1 .. +1
* int → -int .. +int
* tuple → int1 .. int2
Ex: tolerate_hour=2 → work_file.st_mtime -7200 ... + 7200 is compared to the original_file.st_mtime | +| ignore_name | bool | False | Files will not be compared by stem nor suffix. | +| ignore_date | bool | False | If `media_magic=False`, files will not be compared by date. | +| ignore_size | bool | False | If `media_magic=False`, files will not be compared by size. | | space2char | bool \| str | False | When comparing files in work_dir, consider space as another char. Ex: "file 012.jpg" is compared as "file_012.jpg" | | strip_end_counter | bool | False | When comparing files in work_dir, strip the counter. Ex: "00034(3).MTS" is compared as "00034.MTS" | | strip_suffix | str | False | When comparing files in work_dir, strip the file name end matched by a regular. Ex: "001-edited.jpg" is compared as "001.jpg" | @@ -145,7 +150,9 @@ Find the duplicates. Normally, the file must have the same size, date and name. | media_magic | bool | False | Nor the size or date is compared for files with media suffixes.
A video is considered a duplicate if it has the same name and a similar number of frames, even if it has a different extension.
An image is considered a duplicate if it has the same name and a similar image hash, even if the files are of different sizes.
(This mode is considerably slower.) | | accepted_frame_delta | int | 1 | Used only when media_magic is True | | accepted_img_hash_diff | int | 1 | Used only when media_magic is True | -| img_compare_date | bool | False | If True and media_magic=True, the file date or the EXIF date must match. | +| img_compare_date | bool | False | If True and `media_magic=True`, the work file date or the work file EXIF date must match the original file date (has to be no more than an hour around). | +| **Helper** | +| log_level | int | 30 (warning) | 10 debug .. 50 critical | ## Utils In the `deduplidog.utils` packages, you'll find a several handsome tools to help you. You will find parameters by using you IDE hints. diff --git a/deduplidog/__main__.py b/deduplidog/__main__.py index ccd0780..6cf8881 100644 --- a/deduplidog/__main__.py +++ b/deduplidog/__main__.py @@ -2,118 +2,65 @@ from dataclasses import fields from typing import get_args -import click -from dataclass_click import dataclass_click -from textual import events -from textual.app import App, ComposeResult -from textual.containers import VerticalScroll -from textual.widgets import Checkbox, Footer, Input, Label +from click import MissingParameter -from .interface_utils import Field -from .deduplidog import Deduplidog - - -class CheckboxApp(App[None]): - CSS_PATH = "form.tcss" - - BINDINGS = [ - ("up", "go_up", "Go up"), - ("down", "go_up", "Go down"), - ("ctrl+s", "confirm", "Run"), # ctrl/alt+enter does not work; enter does not work with checkboxes - ("escape", "exit", "Exit"), - ] - - def compose(self) -> ComposeResult: - yield Footer() - self.inputs = INPUTS - with VerticalScroll(): - for input in self.inputs: - if isinstance(input, Input): - yield Label(input.placeholder) - yield input - yield Label(input._link.help) - yield Label("") - - def on_mount(self): - self.inputs[0].focus() - - def action_confirm(self): - self.exit(True) - - def action_exit(self): - self.exit() - - def on_key(self, event: events.Key) -> None: - try: - index = self.inputs.index(self.focused) - except ValueError: # probably some other element were focused - return - match event.key: - case "down": - self.inputs[(index + 1) % len(self.inputs)].focus() - case "up": - self.inputs[(index - 1) % len(self.inputs)].focus() - case letter if len(letter) == 1: # navigate by letters - for inp_ in self.inputs[index+1:] + self.inputs[:index]: - label = inp_.label if isinstance(inp_, Checkbox) else inp_.placeholder - if str(label).casefold().startswith(letter): - inp_.focus() - break - - -class RaiseOnMissingParam(click.Command): - def __call__(self, *args, **kwargs): - return super(RaiseOnMissingParam, self).__call__(*args, standalone_mode=False, **kwargs) +from .tui import CheckboxApp, tui_state +from .cli import cli - -@click.command(cls=RaiseOnMissingParam) -@dataclass_click(Deduplidog) -def cli(dd: Deduplidog): - return dd +from .helpers import Field +from .deduplidog import Deduplidog def main(): - global INPUTS - - # CLI try: - dd = cli() - if not dd: # maybe just --help - return - if input("See more options? [Y/n] ").casefold() not in ("", "y"): - sys.exit() - except click.MissingParameter: - # User launched the program without parameters. - # This is not a problem, we have TUI instead. - dd = None - - # TUI - dog_fields: list[Field] = [] - for f in fields(Deduplidog): + # CLI try: - dog_fields.append(Field(f.name, - getattr(dd, f.name, f.default), - get_args(f.type)[0], - get_args(f.type)[1].kwargs["help"])) - except Exception as e: - # we want only documented fields, in case of an incorrenctly defined field, we do not let user to edit - continue - while True: - print("") - INPUTS = [f.get_widgets() for f in dog_fields] - if not CheckboxApp().run(): - break - for form, field in zip(INPUTS, dog_fields): - field.value = form.value - try: - Deduplidog(**{f.name: f.convert() for f in dog_fields}) - except Exception as e: - print("-"*100) - print(e) - input() - continue - if input("See more options? [Y/n] ").casefold() not in ("y", ""): - break + deduplidog = cli() + if not deduplidog: # maybe just --help + return + if input("See more options? [Y/n] ").casefold() not in ("", "y"): + sys.exit() + except MissingParameter: + # User launched the program without parameters. + # This is not a problem, we have TUI instead. + deduplidog = None + + # TUI + dog_fields: list[Field] = [] + for f in fields(Deduplidog): + try: + dog_fields.append(Field(f.name, + getattr(deduplidog, f.name, f.default), + get_args(f.type)[0], + get_args(f.type)[1].kwargs["help"])) + except Exception as e: + # we want only documented fields, in case of an incorrenctly defined field, we do not let user to edit + continue + tui_state.FOCUSED_I = 0 + while True: + print("") + tui_state.INPUTS = [f.get_widgets() for f in dog_fields] + if not CheckboxApp().run(): + break + for form, field in zip(tui_state.INPUTS, dog_fields): + field.value = form.value + try: + # if deduplidog: + # # To prevent full inicialization with the slow metadata refresh, we re-use the same object. + # [setattr(deduplidog, f.name, f.convert()) for f in dog_fields] + # deduplidog.perform() + # else: + deduplidog = Deduplidog(**{f.name: f.convert() for f in dog_fields}) + except Exception as e: + print("-"*100) + print(e) + input() + continue + if input("See more options? [Y/n] ").casefold() not in ("y", ""): + break + except KeyboardInterrupt: + sys.exit() + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/deduplidog/cli.py b/deduplidog/cli.py new file mode 100644 index 0000000..fabe530 --- /dev/null +++ b/deduplidog/cli.py @@ -0,0 +1,15 @@ +import click +from dataclass_click import dataclass_click + +from .deduplidog import Deduplidog + + +class RaiseOnMissingParam(click.Command): + def __call__(self, *args, **kwargs): + return super(RaiseOnMissingParam, self).__call__(*args, standalone_mode=False, **kwargs) + + +@click.command(cls=RaiseOnMissingParam) +@dataclass_click(Deduplidog) +def cli(dd: Deduplidog): + return dd diff --git a/deduplidog/deduplidog.py b/deduplidog/deduplidog.py index ccf8772..0c4ae8f 100644 --- a/deduplidog/deduplidog.py +++ b/deduplidog/deduplidog.py @@ -1,3 +1,4 @@ +from concurrent.futures import ThreadPoolExecutor import logging import os import re @@ -11,17 +12,16 @@ from typing import Annotated, get_args, get_type_hints import click -import imagehash from dataclass_click import option from humanize import naturaldelta, naturalsize -from PIL import ExifTags, Image +from PIL import Image from tqdm.autonotebook import tqdm -from .interface_utils import Field +from .helpers import Field, FileMetadata, keydefaultdict from .utils import _qp, crc, get_frame_count VIDEO_SUFFIXES = ".mp4", ".mov", ".avi", ".vob", ".mts", ".3gp", ".mpg", ".mpeg", ".wmv" -IMAGE_SUFFIXES = ".jpg", ".jpeg", ".png", ".gif" +IMAGE_SUFFIXES = ".jpg", ".jpeg", ".png", ".gif", ".avif", ".webp" MEDIA_SUFFIXES = IMAGE_SUFFIXES + VIDEO_SUFFIXES logger = logging.getLogger(__name__) @@ -47,9 +47,9 @@ def conversion(_ctx, option, value): .convert() -def opt(help, default): +def opt(help, default, process_by_click=True): "CLI support" - return option(help=help, default=default, type=click.UNPROCESSED, callback=conversion) + return option(help=help, default=default, type=None if process_by_click else click.UNPROCESSED, callback=conversion) @dataclass @@ -59,14 +59,14 @@ class Deduplidog: Normally, the file must have the same size, date and name. (Name might be just similar if parameters like strip_end_counter are set.) - If media_magic=True, media files receive different rules: Neither the size nor the date are compared. See its help. + If `media_magic=True`, media files receive different rules: Neither the size nor the date are compared. See its help. """ work_dir: Annotated[str | Path, option( help="""Folder of the files suspectible to be duplicates.""", required=True, type=click.UNPROCESSED)] original_dir: Annotated[str | Path, option( help="""Folder of the original files. Normally, these files will not be affected. - (However, they might get affected by treat_bigger_as_original or set_both_to_older_date).""", default="", type=click.UNPROCESSED)] = "" + (However, they might get affected by `treat_bigger_as_original` or `set_both_to_older_date`).""", default="", type=click.UNPROCESSED)] = "" # Action section execute: Annotated[bool, flag( @@ -74,40 +74,42 @@ class Deduplidog: bashify: Annotated[bool, flag( """Print bash commands that correspond to the actions that would have been executed if execute were True. You can check and run them yourself.""")] = False - affect_only_if_smaller: Annotated[bool, flag( - """If media_magic=True, all writing actions like rename, replace_with_original, set_both_to_older_date and treat_bigger_as_original - are executed only if the affectable file is smaller than the other.""")] = False rename: Annotated[bool, flag( - """If execute=True, prepend ✓ to the duplicated work file name (or possibly to the original file name if treat_bigger_as_original). - Mutually exclusive with replace_with_original and delete.""")] = False + """If `execute=True`, prepend ✓ to the duplicated work file name (or possibly to the original file name if treat_bigger_as_original). + Mutually exclusive with `replace_with_original` and `delete`.""")] = False delete: Annotated[bool, flag( - """If execute=True, delete theduplicated work file name (or possibly to the original file name if treat_bigger_as_original). + """If `execute=True`, delete theduplicated work file name (or possibly to the original file name if treat_bigger_as_original). Mutually exclusive with replace_with_original and rename.""")] = False replace_with_original: Annotated[bool, flag( - """If execute=True, replace duplicated work file with the original (or possibly vice versa if treat_bigger_as_original). + """If `execute=True`, replace duplicated work file with the original (or possibly vice versa if treat_bigger_as_original). Mutually exclusive with rename and delete.""")] = False set_both_to_older_date: Annotated[bool, flag( - "If execute=True, media_magic=True or (media_magic=False and ignore_date=True), both files are set to the older date. Ex: work file get's the original file's date or vice versa.")] = False + "If `execute=True`, `media_magic=True` or (media_magic=False and `ignore_date=True`), both files are set to the older date. Ex: work file get's the original file's date or vice versa.")] = False treat_bigger_as_original: Annotated[bool, flag( - "If execute=True and rename=True and media_magic=True, the original file might be affected (by renaming) if smaller than the work file.")] = False + "If `execute=True` and `rename=True` and `media_magic=True`, the original file might be affected (by renaming) if smaller than the work file.")] = False + skip_bigger: Annotated[bool, flag( + """If `media_magic=True`, all writing actions, such as `rename`, `replace_with_original`, `set_both_to_older_date` and `treat_bigger_as_original` + are executed only if the affectable file is smaller (or the same size) than the other.""")] = False + skip_empty: Annotated[bool, flag("Skip files with zero size.")] = False + neglect_warning: Annotated[bool, flag( + "By default, when a file with bigger size or older date should be affected, just warning is generated. Turn this to suppress it.")] = False # Match section casefold: Annotated[bool, flag( "Case insensitive file name comparing.")] = False checksum: Annotated[bool, flag( - """If media_magic=False and ignore_size=False, files will be compared by CRC32 checksum. + """If `media_magic=False` and `ignore_size=False`, files will be compared by CRC32 checksum. (This mode is considerably slower.)""")] = False tolerate_hour: Annotated[int | tuple[int, int] | bool, opt( - """When comparing files in work_dir and media_magic=False, tolerate hour difference. + """When comparing files in work_dir and `media_magic=False`, tolerate hour difference. Sometimes when dealing with FS changes, files might got shifted few hours. * bool → -1 .. +1 * int → -int .. +int * tuple → int1 .. int2 - Ex: tolerate_hour=2 → work_file.st_mtime -7200 ... + 7200 is compared to the original_file.st_mtime """, False)] = False - ignore_date: Annotated[bool, flag( - "If media_magic=False, files will not be compared by date.")] = False - ignore_size: Annotated[bool, flag( - "If media_magic=False, files will not be compared by size.")] = False + Ex: tolerate_hour=2 → work_file.st_mtime -7200 ... + 7200 is compared to the original_file.st_mtime """, False, False)] = False + ignore_name: Annotated[bool, flag("Files will not be compared by stem nor suffix.")] = False + ignore_date: Annotated[bool, flag("If `media_magic=False`, files will not be compared by date.")] = False + ignore_size: Annotated[bool, flag("If `media_magic=False`, files will not be compared by size.")] = False space2char: Annotated[bool, flag( """When comparing files in work_dir, consider space as another char. Ex: "file 012.jpg" is compared as "file_012.jpg" """)] = False strip_end_counter: Annotated[bool, flag( @@ -129,7 +131,12 @@ class Deduplidog: accepted_img_hash_diff: Annotated[int, opt( "Used only when media_magic is True", 1)] = 1 img_compare_date: Annotated[bool, flag( - "If True and media_magic=True, the file date or the EXIF date must match.")] = False + "If True and `media_magic=True`, the work file date or the work file EXIF date must match the original file date (has to be no more than an hour around).")] = False + + # Helper section + log_level: Annotated[int, opt("10 debug .. 50 critical", logging.WARNING, 1)] = logging.WARNING + + # TODO output of log and of bashize should be outputtable to a file # Following parameters are undocumented: @@ -145,7 +152,6 @@ class Deduplidog: fail_on_error: bool = False shorter_log: bool = True "TODO deprecated If True, common prefix of the file names are not output to the log to save space." - logging_level: int = logging.WARNING ending_counter = re.compile(r"\(\d+\)$") @@ -155,9 +161,9 @@ def __repr__(self): return f'Deduplidog({text})' def __post_init__(self): - logging.basicConfig(level=self.logging_level, format="%(message)s", force=True) - logger.setLevel(self.logging_level) - [handler.setLevel(self.logging_level) for handler in logger.handlers] + logging.basicConfig(level=self.log_level, format="%(message)s", force=True) + logger.setLevel(self.log_level) + [handler.setLevel(self.log_level) for handler in logger.handlers] self.changes: list[Change] = [] "Path to the files to be changed and path to the original file and status" @@ -175,35 +181,14 @@ def __post_init__(self): "What unsuccessful candidates did work files have?" self.bar: tqdm | None = None "Work files iterator" - match self.tolerate_hour: - case True: - self.tolerate_hour = -1, 1 - case n if isinstance(n, int): - self.tolerate_hour = -abs(n), abs(n) - case n if isinstance(n, tuple) and all(isinstance(x, int) for x in n): - pass - case _: - raise AssertionError("Use whole hours only") self._files_cache: dict[str, set[Path]] = defaultdict(set) "Original files, grouped by stem" - + self.metadata: dict[Path, FileMetadata] = keydefaultdict(FileMetadata) + "File metadata like stat() (which is not cached by default)" self._common_prefix_length = 0 " TODO deprecated" - - # Distinguish paths - if not self.original_dir: - self.original_dir = self.work_dir - if not self.work_dir: - raise AssertionError("Missing work_dir") - else: - for a, b in zip(Path(self.work_dir).parts, Path(self.original_dir).parts): - if a != b: - self.work_dir_name = a - self.original_dir_name = b - break - else: - self.work_dir_name = a - self.original_dir_name = "(same superdir)" + self.original_dir_name = self.work_dir_name = None + "Shortened name, human readable" self.check() self.perform() @@ -219,9 +204,21 @@ def perform(self): print("Number of originals:", len(self.file_list)) self._files_cache.clear() - for p in self.file_list: - p_case = Path(str(p).casefold()) if self.casefold else p - self._files_cache[p_case.stem[:self.work_file_stem_shortened]].add(p) + if not self.ignore_name: + for p in self.file_list: + p_case = Path(str(p).casefold()) if self.casefold else p + self._files_cache[p_case.stem[:self.work_file_stem_shortened]].add(p) + elif self.media_magic: + # We preload the metadata cache, since we think there will be a lot of candidates. + # This is because media_magic does not use date nor size file filtering so evaluating the first work_file might + # take ages. Here, we put a nice progress bar. + # Strangely, using multiprocessing seems to have no benefit. It must be just IO demanding and the hashing function is quick. + images = [x for x in self.file_list if x.suffix.lower() in IMAGE_SUFFIXES] + with ThreadPoolExecutor() as executor: + list(tqdm(executor.map( + lambda orig_file: self.metadata[orig_file].preload(), + images), total=len(images), desc="Caching image hashes")) + self._common_prefix_length = len(os.path.commonprefix([self.original_dir, self.work_dir])) \ if self.shorter_log else 0 @@ -240,8 +237,8 @@ def perform(self): raise finally: if self.bar: - print( - f"{'Affected' if self.execute else 'Affectable'}: {self.affected_count}/{len(self.file_list)- self.ignored_count}", end="") + print(f"{'Affected' if self.execute else 'Affectable'}:" + f" {self.affected_count}/{len(self.file_list)- self.ignored_count}", end="") if self.ignored_count: print(f" ({self.ignored_count} ignored)", end="") print("\nAffected size:", naturalsize(self.size_affected)) @@ -270,15 +267,46 @@ def perform(self): def check(self): """ Checks setup and prints out the description. """ - if self.affect_only_if_smaller and not self.media_magic: - raise AssertionError("The affect_only_if_smaller works only with media_magic") + + # Distinguish paths + if not self.original_dir: + self.original_dir = self.work_dir + if not self.work_dir: + raise AssertionError("Missing work_dir") + else: + for a, b in zip(Path(self.work_dir).parts, Path(self.original_dir).parts): + if a != b: + self.work_dir_name = a + self.original_dir_name = b + break + else: + self.work_dir_name = a + self.original_dir_name = "(same superdir)" + + if self.skip_bigger and not self.media_magic: + raise AssertionError("The skip_bigger works only with media_magic") + + match self.tolerate_hour: + case True: + self.tolerate_hour = -1, 1 + case n if isinstance(n, int): + self.tolerate_hour = -abs(n), abs(n) + case n if isinstance(n, tuple) and all(isinstance(x, int) for x in n): + pass + case _: + raise AssertionError("Use whole hours only") + + if self.ignore_name and self.ignore_date and self.ignore_size: + raise AssertionError("You cannot ignore everything.") if self.media_magic: - print("Only files with media suffixes are taken into consideration. Nor the size or date is compared.") + print("Only files with media suffixes are taken into consideration." + f" Nor the size nor the date is compared.{' Nor the name!' if self.ignore_name else ''}") else: if self.ignore_size and self.checksum: raise AssertionError("Checksum cannot be counted when ignore_size.") used, ignored = (", ".join(filter(None, x)) for x in zip( + self.ignore_name and ("", "name") or ("name", ""), self.ignore_size and ("", "size") or ("size", ""), self.ignore_date and ("", "date") or ("date", ""), self.checksum and ("crc32", "") or ("", "crc32"))) @@ -287,9 +315,10 @@ def check(self): which = f"either the file from the work dir at '{self.work_dir_name}' or the original dir at '{self.original_dir_name}' (whichever is bigger)" \ if self.treat_bigger_as_original \ else f"duplicates from the work dir at '{self.work_dir_name}'" - small = " (only if smaller than the pair file)" if self.affect_only_if_smaller else "" + small = " (only if smaller than the pair file)" if self.skip_bigger else "" + nonzero = " with non-zero size" if self.skip_empty else "" action = "will be" if self.execute else f"would be (if execute were True)" - print(f"{which.capitalize()}{small} {action} ", end="") + print(f"{which.capitalize()}{small}{nonzero} {action} ", end="") match self.rename, self.replace_with_original, self.delete: case False, False, False: @@ -305,6 +334,7 @@ def check(self): if self.set_both_to_older_date: print("Original file mtime date might be set backwards to the duplicate file.") + print("") # sometimes, this line is consumed def _loop_files(self): work_dir, skip = self.work_dir, self.skip @@ -351,6 +381,10 @@ def _process_file(self, work_file: Path, bar: tqdm): stem = stem.casefold() if work_file.is_symlink() or self.suffixes and work_file.suffix.lower() not in self.suffixes: + logger.debug("Skipping symlink or a non-wanted suffix: %s", work_file) + return + if self.skip_empty and not work_file.stat().st_size: + logger.debug("Skipping zero size: %s", work_file) return # print stats @@ -360,7 +394,7 @@ def _process_file(self, work_file: Path, bar: tqdm): }) # candidate = name matches - _candidates_fact = (p for p in self._files_cache[stem] if + _candidates_fact = (p for p in (self.file_list if self.ignore_name else self._files_cache[stem]) if work_file != p and p not in self.passed_away) @@ -391,7 +425,7 @@ def _affect(self, work_file: Path, original: Path): # which file will be affected? The work file or the mistakenly original file? change = {work_file: [], original: []} affected_file, other_file = work_file, original - warning = False + warning: Path | bool = False if affected_file == other_file: logger.error("Error, the file is the same", affected_file) return @@ -404,14 +438,12 @@ def _affect(self, work_file: Path, original: Path): affected_file, other_file = original, work_file case False, True: change[work_file].append(f"SIZE WARNING {naturalsize(work_size-orig_size)}") - warning = True - if self.affect_only_if_smaller and affected_file.stat().st_size >= other_file.stat().st_size: + warning = work_file + if self.skip_bigger and affected_file.stat().st_size > other_file.stat().st_size: logger.debug("Skipping %s as it is not smaller than %s", affected_file, other_file) return # execute changes or write a log - self.size_affected += affected_file.stat().st_size - self.affected_count += 1 # setting date affected_date, other_date = affected_file.stat().st_mtime, other_file.stat().st_mtime @@ -422,26 +454,35 @@ def _affect(self, work_file: Path, original: Path): self._change_file_date(affected_file, affected_date, other_date, change) elif other_date > affected_date: self._change_file_date(other_file, other_date, affected_date, change) - case False, True if (other_date > affected_date): - # attention, we do not want to tamper dates however the file marked as duplicate has - # lower timestamp (which might be genuine) + case False, True if other_date > affected_date and other_date-affected_date >= 1: + # Attention, we do not want to tamper dates however the file marked as duplicate has + # lower timestamp (which might be hint it is the genuine one). + # However, too often I came into the cases when the difference was lower than a second. + # So we neglect a lower-than-a-second difference. change[other_file].append(f"DATE WARNING + {naturaldelta(other_date-affected_date)}") - warning = True + warning = other_file - # other actions - if self.rename: - self._rename(change, affected_file) + if warning and not self.neglect_warning: + change[warning].append("🛟skipped on warning") + else: + self.size_affected += affected_file.stat().st_size + self.affected_count += 1 + + # other actions + if self.rename: + self._rename(change, affected_file) - if self.delete: - self._delete(change, affected_file) + if self.delete: + self._delete(change, affected_file) - if self.replace_with_original: - self._replace_with_original(change, affected_file, other_file) + if self.replace_with_original: + self._replace_with_original(change, affected_file, other_file) self.changes.append(change) if warning: self.warning_count += 1 - if (warning and self.logging_level <= logging.WARNING) or (self.logging_level <= logging.INFO): + if (warning and self.log_level <= logging.WARNING) or (self.log_level <= logging.INFO): + self.bar.clear() # this looks the same from jupyter and much better from terminal (does not leave a trace of abandoned bars) self._print_change(change) def _rename(self, change: Change, affected_file: Path): @@ -462,6 +503,7 @@ def _rename(self, change: Change, affected_file: Path): if self.bashify: print(f"mv -n {_qp(affected_file)} {_qp(target_path)}") self.passed_away.add(affected_file) + self.metadata.pop(affected_file, None) change[affected_file].append(msg) def _delete(self, change: Change, affected_file: Path): @@ -473,6 +515,7 @@ def _delete(self, change: Change, affected_file: Path): if self.bashify: print(f"rm {_qp(affected_file)}") self.passed_away.add(affected_file) + self.metadata.pop(affected_file, None) change[affected_file].append(msg) def _replace_with_original(self, change: Change, affected_file: Path, other_file: Path): @@ -492,6 +535,7 @@ def _replace_with_original(self, change: Change, affected_file: Path, other_file # TODO check print(f"cp --preserve {_qp(other_file)} {_qp(affected_file.parent)} && rm {_qp(affected_file)}") change[affected_file].append(msg) + self.metadata.pop(affected_file, None) def _change_file_date(self, path, old_date, new_date, change: Change): # Consider following usecase: @@ -505,6 +549,7 @@ def _change_file_date(self, path, old_date, new_date, change: Change): datetime.fromtimestamp(old_date), "->", datetime.fromtimestamp(new_date))) if self.execute: os.utime(path, (new_date,)*2) # change access time, modification time + self.metadata.pop(path, None) if self.bashify: print(f"touch -t {new_date} {_qp(path)}") # TODO check @@ -522,60 +567,48 @@ def _find_similar(self, work_file: Path, candidates: list[Path]): for original in candidates: ost, wst = original.stat(), work_file.stat() if (self.ignore_date - or wst.st_mtime == ost.st_mtime - or self.tolerate_hour and self.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.tolerate_hour[1] - ) and (self.ignore_size or wst.st_size == ost.st_size and (not self.checksum or crc(original) == crc(work_file))): + or wst.st_mtime == ost.st_mtime + or self.tolerate_hour and self.tolerate_hour[0] <= (wst.st_mtime - ost.st_mtime)/3600 <= self.tolerate_hour[1] + ) and (self.ignore_size or wst.st_size == ost.st_size and (not self.checksum or crc(original) == crc(work_file))): return original def _find_similar_media(self, work_file: Path, comparing_image: bool, candidates: list[Path]): similar = False - ref_time = False - work_pil = None + work_cache = self.metadata[work_file] if self.debug: print("File", work_file, "\n", "Candidates", candidates) - for original in candidates: - if not original.exists(): + for orig_file in candidates: + if not orig_file.exists(): continue if comparing_image: # comparing images - if not ref_time: - ref_time = work_file.stat().st_mtime - work_pil = Image.open(work_file) - similar = self.image_similar(original, work_file, work_pil, ref_time) + similar = self.image_similar(self.metadata[orig_file], work_cache) else: # comparing videos - frame_delta = abs(get_frame_count(work_file) - get_frame_count(original)) + frame_delta = abs(get_frame_count(work_file) - get_frame_count(orig_file)) similar = frame_delta <= self.accepted_frame_delta if not similar and self.debug: - print("Frame delta:", frame_delta, work_file, original) + print("Frame delta:", frame_delta, work_file, orig_file) if similar: break - return original if similar else False + work_cache.clean() + return orig_file if similar else False - def image_similar(self, original: Path, work_file: Path, work_pil: Image, ref_time: float): + def image_similar(self, orig_cache: FileMetadata, work_cache: FileMetadata): """ Returns true if images are similar. When? If their image hash difference are relatively small. - XIf original ref_time set - ref_time: the file date of the investigated file f or its EXIF date - has to be no more than an hour around. """ try: similar = False - original_pil = Image.open(original) - # compare time if self.img_compare_date: - try: - exif_times = {datetime.strptime(v, '%Y:%m:%d %H:%M:%S').timestamp() for k, v in original_pil._getexif().items() if - k in ExifTags.TAGS and "DateTime" in ExifTags.TAGS[k]} - except: - exif_times = tuple() - file_time = original.stat().st_mtime + exif_times = orig_cache.exif_times + file_time = orig_cache.stat.st_mtime + ref_time = work_cache.stat.st_mtime similar = abs(ref_time - file_time) <= 3600 \ or any(abs(ref_time - t) <= 3600 for t in exif_times) - # print("* čas",similar, original, ref_time, exif_times, file_time) if similar or not self.img_compare_date: - hash0 = imagehash.average_hash(original_pil) - hash1 = imagehash.average_hash(work_pil) + hash0 = orig_cache.average_hash + hash1 = work_cache.average_hash # maximum bits that could be different between the hashes hash_dist = abs(hash0 - hash1) similar = hash_dist <= self.accepted_img_hash_diff @@ -583,12 +616,17 @@ def image_similar(self, original: Path, work_file: Path, work_pil: Image, ref_ti print("Hash distance:", hash_dist) return similar except OSError as e: - print(e, original, work_file) + logger.error("OSError %s %s %s", e, orig_cache.file, work_cache.file) + finally: + orig_cache.clean() @staticmethod @cache def build_originals(original_dir: str | Path, suffixes: bool | tuple[str]): - return [p for p in tqdm(Path(original_dir).rglob("*"), desc="Caching original files", leave=False) if p.is_file() and not p.is_symlink() and (not suffixes or p.suffix.lower() in suffixes)] + return [p for p in tqdm(Path(original_dir).rglob("*"), desc="Caching original files", leave=False) + if p.is_file() + and not p.is_symlink() + and (not suffixes or p.suffix.lower() in suffixes)] def print_changes(self): "Prints performed/suggested changes to be inspected in a human readable form." @@ -602,4 +640,3 @@ def _print_change(self, change: Change): [print(text, *(str(s) for s in changes)) for text, changes in zip((f" {wicon}{self.work_dir_name}:", f" {oicon}{self.original_dir_name}:"), change.values()) if len(changes)] - diff --git a/deduplidog/helpers.py b/deduplidog/helpers.py new file mode 100644 index 0000000..40314aa --- /dev/null +++ b/deduplidog/helpers.py @@ -0,0 +1,100 @@ +from ast import literal_eval +from collections import defaultdict +from dataclasses import _MISSING_TYPE, dataclass +from datetime import datetime +from functools import cache, cached_property +from pathlib import Path +from types import UnionType +from typing import Any, get_args + +from PIL import ExifTags, Image +import imagehash +from textual.widgets import Checkbox, Input + + +@dataclass +class Field: + """ Bridge between the values given in CLI, TUI and real needed values (str to int conversion etc). """ + name: str + value: Any + type: Any + help: str = "" + + def __post_init__(self): + if isinstance(self.value, _MISSING_TYPE): + self.value = "" + self.types = get_args(self.type) \ + if isinstance(self.type, UnionType) else (self.type, ) + "All possible types in a tuple. Ex 'int | str' -> (int, str)" + + def get_widgets(self): + if self.type is bool: + o = Checkbox(self.name, self.value) + else: + o = Input(str(self.value), placeholder=self.name) + o._link = self + return o + + def convert(self): + """ Convert the self.value to the given self.type. + The value might be in str due to CLI or TUI whereas the programs wants bool. + """ + if self.value == "True": + return True + if self.value == "False": + return False + if type(self.value) is str and str not in self.types: + try: + return literal_eval(self.value) # ex: int, tuple[int, int] + except: + raise ValueError(f"{self.name}: Cannot convert value {self.value}") + return self.value + + +class keydefaultdict(defaultdict): + def __missing__(self, key): + self[key] = self.default_factory(key) + return self[key] + + +@dataclass +class FileMetadata: + file: Path + _pil = None + cleaned_count = 0 + "Not used, just for debugging: To determine whether the clean up is needed or not." + + @cached_property + def exif_times(self): + try: + return {datetime.strptime(v, '%Y:%m:%d %H:%M:%S').timestamp() + for k, v in self.get_pil()._getexif().items() + if k in ExifTags.TAGS and "DateTime" in ExifTags.TAGS[k]} + except: + return tuple() + + @cached_property + def average_hash(self): + return imagehash.average_hash(self.get_pil()) + + @cached_property + def stat(self): + return self.file.stat() + + def get_pil(self): + if not self._pil: + self._pil = Image.open(self.file) + return self._pil + + def preload(self): + """ Preload all values. """ + self.exif_times + self.average_hash + self.stat + self.clean() # PIL will never be needed anymore + return True + + def clean(self): + """ As PIL is the most memory consuming, we allow the easy clean up. """ + self._pil = None + self.cleaned_count += 1 diff --git a/deduplidog/interface_utils.py b/deduplidog/interface_utils.py deleted file mode 100644 index d41f2fa..0000000 --- a/deduplidog/interface_utils.py +++ /dev/null @@ -1,44 +0,0 @@ -from ast import literal_eval -from dataclasses import _MISSING_TYPE, dataclass -from types import UnionType -from typing import Any, get_args - -from textual.widgets import Checkbox, Input - - -@dataclass -class Field: - name: str - value: Any - type: Any - help: str = "" - - def __post_init__(self): - if isinstance(self.value, _MISSING_TYPE): - self.value = "" - self.types = get_args(self.type) \ - if isinstance(self.type, UnionType) else (self.type, ) - "All possible types in a tuple. Ex 'int | str' -> (int, str)" - - def get_widgets(self): - if self.type is bool: - o = Checkbox(self.name, self.value) - else: - o = Input(str(self.value), placeholder=self.name) - o._link = self - return o - - def convert(self): - """ Convert the self.value to the given self.type. - The value might be in str due to CLI or TUI whereas the programs wants bool. - """ - if self.value == "True": - return True - if self.value == "False": - return False - if type(self.value) is str and str not in self.types: - try: - return literal_eval(self.value) # ex: int, tuple[int, int] - except: - raise ValueError(f"{self.name}: Cannot convert value {self.value}") - return self.value diff --git a/deduplidog/tui.py b/deduplidog/tui.py new file mode 100644 index 0000000..b8ccd33 --- /dev/null +++ b/deduplidog/tui.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass, field + +from textual import events +from textual.app import App, ComposeResult +from textual.containers import VerticalScroll +from textual.widgets import Checkbox, Footer, Input, Label + + +@dataclass +class TuiState: + INPUTS: list = field(default_factory=list) + FOCUSED_I: int = 0 + + +tui_state = TuiState() + + +class CheckboxApp(App[None]): + CSS_PATH = "form.tcss" + + BINDINGS = [ + ("up", "go_up", "Go up"), + ("down", "go_up", "Go down"), + ("ctrl+s", "confirm", "Run"), # ctrl/alt+enter does not work; enter does not work with checkboxes + ("escape", "exit", "Exit"), + ] + + def compose(self) -> ComposeResult: + yield Footer() + self.inputs = tui_state.INPUTS + with VerticalScroll(): + for input in self.inputs: + if isinstance(input, Input): + yield Label(input.placeholder) + yield input + yield Label(input._link.help) + yield Label("") + + def on_mount(self): + self.inputs[tui_state.FOCUSED_I].focus() + + def action_confirm(self): + # next time, start on the same widget + tui_state.FOCUSED_I = next((i for i, inp in enumerate(self.inputs) if inp == self.focused), None) + self.exit(True) + + def action_exit(self): + self.exit() + + def on_key(self, event: events.Key) -> None: + try: + index = self.inputs.index(self.focused) + except ValueError: # probably some other element were focused + return + match event.key: + case "down": + self.inputs[(index + 1) % len(self.inputs)].focus() + case "up": + self.inputs[(index - 1) % len(self.inputs)].focus() + case letter if len(letter) == 1: # navigate by letters + for inp_ in self.inputs[index+1:] + self.inputs[:index]: + label = inp_.label if isinstance(inp_, Checkbox) else inp_.placeholder + if str(label).casefold().startswith(letter): + inp_.focus() + break diff --git a/deduplidog/utils.py b/deduplidog/utils.py index a597d4b..afc0399 100644 --- a/deduplidog/utils.py +++ b/deduplidog/utils.py @@ -15,6 +15,8 @@ from sh import find from tqdm.autonotebook import tqdm +__doc__ = """These utils might be useful for public external use.""" + @cache def crc(path: Path): # undocumented function diff --git a/tests.py b/tests.py index d556544..7e23ee3 100644 --- a/tests.py +++ b/tests.py @@ -129,9 +129,9 @@ def test_replace_with_original(self): state.check(suck=(4, 5, 6, 7, 8, 9, 11)) # No media file in the test case. - # def test_affect_only_if_smaller(self): + # def test_skip_bigger(self): # state = self.prepare() - # Deduplidog(*state, rename=True, execute=True, ignore_date=True, affect_only_if_smaller=True, media_magic=True) + # Deduplidog(*state, rename=True, execute=True, ignore_date=True, skip_bigger=True, `media_magic=True`) # state.check()