diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 170abea98..3d70cc8e8 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -26,5 +26,4 @@ jobs: - name: mypy run: | mypy --version - rm -rf .mypy_cache mypy src diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index de2dea8f5..e1b5a3167 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -exclude: ^(docs|tests/files|cmd_line|tasks.py) +exclude: ^(docs|tests/test_files|tasks.py) ci: autoupdate_schedule: monthly diff --git a/src/monty/subprocess.py b/src/monty/subprocess.py index 104261c2e..ff77117b2 100644 --- a/src/monty/subprocess.py +++ b/src/monty/subprocess.py @@ -74,11 +74,9 @@ def run(self, timeout: Optional[float] = None, **kwargs) -> Self: def target(**kw): try: - # print('Thread started') with Popen(self.command, **kw) as self.process: self.output, self.error = self.process.communicate() self.retcode = self.process.returncode - # print('Thread stopped') except Exception: self.error = traceback.format_exc() self.retcode = -1 @@ -96,7 +94,6 @@ def target(**kw): thread.join(timeout) if thread.is_alive() and self.process is not None: - # print("Terminating process") self.process.terminate() self.killed = True thread.join() diff --git a/src/monty/termcolor.py b/src/monty/termcolor.py index 336ef8829..2e236d6c9 100644 --- a/src/monty/termcolor.py +++ b/src/monty/termcolor.py @@ -128,12 +128,7 @@ def cprint( It accepts arguments of print function. """ - try: - print((colored(text, color, on_color, attrs)), **kwargs) - except TypeError: - # flush is not supported by py2.7 - kwargs.pop("flush", None) - print((colored(text, color, on_color, attrs)), **kwargs) + print((colored(text, color, on_color, attrs)), **kwargs) def colored_map(text: str, cmap: dict) -> str: @@ -162,12 +157,7 @@ def cprint_map(text: str, cmap: dict, **kwargs) -> None: Examples: cprint_map("Hello world", {"Hello": "red"}) """ - try: - print(colored_map(text, cmap), **kwargs) - except TypeError: - # flush is not supported by py2.7 - kwargs.pop("flush", None) - print(colored_map(text, cmap), **kwargs) + print(colored_map(text, cmap), **kwargs) def get_terminal_size(): diff --git a/tests/test_design_patterns.py b/tests/test_design_patterns.py index 67a7ffc2b..6e3766f4c 100644 --- a/tests/test_design_patterns.py +++ b/tests/test_design_patterns.py @@ -53,24 +53,24 @@ def test_pickle(self): @cached_class -class TestClass: +class ClassForTest: def __init__(self, value: Any) -> None: self.value = value def test_caching(): # Test that instances are cached - inst1 = TestClass(1) - inst2 = TestClass(1) + inst1 = ClassForTest(1) + inst2 = ClassForTest(1) assert inst1 is inst2 - inst3 = TestClass(2) + inst3 = ClassForTest(2) assert inst1 is not inst3 def test_picklability(): # Test that instances can be pickled and unpickled - original = TestClass(42) + original = ClassForTest(42) pickled = pickle.dumps(original) unpickled = pickle.loads(pickled) @@ -78,7 +78,7 @@ def test_picklability(): assert original.value == unpickled.value # Check that the unpickled instance is the same as a newly created instance - new_instance = TestClass(42) + new_instance = ClassForTest(42) assert unpickled is new_instance @@ -98,8 +98,8 @@ def __init__(self): def test_class_identity(): # Ensure the decorated class is still recognized as the original class - assert isinstance(TestClass(1), TestClass) - assert type(TestClass(1)) is TestClass + assert isinstance(ClassForTest(1), ClassForTest) + assert type(ClassForTest(1)) is ClassForTest def test_multiple_arguments(): diff --git a/tests/test_dev.py b/tests/test_dev.py index 13bb211be..f5cbf9390 100644 --- a/tests/test_dev.py +++ b/tests/test_dev.py @@ -143,7 +143,7 @@ class TestClassNew: """A dummy class for tests.""" def __post_init__(self): - print("Hello.") + pass def method_a(self): pass @@ -156,7 +156,7 @@ class TestClassOld: class_attrib_old = "OLD_ATTRIB" def __post_init__(self): - print("Hello.") + pass def method_b(self): """This is method_b.""" @@ -246,7 +246,7 @@ def test_requires(self): @requires(fictitious_mod is not None, err_msg) def use_fictitious_mod(): - print("success") + pass with pytest.raises(RuntimeError, match=err_msg): use_fictitious_mod() diff --git a/tests/test_json.py b/tests/test_json.py index 91eb48aad..96afc52ba 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -679,7 +679,7 @@ def test_pint_quantity(self): cls = ClassContainingQuantity(qty=pint.Quantity("9.81 m/s**2")) d = json.loads(MontyEncoder().encode(cls)) - print(d) + assert isinstance(d, dict) assert d["qty"]["@module"] == "pint" assert d["qty"]["@class"] == "Quantity" diff --git a/tests/test_pprint.py b/tests/test_pprint.py index 581f80795..1f3a120e3 100644 --- a/tests/test_pprint.py +++ b/tests/test_pprint.py @@ -34,4 +34,4 @@ def __str__(self): ], ) - print(draw_tree(root)) + assert draw_tree(root) diff --git a/tests/test_shutil.py b/tests/test_shutil.py index 81e0ea859..dc18c3588 100644 --- a/tests/test_shutil.py +++ b/tests/test_shutil.py @@ -20,61 +20,61 @@ remove, ) -test_dir = os.path.join(os.path.dirname(__file__), "test_files") +TEST_DIR = os.path.join(os.path.dirname(__file__), "test_files") class TestCopyR: def setup_method(self): - os.mkdir(os.path.join(test_dir, "cpr_src")) + os.mkdir(os.path.join(TEST_DIR, "cpr_src")) with open( - os.path.join(test_dir, "cpr_src", "test"), "w", encoding="utf-8" + os.path.join(TEST_DIR, "cpr_src", "test"), "w", encoding="utf-8" ) as f: f.write("what") - os.mkdir(os.path.join(test_dir, "cpr_src", "sub")) + os.mkdir(os.path.join(TEST_DIR, "cpr_src", "sub")) with open( - os.path.join(test_dir, "cpr_src", "sub", "testr"), "w", encoding="utf-8" + os.path.join(TEST_DIR, "cpr_src", "sub", "testr"), "w", encoding="utf-8" ) as f: f.write("what2") if platform.system() != "Windows": os.symlink( - os.path.join(test_dir, "cpr_src", "test"), - os.path.join(test_dir, "cpr_src", "mysymlink"), + os.path.join(TEST_DIR, "cpr_src", "test"), + os.path.join(TEST_DIR, "cpr_src", "mysymlink"), ) def test_recursive_copy_and_compress(self): - copy_r(os.path.join(test_dir, "cpr_src"), os.path.join(test_dir, "cpr_dst")) - assert os.path.exists(os.path.join(test_dir, "cpr_dst", "test")) - assert os.path.exists(os.path.join(test_dir, "cpr_dst", "sub", "testr")) - - compress_dir(os.path.join(test_dir, "cpr_src")) - assert os.path.exists(os.path.join(test_dir, "cpr_src", "test.gz")) - assert os.path.exists(os.path.join(test_dir, "cpr_src", "sub", "testr.gz")) - - decompress_dir(os.path.join(test_dir, "cpr_src")) - assert os.path.exists(os.path.join(test_dir, "cpr_src", "test")) - assert os.path.exists(os.path.join(test_dir, "cpr_src", "sub", "testr")) - with open(os.path.join(test_dir, "cpr_src", "test"), encoding="utf-8") as f: + copy_r(os.path.join(TEST_DIR, "cpr_src"), os.path.join(TEST_DIR, "cpr_dst")) + assert os.path.exists(os.path.join(TEST_DIR, "cpr_dst", "test")) + assert os.path.exists(os.path.join(TEST_DIR, "cpr_dst", "sub", "testr")) + + compress_dir(os.path.join(TEST_DIR, "cpr_src")) + assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "test.gz")) + assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "sub", "testr.gz")) + + decompress_dir(os.path.join(TEST_DIR, "cpr_src")) + assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "test")) + assert os.path.exists(os.path.join(TEST_DIR, "cpr_src", "sub", "testr")) + with open(os.path.join(TEST_DIR, "cpr_src", "test"), encoding="utf-8") as f: txt = f.read() assert txt == "what" def test_pathlib(self): - test_path = Path(test_dir) + test_path = Path(TEST_DIR) copy_r(test_path / "cpr_src", test_path / "cpr_dst") assert (test_path / "cpr_dst" / "test").exists() assert (test_path / "cpr_dst" / "sub" / "testr").exists() def teardown_method(self): - shutil.rmtree(os.path.join(test_dir, "cpr_src")) - shutil.rmtree(os.path.join(test_dir, "cpr_dst")) + shutil.rmtree(os.path.join(TEST_DIR, "cpr_src")) + shutil.rmtree(os.path.join(TEST_DIR, "cpr_dst")) class TestCompressFileDir: def setup_method(self): - with open(os.path.join(test_dir, "tempfile"), "w", encoding="utf-8") as f: + with open(os.path.join(TEST_DIR, "tempfile"), "w", encoding="utf-8") as f: f.write("hello world") def test_compress_and_decompress_file(self): - fname = os.path.join(test_dir, "tempfile") + fname = os.path.join(TEST_DIR, "tempfile") for fmt in ["gz", "bz2"]: compress_file(fname, fmt) @@ -97,8 +97,8 @@ def test_compress_and_decompress_file(self): assert decompress_file("non-existent.bz2") is None def test_compress_and_decompress_with_target_dir(self): - fname = os.path.join(test_dir, "tempfile") - target_dir = os.path.join(test_dir, "temp_target_dir") + fname = os.path.join(TEST_DIR, "tempfile") + target_dir = os.path.join(TEST_DIR, "temp_target_dir") for fmt in ["gz", "bz2"]: compress_file(fname, fmt, target_dir) @@ -121,22 +121,22 @@ def test_compress_and_decompress_with_target_dir(self): assert f.read() == "hello world" def teardown_method(self): - os.remove(os.path.join(test_dir, "tempfile")) + os.remove(os.path.join(TEST_DIR, "tempfile")) class TestGzipDir: def setup_method(self): - os.mkdir(os.path.join(test_dir, "gzip_dir")) + os.mkdir(os.path.join(TEST_DIR, "gzip_dir")) with open( - os.path.join(test_dir, "gzip_dir", "tempfile"), "w", encoding="utf-8" + os.path.join(TEST_DIR, "gzip_dir", "tempfile"), "w", encoding="utf-8" ) as f: f.write("what") - self.mtime = os.path.getmtime(os.path.join(test_dir, "gzip_dir", "tempfile")) + self.mtime = os.path.getmtime(os.path.join(TEST_DIR, "gzip_dir", "tempfile")) def test_gzip_dir(self): - full_f = os.path.join(test_dir, "gzip_dir", "tempfile") - gzip_dir(os.path.join(test_dir, "gzip_dir")) + full_f = os.path.join(TEST_DIR, "gzip_dir", "tempfile") + gzip_dir(os.path.join(TEST_DIR, "gzip_dir")) assert os.path.exists(f"{full_f}.gz") assert not os.path.exists(full_f) @@ -148,7 +148,7 @@ def test_gzip_dir(self): def test_gzip_dir_file_coexist(self): """Test case where both file and file.gz exist.""" - full_f = os.path.join(test_dir, "gzip_dir", "temptestfile") + full_f = os.path.join(TEST_DIR, "gzip_dir", "temptestfile") gz_f = f"{full_f}.gz" # Create both the file and its gzipped version @@ -160,7 +160,7 @@ def test_gzip_dir_file_coexist(self): with pytest.warns( UserWarning, match="Both temptestfile and temptestfile.gz exist." ): - gzip_dir(os.path.join(test_dir, "gzip_dir")) + gzip_dir(os.path.join(TEST_DIR, "gzip_dir")) # Verify contents of the files with open(full_f, "r", encoding="utf-8") as f: @@ -170,13 +170,13 @@ def test_gzip_dir_file_coexist(self): assert g.read() == b"gzipped" def test_handle_sub_dirs(self): - sub_dir = os.path.join(test_dir, "gzip_dir", "sub_dir") + sub_dir = os.path.join(TEST_DIR, "gzip_dir", "sub_dir") sub_file = os.path.join(sub_dir, "new_tempfile") os.mkdir(sub_dir) with open(sub_file, "w", encoding="utf-8") as f: f.write("anotherwhat") - gzip_dir(os.path.join(test_dir, "gzip_dir")) + gzip_dir(os.path.join(TEST_DIR, "gzip_dir")) assert os.path.exists(f"{sub_file}.gz") assert not os.path.exists(sub_file) @@ -185,13 +185,13 @@ def test_handle_sub_dirs(self): assert g.readline().decode("utf-8") == "anotherwhat" def teardown_method(self): - shutil.rmtree(os.path.join(test_dir, "gzip_dir")) + shutil.rmtree(os.path.join(TEST_DIR, "gzip_dir")) class TestRemove: @unittest.skipIf(platform.system() == "Windows", "Skip on windows") def test_remove_file(self): - tempdir = tempfile.mkdtemp(dir=test_dir) + tempdir = tempfile.mkdtemp(dir=TEST_DIR) tempf = tempfile.mkstemp(dir=tempdir)[1] remove(tempf) assert not os.path.isfile(tempf) @@ -199,17 +199,17 @@ def test_remove_file(self): @unittest.skipIf(platform.system() == "Windows", "Skip on windows") def test_remove_folder(self): - tempdir = tempfile.mkdtemp(dir=test_dir) + tempdir = tempfile.mkdtemp(dir=TEST_DIR) remove(tempdir) assert not os.path.isdir(tempdir) @unittest.skipIf(platform.system() == "Windows", "Skip on windows") def test_remove_symlink(self): - tempdir = tempfile.mkdtemp(dir=test_dir) + tempdir = tempfile.mkdtemp(dir=TEST_DIR) tempf = tempfile.mkstemp(dir=tempdir)[1] - os.symlink(tempdir, os.path.join(test_dir, "temp_link")) - templink = os.path.join(test_dir, "temp_link") + os.symlink(tempdir, os.path.join(TEST_DIR, "temp_link")) + templink = os.path.join(TEST_DIR, "temp_link") remove(templink) assert os.path.isfile(tempf) assert os.path.isdir(tempdir) @@ -218,11 +218,11 @@ def test_remove_symlink(self): @unittest.skipIf(platform.system() == "Windows", "Skip on windows") def test_remove_symlink_follow(self): - tempdir = tempfile.mkdtemp(dir=test_dir) + tempdir = tempfile.mkdtemp(dir=TEST_DIR) tempf = tempfile.mkstemp(dir=tempdir)[1] - os.symlink(tempdir, os.path.join(test_dir, "temp_link")) - templink = os.path.join(test_dir, "temp_link") + os.symlink(tempdir, os.path.join(TEST_DIR, "temp_link")) + templink = os.path.join(TEST_DIR, "temp_link") remove(templink, follow_symlink=True) assert not os.path.isfile(tempf) assert not os.path.isdir(tempdir) diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index 223211c2d..c5c82dd60 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -8,8 +8,9 @@ def test_command(): sleep05 = Command("sleep 0.5") sleep05.run(timeout=1) - print(sleep05) - assert sleep05.retcode == 0 + # DEBUG: this unit test fail in Windows CI intermittently (PR702) + full_msg = f"{sleep05=}\n{sleep05.error=}\n{sleep05.output}" + assert sleep05.retcode == 0, full_msg assert not sleep05.killed sleep05.run(timeout=0.1)