Skip to content

Commit

Permalink
Merge pull request #35 from paterva/feat/local-mtz
Browse files Browse the repository at this point in the history
Feature: mtz config generation for local transforms
  • Loading branch information
felixatmaltego authored Aug 10, 2022
2 parents 4022240 + 4d8d005 commit b0c3eae
Show file tree
Hide file tree
Showing 11 changed files with 836 additions and 221 deletions.
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
[![Sonatype Jake](https://github.com/paterva/maltego-trx/actions/workflows/sonatype-jack.yml/badge.svg)](https://github.com/paterva/maltego-trx/actions/workflows/sonatype-jack.yml)

## Release Notes

__1.6.0__: Automatically generate am `.mtz` for your local transforms

__1.5.2__: Add logging output for invalid / missing params in xml serialization

__1.5.1__: Add ignored files to starter and use README for pypi
Expand Down Expand Up @@ -305,6 +308,36 @@ registry.write_settings_config()
handle_run(__name__, sys.argv, application)
```

### Generating an `.mtz` config with your local Transforms

Since `maltego-trx>=1.6.0` you can generate an `.mtz` config file with your local transforms.

If you're already using the `TransformRegistry`, just invoke the `write_local_config()` method.

```python
# project.py

registry.write_local_mtz()
```

This will create a file called `local.mtz` in the current directory. You can then import this file into Maltego and
start using your local transforms faster. Just remember that settings are not passed to local transforms.

The method takes in the same arguments as the interface in the Maltego client.
If you are using a `virtualenv` environment, you might want to change the `command` argument to use that.

```bash
# project.py

registry.write_local_mtz(
mtz_path: str = "./local.mtz", # path to the local .mtz file
working_dir: str = ".",
command: str = "python3", # for a venv you might want to use `./venv/bin/python3`
params: str = "project.py",
debug: bool = True
)
```

## Legacy Transforms

[Documentation](https://docs.maltego.com/support/solutions/articles/15000018299-porting-old-trx-transforms-to-the-latest-version)
Expand Down
2 changes: 1 addition & 1 deletion maltego_trx/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "1.5.2"
VERSION = "1.6.0"
238 changes: 186 additions & 52 deletions maltego_trx/decorator_registry.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,41 @@
import logging
import os
import zipfile
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import chain
from typing import List, Optional, Dict, Iterable
from typing import List, Optional, Dict, Iterable, Tuple

from maltego_trx.utils import filter_unique, pascal_case_to_title, escape_csv_fields, export_as_csv, serialize_bool, \
name_to_path
from maltego_trx.mtz import (
create_local_server_xml,
create_settings_xml,
create_transform_xml,
create_transform_set_xml,
)
from maltego_trx.utils import (
filter_unique,
pascal_case_to_title,
escape_csv_fields,
export_as_csv,
serialize_bool,
name_to_path,
serialize_xml,
)

TRANSFORMS_CSV_HEADER = "Owner,Author,Disclaimer,Description,Version," \
"Name,UIName,URL,entityName," \
"oAuthSettingId,transformSettingIDs,seedIDs"
TRANSFORMS_CSV_HEADER = (
"Owner,Author,Disclaimer,Description,Version,"
"Name,UIName,URL,entityName,"
"oAuthSettingId,transformSettingIDs,seedIDs"
)
SETTINGS_CSV_HEADER = "Name,Type,Display,DefaultValue,Optional,Popup"


@dataclass(frozen=True)
class TransformSet:
name: str
description: str


@dataclass()
class TransformMeta:
class_name: str
Expand All @@ -20,9 +44,10 @@ class TransformMeta:
description: str
output_entities: List[str]
disclaimer: str
transform_set: TransformSet


@dataclass()
@dataclass(frozen=True)
class TransformSetting:
name: str
display_name: str
Expand All @@ -49,87 +74,196 @@ class TransformRegistry:
host_url: str
seed_ids: List[str]

version: str = '0.1'
version: str = "0.1"
display_name_suffix: str = ""

global_settings: List[TransformSetting] = field(default_factory=list)
oauth_settings_id: Optional[str] = ""

transform_metas: Dict[str, TransformMeta] = field(init=False, default_factory=dict)
transform_settings: Dict[str, List[TransformSetting]] = field(init=False, default_factory=dict)
transform_settings: Dict[str, List[TransformSetting]] = field(
init=False, default_factory=dict
)
transform_sets: Dict[TransformSet, List[str]] = field(
init=False, default_factory=lambda: defaultdict(list)
)

def register_transform(self, display_name: str, input_entity: str, description: str,
settings: List[TransformSetting] = None, output_entities: List[str] = None,
disclaimer: str = ""):
""" This method can be used as a decorator on transform classes. The data will be used to fill out csv config
files to be imported into a TDS.
def register_transform(
self,
display_name: str,
input_entity: str,
description: str,
settings: List[TransformSetting] = None,
output_entities: List[str] = None,
disclaimer: str = "",
transform_set: TransformSet = None,
):
"""This method can be used as a decorator on transform classes. The data will be used to fill out csv config
files to be imported into a TDS.
"""

def decorated(transform_callable: object):
cleaned_transform_name = name_to_path(transform_callable.__name__)
display = display_name or pascal_case_to_title(transform_callable.__name__)

meta = TransformMeta(cleaned_transform_name,
display, input_entity,
description,
output_entities or [],
disclaimer)
meta = TransformMeta(
cleaned_transform_name,
display,
input_entity,
description,
output_entities or [],
disclaimer,
transform_set=transform_set,
)
self.transform_metas[cleaned_transform_name] = meta

if settings:
self.transform_settings[cleaned_transform_name] = settings

if transform_set:
self.transform_sets[transform_set].append(cleaned_transform_name)

return transform_callable

return decorated

def write_transforms_config(self, config_path: str = "./transforms.csv", csv_line_limit: int = 100):
"""Exports the collected transform metadata as a csv-file to config_path"""
def _create_transforms_config(self) -> Iterable[str]:
global_settings_full_names = [gs.id for gs in self.global_settings]

csv_lines = []
for transform_name, transform_meta in self.transform_metas.items():
meta_settings = [setting.id for setting in
self.transform_settings.get(transform_name, [])]
meta_settings = [
setting.id
for setting in self.transform_settings.get(transform_name, [])
]

transform_row = [
self.owner,
self.author,
transform_meta.disclaimer,
transform_meta.description,
self.version,
transform_name,
transform_meta.display_name + self.display_name_suffix,
os.path.join(self.host_url, "run", transform_name),
transform_meta.input_entity,
";".join(self.oauth_settings_id),
# combine global and transform scoped settings
";".join(chain(meta_settings, global_settings_full_names)),
";".join(self.seed_ids)
self.owner,
self.author,
transform_meta.disclaimer,
transform_meta.description,
self.version,
transform_name,
transform_meta.display_name + self.display_name_suffix,
os.path.join(self.host_url, "run", transform_name),
transform_meta.input_entity,
";".join(self.oauth_settings_id),
# combine global and transform scoped settings
";".join(chain(meta_settings, global_settings_full_names)),
";".join(self.seed_ids),
]

escaped_fields = escape_csv_fields(*transform_row)
csv_lines.append(",".join(escaped_fields))
yield ",".join(escaped_fields)

def write_transforms_config(
self, config_path: str = "./transforms.csv", csv_line_limit: int = 100
):
"""Exports the collected transform metadata as a csv-file to config_path"""

export_as_csv(TRANSFORMS_CSV_HEADER, csv_lines, config_path, csv_line_limit)
csv_lines = self._create_transforms_config()

def write_settings_config(self, config_path: str = "./settings.csv", csv_line_limit: int = 100):
"""Exports the collected settings metadata as a csv-file to config_path"""
chained_settings = chain(self.global_settings, *list(self.transform_settings.values()))
unique_settings: Iterable[TransformSetting] = filter_unique(lambda s: s.name, chained_settings)
export_as_csv(
TRANSFORMS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)

def _create_settings_config(self) -> Iterable[str]:
chained_settings = chain(
self.global_settings, *list(self.transform_settings.values())
)
unique_settings: Iterable[TransformSetting] = filter_unique(
lambda s: s.name, chained_settings
)

csv_lines = []
for setting in unique_settings:
setting_row = [
setting.id,
setting.setting_type,
setting.display_name,
setting.default_value or "",
serialize_bool(setting.optional, 'True', 'False'),
serialize_bool(setting.popup, 'Yes', 'No')
setting.id,
setting.setting_type,
setting.display_name,
setting.default_value or "",
serialize_bool(setting.optional, "True", "False"),
serialize_bool(setting.popup, "Yes", "No"),
]

escaped_fields = escape_csv_fields(*setting_row)
csv_lines.append(",".join(escaped_fields))
yield ",".join(escaped_fields)

def write_settings_config(
self, config_path: str = "./settings.csv", csv_line_limit: int = 100
):
"""Exports the collected settings metadata as a csv-file to config_path"""

csv_lines = self._create_settings_config()

export_as_csv(
SETTINGS_CSV_HEADER, tuple(csv_lines), config_path, csv_line_limit
)

def _create_local_mtz(
self,
working_dir: str = ".",
command: str = "python3",
params: str = "project.py",
debug: bool = True,
) -> Iterable[Tuple[str, str]]:
working_dir = os.path.abspath(working_dir)
if self.global_settings:
logging.warning(
f"Settings are not supported with local transforms. "
f"Global settings are: {', '.join(map(lambda s: s.name, self.global_settings))}"
)

"""Creates an .mtz for bulk importing local transforms"""
server_xml = create_local_server_xml(self.transform_metas.keys())

server_xml_str = serialize_xml(server_xml)
yield "Servers/Local.tas", server_xml_str

for name, meta in self.transform_metas.items():
settings_xml = create_settings_xml(
working_dir, command, f"{params} local {name}", debug
)
settings_xml_str = serialize_xml(settings_xml)

tx_settings = self.transform_settings.get(name)
if tx_settings:
logging.warning(
"Settings are not supported with local transforms. "
f"Transform '{meta.display_name}' has: {', '.join(map(lambda s: s.name, tx_settings))}"
)

xml = create_transform_xml(
name,
meta.display_name,
meta.description,
meta.input_entity,
self.author,
)

xml_str = serialize_xml(xml)

yield f"TransformRepositories/Local/{name}.transform", xml_str
yield f"TransformRepositories/Local/{name}.transformsettings", settings_xml_str

for transform_set, transforms in self.transform_sets.items():
set_xml = create_transform_set_xml(
transform_set.name, transform_set.description, transforms
)

set_xml_str = serialize_xml(set_xml)

yield f"TransformSets/{transform_set.name}.set", set_xml_str

def write_local_mtz(
self,
mtz_path: str = "./local.mtz",
working_dir: str = ".",
command: str = "python3",
params: str = "project.py",
debug: bool = True,
):

export_as_csv(SETTINGS_CSV_HEADER, csv_lines, config_path, csv_line_limit)
with zipfile.ZipFile(mtz_path, "w") as mtz:
for path, content in self._create_local_mtz(
working_dir, command, params, debug
):
mtz.writestr(path, content)
Loading

0 comments on commit b0c3eae

Please sign in to comment.