Skip to content

Commit

Permalink
Merge pull request #1822 from mandiant/fix/dynamic-freeze
Browse files Browse the repository at this point in the history
update freeze for dynamic
  • Loading branch information
williballenthin authored Oct 20, 2023
2 parents b8b55f4 + fc4618e commit 62d4b00
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
- remove the `SCOPE_*` constants in favor of the `Scope` enum #1764 @williballenthin
- protobuf: deprecate `RuleMetadata.scope` in favor of `RuleMetadata.scopes` @williballenthin
- protobuf: deprecate `Metadata.analysis` in favor of `Metadata.analysis2` that is dynamic analysis aware @williballenthin
- update freeze format to v3, adding support for dynamic analysis @williballenthin

### New Rules (19)

Expand Down
20 changes: 10 additions & 10 deletions capa/features/extractors/null.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,26 +136,26 @@ def get_processes(self):
assert isinstance(address, ProcessAddress)
yield ProcessHandle(address=address, inner={})

def extract_process_features(self, p):
for addr, feature in self.processes[p.address].features:
def extract_process_features(self, ph):
for addr, feature in self.processes[ph.address].features:
yield feature, addr

def get_threads(self, p):
for address in sorted(self.processes[p].threads.keys()):
def get_threads(self, ph):
for address in sorted(self.processes[ph.address].threads.keys()):
assert isinstance(address, ThreadAddress)
yield ThreadHandle(address=address, inner={})

def extract_thread_features(self, p, t):
for addr, feature in self.processes[p.address].threads[t.address].features:
def extract_thread_features(self, ph, th):
for addr, feature in self.processes[ph.address].threads[th.address].features:
yield feature, addr

def get_calls(self, p, t):
for address in sorted(self.processes[p.address].threads[t.address].calls.keys()):
def get_calls(self, ph, th):
for address in sorted(self.processes[ph.address].threads[th.address].calls.keys()):
assert isinstance(address, DynamicCallAddress)
yield CallHandle(address=address, inner={})

def extract_call_features(self, p, t, call):
for address, feature in self.processes[p.address].threads[t.address].calls[call.address].features:
def extract_call_features(self, ph, th, ch):
for address, feature in self.processes[ph.address].threads[th.address].calls[ch.address].features:
yield feature, address


Expand Down
67 changes: 43 additions & 24 deletions capa/features/freeze/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
"""
import json
import zlib
import logging
from enum import Enum
from typing import List, Tuple, Union
from typing import List, Tuple, Union, Literal

from pydantic import Field, BaseModel, ConfigDict

Expand All @@ -39,6 +40,8 @@

logger = logging.getLogger(__name__)

CURRENT_VERSION = 3


class HashableModel(BaseModel):
model_config = ConfigDict(frozen=True)
Expand Down Expand Up @@ -325,9 +328,10 @@ class Extractor(BaseModel):


class Freeze(BaseModel):
version: int = 2
version: int = CURRENT_VERSION
base_address: Address = Field(alias="base address")
sample_hashes: SampleHashes
flavor: Literal["static", "dynamic"]
extractor: Extractor
features: Features
model_config = ConfigDict(populate_by_name=True)
Expand Down Expand Up @@ -423,9 +427,10 @@ def dumps_static(extractor: StaticFeatureExtractor) -> str:
# Mypy is unable to recognise `global_` as a argument due to alias

freeze = Freeze(
version=3,
version=CURRENT_VERSION,
base_address=Address.from_capa(extractor.get_base_address()),
sample_hashes=extractor.get_sample_hashes(),
flavor="static",
extractor=Extractor(name=extractor.__class__.__name__),
features=features,
) # type: ignore
Expand Down Expand Up @@ -527,24 +532,27 @@ def dumps_dynamic(extractor: DynamicFeatureExtractor) -> str:
base_addr = get_base_addr() if get_base_addr else capa.features.address.NO_ADDRESS

freeze = Freeze(
version=3,
version=CURRENT_VERSION,
base_address=Address.from_capa(base_addr),
sample_hashes=extractor.get_sample_hashes(),
flavor="dynamic",
extractor=Extractor(name=extractor.__class__.__name__),
features=features,
) # type: ignore
# Mypy is unable to recognise `base_address` as a argument due to alias

return freeze.json()
return freeze.model_dump_json()


def loads_static(s: str) -> StaticFeatureExtractor:
"""deserialize a set of features (as a NullStaticFeatureExtractor) from a string."""
freeze = Freeze.model_validate_json(s)
if freeze.version != 3:
if freeze.version != CURRENT_VERSION:
raise ValueError(f"unsupported freeze format version: {freeze.version}")

assert freeze.flavor == "static"
assert isinstance(freeze.features, StaticFeatures)

return null.NullStaticFeatureExtractor(
base_address=freeze.base_address.to_capa(),
sample_hashes=freeze.sample_hashes,
Expand Down Expand Up @@ -573,11 +581,13 @@ def loads_static(s: str) -> StaticFeatureExtractor:

def loads_dynamic(s: str) -> DynamicFeatureExtractor:
"""deserialize a set of features (as a NullDynamicFeatureExtractor) from a string."""
freeze = Freeze.parse_raw(s)
if freeze.version != 3:
freeze = Freeze.model_validate_json(s)
if freeze.version != CURRENT_VERSION:
raise ValueError(f"unsupported freeze format version: {freeze.version}")

assert freeze.flavor == "dynamic"
assert isinstance(freeze.features, DynamicFeatures)

return null.NullDynamicFeatureExtractor(
base_address=freeze.base_address.to_capa(),
sample_hashes=freeze.sample_hashes,
Expand Down Expand Up @@ -605,42 +615,51 @@ def loads_dynamic(s: str) -> DynamicFeatureExtractor:


MAGIC = "capa0000".encode("ascii")
STATIC_MAGIC = MAGIC + "-static".encode("ascii")
DYNAMIC_MAGIC = MAGIC + "-dynamic".encode("ascii")


def dump(extractor: FeatureExtractor) -> bytes:
"""serialize the given extractor to a byte array."""
def dumps(extractor: FeatureExtractor) -> str:
"""serialize the given extractor to a string."""
if isinstance(extractor, StaticFeatureExtractor):
return STATIC_MAGIC + zlib.compress(dumps_static(extractor).encode("utf-8"))
doc = dumps_static(extractor)
elif isinstance(extractor, DynamicFeatureExtractor):
return DYNAMIC_MAGIC + zlib.compress(dumps_dynamic(extractor).encode("utf-8"))
doc = dumps_dynamic(extractor)
else:
raise ValueError("Invalid feature extractor")

return doc


def dump(extractor: FeatureExtractor) -> bytes:
"""serialize the given extractor to a byte array."""
return MAGIC + zlib.compress(dumps(extractor).encode("utf-8"))


def is_freeze(buf: bytes) -> bool:
return buf[: len(MAGIC)] == MAGIC


def is_static_freeze(buf: bytes) -> bool:
return buf[: len(STATIC_MAGIC)] == STATIC_MAGIC
def loads(s: str):
doc = json.loads(s)

if doc["version"] != CURRENT_VERSION:
raise ValueError(f"unsupported freeze format version: {doc['version']}")

def is_dynamic_freeze(buf: bytes) -> bool:
return buf[: len(DYNAMIC_MAGIC)] == DYNAMIC_MAGIC
if doc["flavor"] == "static":
return loads_static(s)
elif doc["flavor"] == "dynamic":
return loads_dynamic(s)
else:
raise ValueError(f"unsupported freeze format flavor: {doc['flavor']}")


def load(buf: bytes):
"""deserialize a set of features (as a NullFeatureExtractor) from a byte array."""
if not is_freeze(buf):
raise ValueError("missing magic header")
if is_static_freeze(buf):
return loads_static(zlib.decompress(buf[len(STATIC_MAGIC) :]).decode("utf-8"))
elif is_dynamic_freeze(buf):
return loads_dynamic(zlib.decompress(buf[len(DYNAMIC_MAGIC) :]).decode("utf-8"))
else:
raise ValueError("invalid magic header")

s = zlib.decompress(buf[len(MAGIC) :]).decode("utf-8")

return loads(s)


def main(argv=None):
Expand Down
162 changes: 162 additions & 0 deletions tests/test_freeze_dynamic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import textwrap
from typing import List
from pathlib import Path

import fixtures

import capa.main
import capa.rules
import capa.helpers
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.features.freeze
import capa.features.basicblock
import capa.features.extractors.null
import capa.features.extractors.base_extractor
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import (
SampleHashes,
ThreadHandle,
ProcessHandle,
ThreadAddress,
ProcessAddress,
DynamicCallAddress,
DynamicFeatureExtractor,
)

EXTRACTOR = capa.features.extractors.null.NullDynamicFeatureExtractor(
base_address=AbsoluteVirtualAddress(0x401000),
sample_hashes=SampleHashes(
md5="6eb7ee7babf913d75df3f86c229df9e7",
sha1="2a082494519acd5130d5120fa48786df7275fdd7",
sha256="0c7d1a34eb9fd55bedbf37ba16e3d5dd8c1dd1d002479cc4af27ef0f82bb4792",
),
global_features=[],
file_features=[
(AbsoluteVirtualAddress(0x402345), capa.features.common.Characteristic("embedded pe")),
],
processes={
ProcessAddress(pid=1): capa.features.extractors.null.ProcessFeatures(
features=[],
threads={
ThreadAddress(ProcessAddress(pid=1), tid=1): capa.features.extractors.null.ThreadFeatures(
features=[],
calls={
DynamicCallAddress(
thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=1
): capa.features.extractors.null.CallFeatures(
features=[
(
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=1),
capa.features.insn.API("CreateFile"),
),
(
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=1),
capa.features.insn.Number(12),
),
],
),
DynamicCallAddress(
thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=2
): capa.features.extractors.null.CallFeatures(
features=[
(
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=2),
capa.features.insn.API("WriteFile"),
),
],
),
},
),
},
),
},
)


def addresses(s) -> List[Address]:
return sorted(i.address for i in s)


def test_null_feature_extractor():
ph = ProcessHandle(ProcessAddress(pid=1), None)
th = ThreadHandle(ThreadAddress(ProcessAddress(pid=1), tid=1), None)

assert addresses(EXTRACTOR.get_processes()) == [ProcessAddress(pid=1)]
assert addresses(EXTRACTOR.get_threads(ph)) == [ThreadAddress(ProcessAddress(pid=1), tid=1)]
assert addresses(EXTRACTOR.get_calls(ph, th)) == [
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=1),
DynamicCallAddress(thread=ThreadAddress(ProcessAddress(pid=1), tid=1), id=2),
]

rules = capa.rules.RuleSet(
[
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: create file
scopes:
static: basic block
dynamic: call
features:
- and:
- api: CreateFile
"""
)
),
]
)
capabilities, _ = capa.main.find_capabilities(rules, EXTRACTOR)
assert "create file" in capabilities


def compare_extractors(a: DynamicFeatureExtractor, b: DynamicFeatureExtractor):
assert list(a.extract_file_features()) == list(b.extract_file_features())

assert addresses(a.get_processes()) == addresses(b.get_processes())
for p in a.get_processes():
assert addresses(a.get_threads(p)) == addresses(b.get_threads(p))
assert sorted(set(a.extract_process_features(p))) == sorted(set(b.extract_process_features(p)))

for t in a.get_threads(p):
assert addresses(a.get_calls(p, t)) == addresses(b.get_calls(p, t))
assert sorted(set(a.extract_thread_features(p, t))) == sorted(set(b.extract_thread_features(p, t)))

for c in a.get_calls(p, t):
assert sorted(set(a.extract_call_features(p, t, c))) == sorted(set(b.extract_call_features(p, t, c)))


def test_freeze_str_roundtrip():
load = capa.features.freeze.loads
dump = capa.features.freeze.dumps
reanimated = load(dump(EXTRACTOR))
compare_extractors(EXTRACTOR, reanimated)


def test_freeze_bytes_roundtrip():
load = capa.features.freeze.load
dump = capa.features.freeze.dump
reanimated = load(dump(EXTRACTOR))
compare_extractors(EXTRACTOR, reanimated)


def test_freeze_load_sample(tmpdir):
o = tmpdir.mkdir("capa").join("test.frz")

extractor = fixtures.get_cape_extractor(fixtures.get_data_path_by_name("d46900"))

Path(o.strpath).write_bytes(capa.features.freeze.dump(extractor))

null_extractor = capa.features.freeze.load(Path(o.strpath).read_bytes())

compare_extractors(extractor, null_extractor)

0 comments on commit 62d4b00

Please sign in to comment.