diff --git a/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.py b/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.py index b7172bedb967..acf391a85d64 100644 --- a/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.py +++ b/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.py @@ -1,184 +1,178 @@ -from enum import Enum -from pydantic import BaseModel, AnyUrl, Json # pylint: disable=no-name-in-module from CommonServerPython import * - +from http import HTTPStatus +from typing import cast VENDOR = "okta" PRODUCT = "okta" +FETCH_LIMIT = 1000 +FETCH_TIME_LIMIT = 60 -class Method(str, Enum): - """ - A list that represent the types of http request available - """ - GET = 'GET' - POST = 'POST' - PUT = 'PUT' - HEAD = 'HEAD' - PATCH = 'PATCH' - DELETE = 'DELETE' - - -class ReqParams(BaseModel): # pragma: no cover - """ - A class that stores the request query params - """ - since: str - sortOrder: Optional[str] = 'ASCENDING' - limit: str = '1000' - - def set_since_value(self, since: str) -> None: # pragma: no cover - self.since = since +class Client(BaseClient): + def __init__(self, base_url, api_key, verify=True, proxy=False): + headers = {"Accept": "application/json", + "Content-Type": "application/json", + "Authorization": f"SSWS {api_key}" + } + super().__init__(base_url=base_url, headers=headers, verify=verify, proxy=proxy) -class Request(BaseModel): # pragma: no cover - """ - A class that stores a request configuration - """ - method: Method = Method.GET - url: AnyUrl - headers: Optional[Union[Json[dict], dict]] - params: Optional[ReqParams] - verify: bool = True - data: Optional[str] = None + def get_events(self, since: int, limit: int = FETCH_LIMIT): + params = { + "sortOrder": "ASCENDING", + "since": since, + "limit": limit, + } + return self._http_request(url_suffix='/api/v1/logs', method='GET', headers=self._headers, params=params) -class Client: +def get_events_command(client: Client, total_events_to_fetch, since, + last_object_ids: List[str] = None) -> tuple[List[dict], int]: # pragma: no cover """ - A class for the client request handling + Fetches events from the okta api until the total_events_to_fetch is reached or no more events are available. + if 429:TOO_MANY_REQUESTS is returned, will return the stored_events so far and the x-rate-limit-reset + from the response headers. + Args: + client (Client): the okta client + total_events_to_fetch: the total number of events to fetch + since: start fetch from this timestamp + last_object_ids List[str]: list of uuids of the last fetched events. + Returns: + tuple[List[dict], int]: + List[dict]: list of events fetched, + int: x-rate-limit-reset: time in seconds until API can be called again. """ - - def __init__(self, request: Request): # pragma: no cover - self.request = request - - def call(self, requests=requests) -> requests.Response: # pragma: no cover + stored_events: list = [] + num_of_events_to_fetch = FETCH_LIMIT if total_events_to_fetch > FETCH_LIMIT else total_events_to_fetch + demisto.debug(f"num of events to fetch: {num_of_events_to_fetch} since: {since}") + while len(stored_events) < total_events_to_fetch: + demisto.debug(f"stored_events collected: {len(stored_events)}") try: - response = requests.request(**self.request.dict()) - response.raise_for_status() - return response + events = client.get_events(since=since, limit=num_of_events_to_fetch) # type: ignore + if events: + demisto.debug(f'received {len(events)} number of events.') + since = events[-1]['published'] + if last_object_ids: + events = remove_duplicates(events, last_object_ids) # type: ignore + if not events: + demisto.debug('Events are empty after dedup will break.') + break + stored_events.extend(events) + if len(events) < num_of_events_to_fetch: + demisto.debug(f"Number of events collected is smaller than: {num_of_events_to_fetch} will break.") + break + else: + demisto.debug('Didnt receive any events from the api.') + break + except DemistoException as exc: + msg = f'something went wrong: {exc}' + demisto.debug(msg) + if type(exc.res) is not requests.models.Response: + raise + res: requests.models.Response = exc.res + status_code: int = res.status_code + if status_code == HTTPStatus.TOO_MANY_REQUESTS.value: + demisto.debug(f'fetch-events Got 429. okta rate limit headers:\n \ + x-rate-limit-remaining: {res.headers["x-rate-limit-remaining"]}\n \ + x-rate-limit-reset: {res.headers["x-rate-limit-reset"]}\n') + return stored_events, int(res.headers['x-rate-limit-reset']) + return stored_events, 0 except Exception as exc: - msg = f'something went wrong with the http call {exc}' - LOG(msg) - raise DemistoException(msg) from exc - - def set_next_run_filter(self, after: str): - self.request.params.set_since_value(after) # type: ignore + demisto.error(f'Unexpected error.\n{traceback.format_exc()}') + if len(stored_events) == 0: + raise exc + return stored_events, 0 + return stored_events, 0 -class GetEvents: +def remove_duplicates(events: list, ids: list) -> list: """ - A class to handle the flow of the integration + Remove object duplicates by the uuid of the object """ + return [event for event in events if event['uuid'] not in ids] - def __init__(self, client: Client) -> None: - self.client = client - - def make_api_call(self): - limit_tmp = int(self.client.request.params.limit) # type: ignore - if limit_tmp > 1000: - self.client.request.params.limit = '1000' # type: ignore - response = self.client.call() - events: list = response.json() - self.client.request.params.limit = str(limit_tmp - len(events)) # type: ignore - return events - - def _iter_events(self, last_object_ids: list) -> None: # type: ignore # pragma: no cover - """ - Function that responsible for the iteration over the events returned from the Okta api - """ - - events: list = self.make_api_call() # type: ignore - if last_object_ids: - events = GetEvents.remove_duplicates(events, last_object_ids) - while True: - yield events - last = events[-1] - self.client.set_next_run_filter(last['published']) - events: list = self.make_api_call() # type: ignore - try: - assert events - except (IndexError, AssertionError): - LOG('empty list, breaking') - break - - def aggregated_results(self, last_object_ids: List[str] = None) -> List[dict]: # pragma: no cover - """ - Function to group the events returned from the api - """ - stored_events = [] - for events in self._iter_events(last_object_ids): # type: ignore - stored_events.extend(events) - if int(self.client.request.params.limit) == 0 or len(events) == 0: # type: ignore - break - return stored_events - - @staticmethod - def get_last_run(events: List[dict], last_run_after) -> dict: - """ - Get the info from the last run, it returns the time to query from and a list of ids to prevent duplications - """ - - ids = [] - # gets the last event time - last_time = events[-1].get('published') if events else last_run_after - for event in reversed(events): - if event.get('published') != last_time: - break - ids.append(event.get('uuid')) - last_time = datetime.strptime(str(last_time).lower().replace('z', ''), '%Y-%m-%dt%H:%M:%S.%f') - return {'after': last_time.isoformat(), 'ids': ids} - @staticmethod - def remove_duplicates(events: list, ids: list) -> list: - """ - Remove object duplicates by the uuid of the object - """ - return [event for event in events if event['uuid'] not in ids] +def get_last_run(events: List[dict], last_run_after) -> dict: + """ + Get the info from the last run, it returns the time to query from and a list of ids to prevent duplications + """ + ids = [] + # gets the last event time + last_time = events[-1].get('published') if events else last_run_after + for event in reversed(events): + if event.get('published') != last_time: + break + ids.append(event.get('uuid')) + last_time = datetime.strptime(str(last_time).lower().replace('z', ''), '%Y-%m-%dt%H:%M:%S.%f') + return {'after': last_time.isoformat(), 'ids': ids} + + +def fetch_events(client: Client, + start_time_epoch: int, + events_limit: int, + last_run_after, + last_object_ids: List[str] = None) -> List[dict]: + while True: + events, epoch_time_to_continue_fetch = get_events_command(client=client, + total_events_to_fetch=events_limit, + since=last_run_after, + last_object_ids=last_object_ids) + if epoch_time_to_continue_fetch == 0: + break + + sleep_time = abs(epoch_time_to_continue_fetch - start_time_epoch) + if sleep_time and sleep_time < FETCH_TIME_LIMIT: + demisto.debug(f'Will try fetch again in: {sleep_time},\ + as a result of 429 Too Many Requests HTTP status.') + time.sleep(sleep_time) + else: + break + return events def main(): # pragma: no cover try: - demisto_params = demisto.params() | demisto.args() - events_limit = int(demisto_params.get('limit', 2000)) - should_push_events = argToBoolean(demisto_params.get('should_push_events', 'false')) - after = dateparser.parse(demisto_params['after'].strip()) + start_time_epoch = int(time.time()) + demisto_params = demisto.params() + demisto_args = demisto.args() + events_limit = int(demisto_params.get('limit', 1000)) + demisto.debug(f'max_events_to_fetch={events_limit}') api_key = demisto_params['api_key']['password'] - demisto_params['headers'] = {"Accept": "application/json", "Content-Type": "application/json", - "Authorization": f"SSWS {api_key}"} - demisto_params['url'] = urljoin(demisto_params['url'], '/api/v1/logs') - last_run = demisto.getLastRun() - last_object_ids = last_run.get('ids') - if 'after' not in last_run: - last_run_after = after.isoformat() # type: ignore - else: - last_run_after = last_run['after'] - demisto_params['params'] = ReqParams(**demisto_params, since=last_run_after) - - request = Request(**demisto_params) - - client = Client(request) - - get_events = GetEvents(client) - + verify_certificate = not demisto_params.get('insecure', True) + base_url = demisto_params['url'] + client = Client(base_url=base_url, api_key=api_key, verify=verify_certificate) command = demisto.command() + demisto.debug(f'Command being called is {command}') + after: datetime if command == 'test-module': - get_events.aggregated_results() + after = cast(datetime, dateparser.parse('1 hour')) + get_events_command(client, events_limit, since=after.isoformat()) demisto.results('ok') if command == 'okta-get-events': - events = get_events.aggregated_results(last_object_ids=last_object_ids) + after = cast(datetime, dateparser.parse(demisto_args.get('from_date').strip())) + events, _ = get_events_command(client, events_limit, since=after.isoformat()) command_results = CommandResults( readable_output=tableToMarkdown('Okta Logs', events, headerTransform=pascalToSpace), raw_response=events, ) return_results(command_results) - + should_push_events = argToBoolean(demisto_args.get('should_push_events', 'false')) if should_push_events: send_events_to_xsiam(events[:events_limit], vendor=VENDOR, product=PRODUCT) elif command == 'fetch-events': - events = get_events.aggregated_results(last_object_ids=last_object_ids) + after = cast(datetime, dateparser.parse(demisto_params['after'].strip())) + last_run = demisto.getLastRun() + last_object_ids = last_run.get('ids') + if 'after' not in last_run: + last_run_after = after.isoformat() # type: ignore + else: + last_run_after = last_run['after'] + events = fetch_events(client, start_time_epoch, events_limit, + last_run_after=last_run_after, last_object_ids=last_object_ids) + demisto.debug(f'sending_events_to_xsiam: {len(events)}') send_events_to_xsiam(events[:events_limit], vendor=VENDOR, product=PRODUCT) - demisto.setLastRun(GetEvents.get_last_run(events, last_run_after)) + demisto.setLastRun(get_last_run(events, last_run_after)) except Exception as e: return_error(f'Failed to execute {demisto.command()} command. Error: {str(e)}') diff --git a/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.yml b/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.yml index bc9e48932f1d..7fd75c9680a7 100644 --- a/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.yml +++ b/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.yml @@ -67,9 +67,13 @@ script: - 'True' - 'False' required: true + - defaultValue: '1 day' + description: "Specifies the start time of the search. This filter is optional. Default is 1 day. Syntax: start_time=YYYY-MM-DDTHH:mm or '1 hour/day/month'." + name: from_date + required: false description: Manual command to fetch events and display them. name: okta-get-events - dockerimage: demisto/fastapi:1.0.0.79360 + dockerimage: demisto/fastapi:1.0.0.80124 isfetchevents: true subtype: python3 marketplaces: diff --git a/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector_test.py b/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector_test.py index 56babaf86b6c..99c6edb77a02 100644 --- a/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector_test.py +++ b/Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector_test.py @@ -1,33 +1,41 @@ -from OktaEventCollector import ReqParams, Client, Request, GetEvents, Method +from OktaEventCollector import Client, remove_duplicates, get_last_run, get_events_command, main import pytest +from unittest.mock import MagicMock +from freezegun import freeze_time +import demistomock as demisto + +id1_pub = [[{'uuid': 'a5b57ec5feaa', 'published': '2022-04-17T12:32:36.667'}]] +id2_pub = [{'uuid': 'a5b57ec5febb', 'published': '2022-04-17T12:32:36.667'}] +id3_pub = [{'uuid': 'a5b57ec5fecc', 'published': '2022-04-17T12:32:36.667'}] +id4_pub = [{'uuid': 'a5b57ec5fedd', 'published': '2022-04-17T12:32:36.667'}] -req_params = ReqParams(since='', sortOrder='ASCENDING', limit='5') -request = Request(method=Method.GET, url='https://testurl.com', headers={}, params=req_params) -client = Client(request) -get_events = GetEvents(client) id1 = {'uuid': 'a5b57ec5febb'} id2 = {'uuid': 'a5b57ec5fecc'} id3 = {'uuid': 'a12f3c5d77f3'} id4 = {'uuid': 'a12f3c5dxxxx'} -class MockResponse: - def __init__(self, data): - self.data = data +@pytest.fixture +def dummy_client(mocker): + """ + A dummy client fixture for testing. + """ + events = [id1_pub, id2_pub, id3_pub, id4_pub] - def json(self): - return self.data + client = Client('base_url', 'api_key') + mocker.patch.object(client, 'get_events', side_effect=events) + return client @pytest.mark.parametrize("events,ids,result", [ ([id1, id2, id3], ['a12f3c5d77f3'], [id1, id2]), ([id1, id2, id3], ['a12f3c5dxxxx'], [id1, id2, id3]), - ([], ['a12f3c5d77f3'], []), + ([id1], ['a5b57ec5febb'], []), ([{'uuid': 0}, {'uuid': 1}, {'uuid': 2}, {'uuid': 3}, {'uuid': 4}, {'uuid': 5}, {'uuid': 6}, {'uuid': 7}, {'uuid': 8}, {'uuid': 9}], [0, 4, 7, 9], [{'uuid': 1}, {'uuid': 2}, {'uuid': 3}, {'uuid': 5}, {'uuid': 6}, {'uuid': 8}])]) def test_remove_duplicates(events, ids, result): - assert get_events.remove_duplicates(events, ids) == result + assert remove_duplicates(events, ids) == result @pytest.mark.parametrize("events,last_run_after,result", [ @@ -53,18 +61,73 @@ def test_remove_duplicates(events, ids, result): '2022-04-17T12:31:36.667', {'after': '2022-04-17T12:31:36.667000', 'ids': []})]) def test_get_last_run(events, last_run_after, result): - assert get_events.get_last_run(events, last_run_after) == result + assert get_last_run(events, last_run_after) == result + + +def test_get_events_success(dummy_client, mocker): + mock_remove_duplicates = MagicMock() + mock_remove_duplicates.return_value = [{'id': 1, + 'published': '2022-04-17T12:32:36.667'}] + mocker.patch('OktaEventCollector.remove_duplicates', mock_remove_duplicates) + mocker.patch.object(dummy_client, 'get_events', side_effect=id1_pub) + events, epoch = get_events_command(dummy_client, 1, 'since', ['id1']) + assert len(events) == 1 + assert epoch == 0 + + +def test_get_events_no_events(dummy_client, mocker): + mocker.patch.object(dummy_client, 'get_events', return_value=None) + events, epoch = get_events_command(dummy_client, 1, 'since') + assert len(events) == 0 + assert epoch == 0 + + +@freeze_time('2022-04-17T12:32:36.667Z') +def test_429_too_many_requests(mocker, requests_mock): + mock_events = [ + { + 'uuid': 1, + 'published': '2022-04-17T14:00:00.000Z' + }, + { + 'uuid': 2, + 'published': '2022-04-17T14:00:01.000Z' + }, + { + 'uuid': 3, + 'published': '2022-04-17T14:00:02.000Z' + }, + { + 'uuid': 4, + 'published': '2022-04-17T14:00:03.000Z' + } + ] + requests_mock.get( + 'https://testurl.com/api/v1/logs?since=2022-04-17T12%3A32%3A36.667000%2B00%3A00&sortOrder=ASCENDING&limit=5', + json=mock_events) + requests_mock.get('https://testurl.com/api/v1/logs?since=2022-04-17T14%3A00%3A03.000Z&sortOrder=ASCENDING&limit=5', + status_code=429, + reason='Too many requests', + headers={ + 'x-rate-limit-remaining': '0', + 'x-rate-limit-reset': '1698343702', + }) -@pytest.mark.parametrize("time", ['2022-04-17T12:32:36.667)']) -def test_set_since_value(time): - req_params.set_since_value(time) - assert req_params.since == time + mocker.patch.object(demisto, 'command', return_value='fetch-events') + mocker.patch.object(demisto, 'getLastRun', return_value={}) + mocker.patch.object(demisto, 'params', return_value={ + 'url': 'https://testurl.com', + 'api_key': { + 'password': 'TESTAPIKEY' + }, + 'limit': 5, + 'after': '2022-04-17T12:32:36.667Z', + 'proxy': False, + 'verify': False + }) + send_events_to_xsiam_mock = mocker.patch('OktaEventCollector.send_events_to_xsiam', return_value={}) + main() -def test_make_api_call(mocker): - mock_res = MockResponse([{1}, {1}, {1}, {1}, {1}]) - mocker.patch.object(client, 'call', return_value=mock_res) - assert get_events.make_api_call() == [{1}, {1}, {1}, {1}, {1}] - mock_res.data = [{1}, {1}, {1}, {1}, {1}, {1}, {1}, {1}, {1}, {1}] - assert get_events.make_api_call() == [{1}, {1}, {1}, {1}, {1}, {1}, {1}, {1}, {1}, {1}] + send_events_to_xsiam_mock.assert_called_once_with(mock_events, vendor='okta', product='okta') diff --git a/Packs/Okta/ReleaseNotes/3_2_3.md b/Packs/Okta/ReleaseNotes/3_2_3.md new file mode 100644 index 000000000000..f1ec71f00fa0 --- /dev/null +++ b/Packs/Okta/ReleaseNotes/3_2_3.md @@ -0,0 +1,9 @@ + +#### Integrations + +##### Okta Event Collector +- Updated the Docker image to: *demisto/fastapi:1.0.0.80124*. + + +- Updated the Okta Event Collector integration to handle 429 errors. + diff --git a/Packs/Okta/pack_metadata.json b/Packs/Okta/pack_metadata.json index 42ad22e4a81f..ccf4c76978fa 100644 --- a/Packs/Okta/pack_metadata.json +++ b/Packs/Okta/pack_metadata.json @@ -2,7 +2,7 @@ "name": "Okta", "description": "Integration with Okta's cloud-based identity management service.", "support": "xsoar", - "currentVersion": "3.2.2", + "currentVersion": "3.2.3", "author": "Cortex XSOAR", "url": "https://www.paloaltonetworks.com/cortex", "email": "",