From f8b8b2ba6a552fa58a2d0f1dc0a9a844d44475fa Mon Sep 17 00:00:00 2001 From: Francisco Santos Date: Fri, 24 Nov 2023 14:40:56 +0100 Subject: [PATCH] Netwatch example: bulk domain updates (#169) --- examples/livehunt_network_watch.py | 104 +++++++++++++++++--- examples/netwatch_templates/domain.yara | 2 +- examples/netwatch_templates/file.yara | 4 +- examples/netwatch_templates/ip_address.yara | 2 +- examples/netwatch_templates/url.yara | 2 +- 5 files changed, 93 insertions(+), 21 deletions(-) diff --git a/examples/livehunt_network_watch.py b/examples/livehunt_network_watch.py index 5cce2b6..3a4f8c1 100644 --- a/examples/livehunt_network_watch.py +++ b/examples/livehunt_network_watch.py @@ -39,6 +39,11 @@ RULESET_ENTITY = ("file", "url", "domain", "ip_address") RULESET_LINK = "https://www.virustotal.com/yara-editor/livehunt/" +EMPTY_DOMAIN_LIST_MSG = ( + "* Empty domain list, use --add-domain domain.tld or bulk operations to" + " register them" +) + def extract_domains_from_rule(rules): """Extract the domain list from the comment of a yara rule.""" @@ -80,11 +85,22 @@ def render_template(entity, domains): template += "\n" kind_template = os.path.join(TEMPLATE_DIR, entity + ".yara") + escaped_domains = {} with open(kind_template, encoding="utf-8") as f: rule_block = f.read() for domain in domains: - domain_escaped = re.compile(r"[^\w\d]").sub("_", domain) + domain_escaped = domain.lower() + domain_escaped = re.compile(r"[^[a-z\d]").sub("_", domain_escaped) + domain_escaped = re.compile(r"(_(?i:_)+)").sub("_", domain_escaped) + + if not domain_escaped in escaped_domains: + escaped_domains[domain_escaped] = 0 + escaped_domains[domain_escaped] += 1 + + if escaped_domains[domain_escaped] > 1: + domain_escaped = f"{domain_escaped}_{escaped_domains[domain_escaped]}" + template += rule_block.replace("${domain}", domain).replace( "${domain_escaped}", domain_escaped ) @@ -118,13 +134,20 @@ async def upload_rulesets(queue): ) try: # Fix for https://github.com/VirusTotal/vt-py/issues/155 issue. - await client.patch_async( + result = await client.patch_async( path="/intelligence/hunting_rulesets/" + task.get("id"), json_data={"data": ruleset.to_dict()}, ) - print(f'Ruleset {name} [{RULESET_LINK}{task["id"]}] updated.') except vt.error.APIError as e: print(f"Error updating {name}: {e}") + sys.exit(1) + + response = await result.json_async() + if response.get("error") is not None: + print(f"{name}: {response}") + sys.exit(1) + + print(f'Ruleset {name} [{RULESET_LINK}{task["id"]}] updated.') else: ruleset = vt.Object( @@ -141,13 +164,34 @@ async def upload_rulesets(queue): result = await client.post_object_async( path="/intelligence/hunting_rulesets", obj=ruleset ) - print(f"Ruleset {name} [{RULESET_LINK}{result.id}] created.") except vt.error.APIError as e: print(f"Error saving {name}: {e}") + sys.exit(1) + + response = await result.json_async() + if response.get("error") is not None: + print(f"{name}: {response}") + sys.exit(1) + + print(f"Ruleset {name} [{RULESET_LINK}{result.id}] created.") queue.task_done() +def load_bulk_file_domains(filename): + if not os.path.isfile(filename): + print(f"Error: File {filename} does not exists.") + sys.exit(1) + + domains = [] + with open(filename, encoding="utf-8") as bulk_file: + for line in bulk_file.read().split("\n"): + if not line: + continue + domains.append(line) + return domains + + async def main(): parser = argparse.ArgumentParser( description=( @@ -160,9 +204,27 @@ async def main(): action="store_true", help="List current monitored domains.", ) - parser.add_argument("-a", "--add-domain", help="Add a domain to the list.") parser.add_argument( - "-d", "--delete-domain", help="Remove a domain from the list." + "-a", + "--add-domain", + action="append", + type=str, + help="Add a domain to the list.", + ) + parser.add_argument( + "-d", + "--delete-domain", + action="append", + type=str, + help="Remove a domain from the list.", + ) + parser.add_argument( + "--bulk-append", + help="Add a list of domains from an input file.", + ) + parser.add_argument( + "--bulk-replace", + help="Replace the remote list with a new list from a file.", ) parser.add_argument( "--workers", @@ -185,14 +247,15 @@ async def main(): return rulesets = await get_rulesets() - if not rulesets and not args.add_domain: - print("* Empty domain list, use --add-domain domain.tld to register one") + if (not rulesets and + not (args.add_domain or args.bulk_append or args.bulk_replace)): + print(EMPTY_DOMAIN_LIST_MSG) sys.exit(1) domains = rulesets.get("url", {}).get("domains", []) if args.list: if not domains: - print("* Empty domain list, use --add-domain domain.tld to register one") + print(EMPTY_DOMAIN_LIST_MSG) sys.exit(0) print("Currently monitored domains:") @@ -201,16 +264,25 @@ async def main(): sys.exit(0) new_domain_list = copy.copy(domains) - if args.add_domain: - new_domain_list.append(args.add_domain) + if args.bulk_replace: + new_domain_list = load_bulk_file_domains(args.bulk_replace) - if args.delete_domain: - if not args.delete_domain in new_domain_list: - print(f"* {args.delete_domain} not in list") - sys.exit(1) - new_domain_list.remove(args.delete_domain) + elif args.bulk_append: + new_domain_list += load_bulk_file_domains(args.bulk_append) + + else: + if args.add_domain: + new_domain_list += args.add_domain + + if args.delete_domain: + for deleted_domain in args.delete_domain: + if not deleted_domain in new_domain_list: + print(f"* {deleted_domain} not in list") + sys.exit(1) + new_domain_list.remove(deleted_domain) new_domain_list = list(set(new_domain_list)) + new_domain_list = [domain.lower() for domain in new_domain_list] new_domain_list.sort() if new_domain_list != domains: diff --git a/examples/netwatch_templates/domain.yara b/examples/netwatch_templates/domain.yara index 67a131e..e6500bf 100644 --- a/examples/netwatch_templates/domain.yara +++ b/examples/netwatch_templates/domain.yara @@ -1,4 +1,4 @@ -rule network_watch_${domain_escaped} : ${domain_escaped} { +rule network_watch_${domain_escaped} : domain_${domain_escaped} { meta: description = "Monitor new subdomains for ${domain}" target_entity = "domain" diff --git a/examples/netwatch_templates/file.yara b/examples/netwatch_templates/file.yara index b32ef09..831acbc 100644 --- a/examples/netwatch_templates/file.yara +++ b/examples/netwatch_templates/file.yara @@ -1,4 +1,4 @@ -rule network_watch_${domain_escaped} : ${domain_escaped} { +rule network_watch_${domain_escaped} : domain_${domain_escaped} { meta: description = "New files downloaded from ${domain}" target_entity = "file" @@ -8,7 +8,7 @@ condition: } -rule network_watch_contact_${domain_escaped} : ${domain_escaped} { +rule network_watch_contact_${domain_escaped} : domain_${domain_escaped} { meta: description = "New files contacting ${domain}" target_entity = "file" diff --git a/examples/netwatch_templates/ip_address.yara b/examples/netwatch_templates/ip_address.yara index d3bf76b..e2d06a2 100644 --- a/examples/netwatch_templates/ip_address.yara +++ b/examples/netwatch_templates/ip_address.yara @@ -1,4 +1,4 @@ -rule network_watch_${domain_escaped} : ${domain_escaped} { +rule network_watch_${domain_escaped} : domain_${domain_escaped} { meta: description = "New IP addresses resolving domain ${domain} or its subdomains" target_entity = "ip_address" diff --git a/examples/netwatch_templates/url.yara b/examples/netwatch_templates/url.yara index 75863d9..3ac2099 100644 --- a/examples/netwatch_templates/url.yara +++ b/examples/netwatch_templates/url.yara @@ -1,4 +1,4 @@ -rule network_watch_${domain_escaped} : ${domain_escaped} { +rule network_watch_${domain_escaped} : domain_${domain_escaped} { meta: description = "Monitor new URLs in ${domain}" target_entity = "url"