Skip to content

Commit

Permalink
Linting
Browse files Browse the repository at this point in the history
  • Loading branch information
mgmacias95 committed Dec 2, 2024
1 parent 75f2e90 commit 058a14f
Show file tree
Hide file tree
Showing 13 changed files with 394 additions and 405 deletions.
36 changes: 21 additions & 15 deletions examples/hunting_notifications_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,27 @@ async def get_network_infrastructure(self):
contacted_domains = relationships["contacted_domains"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
await self.queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
})
await self.queue.put({
"contacted_addresses": contacted_ips,
"type": "ips",
"file": file_hash,
})
await self.queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
})
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": file_hash,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
}
)
self.networking_infrastructure[file_hash]["domains"] = contacted_domains
self.networking_infrastructure[file_hash]["ips"] = contacted_ips
self.networking_infrastructure[file_hash]["urls"] = contacted_urls
Expand Down
36 changes: 21 additions & 15 deletions examples/intelligence_search_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,21 +94,27 @@ async def get_network(self):
contacted_urls = relationships["contacted_urls"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]

await self.queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
})
await self.queue.put({
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
})
await self.queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
})
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
}
)

self.networking_infrastructure[checksum]["domains"] = contacted_domains
self.networking_infrastructure[checksum]["ips"] = contacted_ips
Expand Down
9 changes: 5 additions & 4 deletions examples/livehunt_network_watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
RULESET_LINK = "https://www.virustotal.com/yara-editor/livehunt/"

EMPTY_DOMAIN_LIST_MSG = (
"* Empty domain list, use --add-domain domain.tld or bulk operations to"
" register them"
"* Empty domain list, use --add-domain domain.tld or bulk operations to"
" register them"
)


Expand Down Expand Up @@ -247,8 +247,9 @@ async def main():
return

rulesets = await get_rulesets()
if (not rulesets and
not (args.add_domain or args.bulk_append or args.bulk_replace)):
if not rulesets and not (
args.add_domain or args.bulk_append or args.bulk_replace
):
print(EMPTY_DOMAIN_LIST_MSG)
sys.exit(1)

Expand Down
127 changes: 59 additions & 68 deletions examples/private_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,77 +13,68 @@

console = Console()


async def scan_file_private(
api_key: str,
file_path: Path,
wait: bool = False
api_key: str, file_path: Path, wait: bool = False
) -> None:
"""
Scan a file privately on VirusTotal.
Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task(
"Scanning file...",
total=None if wait else 1
)

analysis = await client.scan_file_private_async(
str(file_path),
wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, 'stats'):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")
"""
Scan a file privately on VirusTotal.
Args:
api_key: VirusTotal API key
file_path: Path to file to scan
wait: Wait for scan completion
"""
async with vt.Client(api_key) as client:
try:
with Progress() as progress:
task = progress.add_task("Scanning file...", total=None if wait else 1)

analysis = await client.scan_file_private_async(
str(file_path), wait_for_completion=wait
)

progress.update(task, advance=1)

console.print("\n[green]Scan submitted successfully[/green]")
console.print(f"Analysis ID: {analysis.id}")

if wait:
console.print(f"\nScan Status: {analysis.status}")
if hasattr(analysis, "stats"):
console.print("Detection Stats:")
for k, v in analysis.stats.items():
console.print(f" {k}: {v}")

except vt.error.APIError as e:
console.print(f"[red]API Error: {e}[/red]")
except Exception as e:
console.print(f"[red]Error: {e}[/red]")


def main():
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait",
action="store_true",
help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(
args.apikey,
file_path,
args.wait
))
parser = argparse.ArgumentParser(
description="Scan file privately using VirusTotal API"
)
parser.add_argument("--apikey", help="VirusTotal API key")
parser.add_argument("--file_path", help="Path to file to scan")
parser.add_argument(
"--wait", action="store_true", help="Wait for scan completion"
)

args = parser.parse_args()
file_path = Path(args.file_path)

if not file_path.exists():
console.print(f"[red]Error: File {file_path} not found[/red]")
sys.exit(1)

if not file_path.is_file():
console.print(f"[red]Error: {file_path} is not a file[/red]")
sys.exit(1)

asyncio.run(scan_file_private(args.apikey, file_path, args.wait))


if __name__ == "__main__":
main()
main()
24 changes: 14 additions & 10 deletions examples/retrohunt_to_network_infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,19 +87,23 @@ async def get_network_infrastructure(self, file_obj):
contacted_domains = relationships["contacted_domains"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
await self.networking_queue.put({
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
})
await self.networking_queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": file_hash,
}
)
await self.networking_queue.put(
{"contacted_addresses": contacted_ips, "type": "ips", "file": file_hash}
)
await self.networking_queue.put({
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
})
await self.networking_queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": file_hash,
}
)
self.networking_infrastructure[file_hash]["domains"] = contacted_domains
self.networking_infrastructure[file_hash]["ips"] = contacted_ips
self.networking_infrastructure[file_hash]["urls"] = contacted_urls
Expand Down
Loading

0 comments on commit 058a14f

Please sign in to comment.