Skip to content

Commit

Permalink
sequence: only match first overlapping sequence
Browse files Browse the repository at this point in the history
also, for repeating behavior, match only the first instance.
  • Loading branch information
williballenthin committed Dec 12, 2024
1 parent 37f6ccb commit b10d591
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
37 changes: 35 additions & 2 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@


# The number of calls that make up a sequence.
SEQUENCE_SIZE = 5
#
# The larger this is, the more calls are grouped together to match rule logic.
# This means a longer chain can be recognized; however, its a bit more expensive.
SEQUENCE_SIZE = 20


@dataclass
Expand Down Expand Up @@ -69,7 +72,8 @@ def find_thread_capabilities(
ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle
) -> ThreadCapabilities:
"""
find matches for the given rules within the given thread.
find matches for the given rules within the given thread,
which includes matches for all the sequences and calls within it.
"""
# all features found within this thread,
# includes features found within calls.
Expand All @@ -82,8 +86,18 @@ def find_thread_capabilities(
# matches found at the sequence scope.
sequence_matches: MatchResults = collections.defaultdict(list)

# We matches sequences as the sliding window of calls with size SEQUENCE_SIZE.
#
# For each call, we consider the window of SEQUENCE_SIZE calls leading up to it,
# merging all their features and doing a match.
# Here's the primary data structure: a deque of those features found in the prior calls.
# We'll append to it, and as it grows larger than SEQUENCE_SIZE, the oldest items are removed.
sequence: collections.deque[FeatureSet] = collections.deque(maxlen=SEQUENCE_SIZE)

# the names of rules matched at the last sequence,
# so that we can deduplicate long strings of the same matche.
last_sequence_matches: set[str] = set()

for ch in extractor.get_calls(ph, th):
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
Expand All @@ -92,16 +106,35 @@ def find_thread_capabilities(
for rule_name, res in call_capabilities.matches.items():
call_matches[rule_name].extend(res)

#
# sequence scope matching
#
# as we add items to the end of the deque, the oldest items will overflow and get dropped.
sequence.append(call_capabilities.features)
# collect all the features seen across the last SEQUENCE_SIZE calls,
# and match against them.
sequence_features: FeatureSet = collections.defaultdict(set)
for call in sequence:
for feature, vas in call.items():
sequence_features[feature].update(vas)

_, smatches = ruleset.match(Scope.SEQUENCE, sequence_features, ch.address)
for rule_name, res in smatches.items():
if rule_name in last_sequence_matches:
# don't emit match results for rules seen during the immediately preceeding sequence.
#
# This means that we won't emit duplicate matches when there are multiple sequences
# that overlap a single matching event.
# It also handles the case of a tight loop containing matched logic;
# only the first match will be recorded.
#
# In theory, this means the result document doesn't have *every* possible match location,
# but in practice, humans will only be interested in the first handful anyways.
continue
sequence_matches[rule_name].extend(res)

last_sequence_matches = set(smatches.keys())

for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()):
features[feature].add(va)

Expand Down
14 changes: 10 additions & 4 deletions tests/test_dynamic_sequence_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_dynamic_sequence_scope():
assert 12 in get_call_ids(capabilities.matches[r.name])


# show the sequence is only 5 calls long, and doesn't match beyond that 5-tuple.
# show that when the sequence is only 5 calls long (for example), it doesn't match beyond that 5-tuple.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
Expand Down Expand Up @@ -168,7 +168,13 @@ def test_dynamic_sequence_scope2():
r = capa.rules.Rule.from_yaml(rule)
ruleset = capa.rules.RuleSet([r])

capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
# patch SEQUENCE_SIZE since we may use a much larger value in the real world.
from pytest import MonkeyPatch

with MonkeyPatch.context() as m:
m.setattr(capa.capabilities.dynamic, "SEQUENCE_SIZE", 5)
capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)

assert r.name not in capabilities.matches


Expand Down Expand Up @@ -215,7 +221,6 @@ def test_dynamic_sequence_example():


# show how sequences that overlap a single event are handled.
# TODO(williballenthin): but I think we really just want one match for this, not copies of the same thing.
#
# proc: 0000A65749F5902C4D82.exe (ppid=2456, pid=3052)
# thread: 3064
Expand Down Expand Up @@ -252,4 +257,5 @@ def test_dynamic_sequence_multiple_sequences_overlapping_single_event():

capabilities = capa.capabilities.dynamic.find_dynamic_capabilities(ruleset, extractor, disable_progress=True)
assert r.name in capabilities.matches
assert [11, 12, 13, 14, 15] == list(get_call_ids(capabilities.matches[r.name]))
# we only match the first overlapping sequence
assert [11] == list(get_call_ids(capabilities.matches[r.name]))

0 comments on commit b10d591

Please sign in to comment.