-
Notifications
You must be signed in to change notification settings - Fork 47
/
phishpedia.py
195 lines (159 loc) · 7.87 KB
/
phishpedia.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import time
from datetime import datetime
import argparse
import os
import torch
import cv2
from configs import load_config
from logo_recog import pred_rcnn, vis
from logo_matching import check_domain_brand_inconsistency
# from text_recog import check_email_credential_taking
# import pickle
from tqdm import tqdm
import re
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'
def result_file_write(f, folder, url, phish_category, pred_target, matched_domain, siamese_conf, logo_recog_time,
logo_match_time):
f.write(folder + "\t")
f.write(url + "\t")
f.write(str(phish_category) + "\t")
f.write(str(pred_target) + "\t") # write top1 prediction only
f.write(str(matched_domain) + "\t")
f.write(str(siamese_conf) + "\t")
f.write(str(round(logo_recog_time, 4)) + "\t")
f.write(str(round(logo_match_time, 4)) + "\n")
class PhishpediaWrapper:
_caller_prefix = "PhishpediaWrapper"
_DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
def __init__(self):
self._load_config()
self._to_device()
def _load_config(self):
self.ELE_MODEL, self.SIAMESE_THRE, self.SIAMESE_MODEL, \
self.LOGO_FEATS, self.LOGO_FILES, \
self.DOMAIN_MAP_PATH = load_config()
print(f'Length of reference list = {len(self.LOGO_FEATS)}')
def _to_device(self):
self.SIAMESE_MODEL.to(self._DEVICE)
# def simple_input_box_regex(self, html_path):
# with open(html_path, 'r', encoding='ISO-8859-1') as f:
# page = f.read()
# tree = html.fromstring(page)
# if tree is None: # parsing into tree failed
# return False
# ## filter out search boxes
# inputs = tree.xpath(
# './/input[not(@type="hidden") and not(contains(@name, "search"))'
# ' and not(contains(@placeholder, "search"))]'
# )
# search_pattern = re.compile(r'\b(search|query|find|keyword)\b', re.IGNORECASE)
# sensitive_inputs = [
# inp for inp in inputs
# if not search_pattern.search(inp.get('name', '') + inp.get('placeholder', ''))
# ]
# ## a login form will have at least 1 input box
# if len(sensitive_inputs) > 0:
# return True
# return False
'''Phishpedia'''
# @profile
def test_orig_phishpedia(self, url, screenshot_path, html_path):
# 0 for benign, 1 for phish, default is benign
phish_category = 0
pred_target = None
matched_domain = None
siamese_conf = None
plotvis = None
logo_match_time = 0
print("Entering phishpedia")
####################### Step1: Logo detector ##############################################
start_time = time.time()
pred_boxes = pred_rcnn(im=screenshot_path, predictor=self.ELE_MODEL)
logo_recog_time = time.time() - start_time
if pred_boxes is not None:
pred_boxes = pred_boxes.detach().cpu().numpy()
plotvis = vis(screenshot_path, pred_boxes)
# If no element is reported
if pred_boxes is None or len(pred_boxes) == 0:
print('No logo is detected')
return phish_category, pred_target, matched_domain, plotvis, siamese_conf, pred_boxes, logo_recog_time, logo_match_time
######################## Step2: Siamese (Logo matcher) ########################################
start_time = time.time()
pred_target, matched_domain, matched_coord, siamese_conf = check_domain_brand_inconsistency(
logo_boxes=pred_boxes,
domain_map_path=self.DOMAIN_MAP_PATH,
model=self.SIAMESE_MODEL,
logo_feat_list=self.LOGO_FEATS,
file_name_list=self.LOGO_FILES,
url=url,
shot_path=screenshot_path,
similarity_threshold=self.SIAMESE_THRE,
topk=1)
logo_match_time = time.time() - start_time
if pred_target is None:
print('Did not match to any brand, report as benign')
return phish_category, pred_target, matched_domain, plotvis, siamese_conf, pred_boxes, logo_recog_time, logo_match_time
######################## Step3: Simple input box check ###############
# has_input_box = self.simple_input_box_regex(html_path=html_path)
# if not has_input_box:
# print('No input box')
# return phish_category, pred_target, matched_domain, plotvis, siamese_conf, pred_boxes, logo_recog_time, logo_match_time
# else:
print('Match to Target: {} with confidence {:.4f}'.format(pred_target, siamese_conf))
phish_category = 1
# Visualize, add annotations
cv2.putText(plotvis, "Target: {} with confidence {:.4f}".format(pred_target, siamese_conf),
(int(matched_coord[0] + 20), int(matched_coord[1] + 20)),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 0), 2)
return phish_category, pred_target, matched_domain, plotvis, siamese_conf, pred_boxes, logo_recog_time, logo_match_time
if __name__ == '__main__':
'''update domain map'''
# with open('./lib/phishpedia/models/domain_map.pkl', "rb") as handle:
# domain_map = pickle.load(handle)
#
# domain_map['weibo'] = ['sina', 'weibo']
#
# with open('./lib/phishpedia/models/domain_map.pkl', "wb") as handle:
# pickle.dump(domain_map, handle)
# exit()
'''run'''
today = datetime.now().strftime('%Y%m%d')
parser = argparse.ArgumentParser()
parser.add_argument("--folder", required=True, type=str)
parser.add_argument("--output_txt", default=f'{today}_results.txt', help="Output txt path")
args = parser.parse_args()
request_dir = args.folder
phishpedia_cls = PhishpediaWrapper()
result_txt = args.output_txt
os.makedirs(request_dir, exist_ok=True)
for folder in tqdm(os.listdir(request_dir)):
html_path = os.path.join(request_dir, folder, "html.txt")
screenshot_path = os.path.join(request_dir, folder, "shot.png")
info_path = os.path.join(request_dir, folder, 'info.txt')
if not os.path.exists(screenshot_path):
continue
if not os.path.exists(html_path):
html_path = os.path.join(request_dir, folder, "index.html")
with open(info_path, 'r') as file:
url = file.read()
if os.path.exists(result_txt):
with open(result_txt, 'r', encoding='ISO-8859-1') as file:
if url in file.read():
continue
_forbidden_suffixes = r"\.(mp3|wav|wma|ogg|mkv|zip|tar|xz|rar|z|deb|bin|iso|csv|tsv|dat|txt|css|log|xml|sql|mdb|apk|bat|exe|jar|wsf|fnt|fon|otf|ttf|ai|bmp|gif|ico|jp(e)?g|png|ps|psd|svg|tif|tiff|cer|rss|key|odp|pps|ppt|pptx|c|class|cpp|cs|h|java|sh|swift|vb|odf|xlr|xls|xlsx|bak|cab|cfg|cpl|cur|dll|dmp|drv|icns|ini|lnk|msi|sys|tmp|3g2|3gp|avi|flv|h264|m4v|mov|mp4|mp(e)?g|rm|swf|vob|wmv|doc(x)?|odt|rtf|tex|wks|wps|wpd)$"
if re.search(_forbidden_suffixes, url, re.IGNORECASE):
continue
phish_category, pred_target, matched_domain, \
plotvis, siamese_conf, pred_boxes, \
logo_recog_time, logo_match_time = phishpedia_cls.test_orig_phishpedia(url, screenshot_path, html_path)
try:
with open(result_txt, "a+", encoding='ISO-8859-1') as f:
result_file_write(f, folder, url, phish_category, pred_target, matched_domain, siamese_conf,
logo_recog_time, logo_match_time)
except UnicodeError:
with open(result_txt, "a+", encoding='utf-8') as f:
result_file_write(f, folder, url, phish_category, pred_target, matched_domain, siamese_conf,
logo_recog_time, logo_match_time)
if phish_category:
os.makedirs(os.path.join(request_dir, folder), exist_ok=True)
cv2.imwrite(os.path.join(request_dir, folder, "predict.png"), plotvis)