From a8abb1649ec21eaad83ffd6e43c22a7828bd806f Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 10 Dec 2024 15:58:32 +0000 Subject: [PATCH] capabilities: use dataclasses to represent complicated return types --- capa/capabilities/common.py | 30 +++++-- capa/capabilities/dynamic.py | 101 ++++++++++++---------- capa/capabilities/static.py | 94 +++++++++++--------- capa/ghidra/capa_explorer.py | 6 +- capa/ghidra/capa_ghidra.py | 34 ++++---- capa/ida/plugin/form.py | 14 +-- capa/loader.py | 14 +-- capa/main.py | 48 +++++----- capa/render/result_document.py | 18 +++- scripts/bulk-process.py | 8 +- scripts/capa-as-library.py | 12 +-- scripts/detect-binexport2-capabilities.py | 8 +- scripts/import-to-ida.py | 2 +- scripts/lint.py | 4 +- scripts/show-capabilities-by-function.py | 10 +-- tests/test_capabilities.py | 48 +++++----- tests/test_dynamic_sequence_scope.py | 29 +++---- tests/test_freeze_dynamic.py | 4 +- tests/test_freeze_static.py | 4 +- tests/test_result_document.py | 3 +- 20 files changed, 274 insertions(+), 217 deletions(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index e9b6f253d..ed9d3456b 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -9,17 +9,28 @@ import logging import itertools import collections -from typing import Any +from typing import Optional +from dataclasses import dataclass from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS +from capa.render.result_document import LibraryFunction, StaticFeatureCounts, DynamicFeatureCounts from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor logger = logging.getLogger(__name__) -def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): +@dataclass +class FileCapabilities: + features: FeatureSet + matches: MatchResults + feature_count: int + + +def find_file_capabilities( + ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet +) -> FileCapabilities: file_features: FeatureSet = collections.defaultdict(set) for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): @@ -36,8 +47,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi file_features.update(function_features) - _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) - return matches, len(file_features) + features, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) + return FileCapabilities(features, matches, len(file_features)) def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: @@ -62,9 +73,14 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon return False -def find_capabilities( - ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs -) -> tuple[MatchResults, Any]: +@dataclass +class Capabilities: + matches: MatchResults + feature_counts: StaticFeatureCounts | DynamicFeatureCounts + library_functions: Optional[tuple[LibraryFunction, ...]] = None + + +def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs) -> Capabilities: from capa.capabilities.static import find_static_capabilities from capa.capabilities.dynamic import find_dynamic_capabilities diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index c280a888b..e52a60d99 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -9,14 +9,14 @@ import logging import itertools import collections -from typing import Any +from dataclasses import dataclass import capa.perf import capa.features.freeze as frz import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults -from capa.capabilities.common import find_file_capabilities +from capa.capabilities.common import Capabilities, find_file_capabilities from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger(__name__) @@ -26,13 +26,17 @@ SEQUENCE_SIZE = 5 +@dataclass +class CallCapabilities: + features: FeatureSet + matches: MatchResults + + def find_call_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> tuple[FeatureSet, MatchResults]: +) -> CallCapabilities: """ find matches for the given rules for the given call. - - returns: tuple containing (features for call, match results for call) """ # all features found for the call. features: FeatureSet = collections.defaultdict(set) @@ -50,16 +54,22 @@ def find_call_capabilities( for addr, _ in res: capa.engine.index_rule_matches(features, rule, [addr]) - return features, matches + return CallCapabilities(features, matches) + + +@dataclass +class ThreadCapabilities: + features: FeatureSet + thread_matches: MatchResults + sequence_matches: MatchResults + call_matches: MatchResults def find_thread_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle -) -> tuple[FeatureSet, MatchResults, MatchResults, MatchResults]: +) -> ThreadCapabilities: """ find matches for the given rules within the given thread. - - returns: tuple containing (features for thread, match results for thread, match results for sequences, match results for calls) """ # all features found within this thread, # includes features found within calls. @@ -75,20 +85,20 @@ def find_thread_capabilities( sequence: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE) for ch in extractor.get_calls(ph, th): - cfeatures, cmatches = find_call_capabilities(ruleset, extractor, ph, th, ch) - for feature, vas in cfeatures.items(): + call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) + for feature, vas in call_capabilities.features.items(): features[feature].update(vas) - for rule_name, res in cmatches.items(): + for rule_name, res in call_capabilities.matches.items(): call_matches[rule_name].extend(res) - sequence.append(cfeatures) - sfeatures: FeatureSet = collections.defaultdict(set) + sequence.append(call_capabilities.features) + sequence_features: FeatureSet = collections.defaultdict(set) for call in sequence: for feature, vas in call.items(): - sfeatures[feature].update(vas) + sequence_features[feature].update(vas) - _, smatches = ruleset.match(Scope.SEQUENCE, sfeatures, ch.address) + _, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address) for rule_name, res in smatches.items(): sequence_matches[rule_name].extend(res) @@ -103,16 +113,23 @@ def find_thread_capabilities( for va, _ in res: capa.engine.index_rule_matches(features, rule, [va]) - return features, matches, sequence_matches, call_matches + return ThreadCapabilities(features, matches, sequence_matches, call_matches) + + +@dataclass +class ProcessCapabilities: + process_matches: MatchResults + thread_matches: MatchResults + sequence_matches: MatchResults + call_matches: MatchResults + feature_count: int def find_process_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle -) -> tuple[MatchResults, MatchResults, MatchResults, MatchResults, int]: +) -> ProcessCapabilities: """ find matches for the given rules within the given process. - - returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) """ # all features found within this process, # includes features found within threads (and calls). @@ -131,29 +148,29 @@ def find_process_capabilities( call_matches: MatchResults = collections.defaultdict(list) for th in extractor.get_threads(ph): - features, tmatches, smatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) - for feature, vas in features.items(): + thread_capabilities = find_thread_capabilities(ruleset, extractor, ph, th) + for feature, vas in thread_capabilities.features.items(): process_features[feature].update(vas) - for rule_name, res in tmatches.items(): + for rule_name, res in thread_capabilities.thread_matches.items(): thread_matches[rule_name].extend(res) - for rule_name, res in smatches.items(): + for rule_name, res in thread_capabilities.sequence_matches.items(): sequence_matches[rule_name].extend(res) - for rule_name, res in cmatches.items(): + for rule_name, res in thread_capabilities.call_matches.items(): call_matches[rule_name].extend(res) for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): process_features[feature].add(va) _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) - return process_matches, thread_matches, sequence_matches, call_matches, len(process_features) + return ProcessCapabilities(process_matches, thread_matches, sequence_matches, call_matches, len(process_features)) def find_dynamic_capabilities( ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None -) -> tuple[MatchResults, Any]: +) -> Capabilities: all_process_matches: MatchResults = collections.defaultdict(list) all_thread_matches: MatchResults = collections.defaultdict(list) all_sequence_matches: MatchResults = collections.defaultdict(list) @@ -170,21 +187,21 @@ def find_dynamic_capabilities( ) as pbar: task = pbar.add_task("matching", total=n_processes, unit="processes") for p in processes: - process_matches, thread_matches, sequence_matches, call_matches, feature_count = find_process_capabilities( - ruleset, extractor, p - ) + process_capabilities = find_process_capabilities(ruleset, extractor, p) feature_counts.processes += ( - rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), + rdoc.ProcessFeatureCount( + address=frz.Address.from_capa(p.address), count=process_capabilities.feature_count + ), ) - logger.debug("analyzed %s and extracted %d features", p.address, feature_count) + logger.debug("analyzed %s and extracted %d features", p.address, process_capabilities.feature_count) - for rule_name, res in process_matches.items(): + for rule_name, res in process_capabilities.process_matches.items(): all_process_matches[rule_name].extend(res) - for rule_name, res in thread_matches.items(): + for rule_name, res in process_capabilities.thread_matches.items(): all_thread_matches[rule_name].extend(res) - for rule_name, res in sequence_matches.items(): + for rule_name, res in process_capabilities.sequence_matches.items(): all_sequence_matches[rule_name].extend(res) - for rule_name, res in call_matches.items(): + for rule_name, res in process_capabilities.call_matches.items(): all_call_matches[rule_name].extend(res) pbar.advance(task) @@ -199,8 +216,8 @@ def find_dynamic_capabilities( rule = ruleset[rule_name] capa.engine.index_rule_matches(process_and_lower_features, rule, locations) - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) - feature_counts.file = feature_count + all_file_capabilities = find_file_capabilities(ruleset, extractor, process_and_lower_features) + feature_counts.file = all_file_capabilities.feature_count matches = dict( itertools.chain( @@ -211,12 +228,8 @@ def find_dynamic_capabilities( all_sequence_matches.items(), all_thread_matches.items(), all_process_matches.items(), - all_file_matches.items(), + all_file_capabilities.matches.items(), ) ) - meta = { - "feature_counts": feature_counts, - } - - return matches, meta + return Capabilities(matches, feature_counts) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index df8cd7e78..5ce032cf8 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -10,7 +10,7 @@ import logging import itertools import collections -from typing import Any +from dataclasses import dataclass import capa.perf import capa.helpers @@ -18,19 +18,23 @@ import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults -from capa.capabilities.common import find_file_capabilities +from capa.capabilities.common import Capabilities, find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor logger = logging.getLogger(__name__) +@dataclass +class InstructionCapabilities: + features: FeatureSet + matches: MatchResults + + def find_instruction_capabilities( ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle -) -> tuple[FeatureSet, MatchResults]: +) -> InstructionCapabilities: """ find matches for the given rules for the given instruction. - - returns: tuple containing (features for instruction, match results for instruction) """ # all features found for the instruction. features: FeatureSet = collections.defaultdict(set) @@ -48,16 +52,21 @@ def find_instruction_capabilities( for addr, _ in res: capa.engine.index_rule_matches(features, rule, [addr]) - return features, matches + return InstructionCapabilities(features, matches) + + +@dataclass +class BasicBlockCapabilities: + features: FeatureSet + basic_block_matches: MatchResults + instruction_matches: MatchResults def find_basic_block_capabilities( ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle -) -> tuple[FeatureSet, MatchResults, MatchResults]: +) -> BasicBlockCapabilities: """ find matches for the given rules within the given basic block. - - returns: tuple containing (features for basic block, match results for basic block, match results for instructions) """ # all features found within this basic block, # includes features found within instructions. @@ -68,11 +77,11 @@ def find_basic_block_capabilities( insn_matches: MatchResults = collections.defaultdict(list) for insn in extractor.get_instructions(f, bb): - ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) - for feature, vas in ifeatures.items(): + instruction_capabilities = find_instruction_capabilities(ruleset, extractor, f, bb, insn) + for feature, vas in instruction_capabilities.features.items(): features[feature].update(vas) - for rule_name, res in imatches.items(): + for rule_name, res in instruction_capabilities.matches.items(): insn_matches[rule_name].extend(res) for feature, va in itertools.chain( @@ -88,16 +97,20 @@ def find_basic_block_capabilities( for va, _ in res: capa.engine.index_rule_matches(features, rule, [va]) - return features, matches, insn_matches + return BasicBlockCapabilities(features, matches, insn_matches) + + +@dataclass +class CodeCapabilities: + function_matches: MatchResults + basic_block_matches: MatchResults + instruction_matches: MatchResults + feature_count: int -def find_code_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle -) -> tuple[MatchResults, MatchResults, MatchResults, int]: +def find_code_capabilities(ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle) -> CodeCapabilities: """ find matches for the given rules within the given function. - - returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) """ # all features found within this function, # includes features found within basic blocks (and instructions). @@ -112,26 +125,26 @@ def find_code_capabilities( insn_matches: MatchResults = collections.defaultdict(list) for bb in extractor.get_basic_blocks(fh): - features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) - for feature, vas in features.items(): + basic_block_capabilities = find_basic_block_capabilities(ruleset, extractor, fh, bb) + for feature, vas in basic_block_capabilities.features.items(): function_features[feature].update(vas) - for rule_name, res in bmatches.items(): + for rule_name, res in basic_block_capabilities.basic_block_matches.items(): bb_matches[rule_name].extend(res) - for rule_name, res in imatches.items(): + for rule_name, res in basic_block_capabilities.instruction_matches.items(): insn_matches[rule_name].extend(res) for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): function_features[feature].add(va) _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) - return function_matches, bb_matches, insn_matches, len(function_features) + return CodeCapabilities(function_matches, bb_matches, insn_matches, len(function_features)) def find_static_capabilities( ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None -) -> tuple[MatchResults, Any]: +) -> Capabilities: all_function_matches: MatchResults = collections.defaultdict(list) all_bb_matches: MatchResults = collections.defaultdict(list) all_insn_matches: MatchResults = collections.defaultdict(list) @@ -165,30 +178,36 @@ def find_static_capabilities( pbar.advance(task) continue - function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f) + code_capabilities = find_code_capabilities(ruleset, extractor, f) feature_counts.functions += ( - rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), + rdoc.FunctionFeatureCount( + address=frz.Address.from_capa(f.address), count=code_capabilities.feature_count + ), ) t1 = time.time() match_count = 0 - for name, matches_ in itertools.chain(function_matches.items(), bb_matches.items(), insn_matches.items()): + for name, matches_ in itertools.chain( + code_capabilities.function_matches.items(), + code_capabilities.basic_block_matches.items(), + code_capabilities.instruction_matches.items(), + ): if not ruleset.rules[name].is_subscope_rule(): match_count += len(matches_) logger.debug( "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", f.address, - feature_count, + code_capabilities.feature_count, match_count, t1 - t0, ) - for rule_name, res in function_matches.items(): + for rule_name, res in code_capabilities.function_matches.items(): all_function_matches[rule_name].extend(res) - for rule_name, res in bb_matches.items(): + for rule_name, res in code_capabilities.basic_block_matches.items(): all_bb_matches[rule_name].extend(res) - for rule_name, res in insn_matches.items(): + for rule_name, res in code_capabilities.instruction_matches.items(): all_insn_matches[rule_name].extend(res) pbar.advance(task) @@ -203,8 +222,8 @@ def find_static_capabilities( rule = ruleset[rule_name] capa.engine.index_rule_matches(function_and_lower_features, rule, locations) - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) - feature_counts.file = feature_count + all_file_capabilities = find_file_capabilities(ruleset, extractor, function_and_lower_features) + feature_counts.file = all_file_capabilities.feature_count matches: MatchResults = dict( itertools.chain( @@ -214,13 +233,8 @@ def find_static_capabilities( all_insn_matches.items(), all_bb_matches.items(), all_function_matches.items(), - all_file_matches.items(), + all_file_capabilities.matches.items(), ) ) - meta = { - "feature_counts": feature_counts, - "library_functions": library_functions, - } - - return matches, meta + return Capabilities(matches, feature_counts, library_functions) diff --git a/capa/ghidra/capa_explorer.py b/capa/ghidra/capa_explorer.py index 0fe5243c8..289379f43 100644 --- a/capa/ghidra/capa_explorer.py +++ b/capa/ghidra/capa_explorer.py @@ -238,13 +238,13 @@ def get_capabilities(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True) - if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=False): popup("capa explorer encountered warnings during analysis. Please check the console output for more information.") # type: ignore [name-defined] # noqa: F821 logger.info("capa encountered warnings during analysis") - return capa.render.json.render(meta, rules, capabilities) + return capa.render.json.render(meta, rules, capabilities.matches) def get_locations(match_dict): diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index db43ecfac..a7b00a7fa 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -74,23 +74,23 @@ def run_headless(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor, False) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta.analysis.feature_counts = capabilities.feature_counts + meta.analysis.library_functions = capabilities.library_functions + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) - if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): + if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=True): logger.info("capa encountered warnings during analysis") if args.json: - print(capa.render.json.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.json.render(meta, rules, capabilities.matches)) # noqa: T201 elif args.vverbose: - print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.vverbose.render(meta, rules, capabilities.matches)) # noqa: T201 elif args.verbose: - print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.verbose.render(meta, rules, capabilities.matches)) # noqa: T201 else: - print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.default.render(meta, rules, capabilities.matches)) # noqa: T201 return 0 @@ -124,21 +124,21 @@ def run_ui(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor, True) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta.analysis.feature_counts = capabilities.feature_counts + meta.analysis.library_functions = capabilities.library_functions + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) - if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(rules, capabilities.matches, is_standalone=False): logger.info("capa encountered warnings during analysis") if verbose == "vverbose": - print(capa.render.vverbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.vverbose.render(meta, rules, capabilities.matches)) # noqa: T201 elif verbose == "verbose": - print(capa.render.verbose.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.verbose.render(meta, rules, capabilities.matches)) # noqa: T201 else: - print(capa.render.default.render(meta, rules, capabilities)) # noqa: T201 + print(capa.render.default.render(meta, rules, capabilities.matches)) # noqa: T201 return 0 diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 54bd70409..1838a3e73 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -769,13 +769,15 @@ def slot_progress_feature_extraction(text): try: meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])]) - capabilities, counts = capa.capabilities.common.find_capabilities( + capabilities = capa.capabilities.common.find_capabilities( ruleset, self.feature_extractor, disable_progress=True ) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.loader.compute_layout(ruleset, self.feature_extractor, capabilities) + meta.analysis.feature_counts = capabilities.feature_counts + meta.analysis.library_functions = capabilities.library_functions + meta.analysis.layout = capa.loader.compute_layout( + ruleset, self.feature_extractor, capabilities.matches + ) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -811,7 +813,7 @@ def slot_progress_feature_extraction(text): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.capabilities.common.has_file_limitation(ruleset, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(ruleset, capabilities.matches, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.exception("Failed to check for file limitations (error: %s)", e) @@ -825,7 +827,7 @@ def slot_progress_feature_extraction(text): try: self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa( - meta, ruleset, capabilities + meta, ruleset, capabilities.matches ) except Exception as e: logger.exception("Failed to collect results (error: %s)", e) diff --git a/capa/loader.py b/capa/loader.py index 5bca6096a..661d4c019 100644 --- a/capa/loader.py +++ b/capa/loader.py @@ -52,6 +52,7 @@ FORMAT_BINEXPORT2, ) from capa.features.address import Address +from capa.capabilities.common import Capabilities from capa.features.extractors.base_extractor import ( SampleHashes, FeatureExtractor, @@ -443,7 +444,7 @@ def get_signatures(sigs_path: Path) -> list[Path]: return paths -def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): +def get_sample_analysis(format_, arch, os_, extractor, rules_path, feature_counts, library_functions): if isinstance(extractor, StaticFeatureExtractor): return rdoc.StaticAnalysis( format=format_, @@ -459,8 +460,8 @@ def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): # # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } ), - feature_counts=counts["feature_counts"], - library_functions=counts["library_functions"], + feature_counts=feature_counts, + library_functions=library_functions, ) elif isinstance(extractor, DynamicFeatureExtractor): return rdoc.DynamicAnalysis( @@ -472,7 +473,7 @@ def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): layout=rdoc.DynamicLayout( processes=(), ), - feature_counts=counts["feature_counts"], + feature_counts=feature_counts, ) else: raise ValueError("invalid extractor type") @@ -485,7 +486,7 @@ def collect_metadata( os_: str, rules_path: list[Path], extractor: FeatureExtractor, - counts: dict, + capabilities: Capabilities, ) -> rdoc.Metadata: # if it's a binary sample we hash it, if it's a report # we fetch the hashes from the report @@ -528,7 +529,8 @@ def collect_metadata( os_, extractor, rules, - counts, + capabilities.feature_counts, + capabilities.library_functions, ), ) diff --git a/capa/main.py b/capa/main.py index 2e3a5900c..ce8ed8ddf 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,7 +17,7 @@ import textwrap import contextlib from types import TracebackType -from typing import Any, Optional, TypedDict +from typing import Optional, TypedDict from pathlib import Path import colorama @@ -42,7 +42,6 @@ import capa.render.result_document as rdoc import capa.features.extractors.common from capa.rules import RuleSet -from capa.engine import MatchResults from capa.loader import ( BACKEND_IDA, BACKEND_VIV, @@ -95,7 +94,7 @@ FORMAT_BINJA_DB, FORMAT_BINEXPORT2, ) -from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities +from capa.capabilities.common import Capabilities, find_capabilities, has_file_limitation, find_file_capabilities from capa.features.extractors.base_extractor import ( ProcessFilter, FunctionFilter, @@ -758,7 +757,7 @@ def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[F continue try: - pure_file_capabilities, _ = find_file_capabilities(rules, file_extractor, {}) + pure_file_capabilities = find_file_capabilities(rules, file_extractor, {}) except PEFormatError as e: logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e)) raise ShouldExitError(E_CORRUPT_FILE) from e @@ -768,7 +767,7 @@ def find_file_limitations_from_cli(args, rules: RuleSet, file_extractors: list[F # file limitations that rely on non-file scope won't be detected here. # nor on FunctionName features, because pefile doesn't support this. - found_file_limitation = has_file_limitation(rules, pure_file_capabilities) + found_file_limitation = has_file_limitation(rules, pure_file_capabilities.matches) if found_file_limitation: # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. @@ -968,8 +967,7 @@ def main(argv: Optional[list[str]] = None): return e.status_code meta: rdoc.Metadata - capabilities: MatchResults - counts: dict[str, Any] + capabilities: Capabilities if input_format == FORMAT_RESULT: # result document directly parses into meta, capabilities @@ -991,10 +989,12 @@ def main(argv: Optional[list[str]] = None): except ShouldExitError as e: return e.status_code - capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) + capabilities = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata( + argv, args.input_file, input_format, os_, args.rules, extractor, capabilities + ) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) if isinstance(extractor, StaticFeatureExtractor) and found_file_limitation: # bail if capa's static feature extractor encountered file limitation e.g. a packed binary @@ -1003,13 +1003,13 @@ def main(argv: Optional[list[str]] = None): return E_FILE_LIMITATION if args.json: - print(capa.render.json.render(meta, rules, capabilities)) + print(capa.render.json.render(meta, rules, capabilities.matches)) elif args.vverbose: - print(capa.render.vverbose.render(meta, rules, capabilities)) + print(capa.render.vverbose.render(meta, rules, capabilities.matches)) elif args.verbose: - print(capa.render.verbose.render(meta, rules, capabilities)) + print(capa.render.verbose.render(meta, rules, capabilities.matches)) else: - print(capa.render.default.render(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities.matches)) colorama.deinit() logger.debug("done.") @@ -1045,16 +1045,16 @@ def ida_main(): meta = capa.ida.helpers.collect_metadata([rules_path]) - capabilities, counts = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) + capabilities = find_capabilities(rules, capa.features.extractors.ida.extractor.IdaFeatureExtractor()) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] + meta.analysis.feature_counts = capabilities.feature_counts + meta.analysis.library_functions = capabilities.library_functions - if has_file_limitation(rules, capabilities, is_standalone=False): + if has_file_limitation(rules, capabilities.matches, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered warnings during analysis") colorama.init(strip=True) - print(capa.render.default.render(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities.matches)) def ghidra_main(): @@ -1079,19 +1079,19 @@ def ghidra_main(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) - capabilities, counts = find_capabilities( + capabilities = find_capabilities( rules, capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor(), not capa.ghidra.helpers.is_running_headless(), ) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] + meta.analysis.feature_counts = capabilities.feature_counts + meta.analysis.library_functions = capabilities.library_functions - if has_file_limitation(rules, capabilities, is_standalone=False): + if has_file_limitation(rules, capabilities.matches, is_standalone=False): logger.info("capa encountered warnings during analysis") - print(capa.render.default.render(meta, rules, capabilities)) + print(capa.render.default.render(meta, rules, capabilities.matches)) if __name__ == "__main__": diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 8aece5c9c..d12345a5d 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -8,7 +8,7 @@ import datetime import collections from enum import Enum -from typing import Union, Literal, Optional, TypeAlias +from typing import TYPE_CHECKING, Union, Literal, Optional, TypeAlias from pathlib import Path from pydantic import Field, BaseModel, ConfigDict @@ -23,6 +23,9 @@ from capa.engine import MatchResults from capa.helpers import assert_never, load_json_from_path +if TYPE_CHECKING: + from capa.capabilities.common import Capabilities + class FrozenModel(BaseModel): model_config = ConfigDict(frozen=True, extra="forbid") @@ -647,8 +650,8 @@ def from_capa(cls, meta: Metadata, rules: RuleSet, capabilities: MatchResults) - return ResultDocument(meta=meta, rules=rule_matches) - def to_capa(self) -> tuple[Metadata, dict]: - capabilities: dict[str, list[tuple[capa.features.address.Address, capa.features.common.Result]]] = ( + def to_capa(self) -> tuple[Metadata, "Capabilities"]: + matches: dict[str, list[tuple[capa.features.address.Address, capa.features.common.Result]]] = ( collections.defaultdict(list) ) @@ -661,7 +664,14 @@ def to_capa(self) -> tuple[Metadata, dict]: for addr, match in rule_match.matches: result: capa.engine.Result = match.to_capa(rules_by_name) - capabilities[rule_name].append((addr.to_capa(), result)) + matches[rule_name].append((addr.to_capa(), result)) + + if isinstance(self.meta.analysis, StaticAnalysis): + capabilities = Capabilities( + matches, self.meta.analysis.feature_counts, self.meta.analysis.library_functions + ) + elif isinstance(self.meta.analysis, DynamicAnalysis): + capabilities = Capabilities(matches, self.meta.analysis.feature_counts) return self.meta, capabilities diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index ad977a76b..d9b90e6cd 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -147,12 +147,12 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) - meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, counts) - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, format_, os_, [], extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches) return {"path": input_file, "status": "ok", "ok": doc.model_dump()} diff --git a/scripts/capa-as-library.py b/scripts/capa-as-library.py index 0555a0263..1ead36623 100644 --- a/scripts/capa-as-library.py +++ b/scripts/capa-as-library.py @@ -177,25 +177,25 @@ def capa_details(rules_path: Path, input_file: Path, output_format="dictionary") extractor = capa.loader.get_extractor( input_file, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], should_save_workspace=False, disable_progress=True ) - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata([], input_file, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) capa_output: Any = False if output_format == "dictionary": # ...as python dictionary, simplified as textable but in dictionary - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches) capa_output = render_dictionary(doc) elif output_format == "json": # render results # ...as json - capa_output = json.loads(capa.render.json.render(meta, rules, capabilities)) + capa_output = json.loads(capa.render.json.render(meta, rules, capabilities.matches)) elif output_format == "texttable": # ...as human readable text table - capa_output = capa.render.default.render(meta, rules, capabilities) + capa_output = capa.render.default.render(meta, rules, capabilities.matches) return capa_output diff --git a/scripts/detect-binexport2-capabilities.py b/scripts/detect-binexport2-capabilities.py index 3c914de2c..5a7897446 100644 --- a/scripts/detect-binexport2-capabilities.py +++ b/scripts/detect-binexport2-capabilities.py @@ -94,12 +94,12 @@ def main(argv=None): except capa.main.ShouldExitError as e: return e.status_code - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor) - meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches) pb = capa.render.proto.doc_to_pb2(doc) sys.stdout.buffer.write(pb.SerializeToString(deterministic=True)) diff --git a/scripts/import-to-ida.py b/scripts/import-to-ida.py index 6dd1fb63b..27708468a 100644 --- a/scripts/import-to-ida.py +++ b/scripts/import-to-ida.py @@ -85,7 +85,7 @@ def main(): return -2 rows = [] - for name in capabilities.keys(): + for name in capabilities.matches.keys(): rule = result_doc.rules[name] if rule.meta.lib: continue diff --git a/scripts/lint.py b/scripts/lint.py index 6acf0aa50..0bc3bdb90 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -352,10 +352,10 @@ def get_sample_capabilities(ctx: Context, path: Path) -> set[str]: disable_progress=True, ) - capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) + capabilities = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) # mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())? # so we ignore a few types here. - capabilities = set(capabilities.keys()) # type: ignore + capabilities = set(capabilities.matches.keys()) # type: ignore assert isinstance(capabilities, set) logger.debug("computed results: %s: %d capabilities", nice_path, len(capabilities)) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index e0e8fabc3..353e05587 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -156,18 +156,18 @@ def main(argv=None): except capa.main.ShouldExitError as e: return e.status_code - capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) + capabilities = capa.capabilities.common.find_capabilities(rules, extractor) - meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) - meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities.matches) - if capa.capabilities.common.has_file_limitation(rules, capabilities): + if capa.capabilities.common.has_file_limitation(rules, capabilities.matches): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): return capa.main.E_FILE_LIMITATION - doc = rd.ResultDocument.from_capa(meta, rules, capabilities) + doc = rd.ResultDocument.from_capa(meta, rules, capabilities.matches) print(render_matches_by_function(doc)) colorama.deinit() diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index 5c6de51b4..fb5d8e9ec 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -75,10 +75,10 @@ def test_match_across_scopes_file_function(z9324d_extractor): ), ] ) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "install service" in capabilities - assert ".text section" in capabilities - assert ".text section and install service" in capabilities + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "install service" in capabilities.matches + assert ".text section" in capabilities.matches + assert ".text section and install service" in capabilities.matches def test_match_across_scopes(z9324d_extractor): @@ -143,10 +143,10 @@ def test_match_across_scopes(z9324d_extractor): ), ] ) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "tight loop" in capabilities - assert "kill thread loop" in capabilities - assert "kill thread program" in capabilities + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "tight loop" in capabilities.matches + assert "kill thread loop" in capabilities.matches + assert "kill thread program" in capabilities.matches def test_subscope_bb_rules(z9324d_extractor): @@ -171,8 +171,8 @@ def test_subscope_bb_rules(z9324d_extractor): ] ) # tight loop at 0x403685 - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "test rule" in capabilities + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "test rule" in capabilities.matches def test_match_specific_functions(z9324d_extractor): @@ -198,8 +198,8 @@ def test_match_specific_functions(z9324d_extractor): ] ) extractor = FunctionFilter(z9324d_extractor, {0x4019C0}) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, extractor) - matches = capabilities["receive data"] + capabilities = capa.capabilities.common.find_capabilities(rules, extractor) + matches = capabilities.matches["receive data"] # test that we received only one match assert len(matches) == 1 # and that this match is from the specified function @@ -226,8 +226,8 @@ def test_byte_matching(z9324d_extractor): ) ] ) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "byte match test" in capabilities + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "byte match test" in capabilities.matches def test_com_feature_matching(z395eb_extractor): @@ -252,8 +252,8 @@ def test_com_feature_matching(z395eb_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) - assert "initialize IWebBrowser2" in capabilities + capabilities = capa.main.find_capabilities(rules, z395eb_extractor) + assert "initialize IWebBrowser2" in capabilities.matches def test_count_bb(z9324d_extractor): @@ -277,8 +277,8 @@ def test_count_bb(z9324d_extractor): ) ] ) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "count bb" in capabilities + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "count bb" in capabilities.matches def test_instruction_scope(z9324d_extractor): @@ -304,9 +304,9 @@ def test_instruction_scope(z9324d_extractor): ) ] ) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "push 1000" in capabilities - assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000" in capabilities.matches + assert 0x4071A4 in {result[0] for result in capabilities.matches["push 1000"]} def test_instruction_subscope(z9324d_extractor): @@ -336,6 +336,6 @@ def test_instruction_subscope(z9324d_extractor): ) ] ) - capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "push 1000 on i386" in capabilities - assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} + capabilities = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000 on i386" in capabilities.matches + assert 0x406F60 in {result[0] for result in capabilities.matches["push 1000 on i386"]} diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index 810dc5b34..d1f42f338 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -91,9 +91,9 @@ def test_dynamic_call_scope(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name in matches - assert 8 in get_call_ids(matches[r.name]) + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 8 in get_call_ids(capabilities.matches[r.name]) # match the first 5-tuple sequence. @@ -129,9 +129,9 @@ def test_dynamic_sequence_scope(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name in matches - assert 12 in get_call_ids(matches[r.name]) + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 12 in get_call_ids(capabilities.matches[r.name]) # show the sequence is only 5 calls long, and doesn't match beyond that 5-tuple. @@ -168,8 +168,8 @@ def test_dynamic_sequence_scope2(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name not in matches + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name not in capabilities.matches # show how you might use a sequence rule: to match a small window for a collection of features. @@ -209,9 +209,9 @@ def test_dynamic_sequence_example(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name in matches - assert 14 in get_call_ids(matches[r.name]) + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert 14 in get_call_ids(capabilities.matches[r.name]) # show how sequences that overlap a single event are handled. @@ -250,7 +250,6 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event(): r = capa.rules.Rule.from_yaml(rule) ruleset = capa.rules.RuleSet([r]) - matches, features = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) - assert r.name in matches - assert [11, 12, 13, 14, 15] == list(get_call_ids(matches[r.name])) - + capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True) + assert r.name in capabilities.matches + assert [11, 12, 13, 14, 15] == list(get_call_ids(capabilities.matches[r.name])) diff --git a/tests/test_freeze_dynamic.py b/tests/test_freeze_dynamic.py index ead4d50c2..be91e4ea0 100644 --- a/tests/test_freeze_dynamic.py +++ b/tests/test_freeze_dynamic.py @@ -118,8 +118,8 @@ def test_null_feature_extractor(): ), ] ) - capabilities, _ = capa.main.find_capabilities(rules, EXTRACTOR) - assert "create file" in capabilities + capabilities = capa.main.find_capabilities(rules, EXTRACTOR) + assert "create file" in capabilities.matches def compare_extractors(a: DynamicFeatureExtractor, b: DynamicFeatureExtractor): diff --git a/tests/test_freeze_static.py b/tests/test_freeze_static.py index bd0c90b5d..743a44515 100644 --- a/tests/test_freeze_static.py +++ b/tests/test_freeze_static.py @@ -100,8 +100,8 @@ def test_null_feature_extractor(): ), ] ) - capabilities, meta = capa.main.find_capabilities(rules, EXTRACTOR) - assert "xor loop" in capabilities + capabilities = capa.main.find_capabilities(rules, EXTRACTOR) + assert "xor loop" in capabilities.matches def compare_extractors(a, b): diff --git a/tests/test_result_document.py b/tests/test_result_document.py index 769709fa4..9ab3f2573 100644 --- a/tests/test_result_document.py +++ b/tests/test_result_document.py @@ -14,6 +14,7 @@ import capa.engine as ceng import capa.render.result_document as rdoc import capa.features.freeze.features as frzf +from capa.capabilities.common import Capabilities def test_optional_node_from_capa(): @@ -282,4 +283,4 @@ def test_rdoc_to_capa(): meta, capabilites = rd.to_capa() assert isinstance(meta, rdoc.Metadata) - assert isinstance(capabilites, dict) + assert isinstance(capabilites, Capabilities)