Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: add entity registration and mtz generation #36

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 106 additions & 20 deletions maltego_trx/decorator_registry.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import os
import sys
import typing
import zipfile
from dataclasses import dataclass, field
from itertools import chain
from typing import List, Optional, Dict, Iterable
from typing import List, Optional, Dict, Iterable, Type, Set
from xml.etree import ElementTree
from xml.etree.ElementTree import Element

from maltego_trx.protocol.entities import MaltegoEntityType, MaltegoEntityTypeMeta, generate_meta, EntityFieldType, \
generate_field
from maltego_trx.protocol.mtz import create_entity_type_xml
from maltego_trx.utils import filter_unique, pascal_case_to_title, escape_csv_fields, export_as_csv, serialize_bool, \
name_to_path

Expand Down Expand Up @@ -55,9 +63,14 @@ class TransformRegistry:
global_settings: List[TransformSetting] = field(default_factory=list)
oauth_settings_id: Optional[str] = ""

namespace: str = None

transform_metas: Dict[str, TransformMeta] = field(init=False, default_factory=dict)
transform_settings: Dict[str, List[TransformSetting]] = field(init=False, default_factory=dict)

entity_metas: Dict[str, Type[MaltegoEntityType]] = field(init=False, default_factory=dict)
entity_categories: Set[str] = field(init=False, default_factory=set)

def register_transform(self, display_name: str, input_entity: str, description: str,
settings: List[TransformSetting] = None, output_entities: List[str] = None,
disclaimer: str = ""):
Expand All @@ -83,6 +96,64 @@ def decorated(transform_callable: object):

return decorated

def register_entity(self,
namespace: str = None,
name: str = None,
display_name: str = None,
display_name_plural: str = None,
description: str = None,
category: str = None,
small_icon_resource: str = None,
large_icon_resource: str = None,
allowed_root: bool = True,
conversion_order: int = 2147483647,
visible: bool = True,
base_entities: list[str] = None,
value_field: str = None,
display_value_field: str = None):
# get all arguments
kwargs = locals()
kwargs.pop("self")
kwargs['namespace'] = kwargs['namespace'] or self.namespace

def decorated(cls: Type[MaltegoEntityType]) -> MaltegoEntityType:

default = MaltegoEntityTypeMeta(**kwargs)
meta = getattr(cls, '__meta__', default)
meta = generate_meta(cls.__name__, meta)
cls.__meta__ = meta

if meta.category:
self.entity_categories.add(meta.category)

field_metas = {}
for field_name, field_type in typing.get_type_hints(cls).items():
if field_name.startswith("__"):
continue

sample = None
field_meta = None
if value := getattr(cls, field_name, None):
if isinstance(value, EntityFieldType):
field_meta = value
else:
sample = value

field_meta = generate_field(field_name, sample, field_meta)
setattr(cls, field_name, field_meta)
field_metas[field_name] = field_meta

if not meta.value_field:
meta.value_field = list(field_metas.keys())[0]
if not meta.display_value_field:
meta.display_value_field = meta.value_field

self.entity_metas[meta.id] = cls
cls.__fields__ = field_metas
return cls

return decorated

def write_transforms_config(self, config_path: str = "./transforms.csv", csv_line_limit: int = 100):
"""Exports the collected transform meta data as a csv-file to config_path"""
global_settings_full_names = [gs.id for gs in self.global_settings]
Expand All @@ -93,19 +164,19 @@ def write_transforms_config(self, config_path: str = "./transforms.csv", csv_lin
self.transform_settings.get(transform_name, [])]

transform_row = [
self.owner,
self.author,
transform_meta.disclaimer,
transform_meta.description,
self.version,
transform_name,
transform_meta.display_name + self.display_name_suffix,
os.path.join(self.host_url, "run", transform_name),
transform_meta.input_entity,
";".join(self.oauth_settings_id),
# combine global and transform scoped settings
";".join(chain(meta_settings, global_settings_full_names)),
";".join(self.seed_ids)
self.owner,
self.author,
transform_meta.disclaimer,
transform_meta.description,
self.version,
transform_name,
transform_meta.display_name + self.display_name_suffix,
os.path.join(self.host_url, "run", transform_name),
transform_meta.input_entity,
";".join(self.oauth_settings_id),
# combine global and transform scoped settings
";".join(chain(meta_settings, global_settings_full_names)),
";".join(self.seed_ids)
]

escaped_fields = escape_csv_fields(*transform_row)
Expand All @@ -121,15 +192,30 @@ def write_settings_config(self, config_path: str = "./settings.csv", csv_line_li
csv_lines = []
for setting in unique_settings:
setting_row = [
setting.id,
setting.setting_type,
setting.display_name,
setting.default_value or "",
serialize_bool(setting.optional, 'True', 'False'),
serialize_bool(setting.popup, 'Yes', 'No')
setting.id,
setting.setting_type,
setting.display_name,
setting.default_value or "",
serialize_bool(setting.optional, 'True', 'False'),
serialize_bool(setting.popup, 'Yes', 'No')
]

escaped_fields = escape_csv_fields(*setting_row)
csv_lines.append(",".join(escaped_fields))

export_as_csv(SETTINGS_CSV_HEADER, csv_lines, config_path, csv_line_limit)

def write_mtz(self, mtz_path: str):
with zipfile.ZipFile(mtz_path, "w") as mtz:
for name, entity_meta in self.entity_metas.items():
entity_xml = create_entity_type_xml(entity_meta)
if sys.version_info.minor >= 9:
ElementTree.indent(entity_xml)

entity_xml_str = ElementTree.tostring(entity_xml)
mtz.writestr(f"Entities/{name}.entity", entity_xml_str)

for category in self.entity_categories:
category_xml = Element("EntityCategory", attrib={"name": category})
category_xml_str = ElementTree.tostring(category_xml)
mtz.writestr(f"EntityCategories/{category.lower()}.category", category_xml_str)
85 changes: 85 additions & 0 deletions maltego_trx/protocol/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from dataclasses import dataclass, field
from typing import Any

from maltego_trx.utils import pascal_case_to_title


@dataclass
class MaltegoEntityTypeMeta:
namespace: str = None
name: str = None

display_name: str = None
display_name_plural: str = None

description: str = None
category: str = None

small_icon_resource: str = None
large_icon_resource: str = None

allowed_root: bool = True
conversion_order: int = 2147483647
visible: bool = True

base_entities: list[str] = field(default_factory=list)

value_field: str = None
display_value_field: str = None

@property
def id(self):
if self.namespace:
return f"{self.namespace}.{self.name}"

return self.name


@dataclass
class EntityFieldType:
name: str = None
type: str = 'string'
nullable: bool = True
hidden: bool = False
readonly: bool = False
description: str = None
display_name: str = None
sample_value: str = None
evaluator: str = None


class MaltegoEntityType:
__meta__: MaltegoEntityTypeMeta
__fields__: dict[str, EntityFieldType]


def generate_meta(class_name: str, base: MaltegoEntityTypeMeta = None) -> MaltegoEntityTypeMeta:
meta = base or MaltegoEntityTypeMeta()

meta.name = meta.name or class_name

meta.display_name = meta.display_name or pascal_case_to_title(meta.name)
meta.display_name_plural = meta.display_name_plural or meta.display_name + "s"

meta.description = meta.description or f"A {meta.display_name} entity"

return meta


def generate_field(field_name: str, sample: Any = None, base: EntityFieldType = None) -> EntityFieldType:
base = base or EntityFieldType()
field_meta = EntityFieldType()

field_meta.name = base.name or field_name
field_meta.type = base.type or 'string'

field_meta.nullable = base.nullable
field_meta.hidden = base.hidden

field_meta.readonly = base.readonly
field_meta.description = base.description

field_meta.display_name = base.display_name or field_name.replace("_", " ").title()
field_meta.sample_value = base.sample_value or sample

return field_meta
54 changes: 54 additions & 0 deletions maltego_trx/protocol/mtz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Type
from xml.etree.ElementTree import Element
from xml.etree.ElementTree import SubElement

from maltego_trx.protocol.entities import MaltegoEntityType, EntityFieldType
from maltego_trx.utils import serialize_bool


def create_entity_type_xml(entity: Type[MaltegoEntityType]) -> Element:
meta = entity.__meta__
entity_xml = Element("MaltegoEntity", attrib={
"id": meta.id,
"displayName": meta.display_name,
"displayNamePlural": meta.display_name_plural,
"description": meta.description,
"category": meta.category or "",
"smallIconResource": meta.small_icon_resource or "",
"largeIconResource": meta.large_icon_resource or "",
"allowedRoot": serialize_bool(meta.allowed_root),
"conversionOrder": str(meta.conversion_order),
"visible": serialize_bool(meta.visible)
})

if meta.base_entities:
base_entities_xml = SubElement(entity_xml, "BaseEntities")
for base_entity in meta.base_entities:
base_entity_xml = SubElement(base_entities_xml, "BaseEntity")
base_entity_xml.text = base_entity

properties_xml = SubElement(entity_xml, "Properties",
attrib={"value": meta.value_field,
"displayValue": meta.display_value_field})

SubElement(properties_xml, "Groups")
fields_xml = SubElement(properties_xml, "Fields")

field_meta: EntityFieldType
for name, field_meta in entity.__fields__.items():
field_xml = SubElement(fields_xml, "Field", attrib={
"name": field_meta.name,
"type": field_meta.type,
"nullable": serialize_bool(field_meta.nullable),
"hidden": serialize_bool(field_meta.hidden),
"readonly": serialize_bool(field_meta.readonly),
"description": field_meta.description or "",
"evaluator": field_meta.evaluator or "",
"displayName": field_meta.display_name,
})

sample = SubElement(field_xml, "SampleValue")
sample.text = field_meta.sample_value or ""

return entity_xml

6 changes: 3 additions & 3 deletions maltego_trx/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import math
import re
from typing import TypeVar, Callable, Hashable, Iterable, Generator, List, Sequence
from typing import TypeVar, Callable, Hashable, Iterable, Generator, Sequence

import math
from six import text_type, binary_type


Expand Down Expand Up @@ -111,5 +111,5 @@ def export_as_csv(header: str, lines: Sequence[str], export_file_path: str, csv_
csv_file.writelines(map(lambda x: x + "\n", chunk))


def serialize_bool(boolean: bool, serialized_true: str, serialized_false: str) -> str:
def serialize_bool(boolean: bool, serialized_true: str = "true", serialized_false: str = "false") -> str:
return serialized_true if boolean else serialized_false