Skip to content

Commit

Permalink
implemented ids_policy module
Browse files Browse the repository at this point in the history
  • Loading branch information
ansibleguy committed Dec 25, 2023
1 parent 1a366ae commit cc2cc13
Show file tree
Hide file tree
Showing 14 changed files with 598 additions and 117 deletions.
186 changes: 87 additions & 99 deletions README.md

Large diffs are not rendered by default.

89 changes: 86 additions & 3 deletions docs/source/modules/ids.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,27 @@ ansibleguy.opnsense.ids_user_rule
"enabled","boolean","false","true","\-","En- or disable the rule"
"reload","boolean","false","true","\-", .. include:: ../_include/param_reload.rst

Usage
*****

TBD
ansibleguy.opnsense.ids_policy
==============================

.. csv-table:: Definition
:header: "Parameter", "Type", "Required", "Default", "Aliases", "Comment"
:widths: 15 10 10 10 10 45

"name","string","true","\-","description, desc","Unique policy name"
"priority","integer","false","0","prio","Policies are processed on a first match basis a lower number means more important"
"rulesets","list","false","\-","rs","Rulesets this policy applies to (all when none selected). Rulesets must be enabled beforehand!"
"action","list","false","\-","a","One or multiple of: 'disable', 'alert', 'drop'. Rule configured action"
"new_action","string","false","alert","na","One or multiple of: 'default', 'disable', 'alert', 'drop'. Action to perform when filter policy applies"
"rules","dictionary","false","\-","\-","Key-value pairs of policy-rules as provided by the enabled rulesets. Values must be string or lists. Example: '{\"rules\": {\"signature_severity\": [\"Minor\", \"Major\"], \"tag\": \"Dshield\"}}'"

Info
****

.. warning::

The :code:`list` module will not return all details of the existing entries `as the current implementation does not scale well <https://github.com/opnsense/core/issues/7094>`_.

Examples
********
Expand Down Expand Up @@ -378,3 +395,69 @@ ansibleguy.opnsense.ids_user_rule
- name: Printing Rules
ansible.builtin.debug:
var: existing_rules.data
ansibleguy.opnsense.ids_policy
==============================

.. code-block:: yaml
- hosts: localhost
gather_facts: false
module_defaults:
group/ansibleguy.opnsense.all:
firewall: 'opnsense.template.ansibleguy.net'
api_credential_file: '/home/guy/.secret/opn.key'
ansibleguy.opnsense.list:
target: 'ids_policy'
tasks:
- name: Example
ansibleguy.opnsense.ids_policy:
name: 'Example'
# priority: 0
# rulesets: []
# action: []
# new_action: 'alert'
# rules: {}
# enabled: true
# reload: true
# debug: false
- name: Adding
ansibleguy.opnsense.ids_policy:
name: 'ANSIBLE_TEST_1_1'
priority: 1
rulesets: 'ET open/drop'
action: ['drop']
new_action: 'alert'
rules:
classtype: ['misc-attack', 'bad-unknown']
signature_severity: 'Minor'
- name: Disabling
ansibleguy.opnsense.ids_policy:
name: 'ANSIBLE_TEST_1_1'
priority: 1
rulesets: 'ET open/drop'
action: ['drop']
new_action: 'alert'
rules:
classtype: ['misc-attack', 'bad-unknown']
signature_severity: 'Minor'
enabled: false
- name: Removing
ansibleguy.opnsense.ids_policy:
name: 'ANSIBLE_TEST_1_1'
state: 'absent'
- name: Listing Policies
ansibleguy.opnsense.list:
# target: 'ids_policy'
register: existing_policies
- name: Printing Policies
ansible.builtin.debug:
var: existing_policies.data
4 changes: 2 additions & 2 deletions plugins/module_utils/base/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ def _set_existing(self) -> None:
if _existing is not None and len(_existing) > 0:
self.e = _existing

def _simplify_existing(self, existing: dict) -> dict:
def simplify_existing(self, existing: dict) -> dict:
translate, typing, bool_invert, value_map = {}, {}, [], {}

if hasattr(self.i, self.ATTR_TRANSLATE):
Expand Down Expand Up @@ -595,7 +595,7 @@ def _call_simple(self) -> Callable:
if hasattr(self.i, '_simplify_existing'):
return self.i._simplify_existing

return self._simplify_existing
return self.simplify_existing

def _call_search(self) -> (list, dict):
if hasattr(self.i, '_search_call'):
Expand Down
3 changes: 1 addition & 2 deletions plugins/module_utils/main/bind_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,10 @@ def check(self) -> None:
'You may have to create it before managing its records.'
)

# pylint: disable=W0212
self.existing = get_multiple_matching(
module=self.m, existing_items=self.existing_entries,
compare_item=self.p, match_fields=self.p['match_fields'],
simplify_func=self.b._simplify_existing,
simplify_func=self.b.simplify_existing,
)

self.exists_rr = len(self.existing) > 1
Expand Down
3 changes: 1 addition & 2 deletions plugins/module_utils/main/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def _build_all_available_cmds(self, raw_cmds: dict):
self.available_commands.append(cmd)

def _simplify_existing(self, existing: dict) -> dict:
# pylint: disable=W0212
simple = self.b._simplify_existing(existing)
simple = self.b.simplify_existing(existing)
simple.pop('origin')
self._build_all_available_cmds(existing['command'])
return simple
3 changes: 1 addition & 2 deletions plugins/module_utils/main/ids_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ def _search_call(self) -> dict:
**self.call_cnf, **{'command': self.CMDS['search']}
})[self.API_KEY_1][self.API_KEY]

# pylint: disable=W0212
simple = self.b._simplify_existing(settings)
simple = self.b.simplify_existing(settings)

try:
# resolve schedule/cron name to uuid
Expand Down
177 changes: 177 additions & 0 deletions plugins/module_utils/main/ids_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from ansible.module_utils.basic import AnsibleModule

from ansible_collections.ansibleguy.opnsense.plugins.module_utils.base.api import \
Session
from ansible_collections.ansibleguy.opnsense.plugins.module_utils.base.cls import BaseModule
from ansible_collections.ansibleguy.opnsense.plugins.module_utils.main.ids_ruleset import Ruleset
from ansible_collections.ansibleguy.opnsense.plugins.module_utils.helper.main import \
is_unset, is_true, ensure_list


class Policy(BaseModule):
FIELD_ID = 'description'
CMDS = {
'add': 'addPolicy',
'del': 'delPolicy',
'set': 'setPolicy',
'search': 'searchPolicy',
'detail': 'getPolicy',
'toggle': 'togglePolicy',
}
API_KEY = 'policy'
API_KEY_PATH = f'policies.{API_KEY}'
API_MOD = 'ids'
API_CONT = 'settings'
API_CONT_REL = 'service'
API_CMD_REL = 'reconfigure'
FIELDS_CHANGE = ['priority', 'action', 'rulesets', 'new_action']
FIELDS_ALL = ['enabled', FIELD_ID]
FIELDS_ALL.extend(FIELDS_CHANGE)
FIELDS_TRANSLATE = {
'priority': 'prio',
}
FIELDS_TRANSLATE_SPECIAL = {
'rules': 'content',
}
FIELDS_TYPING = {
'bool': ['enabled'],
'select': ['new_action'],
'list': ['rulesets', 'action'],
'int': ['priority'],
}
FIELDS_IGNORE = ['content']
EXIST_ATTR = 'policy'
QUERY_MAX_RULES = 5000

def __init__(self, module: AnsibleModule, result: dict, session: Session = None):
BaseModule.__init__(self=self, m=module, r=result, s=session)
self.policy = {}
self.exists = False
self.enabled_rulesets = {}
self.ruleset_names = {}

def check(self) -> None:
self._search_call()
if self.p['state'] == 'present' and not is_unset(self.p['rulesets']):
if len(self.enabled_rulesets) == 0:
self._search_rulesets()

if len(self.enabled_rulesets) == 0:
self.m.fail_json("You need to enable rulesets before referencing them!")

ruleset_uuids = []
for ruleset in self.p['rulesets']:
found = False
for enabled_ruleset, uuid in self.enabled_rulesets.items():
if enabled_ruleset == ruleset:
found = True
ruleset_uuids.append(uuid)

if not found:
self.m.fail_json(
f"The ruleset '{ruleset}' was not found! "
"You need to enable a ruleset before referencing it. "
f"Enabled ones are: {list(self.enabled_rulesets.keys())}"
)

ruleset_uuids.sort()
self.p['rulesets'] = ruleset_uuids

self.r['diff']['after'] = self.b.build_diff(data=self.p)

def get_existing(self) -> list:
return self._search_call()

def _search_call(self) -> list:
# NOTE: workaround for issue with incomplete response-data from 'get' endpoint:
# https://github.com/opnsense/core/issues/7094
existing = self.s.post(cnf={
**self.call_cnf,
'command': self.CMDS['search'],
'data': {'current': 1, 'rowCount': self.QUERY_MAX_RULES, 'sort': self.FIELD_ID, 'searchPhrase': ''},
})['rows']

if self.FIELD_ID in self.p: # list module
for policy in existing:
if policy[self.FIELD_ID] == self.p[self.FIELD_ID]:
self.exists = True
self.call_cnf['params'] = [policy['uuid']]
raw_policy = self.s.get(cnf={
**self.call_cnf,
'command': self.CMDS['detail'],
})[self.API_KEY]
self.policy['rules'] = self._parse_rules(raw_policy)
self.policy = self.b.simplify_existing(raw_policy)
self.enabled_rulesets = self._format_ruleset(raw_policy['rulesets'])
self.policy['uuid'] = policy['uuid']
if 'content' in self.policy:
self.policy.pop('content')

self.r['diff']['before'] = self.policy

return existing

@staticmethod
def _parse_rules(raw_policy: dict) -> dict:
parsed = {}

if 'content' not in raw_policy:
return parsed

for key_value, values in raw_policy['content'].items():
if is_true(values['selected']):
key, value = key_value.split('.', 1)
if key in parsed:
parsed[key].append(value)
else:
parsed[key] = [value]

return parsed

def _build_request(self) -> dict:
raw_request = self.b.build_request(ignore_fields=['rules'])

# formatting dynamic rules
# example: 'policy_content_affected_product: "affected_product.Adobe_Flash,affected_product.Adobe_Reader"'
raw_request_rules = {}
raw_request_content = []
for key, values in self.p['rules'].items():
fmt_values = [f'{key}.{value}' for value in ensure_list(values)]
raw_request_rules[f'policy_content_{key}'] = self.b.RESP_JOIN_CHAR.join(fmt_values)
raw_request_content.extend(fmt_values)

raw_request[self.API_KEY]['content'] = self.b.RESP_JOIN_CHAR.join(raw_request_content)

return {
**raw_request,
**raw_request_rules,
}

def _search_rulesets(self):
# check if any ruleset is enabled before creating a new policy
self.enabled_rulesets = self._format_ruleset(
self.s.get(cnf={
**self.call_cnf,
'command': self.CMDS['detail'],
})[self.API_KEY]['rulesets']
)

def _search_ruleset_names(self):
ruleset_details = self.s.get(cnf={
**self.call_cnf,
'command': Ruleset.CMDS['search'],
})['rows']

for ruleset in ruleset_details:
self.ruleset_names[ruleset[Ruleset.FIELD_PK]] = ruleset[Ruleset.FIELD_ID]

def _format_ruleset(self, rulesets: dict) -> dict:
if len(self.ruleset_names) == 0:
self._search_ruleset_names()

formatted = {}

for uuid, ruleset in rulesets.items():
formatted[self.ruleset_names[ruleset['value']]] = uuid

return formatted
3 changes: 1 addition & 2 deletions plugins/module_utils/main/ids_user_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ def _search_call(self) -> list:
if rule[self.FIELD_ID] == self.p[self.FIELD_ID]:
self.exists = True
self.call_cnf['params'] = [rule['uuid']]
# pylint: disable=W0212
self.rule = self.b._simplify_existing(
self.rule = self.b.simplify_existing(
self.s.get(cnf={
**self.call_cnf,
'command': self.CMDS['detail'],
Expand Down
3 changes: 1 addition & 2 deletions plugins/module_utils/main/webproxy_general.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ def _search_call(self) -> dict:
**self.call_cnf, **{'command': self.CMDS['search']}
})[self.API_KEY_1][self.API_KEY]

# pylint: disable=W0212
simple = self.b._simplify_existing(settings)
simple = self.b.simplify_existing(settings)

simple['log'] = is_true(
settings['logging']['enable'][self.FIELDS_TRANSLATE_SPECIAL['log']]
Expand Down
Loading

0 comments on commit cc2cc13

Please sign in to comment.