diff --git a/Makefile b/Makefile
index 024b4f3..c127ce3 100644
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-PY_FILES=fluster
+PY_FILES=fluster scripts
CONTRIB_DIR=contrib
DECODERS_DIR=decoders
PYTHONPATH=.
@@ -105,7 +105,7 @@ mpeg_2_aac_reference_decoder: ## build ISO MPEG2 AAC reference decoder
if ! dpkg -l | grep g++-multilib -c >>/dev/null; then sudo apt-get install g++-multilib; fi
if [ ! $(wildcard /usr/include/asm) ] && [ $(wildcard /usr/include/asm-generic) ]; then sudo ln -s /usr/include/asm-generic /usr/include/asm; fi
-ifeq ($(wildcard $(CONTRIB_DIR)/C039486_Electronic_inserts), )
+ifeq ($(wildcard $(CONTRIB_DIR)/C039486_Electronic_inserts),)
$(create_dirs)
cd $(CONTRIB_DIR) && rm -f iso_cookies.txt
cd $(CONTRIB_DIR) && wget -qO- --keep-session-cookies --save-cookies iso_cookies.txt \
diff --git a/fluster/decoders/ffmpeg.py b/fluster/decoders/ffmpeg.py
index 35d3f13..76823a6 100644
--- a/fluster/decoders/ffmpeg.py
+++ b/fluster/decoders/ffmpeg.py
@@ -152,11 +152,7 @@ def check(self, verbose: bool) -> bool:
# Get ffmpeg version
output = _run_ffmpeg_command(self.binary, "-version", verbose=verbose)
version = re.search(r" version n?(\d+)\.(\d+)(?:\.(\d+))?", output)
- self.ffmpeg_version = (
- tuple(map(lambda x: int(x) if x else 0, version.groups()))
- if version
- else None
- )
+ self.ffmpeg_version = tuple((int(x) if x else 0 for x in version.groups())) if version else None
# Check if codec can be used
output = _run_ffmpeg_command(self.binary, "-codecs", verbose=verbose)
diff --git a/fluster/fluster.py b/fluster/fluster.py
index bcba665..cd1d0db 100644
--- a/fluster/fluster.py
+++ b/fluster/fluster.py
@@ -172,9 +172,7 @@ def _load_test_suites(self) -> None:
if len(self.test_suites) == 0:
raise Exception(f'No test suites found in "{self.test_suites_dir}"')
- def list_decoders(
- self, check: bool, verbose: bool, codec: Optional[Codec] = None
- ) -> None:
+ def list_decoders(self, check: bool, verbose: bool, codec: Optional[Codec] = None) -> None:
"""List all the available decoders"""
print("\nList of available decoders:")
decoders_dict: Dict[Codec, List[Decoder]] = {}
diff --git a/fluster/main.py b/fluster/main.py
index bcc2e2f..9879e67 100644
--- a/fluster/main.py
+++ b/fluster/main.py
@@ -25,7 +25,6 @@
from tempfile import gettempdir
from typing import Any, Tuple
-
from fluster import utils
from fluster.codec import Codec
from fluster.fluster import Context, Fluster, SummaryFormat
diff --git a/scripts/gen_aac.py b/scripts/gen_aac.py
index 8a38197..255d285 100755
--- a/scripts/gen_aac.py
+++ b/scripts/gen_aac.py
@@ -18,21 +18,20 @@
# License along with this library. If not, see .
import argparse
-import re
-from html.parser import HTMLParser
-from multiprocessing import Pool
+import multiprocessing
import os
+import re
import sys
import urllib.request
-import multiprocessing
+from html.parser import HTMLParser
+from multiprocessing import Pool
+from typing import Any, List, Optional, Tuple
-# pylint: disable=wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from fluster import utils
from fluster.codec import Codec, OutputFormat
-from fluster.test_suite import TestSuite, TestVector
-
-# pylint: enable=wrong-import-position
+from fluster.test_suite import TestSuite
+from fluster.test_vector import TestVector
BASE_URL = "https://standards.iso.org/"
@@ -49,21 +48,21 @@
BITSTREAM_EXTS = [".adts", ".adif"]
MD5_EXTS = [".wav.md5sum"]
-MD5_EXCLUDES = []
+MD5_EXCLUDES: List[str] = []
RAW_EXTS = [".wav"]
class HREFParser(HTMLParser):
"""Custom parser to find href links"""
- def __init__(self):
- self.links = []
+ def __init__(self) -> None:
+ self.links: List[Any] = []
super().__init__()
- def error(self, message):
+ def error(self, message: str) -> None:
print(message)
- def handle_starttag(self, tag, attrs):
+ def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
# Only parse the 'anchor' tag.
if tag == "a":
# Check the list of defined attributes.
@@ -71,7 +70,7 @@ def handle_starttag(self, tag, attrs):
# If href is defined, print it.
if name == "href":
base_url = BASE_URL if BASE_URL[-1] != "/" else BASE_URL[0:-1]
- self.links.append(base_url + value)
+ self.links.append(base_url + str(value))
class AACGenerator:
@@ -97,11 +96,13 @@ def __init__(
self.url_reference_vectors_checksums = url_reference_vectors_checksums
self.use_ffprobe = use_ffprobe
- def _download_raw_output_references_and_checksums(self, jobs, test_suite, raw_bitstream_links,
- raw_bitstream_md5_links):
+ def _download_raw_output_references_and_checksums(
+ self, jobs: int, test_suite: TestSuite, raw_bitstream_links: List[str], raw_bitstream_md5_links: List[str]
+ ) -> None:
"""Downloads raw output reference bitstreams and their checksums"""
with Pool(jobs) as pool:
- def _callback_error(err):
+
+ def _callback_error(err: Any) -> None:
print(f"\nError downloading -> {err}\n")
pool.terminate()
@@ -111,8 +112,8 @@ def _callback_error(err):
for link in raw_bitstream_links:
file_name = os.path.basename(link)
- base_name = file_name.split('.')[0]
- main_prefix = "_".join(base_name.split('_')[:2])
+ base_name = file_name.split(".")[0]
+ main_prefix = "_".join(base_name.split("_")[:2])
directory = os.path.join(test_suite.resources_dir, test_suite.name, main_prefix)
if not os.path.exists(directory):
@@ -130,8 +131,8 @@ def _callback_error(err):
for link in raw_bitstream_md5_links:
file_name = os.path.basename(link)
- base_name = file_name.split('.')[0]
- main_prefix = "_".join(base_name.split('_')[:2])
+ base_name = file_name.split(".")[0]
+ main_prefix = "_".join(base_name.split("_")[:2])
directory = os.path.join(test_suite.resources_dir, test_suite.name, main_prefix)
if not os.path.exists(directory):
@@ -156,7 +157,7 @@ def _callback_error(err):
except Exception as e:
sys.exit(f"Some download failed: {e}")
- def generate(self, download, jobs):
+ def generate(self, download: bool, jobs: int) -> None:
"""Generates the test suite and saves it to a file"""
output_filepath = os.path.join(self.suite_name + ".json")
test_suite = TestSuite(
@@ -165,7 +166,7 @@ def generate(self, download, jobs):
self.suite_name,
self.codec,
self.description,
- dict(),
+ {},
)
hparser = HREFParser()
@@ -181,11 +182,9 @@ def generate(self, download, jobs):
hparser.feed(data)
raw_bitstream_links = [url for url in hparser.links if url.endswith(tuple(RAW_EXTS))]
- raw_bitstream_names = [
- os.path.splitext(os.path.basename(x))[0].split('_f')[0] for x in raw_bitstream_links
- ]
+ raw_bitstream_names = [os.path.splitext(os.path.basename(x))[0].split("_f")[0] for x in raw_bitstream_links]
- missing_files = [x for x in set(compressed_bitstream_names).difference(raw_bitstream_names)]
+ missing_files = list(set(compressed_bitstream_names).difference(raw_bitstream_names))
if missing_files:
print(f"Missing reference files: {missing_files}")
for missing_file in missing_files:
@@ -204,11 +203,11 @@ def generate(self, download, jobs):
raw_bitstream_md5_links = [url for url in hparser.links if url.endswith(tuple(MD5_EXTS))]
raw_bitstream_md5_names = [
- os.path.splitext(os.path.splitext(os.path.basename(x))[0].split('_f')[0])[0] for x in
- raw_bitstream_md5_links
+ os.path.splitext(os.path.splitext(os.path.basename(x))[0].split("_f")[0])[0]
+ for x in raw_bitstream_md5_links
]
- missing_checksum_files = [x for x in set(compressed_bitstream_names).difference(raw_bitstream_md5_names)]
+ missing_checksum_files = list(set(compressed_bitstream_names).difference(raw_bitstream_md5_names))
if missing_checksum_files:
print(f"Missing reference checksum files: {missing_checksum_files}")
for missing_checksum in missing_checksum_files:
@@ -225,9 +224,7 @@ def generate(self, download, jobs):
for source_url in compressed_bitstream_links:
input_filename = os.path.basename(source_url)
test_vector_name = os.path.splitext(input_filename)[0]
- test_vector = TestVector(
- test_vector_name, source_url, "__skip__", input_filename, OutputFormat.UNKNOWN, ""
- )
+ test_vector = TestVector(test_vector_name, source_url, "__skip__", input_filename, OutputFormat.UNKNOWN, "")
test_suite.test_vectors[test_vector_name] = test_vector
print(f"Download list of compressed bitstreams from {self.url_test_vectors}")
@@ -241,8 +238,9 @@ def generate(self, download, jobs):
)
# Download test suite output reference and md5 checksum files
- self._download_raw_output_references_and_checksums(jobs, test_suite, raw_bitstream_links,
- raw_bitstream_md5_links)
+ self._download_raw_output_references_and_checksums(
+ jobs, test_suite, raw_bitstream_links, raw_bitstream_md5_links
+ )
for test_vector in test_suite.test_vectors.values():
dest_dir = os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name)
@@ -286,7 +284,7 @@ def generate(self, download, jobs):
print("Generate new test suite: " + test_suite.name + ".json")
@staticmethod
- def _fill_checksum_aac(test_vector, dest_dir):
+ def _fill_checksum_aac(test_vector: TestVector, dest_dir: str) -> None:
base_name = test_vector.name
raw_file = None
ext = None
@@ -299,23 +297,24 @@ def _fill_checksum_aac(test_vector, dest_dir):
if not raw_file:
for ext in RAW_EXTS:
- fallback_file = os.path.join(dest_dir, base_name + '_f00' + ext)
+ fallback_file = os.path.join(dest_dir, base_name + "_f00" + ext)
if os.path.exists(fallback_file):
raw_file = fallback_file
break
if not raw_file:
raise Exception(
- f"Neither {base_name + ext} nor {base_name + '_f00' + ext} found with extensions {RAW_EXTS} in {dest_dir}"
+ f"Neither {base_name + ext} nor {base_name + '_f00' + ext} found with extensions {RAW_EXTS} "
+ f"in {dest_dir}"
)
checksum_file = utils.find_by_ext(dest_dir, MD5_EXTS)
if checksum_file is None:
raise Exception("MD5 not found")
- with open(checksum_file, "r") as checksum_file:
- regex = re.compile(rf"([a-fA-F0-9]{{32,}}).*(?:\.(wav))?")
- lines = checksum_file.readlines()
+ with open(checksum_file, "r") as checksum_fh:
+ regex = re.compile(r"([a-fA-F0-9]{32,}).*(?:\.(wav))?")
+ lines = checksum_fh.readlines()
# Filter out empty lines
filtered_lines = [line.strip() for line in lines if line.strip()]
# Prefer lines matching the regex pattern
@@ -327,8 +326,7 @@ def _fill_checksum_aac(test_vector, dest_dir):
test_vector.result = match.group(1).lower()
# Assert that we have extracted a valid MD5 from the file
assert (
- len(test_vector.result) == 32
- and re.search(r"^[a-fA-F0-9]{32}$", test_vector.result) is not None
+ len(test_vector.result) == 32 and re.search(r"^[a-fA-F0-9]{32}$", test_vector.result) is not None
), f"{test_vector.result} is not a valid MD5 hash"
test_vector.result = utils.file_checksum(raw_file)
diff --git a/scripts/gen_av1_argon.py b/scripts/gen_av1_argon.py
index e0faac3..5b9c707 100755
--- a/scripts/gen_av1_argon.py
+++ b/scripts/gen_av1_argon.py
@@ -24,14 +24,13 @@
import sys
import urllib.error
import zipfile
+from typing import Any
-# pylint: disable=wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from fluster import utils
from fluster.codec import Codec, OutputFormat
-from fluster.test_suite import TestSuite, TestVector
-
-# pylint: enable=wrong-import-position
+from fluster.test_suite import TestSuite
+from fluster.test_vector import TestVector
ARGON_URL = "https://storage.googleapis.com/downloads.aomedia.org/assets/zip/"
@@ -55,7 +54,7 @@ def __init__(
self.site = site
self.use_ffprobe = use_ffprobe
- def generate(self, download):
+ def generate(self, download: bool) -> None:
"""Generates the test suite and saves it to a file"""
output_filepath = os.path.join(self.suite_name + ".json")
extract_folder = "resources"
@@ -65,7 +64,7 @@ def generate(self, download):
self.suite_name,
self.codec,
self.description,
- dict(),
+ {},
)
os.makedirs(extract_folder, exist_ok=True)
# Download the zip file
@@ -76,9 +75,7 @@ def generate(self, download):
utils.download(source_url, extract_folder)
except urllib.error.URLError as ex:
exception_str = str(ex)
- print(
- f"\tUnable to download {source_url} to {extract_folder}, {exception_str}"
- )
+ print(f"\tUnable to download {source_url} to {extract_folder}, {exception_str}")
except Exception as ex:
raise Exception(str(ex)) from ex
@@ -93,96 +90,89 @@ def generate(self, download):
test_vector_files.append(file_info)
# Extract md5 files
- if (
- file_info.endswith(".md5")
- and "md5_ref/" in file_info
- and "layers/" not in file_info
- ):
+ if file_info.endswith(".md5") and "md5_ref/" in file_info and "layers/" not in file_info:
zip_ref.extract(file_info, extract_folder)
- # Create the test vector and test suite
- print("Creating test vectors and test suite")
- source_checksum = utils.file_checksum(extract_folder + "/" + self.name)
- for file in test_vector_files:
- filename = os.path.splitext(os.path.basename(file))[0]
- # ffprobe execution
- if self.use_ffprobe:
- full_path = os.path.abspath(extract_folder + "/" + file)
- ffprobe = utils.normalize_binary_cmd("ffprobe")
- command = [
- ffprobe,
- "-v",
- "error",
- "-select_streams",
- "v:0",
- "-show_entries",
- "stream=pix_fmt",
- "-of",
- "default=nokey=1:noprint_wrappers=1",
- full_path,
- ]
- try:
- result = utils.run_command_with_output(command).splitlines()
- pix_fmt = result[0]
- if pix_fmt == "unknown":
- pix_fmt = "Unknown"
- except subprocess.CalledProcessError:
- pix_fmt = "None"
-
- # Processing md5 files
- md5_file_to_find = os.path.splitext(filename)[0] + ".md5"
- full_path_split = full_path.split("/")
- md5_directory_path = (
- "/".join(full_path_split[: len(full_path_split) - 2])
- + "/"
- + "md5_ref"
- )
- md5_file_path = os.path.join(md5_directory_path, md5_file_to_find)
-
- # Check the .md5 file and get checksum
- if os.path.exists(md5_file_path):
- try:
- result_checksum = self._fill_checksum_argon(md5_file_path)
- except Exception as ex:
- print("MD5 does not match")
- raise ex
- else:
- try:
- result_checksum = utils.file_checksum(full_path)
- except Exception as ex:
- print("MD5 cannot be calculated")
- raise ex
-
- # Add data to the test vector and the test suite
- test_vector = TestVector(
- filename,
- source_url,
- source_checksum,
- file,
- OutputFormat[pix_fmt.upper()],
- result_checksum,
- )
- test_suite.test_vectors[filename] = test_vector
+ # Create the test vector and test suite
+ print("Creating test vectors and test suite")
+ source_checksum = utils.file_checksum(extract_folder + "/" + self.name)
+ for file in test_vector_files:
+ filename = os.path.splitext(os.path.basename(file))[0]
+ # ffprobe execution
+ if self.use_ffprobe:
+ full_path = os.path.abspath(extract_folder + "/" + file)
+ ffprobe = utils.normalize_binary_cmd("ffprobe")
+ command = [
+ ffprobe,
+ "-v",
+ "error",
+ "-select_streams",
+ "v:0",
+ "-show_entries",
+ "stream=pix_fmt",
+ "-of",
+ "default=nokey=1:noprint_wrappers=1",
+ full_path,
+ ]
+ try:
+ result = utils.run_command_with_output(command).splitlines()
+ pix_fmt = result[0]
+ if pix_fmt == "unknown":
+ pix_fmt = "Unknown"
+ except subprocess.CalledProcessError:
+ pix_fmt = "None"
+
+ # Processing md5 files
+ md5_file_to_find = os.path.splitext(filename)[0] + ".md5"
+ full_path_split = full_path.split("/")
+ md5_directory_path = "/".join(full_path_split[: len(full_path_split) - 2]) + "/" + "md5_ref"
+ md5_file_path = os.path.join(md5_directory_path, md5_file_to_find)
+
+ # Check the .md5 file and get checksum
+ if os.path.exists(md5_file_path):
+ try:
+ result_checksum = self._fill_checksum_argon(md5_file_path)
+ except Exception as ex:
+ print("MD5 does not match")
+ raise ex
+ else:
+ try:
+ result_checksum = utils.file_checksum(full_path)
+ except Exception as ex:
+ print("MD5 cannot be calculated")
+ raise ex
+
+ # Add data to the test vector and the test suite
+ test_vector = TestVector(
+ filename,
+ source_url,
+ source_checksum,
+ file,
+ OutputFormat[pix_fmt.upper()],
+ result_checksum,
+ )
+ test_suite.test_vectors[filename] = test_vector
test_suite.to_json_file(output_filepath)
print("Generate new test suite: " + test_suite.name + ".json")
@staticmethod
- def _fill_checksum_argon(dest_dir):
+ def _fill_checksum_argon(dest_dir: str) -> Any:
checksum_file = dest_dir
if checksum_file is None:
raise Exception("MD5 not found")
- with open(checksum_file, "r") as checksum_file:
+ with open(checksum_file, "r") as checksum_fh:
regex = re.compile(r"([a-fA-F0-9]{32,}).*\.(yuv|rgb|gbr)")
- lines = checksum_file.readlines()
- if any((match := regex.match(line)) for line in lines):
+ lines = checksum_fh.readlines()
+ # Prefer lines matching the regex pattern
+ match = next((regex.match(line) for line in lines if regex.match(line)), None)
+ if match:
result = match.group(1)[:32].lower()
else:
result = -1
# Assert that we have extracted a valid MD5 from the file
assert (
- len(result) == 32
- and re.search(r"^[a-fA-F0-9]{32}$", result) is not None
+ len(result) == 32 and re.search(r"^[a-fA-F0-9]{32}$", result) is not None
), f"{result} is not a valid MD5 hash"
return result
diff --git a/scripts/gen_av1_chromium.py b/scripts/gen_av1_chromium.py
old mode 100644
new mode 100755
index fece574..03fc7ac
--- a/scripts/gen_av1_chromium.py
+++ b/scripts/gen_av1_chromium.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python3
+
# Fluster - testing framework for decoders conformance
#
# This library is free software; you can redistribute it and/or
@@ -13,20 +15,19 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library. If not, see .
-import os
import argparse
-import sys
import multiprocessing
+import os
import re
+import sys
+from typing import Any, Optional, Tuple # noqa: F401
-# pylint: disable=wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
-from fluster.test_suite import TestSuite, TestVector
-from fluster.codec import Codec, OutputFormat
from fluster import utils
+from fluster.codec import Codec, OutputFormat
from fluster.decoders import av1_aom
-
-# pylint: enable=wrong-import-position
+from fluster.test_suite import TestSuite
+from fluster.test_vector import TestVector
DOWNLOAD_URL = "https://storage.googleapis.com/chromiumos-test-assets-public/tast/cros/video/test_vectors/av1"
@@ -48,7 +49,7 @@
)
TESTS_10BPP = (
- #10 bit
+ # 10 bit
"00000671_20210310.ivf",
"00000672_20210310.ivf",
"00000673_20210310.ivf",
@@ -71,9 +72,10 @@
"av1-1-b10-00-quantizer-40_20210310.ivf",
"av1-1-b10-00-quantizer-50_20210310.ivf",
"av1-1-b10-00-quantizer-60_20210310.ivf",
- "av1-1-b10-23-film_grain-50_20210310.ivf"
+ "av1-1-b10-23-film_grain-50_20210310.ivf",
)
+
class ChromiumAV1Generator:
"""Generates a test suite from the conformance bitstreams used in tast tests for Chromium"""
@@ -92,8 +94,7 @@ def __init__(
self.decoder = av1_aom.AV1AOMDecoder()
self.bpp = bpp
-
- def generate(self, download, jobs):
+ def generate(self, download: bool, jobs: int) -> Any:
"""Generates the test suite and saves it to a file"""
output_filepath = os.path.join(self.suite_name + ".json")
test_suite = TestSuite(
@@ -102,24 +103,24 @@ def generate(self, download, jobs):
self.suite_name,
self.codec,
self.description,
- dict(),
+ {},
)
print(f"Download list of bitstreams from {DOWNLOAD_URL}")
+ tests: Optional[Tuple[str, ...]]
if self.bpp == 10:
- TESTS = TESTS_10BPP
+ tests = TESTS_10BPP
elif self.bpp == 8:
- TESTS = TESTS_8BPP
+ tests = TESTS_8BPP
else:
return
- for test in TESTS:
+ for test in tests:
file_url = f"{DOWNLOAD_URL}/{test}"
- name = re.sub("_[\d]*", "", test)
+ name = re.sub(r"_[\d]*", "", test)
- test_vector = TestVector(
- name, file_url, "__skip__", test, OutputFormat.YUV420P, "")
+ test_vector = TestVector(name, file_url, "__skip__", test, OutputFormat.YUV420P, "")
test_suite.test_vectors[name] = test_vector
@@ -133,16 +134,10 @@ def generate(self, download, jobs):
)
for test_vector in test_suite.test_vectors.values():
- dest_dir = os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
- dest_path = os.path.join(
- dest_dir, os.path.basename(test_vector.source))
+ dest_dir = os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name)
+ dest_path = os.path.join(dest_dir, os.path.basename(test_vector.source))
test_vector.input_file = dest_path.replace(
- os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
- + os.sep,
+ os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name) + os.sep,
"",
)
@@ -151,8 +146,7 @@ def generate(self, download, jobs):
test_vector.source_checksum = utils.file_checksum(dest_path)
out420 = f"{dest_path}.i420"
# Run the libaom av1 decoder to get the checksum as the .md5 in the JSONs are per-frame
- test_vector.result = self.decoder.decode(
- dest_path, out420, test_vector.output_format, 30, False, False)
+ test_vector.result = self.decoder.decode(dest_path, out420, test_vector.output_format, 30, False, False)
os.remove(out420)
test_suite.to_json_file(output_filepath)
diff --git a/scripts/gen_jct_vc.py b/scripts/gen_jct_vc.py
old mode 100644
new mode 100755
index a2934b4..51e37f7
--- a/scripts/gen_jct_vc.py
+++ b/scripts/gen_jct_vc.py
@@ -20,42 +20,38 @@
# License along with this library. If not, see .
import argparse
-from html.parser import HTMLParser
+import multiprocessing
import os
+import re
import sys
import urllib.request
-import multiprocessing
-import re
+from html.parser import HTMLParser
+from typing import Any, List, Optional, Tuple
-# pylint: disable=wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from fluster import utils
from fluster.codec import Codec, OutputFormat
-from fluster.test_suite import TestSuite, TestVector
-
-# pylint: enable=wrong-import-position
+from fluster.test_suite import TestSuite
+from fluster.test_vector import TestVector
BASE_URL = "https://www.itu.int/"
H265_URL = BASE_URL + "wftp3/av-arch/jctvc-site/bitstream_exchange/draft_conformance/"
-BITSTREAM_EXTS = (
- ".bin",
- ".bit",
-)
-MD5_EXTS = ("yuv_2.md5", "yuv.md5", ".md5", ".MD5", "md5.txt", "md5sum.txt")
-MD5_EXCLUDES = (".bin.md5", "bit.md5")
+BITSTREAM_EXTS = [".bin", ".bit"]
+MD5_EXTS = ["yuv_2.md5", "yuv.md5", ".md5", ".MD5", "md5.txt", "md5sum.txt"]
+MD5_EXCLUDES = [".bin.md5", "bit.md5"]
class HREFParser(HTMLParser):
"""Custom parser to find href links"""
- def __init__(self):
- self.links = []
+ def __init__(self) -> None:
+ self.links: List[Any] = []
super().__init__()
- def error(self, message):
+ def error(self, message: str) -> None:
print(message)
- def handle_starttag(self, tag, attrs):
+ def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
# Only parse the 'anchor' tag.
if tag == "a":
# Check the list of defined attributes.
@@ -63,20 +59,14 @@ def handle_starttag(self, tag, attrs):
# If href is defined, print it.
if name == "href":
base_url = BASE_URL if BASE_URL[-1] != "/" else BASE_URL[0:-1]
- self.links.append(base_url + value)
+ self.links.append(base_url + str(value))
class JCTVCGenerator:
"""Generates a test suite from the conformance bitstreams"""
def __init__(
- self,
- name: str,
- suite_name: str,
- codec: Codec,
- description: str,
- site: str,
- use_ffprobe: bool = False
+ self, name: str, suite_name: str, codec: Codec, description: str, site: str, use_ffprobe: bool = False
):
self.name = name
self.suite_name = suite_name
@@ -85,7 +75,7 @@ def __init__(
self.site = site
self.use_ffprobe = use_ffprobe
- def generate(self, download, jobs):
+ def generate(self, download: bool, jobs: int) -> None:
"""Generates the test suite and saves it to a file"""
output_filepath = os.path.join(self.suite_name + ".json")
test_suite = TestSuite(
@@ -94,7 +84,7 @@ def generate(self, download, jobs):
self.suite_name,
self.codec,
self.description,
- dict(),
+ {},
)
hparser = HREFParser()
@@ -129,17 +119,12 @@ def generate(self, download, jobs):
break
for test_vector in test_suite.test_vectors.values():
- dest_dir = os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
+ dest_dir = os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name)
dest_path = os.path.join(dest_dir, os.path.basename(test_vector.source))
- test_vector.input_file = utils.find_by_ext(dest_dir, BITSTREAM_EXTS)
+ test_vector.input_file = str(utils.find_by_ext(dest_dir, BITSTREAM_EXTS))
absolute_input_path = test_vector.input_file
test_vector.input_file = test_vector.input_file.replace(
- os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
- + os.sep,
+ os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name) + os.sep,
"",
)
if not test_vector.input_file:
@@ -148,11 +133,19 @@ def generate(self, download, jobs):
if "main10" in test_vector.name.lower():
test_vector.output_format = OutputFormat.YUV420P10LE
elif self.use_ffprobe:
- ffprobe = utils.normalize_binary_cmd('ffprobe')
- command = [ffprobe, '-v', 'error', '-select_streams', 'v:0',
- '-show_entries', 'stream=pix_fmt', '-of',
- 'default=nokey=1:noprint_wrappers=1',
- absolute_input_path]
+ ffprobe = utils.normalize_binary_cmd("ffprobe")
+ command = [
+ ffprobe,
+ "-v",
+ "error",
+ "-select_streams",
+ "v:0",
+ "-show_entries",
+ "stream=pix_fmt",
+ "-of",
+ "default=nokey=1:noprint_wrappers=1",
+ absolute_input_path,
+ ]
result = utils.run_command_with_output(command).splitlines()
pix_fmt = result[0]
@@ -176,7 +169,7 @@ def generate(self, download, jobs):
"GENERAL_16b_400_RExt_Sony_1": OutputFormat.GRAY16LE,
"GENERAL_16b_444_highThroughput_RExt_Sony_2": OutputFormat.YUV444P16LE,
"GENERAL_16b_444_RExt_Sony_2": OutputFormat.YUV444P16LE,
- "WAVETILES_RExt_Sony_2": OutputFormat.YUV444P16LE
+ "WAVETILES_RExt_Sony_2": OutputFormat.YUV444P16LE,
}
if test_vector.name in exceptions.keys():
test_vector.output_format = exceptions[test_vector.name]
@@ -188,11 +181,11 @@ def generate(self, download, jobs):
test_suite.to_json_file(output_filepath)
print("Generate new test suite: " + test_suite.name + ".json")
- def _fill_checksum_h265(self, test_vector, dest_dir):
+ def _fill_checksum_h265(self, test_vector: TestVector, dest_dir: str) -> None:
checksum_file = utils.find_by_ext(dest_dir, MD5_EXTS, MD5_EXCLUDES)
if checksum_file is None:
raise Exception("MD5 not found")
- with open(checksum_file, "r") as checksum_file:
+ with open(checksum_file, "r") as checksum_fh:
# The md5 is in several formats
# Example 1
# 158312a1a35ef4b20cb4aeee48549c03 *WP_A_Toshiba_3.bit
@@ -211,15 +204,15 @@ def _fill_checksum_h265(self, test_vector, dest_dir):
# Example 6:
# 9cab6bcd74491062a8523b5a7ff6a540 CCP_8bit_RExt_QCOM.bin
# f3e914fccdb820eac85f46642ea0e168 CCP_8bit_RExt_QCOM.gbr
- regex = re.compile(rf"([a-fA-F0-9]{{32,}}).*\.(yuv|rgb|gbr)")
- lines = checksum_file.readlines()
+ regex = re.compile(r"([a-fA-F0-9]{32,}).*\.(yuv|rgb|gbr)")
+ lines = checksum_fh.readlines()
# Filter out empty lines and lines that start with "#"
filtered_lines = [line.strip() for line in lines if line.strip() and not line.strip().startswith("#")]
# Prefer lines matching the regex pattern
match = next((regex.match(line) for line in filtered_lines if regex.match(line)), None)
if match:
test_vector.result = match.group(1).lower()
- elif self.name in {"RExt", "MV-HEVC", "SCC", "SHVC"}:
+ elif self.name in ["RExt", "MV-HEVC", "SCC", "SHVC"]:
# Handle special cases where checksum is at the end
test_vector.result = filtered_lines[-1].split(" ")[0].strip().lower()
else:
@@ -230,9 +223,9 @@ def _fill_checksum_h265(self, test_vector, dest_dir):
test_vector.result = line.split(" ")[0].strip().lower()
break
# Assert that we have extracted a valid MD5 from the file
- assert len(test_vector.result) == 32 and re.search(
- r"^[a-fA-F0-9]{32}$",
- test_vector.result) is not None, f"{test_vector.result} is not a valid MD5 hash"
+ assert (
+ len(test_vector.result) == 32 and re.search(r"^[a-fA-F0-9]{32}$", test_vector.result) is not None
+ ), f"{test_vector.result} is not a valid MD5 hash"
if __name__ == "__main__":
@@ -260,44 +253,20 @@ def _fill_checksum_h265(self, test_vector, dest_dir):
)
generator.generate(not args.skip_download, args.jobs)
- generator = JCTVCGenerator(
- "RExt",
- "JCT-VC-RExt",
- Codec.H265,
- "JCT-VC HEVC Range Extension",
- H265_URL,
- True
- )
+ generator = JCTVCGenerator("RExt", "JCT-VC-RExt", Codec.H265, "JCT-VC HEVC Range Extension", H265_URL, True)
generator.generate(not args.skip_download, args.jobs)
generator = JCTVCGenerator(
- "SCC",
- "JCT-VC-SCC",
- Codec.H265,
- "JCT-VC HEVC Screen Content Coding Extension",
- H265_URL,
- True
+ "SCC", "JCT-VC-SCC", Codec.H265, "JCT-VC HEVC Screen Content Coding Extension", H265_URL, True
)
generator.generate(not args.skip_download, args.jobs)
generator = JCTVCGenerator(
- "MV-HEVC",
- "JCT-VC-MV-HEVC",
- Codec.H265,
- "JCT-VC HEVC Multiview Extension",
- H265_URL,
- True
+ "MV-HEVC", "JCT-VC-MV-HEVC", Codec.H265, "JCT-VC HEVC Multiview Extension", H265_URL, True
)
generator.generate(not args.skip_download, args.jobs)
- generator = JCTVCGenerator(
- "3D-HEVC",
- "JCT-VC-3D-HEVC",
- Codec.H265,
- "JCT-VC HEVC 3D Extension",
- H265_URL,
- True
- )
+ generator = JCTVCGenerator("3D-HEVC", "JCT-VC-3D-HEVC", Codec.H265, "JCT-VC HEVC 3D Extension", H265_URL, True)
generator.generate(not args.skip_download, args.jobs)
# TODO see comment (https://fluendo.atlassian.net/browse/COM-10938?focusedCommentId=86998)
diff --git a/scripts/gen_jvet.py b/scripts/gen_jvet.py
index 7e6dcaf..bd09f5a 100755
--- a/scripts/gen_jvet.py
+++ b/scripts/gen_jvet.py
@@ -18,42 +18,41 @@
# License along with this library. If not, see .
import argparse
-from html.parser import HTMLParser
+import multiprocessing
import os
import re
import sys
import urllib.request
-import multiprocessing
+from html.parser import HTMLParser
from subprocess import CalledProcessError
+from typing import Any, List, Optional, Tuple
-# pylint: disable=wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from fluster import utils
from fluster.codec import Codec, OutputFormat
-from fluster.test_suite import TestSuite, TestVector
-
-# pylint: enable=wrong-import-position
+from fluster.test_suite import TestSuite
+from fluster.test_vector import TestVector
BASE_URL = "https://www.itu.int/"
H266_URL = BASE_URL + "wftp3/av-arch/jvet-site/bitstream_exchange/VVC/draft_conformance/"
# When there is only 1 element in below variables there must be a ", " at the end.
# Otherwise utils.find_by_ext() considers each character of the string as an individual
# element in the list
-BITSTREAM_EXTS = (".bit", )
-MD5_EXTS = (".yuv.md5", )
+BITSTREAM_EXTS = [".bit"]
+MD5_EXTS = [".yuv.md5"]
class HREFParser(HTMLParser):
"""Custom parser to find href links"""
- def __init__(self):
- self.links = []
+ def __init__(self) -> None:
+ self.links: List[Any] = []
super().__init__()
- def error(self, message):
+ def error(self, message: str) -> None:
print(message)
- def handle_starttag(self, tag, attrs):
+ def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
# Only parse the 'anchor' tag.
if tag == "a":
# Check the list of defined attributes.
@@ -61,20 +60,14 @@ def handle_starttag(self, tag, attrs):
# If href is defined, print it.
if name == "href":
base_url = BASE_URL if BASE_URL[-1] != "/" else BASE_URL[0:-1]
- self.links.append(base_url + value)
+ self.links.append(base_url + str(value))
class JVETGenerator:
"""Generates a test suite from the conformance bitstreams"""
def __init__(
- self,
- name: str,
- suite_name: str,
- codec: Codec,
- description: str,
- site: str,
- use_ffprobe: bool = False
+ self, name: str, suite_name: str, codec: Codec, description: str, site: str, use_ffprobe: bool = False
):
self.name = name
self.suite_name = suite_name
@@ -83,7 +76,7 @@ def __init__(
self.site = site
self.use_ffprobe = use_ffprobe
- def generate(self, download, jobs):
+ def generate(self, download: bool, jobs: int) -> None:
"""Generates the test suite and saves it to a file"""
output_filepath = os.path.join(self.suite_name + ".json")
test_suite = TestSuite(
@@ -92,7 +85,7 @@ def generate(self, download, jobs):
self.suite_name,
self.codec,
self.description,
- dict(),
+ {},
)
hparser = HREFParser()
@@ -118,17 +111,12 @@ def generate(self, download, jobs):
)
for test_vector in test_suite.test_vectors.values():
- dest_dir = os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
+ dest_dir = os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name)
dest_path = os.path.join(dest_dir, os.path.basename(test_vector.source))
- test_vector.input_file = utils.find_by_ext(dest_dir, BITSTREAM_EXTS)
+ test_vector.input_file = str(utils.find_by_ext(dest_dir, BITSTREAM_EXTS))
absolute_input_path = test_vector.input_file
test_vector.input_file = test_vector.input_file.replace(
- os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
- + os.sep,
+ os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name) + os.sep,
"",
)
if not test_vector.input_file:
@@ -136,14 +124,24 @@ def generate(self, download, jobs):
test_vector.source_checksum = utils.file_checksum(dest_path)
if self.use_ffprobe:
try:
- ffprobe = utils.normalize_binary_cmd('ffprobe')
- command = [ffprobe, '-v', 'error', '-strict', '-2',
- '-select_streams', 'v:0',
- '-show_entries', 'stream=pix_fmt', '-of',
- 'default=nokey=1:noprint_wrappers=1',
- absolute_input_path]
+ ffprobe = utils.normalize_binary_cmd("ffprobe")
+ command = [
+ ffprobe,
+ "-v",
+ "error",
+ "-strict",
+ "-2",
+ "-select_streams",
+ "v:0",
+ "-show_entries",
+ "stream=pix_fmt",
+ "-of",
+ "default=nokey=1:noprint_wrappers=1",
+ absolute_input_path,
+ ]
result = utils.run_command_with_output(command).splitlines()
+ print(result)
pix_fmt = result[0]
test_vector.output_format = OutputFormat[pix_fmt.upper()]
except KeyError as key_err:
@@ -162,7 +160,7 @@ def generate(self, download, jobs):
"MNUT_A_Nokia_3": OutputFormat.NONE,
"MNUT_B_Nokia_2": OutputFormat.NONE,
"SUBPIC_C_ERICSSON_1": OutputFormat.NONE,
- "SUBPIC_D_ERICSSON_1": OutputFormat.NONE
+ "SUBPIC_D_ERICSSON_1": OutputFormat.NONE,
}
if test_vector.name in exceptions.keys():
test_vector.output_format = exceptions[test_vector.name]
@@ -175,13 +173,13 @@ def generate(self, download, jobs):
print("Generate new test suite: " + test_suite.name + ".json")
@staticmethod
- def _fill_checksum_h266(test_vector, dest_dir):
+ def _fill_checksum_h266(test_vector: TestVector, dest_dir: str) -> None:
checksum_file = utils.find_by_ext(dest_dir, MD5_EXTS)
if checksum_file is None:
raise Exception("MD5 not found")
- with open(checksum_file, "r") as checksum_file:
- regex = re.compile(rf"([a-fA-F0-9]{{32,}}).*(?:\.(yuv|rgb|gbr))?")
- lines = checksum_file.readlines()
+ with open(checksum_file, "r") as checksum_fh:
+ regex = re.compile(r"([a-fA-F0-9]{32,}).*(?:\.(yuv|rgb|gbr))?")
+ lines = checksum_fh.readlines()
# Filter out empty lines
filtered_lines = [line.strip() for line in lines if line.strip()]
# Prefer lines matching the regex pattern
@@ -189,9 +187,9 @@ def _fill_checksum_h266(test_vector, dest_dir):
if match:
test_vector.result = match.group(1).lower()
# Assert that we have extracted a valid MD5 from the file
- assert len(test_vector.result) == 32 and re.search(
- r"^[a-fA-F0-9]{32}$",
- test_vector.result) is not None, f"{test_vector.result} is not a valid MD5 hash"
+ assert (
+ len(test_vector.result) == 32 and re.search(r"^[a-fA-F0-9]{32}$", test_vector.result) is not None
+ ), f"{test_vector.result} is not a valid MD5 hash"
if __name__ == "__main__":
@@ -211,10 +209,10 @@ def _fill_checksum_h266(test_vector, dest_dir):
)
args = parser.parse_args()
generator = JVETGenerator(
- 'draft6',
- 'JVET-VVC_draft6',
+ "draft6",
+ "JVET-VVC_draft6",
Codec.H266,
- 'JVET VVC draft6',
+ "JVET VVC draft6",
H266_URL,
True,
)
diff --git a/scripts/gen_jvt.py b/scripts/gen_jvt.py
index fb30933..8cd62f8 100755
--- a/scripts/gen_jvt.py
+++ b/scripts/gen_jvt.py
@@ -19,32 +19,23 @@
import argparse
import copy
-import re
-from html.parser import HTMLParser
+import multiprocessing
import os
+import re
import sys
import urllib.request
-import multiprocessing
+from html.parser import HTMLParser
+from typing import Any, List, Optional, Tuple
-# pylint: disable=wrong-import-position
sys.path.append(os.path.join(os.path.dirname(__file__), ".."))
from fluster import utils
from fluster.codec import Codec, OutputFormat
-from fluster.test_suite import TestSuite, TestVector
-
-# pylint: enable=wrong-import-position
+from fluster.test_suite import TestSuite
+from fluster.test_vector import TestVector
BASE_URL = "https://www.itu.int/"
H264_URL = BASE_URL + "wftp3/av-arch/jvt-site/draft_conformance/"
-BITSTREAM_EXTS = [
- ".264",
- ".h264",
- ".jsv",
- ".jvt",
- ".avc",
- ".26l",
- ".bits",
-]
+BITSTREAM_EXTS = [".264", ".h264", ".jsv", ".jvt", ".avc", ".26l", ".bits"]
MD5_EXTS = ["yuv_2.md5", "yuv.md5", ".md5", "md5.txt", "md5sum.txt"]
MD5_EXCLUDES = [".bin.md5", "bit.md5"]
RAW_EXTS = ["nogray.yuv", ".yuv", ".YUV", ".qcif"]
@@ -53,14 +44,14 @@
class HREFParser(HTMLParser):
"""Custom parser to find href links"""
- def __init__(self):
- self.links = []
+ def __init__(self) -> None:
+ self.links: List[Any] = []
super().__init__()
- def error(self, message):
+ def error(self, message: str) -> None:
print(message)
- def handle_starttag(self, tag, attrs):
+ def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]) -> None:
# Only parse the 'anchor' tag.
if tag == "a":
# Check the list of defined attributes.
@@ -68,7 +59,7 @@ def handle_starttag(self, tag, attrs):
# If href is defined, print it.
if name == "href":
base_url = BASE_URL if BASE_URL[-1] != "/" else BASE_URL[0:-1]
- self.links.append(base_url + value)
+ self.links.append(base_url + str(value))
class JVTGenerator:
@@ -90,7 +81,7 @@ def __init__(
self.site = site
self.use_ffprobe = use_ffprobe
- def generate(self, download, jobs):
+ def generate(self, download: bool, jobs: int) -> None:
"""Generates the test suite and saves it to a file"""
new_test_vectors = []
output_filepath = os.path.join(self.suite_name + ".json")
@@ -100,7 +91,7 @@ def generate(self, download, jobs):
self.suite_name,
self.codec,
self.description,
- dict(),
+ {},
)
hparser = HREFParser()
@@ -119,9 +110,7 @@ def generate(self, download, jobs):
file_url = os.path.basename(url)
name = os.path.splitext(file_url)[0]
file_input = f"{name}.bin"
- test_vector = TestVector(
- name, url, "__skip__", file_input, OutputFormat.YUV420P, ""
- )
+ test_vector = TestVector(name, url, "__skip__", file_input, OutputFormat.YUV420P, "")
test_suite.test_vectors[name] = test_vector
if download:
@@ -134,17 +123,12 @@ def generate(self, download, jobs):
)
for test_vector in test_suite.test_vectors.values():
- dest_dir = os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
+ dest_dir = os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name)
dest_path = os.path.join(dest_dir, os.path.basename(test_vector.source))
- test_vector.input_file = utils.find_by_ext(dest_dir, BITSTREAM_EXTS)
+ test_vector.input_file = str(utils.find_by_ext(dest_dir, BITSTREAM_EXTS))
absolute_input_path = test_vector.input_file
test_vector.input_file = test_vector.input_file.replace(
- os.path.join(
- test_suite.resources_dir, test_suite.name, test_vector.name
- )
- + os.sep,
+ os.path.join(test_suite.resources_dir, test_suite.name, test_vector.name) + os.sep,
"",
)
if not test_vector.input_file:
@@ -220,16 +204,10 @@ def generate(self, download, jobs):
"Professional_profiles",
"MVC",
): # result md5 generated from h264_reference_decoder
- if (
- self.name == "SVC"
- ): # result md5 generated for different Lines (L0, L1...)
- new_vectors = self._fill_checksum_h264_multiple(
- test_vector, dest_dir
- )
+ if self.name == "SVC": # result md5 generated for different Lines (L0, L1...)
+ new_vectors = self._fill_checksum_h264_multiple(test_vector, dest_dir)
new_test_vectors.extend(new_vectors)
- test_suite.test_vectors = {
- vector.name: vector for vector in new_test_vectors
- }
+ test_suite.test_vectors = {vector.name: vector for vector in new_test_vectors}
else:
self._fill_checksum_h264(test_vector, dest_dir)
@@ -237,15 +215,15 @@ def generate(self, download, jobs):
print("Generate new test suite: " + test_suite.name + ".json")
@staticmethod
- def _fill_checksum_h264(test_vector, dest_dir):
+ def _fill_checksum_h264(test_vector: TestVector, dest_dir: str) -> None:
raw_file = utils.find_by_ext(dest_dir, RAW_EXTS)
if raw_file is None or len(raw_file) == 0:
raise Exception(f"RAW file not found in {dest_dir}")
test_vector.result = utils.file_checksum(raw_file)
@staticmethod
- def _fill_checksum_h264_multiple(test_vector, dest_dir):
- def remove_r1_from_path(path):
+ def _fill_checksum_h264_multiple(test_vector: TestVector, dest_dir: str) -> List[TestVector]:
+ def remove_r1_from_path(path: str) -> str:
parts = path.split("/")
if len(parts) >= 2:
parts[-2] = re.sub(r"-r1", "", parts[-2])
@@ -258,19 +236,13 @@ def remove_r1_from_path(path):
new_vector = copy.deepcopy(test_vector)
new_vector.name = test_vector.name + suffix
- input_file_path = os.path.join(
- dest_dir, test_vector.name, f"{test_vector.name}{suffix}.264"
- )
- result_file_path = os.path.join(
- dest_dir, test_vector.name, f"{test_vector.name}{suffix}.yuv"
- )
+ input_file_path = os.path.join(dest_dir, test_vector.name, f"{test_vector.name}{suffix}.264")
+ result_file_path = os.path.join(dest_dir, test_vector.name, f"{test_vector.name}{suffix}.yuv")
corrected_input_path = remove_r1_from_path(input_file_path)
corrected_result_path = remove_r1_from_path(result_file_path)
- if os.path.exists(corrected_input_path) and os.path.exists(
- corrected_result_path
- ):
+ if os.path.exists(corrected_input_path) and os.path.exists(corrected_result_path):
new_vector.input_file = os.path.relpath(corrected_input_path, dest_dir)
new_vector.result = utils.file_checksum(corrected_result_path)