175 lines
7.6 KiB
Diff
175 lines
7.6 KiB
Diff
diff -Naur a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
|
|
--- a/Lib/test/test_ssl.py 2020-05-12 02:03:27.244777454 +0300
|
|
+++ b/Lib/test/test_ssl.py 2020-05-12 03:51:13.148500805 +0300
|
|
@@ -28,6 +28,7 @@
|
|
HOST = support.HOST
|
|
IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
|
|
IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
|
|
+IS_OPENSSL_1_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 1)
|
|
|
|
|
|
def data_file(*name):
|
|
@@ -76,6 +77,13 @@
|
|
DHFILE = data_file("dh1024.pem")
|
|
BYTES_DHFILE = DHFILE.encode(sys.getfilesystemencoding())
|
|
|
|
+# Not defined in all versions of OpenSSL
|
|
+OP_NO_COMPRESSION = getattr(ssl, "OP_NO_COMPRESSION", 0)
|
|
+OP_SINGLE_DH_USE = getattr(ssl, "OP_SINGLE_DH_USE", 0)
|
|
+OP_SINGLE_ECDH_USE = getattr(ssl, "OP_SINGLE_ECDH_USE", 0)
|
|
+OP_CIPHER_SERVER_PREFERENCE = getattr(ssl, "OP_CIPHER_SERVER_PREFERENCE", 0)
|
|
+
|
|
+
|
|
|
|
def handle_error(prefix):
|
|
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
|
|
@@ -770,13 +778,14 @@
|
|
with self.assertRaisesRegexp(ssl.SSLError, "No cipher can be selected"):
|
|
ctx.set_ciphers("^$:,;?*'dorothyx")
|
|
|
|
- @skip_if_broken_ubuntu_ssl
|
|
+ @unittest.skipIf(IS_OPENSSL_1_1_1, "bpo-36576: fail on OpenSSL 1.1.1")
|
|
def test_options(self):
|
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
|
|
# OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
|
|
default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
|
|
- if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
|
|
- default |= ssl.OP_NO_COMPRESSION
|
|
+ # SSLContext also enables these by default
|
|
+ default |= (OP_NO_COMPRESSION | OP_CIPHER_SERVER_PREFERENCE |
|
|
+ OP_SINGLE_DH_USE | OP_SINGLE_ECDH_USE)
|
|
self.assertEqual(default, ctx.options)
|
|
ctx.options |= ssl.OP_NO_TLSv1
|
|
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
|
|
@@ -1148,16 +1157,31 @@
|
|
stats["x509"] += 1
|
|
self.assertEqual(ctx.cert_store_stats(), stats)
|
|
|
|
+
|
|
+ def _assert_context_options(self, ctx):
|
|
+ self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
+ if OP_NO_COMPRESSION != 0:
|
|
+ self.assertEqual(ctx.options & OP_NO_COMPRESSION,
|
|
+ OP_NO_COMPRESSION)
|
|
+ if OP_SINGLE_DH_USE != 0:
|
|
+ self.assertEqual(ctx.options & OP_SINGLE_DH_USE,
|
|
+ OP_SINGLE_DH_USE)
|
|
+ if OP_SINGLE_ECDH_USE != 0:
|
|
+ self.assertEqual(ctx.options & OP_SINGLE_ECDH_USE,
|
|
+ OP_SINGLE_ECDH_USE)
|
|
+ if OP_CIPHER_SERVER_PREFERENCE != 0:
|
|
+ self.assertEqual(ctx.options & OP_CIPHER_SERVER_PREFERENCE,
|
|
+ OP_CIPHER_SERVER_PREFERENCE)
|
|
+
|
|
+
|
|
def test_create_default_context(self):
|
|
ctx = ssl.create_default_context()
|
|
+
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
|
self.assertTrue(ctx.check_hostname)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
- self.assertEqual(
|
|
- ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
|
|
- getattr(ssl, "OP_NO_COMPRESSION", 0),
|
|
- )
|
|
+ self._assert_context_options(ctx)
|
|
+
|
|
|
|
with open(SIGNING_CA) as f:
|
|
cadata = f.read().decode("ascii")
|
|
@@ -1165,40 +1189,24 @@
|
|
cadata=cadata)
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
- self.assertEqual(
|
|
- ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
|
|
- getattr(ssl, "OP_NO_COMPRESSION", 0),
|
|
- )
|
|
+ self._assert_context_options(ctx)
|
|
|
|
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
- self.assertEqual(
|
|
- ctx.options & getattr(ssl, "OP_NO_COMPRESSION", 0),
|
|
- getattr(ssl, "OP_NO_COMPRESSION", 0),
|
|
- )
|
|
- self.assertEqual(
|
|
- ctx.options & getattr(ssl, "OP_SINGLE_DH_USE", 0),
|
|
- getattr(ssl, "OP_SINGLE_DH_USE", 0),
|
|
- )
|
|
- self.assertEqual(
|
|
- ctx.options & getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
|
|
- getattr(ssl, "OP_SINGLE_ECDH_USE", 0),
|
|
- )
|
|
+ self._assert_context_options(ctx)
|
|
|
|
def test__create_stdlib_context(self):
|
|
ctx = ssl._create_stdlib_context()
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
|
self.assertFalse(ctx.check_hostname)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
+ self._assert_context_options(ctx)
|
|
|
|
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
+ self._assert_context_options(ctx)
|
|
|
|
ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
|
|
cert_reqs=ssl.CERT_REQUIRED,
|
|
@@ -1206,12 +1214,12 @@
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
|
|
self.assertTrue(ctx.check_hostname)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
+ self._assert_context_options(ctx)
|
|
|
|
ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
|
|
self.assertEqual(ctx.protocol, ssl.PROTOCOL_SSLv23)
|
|
self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
|
|
- self.assertEqual(ctx.options & ssl.OP_NO_SSLv2, ssl.OP_NO_SSLv2)
|
|
+ self._assert_context_options(ctx)
|
|
|
|
def test__https_verify_certificates(self):
|
|
# Unit test to check the contect factory mapping
|
|
@@ -2765,6 +2773,7 @@
|
|
sock.do_handshake()
|
|
self.assertEqual(cm.exception.errno, errno.ENOTCONN)
|
|
|
|
+ @unittest.skipIf(IS_OPENSSL_1_1_1, "bpo-36576: fail on OpenSSL 1.1.1")
|
|
def test_default_ciphers(self):
|
|
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
|
try:
|
|
@@ -2805,7 +2814,8 @@
|
|
ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
|
|
)
|
|
with ThreadedEchoServer(context=context) as server:
|
|
- with context.wrap_socket(socket.socket()) as s:
|
|
+ s = context.wrap_socket(socket.socket())
|
|
+ with closing(s):
|
|
s.connect((HOST, server.port))
|
|
self.assertIn(s.cipher()[0], [
|
|
'TLS13-AES-256-GCM-SHA384',
|
|
@@ -2814,6 +2824,7 @@
|
|
])
|
|
|
|
@unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
|
|
+ @unittest.skipIf(IS_OPENSSL_1_1_1, "bpo-36576: fail on OpenSSL 1.1.1")
|
|
def test_default_ecdh_curve(self):
|
|
# Issue #21015: elliptic curve-based Diffie Hellman key exchange
|
|
# should be enabled by default on SSL contexts.
|
|
@@ -2943,6 +2954,7 @@
|
|
self.assertIs(stats['client_alpn_protocol'], None)
|
|
|
|
@unittest.skipUnless(ssl.HAS_ALPN, "ALPN support needed for this test")
|
|
+ @unittest.skipIf(IS_OPENSSL_1_1_1, "bpo-36576: fail on OpenSSL 1.1.1")
|
|
def test_alpn_protocols(self):
|
|
server_protocols = ['foo', 'bar', 'milkshake']
|
|
protocol_tests = [
|