Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsasha committed Nov 25, 2024
1 parent 8203eba commit 2f2e8a0
Showing 1 changed file with 59 additions and 31 deletions.
90 changes: 59 additions & 31 deletions checks/tasks/tls/scans.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ def dane(
chain: List[Certificate],
task,
dane_cb_data,
score_none,
score_none_bogus,
score_failed,
score_validated,
score_none: scoring.Score,
score_none_bogus: scoring.Score,
score_failed: scoring.Score,
score_validated: scoring.Score,
):
"""
Check if there are TLSA records, if they are valid and if a DANE rollover
Expand Down Expand Up @@ -257,20 +257,18 @@ def dane(
)


def is_root_cert(cert):
def is_root_cert(cert: Certificate) -> bool:
"""
Check if the certificate is a root certificate.
"""
digest = cert.fingerprint(hashes.SHA1())
digest = hexlify(digest).decode("ascii")
return digest.upper() in root_fingerprints


def get_common_name(cert):
def get_common_name(cert: Certificate) -> str:
"""
Get the commonName of the certificate.
"""
value = "-"
try:
Expand All @@ -282,10 +280,9 @@ def get_common_name(cert):
return value


def cert_checks(hostname, mode, task, af_ip_pair=None, dane_cb_data=None, *args, **kwargs):
def cert_checks(hostname: str, mode: ChecksMode, task, af_ip_pair=None, dane_cb_data=None, *args, **kwargs):
"""
Perform certificate checks.
Perform certificate checks, such as trust, name match. Also scans the server.
"""
log.info(f"starting cert sslyze scan for {hostname} {af_ip_pair} {mode}")
# cryptography's PolicyBuilder.build_server_verifier is called through sslyze cert chain analyser
Expand Down Expand Up @@ -442,6 +439,9 @@ def _certificate_matches_hostname(certificate: Certificate, server_hostname: str


def check_pubkey(certificates: List[Certificate], mode: ChecksMode):
"""
Check that all provided certificates meet NCSC requirements.
"""
# NCSC guidelines B3-3, B5-1
bad_pubkey = []
phase_out_pubkey = []
Expand Down Expand Up @@ -511,7 +511,6 @@ def check_mail_tls_multiple(server_tuples, task) -> Dict[str, Dict[str, Any]]:
if not scans:
return results
connection_limit = connection_limit_for_scans(scans)
log.info(f"sslyze connection limit set to {connection_limit} for this mail scan")
for all_suites, result, error in run_sslyze(scans, connection_limit=connection_limit):
if error:
log.info(f"sslyze scan for mail failed: {error}")
Expand All @@ -525,6 +524,10 @@ def check_mail_tls_multiple(server_tuples, task) -> Dict[str, Dict[str, Any]]:


def connection_limit_for_scans(scans: List[ServerScanRequest]):
"""
Determine the appropriate connection limit for a mail server.
Sometimes we set this higher, due to anti-spam slowness.
"""
hostnames = [scan.server_location.hostname for scan in scans]
for hostname_substr, limit in MAIL_ALTERNATE_CONNLIMIT_HOST_SUBSTRS.items():
if any([hostname_substr in hostname for hostname in hostnames]):
Expand All @@ -533,14 +536,18 @@ def connection_limit_for_scans(scans: List[ServerScanRequest]):
return 1


def _generate_mail_server_scan_request(server: str) -> Optional[ServerScanRequest]:
def _generate_mail_server_scan_request(mx_hostname: str) -> Optional[ServerScanRequest]:
"""
Generate the scan request (sslyze scan commands) for a mail server.
Includes resolving and determining supported TLS versions.
"""
try:
server_location = ServerNetworkLocation(hostname=server, port=25)
server_location = ServerNetworkLocation(hostname=mx_hostname, port=25)
except ServerHostnameCouldNotBeResolved:
log.info(f"unable to resolve MX host {server}, marking server unreachable")
log.info(f"unable to resolve MX host {mx_hostname}, marking server unreachable")
return None
network_configuration = ServerNetworkConfiguration(
tls_server_name_indication=server,
tls_server_name_indication=mx_hostname,
tls_opportunistic_encryption=ProtocolWithOpportunisticTlsEnum.SMTP,
smtp_ehlo_hostname=settings.SMTP_EHLO_DOMAIN,
network_timeout=SSLYZE_NETWORK_TIMEOUT,
Expand All @@ -553,7 +560,7 @@ def _generate_mail_server_scan_request(server: str) -> Optional[ServerScanReques
)
)
if not supported_tls_versions:
log.info(f"no TLS version support found for MX host {server}, marking server unreachable")
log.info(f"no TLS version support found for MX host {mx_hostname}, marking server unreachable")
return None
scan_commands = SSLYZE_SCAN_COMMANDS | {
SSLYZE_SCAN_COMMANDS_FOR_TLS[tls_version] for tls_version in supported_tls_versions
Expand Down Expand Up @@ -793,6 +800,11 @@ def check_web_tls(url, af_ip_pair=None, *args, **kwargs):
def run_sslyze(
scans: List[ServerScanRequest], connection_limit: int
) -> Generator[Tuple[List[CipherSuitesScanAttempt], ServerScanResult, Optional[TLSException]], None, None]:
"""
Run a set of sslyze scans in parallel.
Starts each scan request at the same time, and yields them as soon as they are finished.
This threading is handled inside sslyze.
"""
log.debug(f"starting sslyze scan for {[scan.server_location for scan in scans]}")
scanner = Scanner(per_server_concurrent_connections_limit=connection_limit, concurrent_server_scans_limit=10)
scanner.queue_scans(scans)
Expand All @@ -813,7 +825,7 @@ def run_sslyze(
)
if suite and suite.result
]
# Error is caught and returned here, as we may be scanning many targets,
# Error is caught and returned here, as we may be running many scans.,
# and don't want to abort all scans for one failure.
try:
raise_sslyze_errors(result)
Expand All @@ -824,6 +836,10 @@ def run_sslyze(


def raise_sslyze_errors(result: ServerScanResult) -> None:
"""
Determine whether the scan result contains any exceptions,
and if it does, raise a TLSException for them.
"""
last_error_trace = None
for scan_result in vars(result.scan_result).values():
error_trace = getattr(scan_result, "error_trace")
Expand All @@ -837,6 +853,11 @@ def raise_sslyze_errors(result: ServerScanResult) -> None:
def test_key_exchange_hash(
server_connectivity_info: ServerConnectivityInfo,
) -> KeyExchangeHashFunctionEvaluation:
"""
Test the SHA2 key exchange per NCSC table 5.
Note that this is not the certificate hash, or TLS cipher hash.
There are few or no hosts that do not meet this requirement.
"""
ssl_connection = server_connectivity_info.get_preconfigured_tls_connection(should_use_legacy_openssl=False)
ssl_connection.ssl_client.set_sigalgs(SIGNATURE_ALGORITHMS_SHA2)

Expand Down Expand Up @@ -864,6 +885,17 @@ def test_cipher_order(
tls_versions: List[TlsVersionEnum],
cipher_evaluation: TLSCipherEvaluation,
) -> TLSCipherOrderEvaluation:
"""
Determine whether there was a cipher order violation.
We require supported ciphers to be ordered good>sufficient>phase out>bad.
Within each level, the order is not significant to us.
This test forms cipher strings of e.g. all supported sufficient followed
by each good, and then expects the server to choose the good cipher.
That assures us that the server prefers each good cipher over any lower cipher.
This is tested at all levels that the server supported.
NCSC B2-5.
"""
cipher_order_violation = []
if (
not cipher_evaluation.ciphers_bad
Expand All @@ -881,7 +913,7 @@ def test_cipher_order(
order_tuples = [
(
cipher_evaluation.ciphers_bad + cipher_evaluation.ciphers_phase_out + cipher_evaluation.ciphers_sufficient,
# Make sure we do not mix in TLS 1.3 ciphers
# Make sure we do not mix in TLS 1.3 ciphers, all TLS 1.3 ciphers are good.
cipher_evaluation.ciphers_good_no_tls13,
),
(cipher_evaluation.ciphers_bad + cipher_evaluation.ciphers_phase_out, cipher_evaluation.ciphers_sufficient),
Expand All @@ -892,20 +924,18 @@ def test_cipher_order(
break
# Sort CHACHA as later in the list, in case SSL_OP_PRIORITIZE_CHACHA is enabled #461
expected_less_preferred.sort(key=lambda c: "CHACHA" in c.name)
print(f"checking server pref against: {[s.name for s in expected_more_preferred_list]}")
for expected_more_preferred in expected_more_preferred_list:
print(
f"evaluating less {[s.name for s in expected_less_preferred]} vs "
f"more {expected_more_preferred.name} TLS {tls_version}"
)
if not expected_less_preferred or not expected_more_preferred:
continue
preferred_suite = find_most_preferred_cipher_suite(
server_connectivity_info, tls_version, expected_less_preferred + [expected_more_preferred]
)
if preferred_suite != expected_more_preferred:
cipher_order_violation = [preferred_suite.name, expected_more_preferred.name]
print(f"break out, got bad order: {cipher_order_violation}")
log.info(
f"found cipher order violation for {server_connectivity_info.server_location.hostname}:"
f" preferred {preferred_suite.name} instead of {expected_more_preferred.name}"
)
break

return TLSCipherOrderEvaluation(
Expand Down Expand Up @@ -935,11 +965,6 @@ def find_most_preferred_cipher_suite(
ssl_connection = server_connectivity_info.get_preconfigured_tls_connection(
override_tls_version=tls_version, should_use_legacy_openssl=False
)
print(
f"openssl s_client -tls1_2 -cipher {':'.join(suite_names)} -connect "
f"{server_connectivity_info.server_location.hostname}:443 # {tls_version.name}"
)
print(f"{ssl_connection.ssl_client.get_cipher_list()=}")
_set_cipher_suite_string(tls_version, ":".join(suite_names), ssl_connection.ssl_client)

try:
Expand All @@ -956,7 +981,6 @@ def find_most_preferred_cipher_suite(
selected_cipher = CipherSuitesRepository.get_cipher_suite_with_openssl_name(
tls_version, ssl_connection.ssl_client.get_current_cipher_name()
)
print(f"from CS {[s.name for s in cipher_suites]} selected {selected_cipher}")
return selected_cipher


Expand All @@ -969,6 +993,10 @@ def _check_cipher_suite_available(tls_version: TlsVersionEnum, cipher_suite: Cip


def check_supported_tls_versions(server_connectivity_info: ServerConnectivityInfo) -> List[TlsVersionEnum]:
"""
Determine which TLS versions are supported.
Providing this info to sslyze improves on the bluntness of the scans.
"""
supported_tls_versions = []
for tls_version in TlsVersionEnum:
requires_legacy_openssl = tls_version != TlsVersionEnum.TLS_1_3
Expand Down

0 comments on commit 2f2e8a0

Please sign in to comment.