]> granicus.if.org Git - python/commitdiff
Issue #19422: Explicitly disallow non-SOCK_STREAM sockets in the ssl module, rather...
authorAntoine Pitrou <solipsis@pitrou.net>
Sat, 28 Dec 2013 16:30:51 +0000 (17:30 +0100)
committerAntoine Pitrou <solipsis@pitrou.net>
Sat, 28 Dec 2013 16:30:51 +0000 (17:30 +0100)
1  2 
Doc/library/ssl.rst
Lib/ssl.py
Lib/test/test_ssl.py
Misc/NEWS

Simple merge
diff --cc Lib/ssl.py
index 052a118abb4a7beedd5a795cb2320ee24762cffc,cd8d6b4c9eec0239a46b5ccf49fec9fb75ac8f0e..4408e7b74364803d78bd54ff5e5a75b77ce12757
@@@ -136,20 -108,10 +136,21 @@@ except ImportError
  else:
      _PROTOCOL_NAMES[PROTOCOL_SSLv2] = "SSLv2"
  
 +try:
 +    from _ssl import PROTOCOL_TLSv1_1, PROTOCOL_TLSv1_2
 +except ImportError:
 +    pass
 +else:
 +    _PROTOCOL_NAMES[PROTOCOL_TLSv1_1] = "TLSv1.1"
 +    _PROTOCOL_NAMES[PROTOCOL_TLSv1_2] = "TLSv1.2"
 +
 +if sys.platform == "win32":
 +    from _ssl import enum_certificates, enum_crls
 +
  from socket import getnameinfo as _getnameinfo
 -from socket import error as socket_error
 +from socket import SHUT_RDWR as _SHUT_RDWR
  from socket import socket, AF_INET, SOCK_STREAM, create_connection
+ from socket import SOL_SOCKET, SO_TYPE
  import base64        # for DER-to-PEM translation
  import traceback
  import errno
index 6eb88d729eee1eadebd70f2d47ebef31efe35c60,104a1edc6c95add0ea16edd7bbc7d8a2d2381e38..14d3cc1e96c76d32ef021206f2a58f9368208162
@@@ -524,114 -493,17 +524,125 @@@ class BasicSocketTests(unittest.TestCas
              support.gc_collect()
          self.assertIn(r, str(cm.warning.args[0]))
  
 +    def test_get_default_verify_paths(self):
 +        paths = ssl.get_default_verify_paths()
 +        self.assertEqual(len(paths), 6)
 +        self.assertIsInstance(paths, ssl.DefaultVerifyPaths)
 +
 +        with support.EnvironmentVarGuard() as env:
 +            env["SSL_CERT_DIR"] = CAPATH
 +            env["SSL_CERT_FILE"] = CERTFILE
 +            paths = ssl.get_default_verify_paths()
 +            self.assertEqual(paths.cafile, CERTFILE)
 +            self.assertEqual(paths.capath, CAPATH)
 +
 +    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
 +    def test_enum_certificates(self):
 +        self.assertTrue(ssl.enum_certificates("CA"))
 +        self.assertTrue(ssl.enum_certificates("ROOT"))
 +
 +        self.assertRaises(TypeError, ssl.enum_certificates)
 +        self.assertRaises(WindowsError, ssl.enum_certificates, "")
 +
 +        trust_oids = set()
 +        for storename in ("CA", "ROOT"):
 +            store = ssl.enum_certificates(storename)
 +            self.assertIsInstance(store, list)
 +            for element in store:
 +                self.assertIsInstance(element, tuple)
 +                self.assertEqual(len(element), 3)
 +                cert, enc, trust = element
 +                self.assertIsInstance(cert, bytes)
 +                self.assertIn(enc, {"x509_asn", "pkcs_7_asn"})
 +                self.assertIsInstance(trust, (set, bool))
 +                if isinstance(trust, set):
 +                    trust_oids.update(trust)
 +
 +        serverAuth = "1.3.6.1.5.5.7.3.1"
 +        self.assertIn(serverAuth, trust_oids)
 +
 +    @unittest.skipUnless(sys.platform == "win32", "Windows specific")
 +    def test_enum_crls(self):
 +        self.assertTrue(ssl.enum_crls("CA"))
 +        self.assertRaises(TypeError, ssl.enum_crls)
 +        self.assertRaises(WindowsError, ssl.enum_crls, "")
 +
 +        crls = ssl.enum_crls("CA")
 +        self.assertIsInstance(crls, list)
 +        for element in crls:
 +            self.assertIsInstance(element, tuple)
 +            self.assertEqual(len(element), 2)
 +            self.assertIsInstance(element[0], bytes)
 +            self.assertIn(element[1], {"x509_asn", "pkcs_7_asn"})
 +
 +
 +    def test_asn1object(self):
 +        expected = (129, 'serverAuth', 'TLS Web Server Authentication',
 +                    '1.3.6.1.5.5.7.3.1')
 +
 +        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
 +        self.assertEqual(val, expected)
 +        self.assertEqual(val.nid, 129)
 +        self.assertEqual(val.shortname, 'serverAuth')
 +        self.assertEqual(val.longname, 'TLS Web Server Authentication')
 +        self.assertEqual(val.oid, '1.3.6.1.5.5.7.3.1')
 +        self.assertIsInstance(val, ssl._ASN1Object)
 +        self.assertRaises(ValueError, ssl._ASN1Object, 'serverAuth')
 +
 +        val = ssl._ASN1Object.fromnid(129)
 +        self.assertEqual(val, expected)
 +        self.assertIsInstance(val, ssl._ASN1Object)
 +        self.assertRaises(ValueError, ssl._ASN1Object.fromnid, -1)
 +        with self.assertRaisesRegex(ValueError, "unknown NID 100000"):
 +            ssl._ASN1Object.fromnid(100000)
 +        for i in range(1000):
 +            try:
 +                obj = ssl._ASN1Object.fromnid(i)
 +            except ValueError:
 +                pass
 +            else:
 +                self.assertIsInstance(obj.nid, int)
 +                self.assertIsInstance(obj.shortname, str)
 +                self.assertIsInstance(obj.longname, str)
 +                self.assertIsInstance(obj.oid, (str, type(None)))
 +
 +        val = ssl._ASN1Object.fromname('TLS Web Server Authentication')
 +        self.assertEqual(val, expected)
 +        self.assertIsInstance(val, ssl._ASN1Object)
 +        self.assertEqual(ssl._ASN1Object.fromname('serverAuth'), expected)
 +        self.assertEqual(ssl._ASN1Object.fromname('1.3.6.1.5.5.7.3.1'),
 +                         expected)
 +        with self.assertRaisesRegex(ValueError, "unknown object 'serverauth'"):
 +            ssl._ASN1Object.fromname('serverauth')
 +
 +    def test_purpose_enum(self):
 +        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.1')
 +        self.assertIsInstance(ssl.Purpose.SERVER_AUTH, ssl._ASN1Object)
 +        self.assertEqual(ssl.Purpose.SERVER_AUTH, val)
 +        self.assertEqual(ssl.Purpose.SERVER_AUTH.nid, 129)
 +        self.assertEqual(ssl.Purpose.SERVER_AUTH.shortname, 'serverAuth')
 +        self.assertEqual(ssl.Purpose.SERVER_AUTH.oid,
 +                              '1.3.6.1.5.5.7.3.1')
 +
 +        val = ssl._ASN1Object('1.3.6.1.5.5.7.3.2')
 +        self.assertIsInstance(ssl.Purpose.CLIENT_AUTH, ssl._ASN1Object)
 +        self.assertEqual(ssl.Purpose.CLIENT_AUTH, val)
 +        self.assertEqual(ssl.Purpose.CLIENT_AUTH.nid, 130)
 +        self.assertEqual(ssl.Purpose.CLIENT_AUTH.shortname, 'clientAuth')
 +        self.assertEqual(ssl.Purpose.CLIENT_AUTH.oid,
 +                              '1.3.6.1.5.5.7.3.2')
 +
+     def test_unsupported_dtls(self):
+         s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+         self.addCleanup(s.close)
+         with self.assertRaises(NotImplementedError) as cx:
+             ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
+         self.assertEqual(str(cx.exception), "only stream sockets are supported")
+         ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+         with self.assertRaises(NotImplementedError) as cx:
+             ctx.wrap_socket(s)
+         self.assertEqual(str(cx.exception), "only stream sockets are supported")
  
  class ContextTests(unittest.TestCase):
  
diff --cc Misc/NEWS
index c6f27f0622f12000c825e310689a1b5f2f4376d9,b7924cb4bb1c9a805c7a24665ebde1c9fb872d76..57da667ded4da4920620c7e302d0c88805770147
+++ b/Misc/NEWS
@@@ -44,8 -29,13 +44,11 @@@ Core and Builtin
  Library
  -------
  
 -- Issue #18116: getpass was always getting an error when testing /dev/tty,
 -  and thus was always falling back to stdin, and would then raise an exception
 -  if stdin could not be used (such as /dev/null).  It also leaked an open file.
 -  All of these issues are now fixed.
+ - Issue #19422: Explicitly disallow non-SOCK_STREAM sockets in the ssl
+   module, rather than silently let them emit clear text data.
 +- Issue #20046: Locale alias table no longer contains entities which can be
 +  calculated.  Generalized support of the euro modifier.
  
  - Issue #20027: Fixed locale aliases for devanagari locales.