Skip to content

Commit

Permalink
Okta event collector refactor (demisto#30631)
Browse files Browse the repository at this point in the history
* refactor in process

* refactor

* refacror

* Update version.
Update RN.
Handle 429

* Finalizing eventCollector.
Added unitTest / refactor

* pylint errors.
bump docker version

* CR changes

* forgot isoformat in test-module

* refactor to the right var name

* Revert changes to Armis.
fix PR validations failures in OktaEventCollector

* fix mypy errors

* fix mypy errors. again

* CR changes to description

* set since for fetch iterations.
Add unit test

* Add unit test.
Add max iterations cap

* if events after dedup is 0 will break while loop.

* fix if events is empty break.

* Unittest changes. log changes.

* Remove redundant unit test.
Make if statement more concise.

* Adding another break condition.
if number of events fetched is smaller than
the given limit to the API will break

* Update docker version

* Remove unused imports.
  • Loading branch information
thefrieddan1 authored Nov 9, 2023
1 parent 95eb285 commit 7a6e1ec
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 167 deletions.
278 changes: 136 additions & 142 deletions Packs/Okta/Integrations/OktaEventCollector/OktaEventCollector.py
Original file line number Diff line number Diff line change
@@ -1,184 +1,178 @@
from enum import Enum
from pydantic import BaseModel, AnyUrl, Json # pylint: disable=no-name-in-module
from CommonServerPython import *

from http import HTTPStatus
from typing import cast
VENDOR = "okta"
PRODUCT = "okta"
FETCH_LIMIT = 1000
FETCH_TIME_LIMIT = 60


class Method(str, Enum):
"""
A list that represent the types of http request available
"""
GET = 'GET'
POST = 'POST'
PUT = 'PUT'
HEAD = 'HEAD'
PATCH = 'PATCH'
DELETE = 'DELETE'


class ReqParams(BaseModel): # pragma: no cover
"""
A class that stores the request query params
"""
since: str
sortOrder: Optional[str] = 'ASCENDING'
limit: str = '1000'

def set_since_value(self, since: str) -> None: # pragma: no cover
self.since = since
class Client(BaseClient):

def __init__(self, base_url, api_key, verify=True, proxy=False):
headers = {"Accept": "application/json",
"Content-Type": "application/json",
"Authorization": f"SSWS {api_key}"
}
super().__init__(base_url=base_url, headers=headers, verify=verify, proxy=proxy)

class Request(BaseModel): # pragma: no cover
"""
A class that stores a request configuration
"""
method: Method = Method.GET
url: AnyUrl
headers: Optional[Union[Json[dict], dict]]
params: Optional[ReqParams]
verify: bool = True
data: Optional[str] = None
def get_events(self, since: int, limit: int = FETCH_LIMIT):
params = {
"sortOrder": "ASCENDING",
"since": since,
"limit": limit,
}
return self._http_request(url_suffix='/api/v1/logs', method='GET', headers=self._headers, params=params)


class Client:
def get_events_command(client: Client, total_events_to_fetch, since,
last_object_ids: List[str] = None) -> tuple[List[dict], int]: # pragma: no cover
"""
A class for the client request handling
Fetches events from the okta api until the total_events_to_fetch is reached or no more events are available.
if 429:TOO_MANY_REQUESTS is returned, will return the stored_events so far and the x-rate-limit-reset
from the response headers.
Args:
client (Client): the okta client
total_events_to_fetch: the total number of events to fetch
since: start fetch from this timestamp
last_object_ids List[str]: list of uuids of the last fetched events.
Returns:
tuple[List[dict], int]:
List[dict]: list of events fetched,
int: x-rate-limit-reset: time in seconds until API can be called again.
"""

def __init__(self, request: Request): # pragma: no cover
self.request = request

def call(self, requests=requests) -> requests.Response: # pragma: no cover
stored_events: list = []
num_of_events_to_fetch = FETCH_LIMIT if total_events_to_fetch > FETCH_LIMIT else total_events_to_fetch
demisto.debug(f"num of events to fetch: {num_of_events_to_fetch} since: {since}")
while len(stored_events) < total_events_to_fetch:
demisto.debug(f"stored_events collected: {len(stored_events)}")
try:
response = requests.request(**self.request.dict())
response.raise_for_status()
return response
events = client.get_events(since=since, limit=num_of_events_to_fetch) # type: ignore
if events:
demisto.debug(f'received {len(events)} number of events.')
since = events[-1]['published']
if last_object_ids:
events = remove_duplicates(events, last_object_ids) # type: ignore
if not events:
demisto.debug('Events are empty after dedup will break.')
break
stored_events.extend(events)
if len(events) < num_of_events_to_fetch:
demisto.debug(f"Number of events collected is smaller than: {num_of_events_to_fetch} will break.")
break
else:
demisto.debug('Didnt receive any events from the api.')
break
except DemistoException as exc:
msg = f'something went wrong: {exc}'
demisto.debug(msg)
if type(exc.res) is not requests.models.Response:
raise
res: requests.models.Response = exc.res
status_code: int = res.status_code
if status_code == HTTPStatus.TOO_MANY_REQUESTS.value:
demisto.debug(f'fetch-events Got 429. okta rate limit headers:\n \
x-rate-limit-remaining: {res.headers["x-rate-limit-remaining"]}\n \
x-rate-limit-reset: {res.headers["x-rate-limit-reset"]}\n')
return stored_events, int(res.headers['x-rate-limit-reset'])
return stored_events, 0
except Exception as exc:
msg = f'something went wrong with the http call {exc}'
LOG(msg)
raise DemistoException(msg) from exc

def set_next_run_filter(self, after: str):
self.request.params.set_since_value(after) # type: ignore
demisto.error(f'Unexpected error.\n{traceback.format_exc()}')
if len(stored_events) == 0:
raise exc
return stored_events, 0
return stored_events, 0


class GetEvents:
def remove_duplicates(events: list, ids: list) -> list:
"""
A class to handle the flow of the integration
Remove object duplicates by the uuid of the object
"""
return [event for event in events if event['uuid'] not in ids]

def __init__(self, client: Client) -> None:
self.client = client

def make_api_call(self):
limit_tmp = int(self.client.request.params.limit) # type: ignore
if limit_tmp > 1000:
self.client.request.params.limit = '1000' # type: ignore
response = self.client.call()
events: list = response.json()
self.client.request.params.limit = str(limit_tmp - len(events)) # type: ignore
return events

def _iter_events(self, last_object_ids: list) -> None: # type: ignore # pragma: no cover
"""
Function that responsible for the iteration over the events returned from the Okta api
"""

events: list = self.make_api_call() # type: ignore
if last_object_ids:
events = GetEvents.remove_duplicates(events, last_object_ids)
while True:
yield events
last = events[-1]
self.client.set_next_run_filter(last['published'])
events: list = self.make_api_call() # type: ignore
try:
assert events
except (IndexError, AssertionError):
LOG('empty list, breaking')
break

def aggregated_results(self, last_object_ids: List[str] = None) -> List[dict]: # pragma: no cover
"""
Function to group the events returned from the api
"""
stored_events = []
for events in self._iter_events(last_object_ids): # type: ignore
stored_events.extend(events)
if int(self.client.request.params.limit) == 0 or len(events) == 0: # type: ignore
break
return stored_events

@staticmethod
def get_last_run(events: List[dict], last_run_after) -> dict:
"""
Get the info from the last run, it returns the time to query from and a list of ids to prevent duplications
"""

ids = []
# gets the last event time
last_time = events[-1].get('published') if events else last_run_after
for event in reversed(events):
if event.get('published') != last_time:
break
ids.append(event.get('uuid'))
last_time = datetime.strptime(str(last_time).lower().replace('z', ''), '%Y-%m-%dt%H:%M:%S.%f')
return {'after': last_time.isoformat(), 'ids': ids}

@staticmethod
def remove_duplicates(events: list, ids: list) -> list:
"""
Remove object duplicates by the uuid of the object
"""
return [event for event in events if event['uuid'] not in ids]
def get_last_run(events: List[dict], last_run_after) -> dict:
"""
Get the info from the last run, it returns the time to query from and a list of ids to prevent duplications
"""
ids = []
# gets the last event time
last_time = events[-1].get('published') if events else last_run_after
for event in reversed(events):
if event.get('published') != last_time:
break
ids.append(event.get('uuid'))
last_time = datetime.strptime(str(last_time).lower().replace('z', ''), '%Y-%m-%dt%H:%M:%S.%f')
return {'after': last_time.isoformat(), 'ids': ids}


def fetch_events(client: Client,
start_time_epoch: int,
events_limit: int,
last_run_after,
last_object_ids: List[str] = None) -> List[dict]:
while True:
events, epoch_time_to_continue_fetch = get_events_command(client=client,
total_events_to_fetch=events_limit,
since=last_run_after,
last_object_ids=last_object_ids)
if epoch_time_to_continue_fetch == 0:
break

sleep_time = abs(epoch_time_to_continue_fetch - start_time_epoch)
if sleep_time and sleep_time < FETCH_TIME_LIMIT:
demisto.debug(f'Will try fetch again in: {sleep_time},\
as a result of 429 Too Many Requests HTTP status.')
time.sleep(sleep_time)
else:
break
return events


def main(): # pragma: no cover
try:
demisto_params = demisto.params() | demisto.args()
events_limit = int(demisto_params.get('limit', 2000))
should_push_events = argToBoolean(demisto_params.get('should_push_events', 'false'))
after = dateparser.parse(demisto_params['after'].strip())
start_time_epoch = int(time.time())
demisto_params = demisto.params()
demisto_args = demisto.args()
events_limit = int(demisto_params.get('limit', 1000))
demisto.debug(f'max_events_to_fetch={events_limit}')
api_key = demisto_params['api_key']['password']
demisto_params['headers'] = {"Accept": "application/json", "Content-Type": "application/json",
"Authorization": f"SSWS {api_key}"}
demisto_params['url'] = urljoin(demisto_params['url'], '/api/v1/logs')
last_run = demisto.getLastRun()
last_object_ids = last_run.get('ids')
if 'after' not in last_run:
last_run_after = after.isoformat() # type: ignore
else:
last_run_after = last_run['after']
demisto_params['params'] = ReqParams(**demisto_params, since=last_run_after)

request = Request(**demisto_params)

client = Client(request)

get_events = GetEvents(client)

verify_certificate = not demisto_params.get('insecure', True)
base_url = demisto_params['url']
client = Client(base_url=base_url, api_key=api_key, verify=verify_certificate)
command = demisto.command()
demisto.debug(f'Command being called is {command}')
after: datetime
if command == 'test-module':
get_events.aggregated_results()
after = cast(datetime, dateparser.parse('1 hour'))
get_events_command(client, events_limit, since=after.isoformat())
demisto.results('ok')

if command == 'okta-get-events':
events = get_events.aggregated_results(last_object_ids=last_object_ids)
after = cast(datetime, dateparser.parse(demisto_args.get('from_date').strip()))
events, _ = get_events_command(client, events_limit, since=after.isoformat())
command_results = CommandResults(
readable_output=tableToMarkdown('Okta Logs', events, headerTransform=pascalToSpace),
raw_response=events,
)
return_results(command_results)

should_push_events = argToBoolean(demisto_args.get('should_push_events', 'false'))
if should_push_events:
send_events_to_xsiam(events[:events_limit], vendor=VENDOR, product=PRODUCT)

elif command == 'fetch-events':
events = get_events.aggregated_results(last_object_ids=last_object_ids)
after = cast(datetime, dateparser.parse(demisto_params['after'].strip()))
last_run = demisto.getLastRun()
last_object_ids = last_run.get('ids')
if 'after' not in last_run:
last_run_after = after.isoformat() # type: ignore
else:
last_run_after = last_run['after']
events = fetch_events(client, start_time_epoch, events_limit,
last_run_after=last_run_after, last_object_ids=last_object_ids)
demisto.debug(f'sending_events_to_xsiam: {len(events)}')
send_events_to_xsiam(events[:events_limit], vendor=VENDOR, product=PRODUCT)
demisto.setLastRun(GetEvents.get_last_run(events, last_run_after))
demisto.setLastRun(get_last_run(events, last_run_after))

except Exception as e:
return_error(f'Failed to execute {demisto.command()} command. Error: {str(e)}')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ script:
- 'True'
- 'False'
required: true
- defaultValue: '1 day'
description: "Specifies the start time of the search. This filter is optional. Default is 1 day. Syntax: start_time=YYYY-MM-DDTHH:mm or '1 hour/day/month'."
name: from_date
required: false
description: Manual command to fetch events and display them.
name: okta-get-events
dockerimage: demisto/fastapi:1.0.0.79360
dockerimage: demisto/fastapi:1.0.0.80124
isfetchevents: true
subtype: python3
marketplaces:
Expand Down
Loading

0 comments on commit 7a6e1ec

Please sign in to comment.