diff --git a/maltego_trx/decorator_registry.py b/maltego_trx/decorator_registry.py index 31694d8..9b476da 100644 --- a/maltego_trx/decorator_registry.py +++ b/maltego_trx/decorator_registry.py @@ -1,8 +1,16 @@ import os +import sys +import typing +import zipfile from dataclasses import dataclass, field from itertools import chain -from typing import List, Optional, Dict, Iterable +from typing import List, Optional, Dict, Iterable, Type, Set +from xml.etree import ElementTree +from xml.etree.ElementTree import Element +from maltego_trx.protocol.entities import MaltegoEntityType, MaltegoEntityTypeMeta, generate_meta, EntityFieldType, \ + generate_field +from maltego_trx.protocol.mtz import create_entity_type_xml from maltego_trx.utils import filter_unique, pascal_case_to_title, escape_csv_fields, export_as_csv, serialize_bool, \ name_to_path @@ -55,9 +63,14 @@ class TransformRegistry: global_settings: List[TransformSetting] = field(default_factory=list) oauth_settings_id: Optional[str] = "" + namespace: str = None + transform_metas: Dict[str, TransformMeta] = field(init=False, default_factory=dict) transform_settings: Dict[str, List[TransformSetting]] = field(init=False, default_factory=dict) + entity_metas: Dict[str, Type[MaltegoEntityType]] = field(init=False, default_factory=dict) + entity_categories: Set[str] = field(init=False, default_factory=set) + def register_transform(self, display_name: str, input_entity: str, description: str, settings: List[TransformSetting] = None, output_entities: List[str] = None, disclaimer: str = ""): @@ -83,6 +96,64 @@ def decorated(transform_callable: object): return decorated + def register_entity(self, + namespace: str = None, + name: str = None, + display_name: str = None, + display_name_plural: str = None, + description: str = None, + category: str = None, + small_icon_resource: str = None, + large_icon_resource: str = None, + allowed_root: bool = True, + conversion_order: int = 2147483647, + visible: bool = True, + base_entities: list[str] = None, + value_field: str = None, + display_value_field: str = None): + # get all arguments + kwargs = locals() + kwargs.pop("self") + kwargs['namespace'] = kwargs['namespace'] or self.namespace + + def decorated(cls: Type[MaltegoEntityType]) -> MaltegoEntityType: + + default = MaltegoEntityTypeMeta(**kwargs) + meta = getattr(cls, '__meta__', default) + meta = generate_meta(cls.__name__, meta) + cls.__meta__ = meta + + if meta.category: + self.entity_categories.add(meta.category) + + field_metas = {} + for field_name, field_type in typing.get_type_hints(cls).items(): + if field_name.startswith("__"): + continue + + sample = None + field_meta = None + if value := getattr(cls, field_name, None): + if isinstance(value, EntityFieldType): + field_meta = value + else: + sample = value + + field_meta = generate_field(field_name, sample, field_meta) + setattr(cls, field_name, field_meta) + field_metas[field_name] = field_meta + + if not meta.value_field: + meta.value_field = list(field_metas.keys())[0] + if not meta.display_value_field: + meta.display_value_field = meta.value_field + + self.entity_metas[meta.id] = cls + cls.__fields__ = field_metas + return cls + + return decorated + def write_transforms_config(self, config_path: str = "./transforms.csv", csv_line_limit: int = 100): """Exports the collected transform meta data as a csv-file to config_path""" global_settings_full_names = [gs.id for gs in self.global_settings] @@ -93,19 +164,19 @@ def write_transforms_config(self, config_path: str = "./transforms.csv", csv_lin self.transform_settings.get(transform_name, [])] transform_row = [ - self.owner, - self.author, - transform_meta.disclaimer, - transform_meta.description, - self.version, - transform_name, - transform_meta.display_name + self.display_name_suffix, - os.path.join(self.host_url, "run", transform_name), - transform_meta.input_entity, - ";".join(self.oauth_settings_id), - # combine global and transform scoped settings - ";".join(chain(meta_settings, global_settings_full_names)), - ";".join(self.seed_ids) + self.owner, + self.author, + transform_meta.disclaimer, + transform_meta.description, + self.version, + transform_name, + transform_meta.display_name + self.display_name_suffix, + os.path.join(self.host_url, "run", transform_name), + transform_meta.input_entity, + ";".join(self.oauth_settings_id), + # combine global and transform scoped settings + ";".join(chain(meta_settings, global_settings_full_names)), + ";".join(self.seed_ids) ] escaped_fields = escape_csv_fields(*transform_row) @@ -121,15 +192,30 @@ def write_settings_config(self, config_path: str = "./settings.csv", csv_line_li csv_lines = [] for setting in unique_settings: setting_row = [ - setting.id, - setting.setting_type, - setting.display_name, - setting.default_value or "", - serialize_bool(setting.optional, 'True', 'False'), - serialize_bool(setting.popup, 'Yes', 'No') + setting.id, + setting.setting_type, + setting.display_name, + setting.default_value or "", + serialize_bool(setting.optional, 'True', 'False'), + serialize_bool(setting.popup, 'Yes', 'No') ] escaped_fields = escape_csv_fields(*setting_row) csv_lines.append(",".join(escaped_fields)) export_as_csv(SETTINGS_CSV_HEADER, csv_lines, config_path, csv_line_limit) + + def write_mtz(self, mtz_path: str): + with zipfile.ZipFile(mtz_path, "w") as mtz: + for name, entity_meta in self.entity_metas.items(): + entity_xml = create_entity_type_xml(entity_meta) + if sys.version_info.minor >= 9: + ElementTree.indent(entity_xml) + + entity_xml_str = ElementTree.tostring(entity_xml) + mtz.writestr(f"Entities/{name}.entity", entity_xml_str) + + for category in self.entity_categories: + category_xml = Element("EntityCategory", attrib={"name": category}) + category_xml_str = ElementTree.tostring(category_xml) + mtz.writestr(f"EntityCategories/{category.lower()}.category", category_xml_str) diff --git a/maltego_trx/protocol/entities.py b/maltego_trx/protocol/entities.py new file mode 100644 index 0000000..6902cba --- /dev/null +++ b/maltego_trx/protocol/entities.py @@ -0,0 +1,85 @@ +from dataclasses import dataclass, field +from typing import Any + +from maltego_trx.utils import pascal_case_to_title + + +@dataclass +class MaltegoEntityTypeMeta: + namespace: str = None + name: str = None + + display_name: str = None + display_name_plural: str = None + + description: str = None + category: str = None + + small_icon_resource: str = None + large_icon_resource: str = None + + allowed_root: bool = True + conversion_order: int = 2147483647 + visible: bool = True + + base_entities: list[str] = field(default_factory=list) + + value_field: str = None + display_value_field: str = None + + @property + def id(self): + if self.namespace: + return f"{self.namespace}.{self.name}" + + return self.name + + +@dataclass +class EntityFieldType: + name: str = None + type: str = 'string' + nullable: bool = True + hidden: bool = False + readonly: bool = False + description: str = None + display_name: str = None + sample_value: str = None + evaluator: str = None + + +class MaltegoEntityType: + __meta__: MaltegoEntityTypeMeta + __fields__: dict[str, EntityFieldType] + + +def generate_meta(class_name: str, base: MaltegoEntityTypeMeta = None) -> MaltegoEntityTypeMeta: + meta = base or MaltegoEntityTypeMeta() + + meta.name = meta.name or class_name + + meta.display_name = meta.display_name or pascal_case_to_title(meta.name) + meta.display_name_plural = meta.display_name_plural or meta.display_name + "s" + + meta.description = meta.description or f"A {meta.display_name} entity" + + return meta + + +def generate_field(field_name: str, sample: Any = None, base: EntityFieldType = None) -> EntityFieldType: + base = base or EntityFieldType() + field_meta = EntityFieldType() + + field_meta.name = base.name or field_name + field_meta.type = base.type or 'string' + + field_meta.nullable = base.nullable + field_meta.hidden = base.hidden + + field_meta.readonly = base.readonly + field_meta.description = base.description + + field_meta.display_name = base.display_name or field_name.replace("_", " ").title() + field_meta.sample_value = base.sample_value or sample + + return field_meta diff --git a/maltego_trx/protocol/mtz.py b/maltego_trx/protocol/mtz.py new file mode 100644 index 0000000..a59395d --- /dev/null +++ b/maltego_trx/protocol/mtz.py @@ -0,0 +1,54 @@ +from typing import Type +from xml.etree.ElementTree import Element +from xml.etree.ElementTree import SubElement + +from maltego_trx.protocol.entities import MaltegoEntityType, EntityFieldType +from maltego_trx.utils import serialize_bool + + +def create_entity_type_xml(entity: Type[MaltegoEntityType]) -> Element: + meta = entity.__meta__ + entity_xml = Element("MaltegoEntity", attrib={ + "id": meta.id, + "displayName": meta.display_name, + "displayNamePlural": meta.display_name_plural, + "description": meta.description, + "category": meta.category or "", + "smallIconResource": meta.small_icon_resource or "", + "largeIconResource": meta.large_icon_resource or "", + "allowedRoot": serialize_bool(meta.allowed_root), + "conversionOrder": str(meta.conversion_order), + "visible": serialize_bool(meta.visible) + }) + + if meta.base_entities: + base_entities_xml = SubElement(entity_xml, "BaseEntities") + for base_entity in meta.base_entities: + base_entity_xml = SubElement(base_entities_xml, "BaseEntity") + base_entity_xml.text = base_entity + + properties_xml = SubElement(entity_xml, "Properties", + attrib={"value": meta.value_field, + "displayValue": meta.display_value_field}) + + SubElement(properties_xml, "Groups") + fields_xml = SubElement(properties_xml, "Fields") + + field_meta: EntityFieldType + for name, field_meta in entity.__fields__.items(): + field_xml = SubElement(fields_xml, "Field", attrib={ + "name": field_meta.name, + "type": field_meta.type, + "nullable": serialize_bool(field_meta.nullable), + "hidden": serialize_bool(field_meta.hidden), + "readonly": serialize_bool(field_meta.readonly), + "description": field_meta.description or "", + "evaluator": field_meta.evaluator or "", + "displayName": field_meta.display_name, + }) + + sample = SubElement(field_xml, "SampleValue") + sample.text = field_meta.sample_value or "" + + return entity_xml + diff --git a/maltego_trx/utils.py b/maltego_trx/utils.py index 272d87d..cebc82c 100644 --- a/maltego_trx/utils.py +++ b/maltego_trx/utils.py @@ -1,7 +1,7 @@ +import math import re -from typing import TypeVar, Callable, Hashable, Iterable, Generator, List, Sequence +from typing import TypeVar, Callable, Hashable, Iterable, Generator, Sequence -import math from six import text_type, binary_type @@ -111,5 +111,5 @@ def export_as_csv(header: str, lines: Sequence[str], export_file_path: str, csv_ csv_file.writelines(map(lambda x: x + "\n", chunk)) -def serialize_bool(boolean: bool, serialized_true: str, serialized_false: str) -> str: +def serialize_bool(boolean: bool, serialized_true: str = "true", serialized_false: str = "false") -> str: return serialized_true if boolean else serialized_false