-
Notifications
You must be signed in to change notification settings - Fork 0
/
cve_rce.py
163 lines (136 loc) · 5.74 KB
/
cve_rce.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# -*- coding:utf-8 -*-
# 作者: 沉墨
# 说明: 项目改写于yhy0师傅的github监测脚本,只支持钉钉
# 原项目地址: https://github.com/yhy0/github-cve-monitor
# 改写说明:
# - 红队工具监测
# + rce信息监测
# + 扩大cve监测面
# 使用: 填写load_config配置文件即可
import requests, re, time
import datetime
from collections import OrderedDict
import sqlite3
import dingtalkchatbot.chatbot as cb
def load_config():
github_token = r""
dingding_webhook = r""
dingding_secretKey = "h"
app_name = "dingding"
black_user = "zeroChen00" # 不监测的用户,默认自己
return app_name, github_token, dingding_webhook, dingding_secretKey, black_user
github_headers = {
'Authorization': "token {}".format(load_config()[1])
}
#初始化创建数据库
def create_database():
conn = sqlite3.connect('data.db')
cur = conn.cursor()
try:
cur.execute('''CREATE TABLE IF NOT EXISTS cr_monitor
(cr_name varchar(255),
pushed_at varchar(255),
cr_url varchar(255));''')
print("成功创建监控表")
except Exception as e:
print("创建监控表失败!报错:{}".format(e))
conn.close()
# 获取当天更新的CVE和RCE信息
def getNews():
today_cr_info_tmp = []
try:
api = "https://api.github.com/search/repositories?q=CVE|RCE|POC|EXP&sort=updated"
json_str = requests.get(api, headers=github_headers, timeout=10).json()
n = len(json_str['items'])
today_date = datetime.date.today()
for i in range(0, n):
cr_url = json_str['items'][i]['html_url']
if cr_url.split("/")[-2] not in load_config()[4]:
try:
cr_name_tmp = json_str['items'][i]['name'].upper()
cr_name = re.findall('CVE\-\d+\-\d+|.*rce.*|.*rce|.*poc.*|.*poc|.*exp.*|.*exp', cr_name_tmp, re.I)[0].upper()
pushed_at_tmp = json_str['items'][i]['pushed_at']
pushed_at = re.findall('\d{4}-\d{2}-\d{2}', pushed_at_tmp)[0]
if pushed_at == str(today_date):
today_cr_info_tmp.append({"cr_name": cr_name, "cr_url": cr_url, "pushed_at": pushed_at})
else:
print("[-] 更新日期非今天".format(cr_name, pushed_at))
except Exception as e:
pass
else:
pass
today_cr_info = OrderedDict()
for item in today_cr_info_tmp:
today_cr_info.setdefault(item['cr_name'], {**item, })
today_cr_info = list(today_cr_info.values())
return today_cr_info
except Exception as e:
print(e, "github链接不通")
return '', '', ''
#获取到的CVE信息插入到数据库
def cr_insert_into_sqlite3(data):
conn = sqlite3.connect('data.db')
print("cr_insert_into_sqlite3 函数 打开数据库成功!")
cur = conn.cursor()
for i in range(len(data)):
try:
cr_name = re.findall('CVE\-\d+\-\d+|.*rce.*|.*rce|.*poc.*|.*poc|.*exp.*|.*exp', data[i]['cr_name'], re.I)[0].upper()
cur.execute("INSERT INTO cr_monitor (cr_name,pushed_at,cr_url) VALUES ('{}', '{}', '{}')".format(cr_name, data[i]['pushed_at'], data[i]['cr_url']))
print("cr_insert_into_sqlite3 函数: {}插入数据成功!".format(cr_name))
except Exception as e:
pass
conn.commit()
conn.close()
# 查询数据库里是否存在该CVE或RCE
def query_cr_info_database(cr_name):
conn = sqlite3.connect('data.db')
cur = conn.cursor()
sql_grammar = "SELECT cr_name FROM cr_monitor WHERE cr_name = '{}';".format(cr_name)
cursor = cur.execute(sql_grammar)
return len(list(cursor))
#获取不存在数据库里的CVE或RCE信息
def get_today_cr_info(today_cr_info_data):
today_all_cr_info = []
for i in range(len(today_cr_info_data)):
try:
today_cr_name = re.findall('CVE\-\d+\-\d+|.*rce.*|.*rce|.*poc.*|.*poc|.*exp.*|.*exp', today_cr_info_data[i]['cr_name'], re.I)[0].upper()
Verify = query_cr_info_database(today_cr_name.upper())
if Verify == 0:
print("[+] 数据库里不存在{}".format(today_cr_name.upper()))
today_all_cr_info.append(today_cr_info_data[i])
else:
print("[-] 数据库里存在{}".format(today_cr_name.upper()))
except Exception as e:
pass
return today_all_cr_info
# 钉钉
def dingding(text, msg, webhook, secretKey):
ding = cb.DingtalkChatbot(webhook, secret=secretKey)
ding.send_text(msg='{}\r\n{}'.format(text, msg), is_at_all=False)
#发送CVE/RCE信息到钉钉
def sendNews(data):
try:
text = r'有新的CVE/RCE资讯送达!'
for i in range(len(data)):
try:
cr_name = re.findall('CVE\-\d+\-\d+|.*rce.*|.*rce|.*poc.*|.*poc|.*exp.*|.*exp', data[i]['cr_name'], re.I)[0].upper()
body = cr_name + "\r\n" + "Github地址:" + str(data[i]['cr_url'])
dingding(text, body, load_config()[2], load_config()[3])
print("钉钉 发送 CVE/RCE 成功")
except IndexError:
pass
except Exception as e:
print("sendNews 函数 error:{}".format(e))
if __name__ == '__main__':
print("github cve 和 rce 资讯监控中 ...")
#初始化部分
create_database()
while True:
try:
cr_data = getNews()
today_cr_data = get_today_cr_info(cr_data)
sendNews(today_cr_data)
cr_insert_into_sqlite3(today_cr_data)
time.sleep(3 * 60)
except Exception as e:
print("main函数 try循环 遇到错误-->{}".format(e))