From: Bill Janssen Date: Mon, 8 Sep 2008 16:45:19 +0000 (+0000) Subject: fixes from issue 3162 for SSL module X-Git-Tag: v3.0rc1~62 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=58afe4c194b2934beda995840d3f5b3a0aafc3fa;p=python fixes from issue 3162 for SSL module --- diff --git a/Lib/ssl.py b/Lib/ssl.py index aa301295ae..cd54437a0c 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -284,6 +284,14 @@ class SSLSocket(socket): else: return socket.recvfrom(self, addr, buflen, flags) + def recvfrom_into(self, buffer, nbytes=None, flags=0): + self._checkClosed() + if self._sslobj: + raise ValueError("recvfrom_into not allowed on instances of %s" % + self.__class__) + else: + return socket.recvfrom_into(self, buffer, nbytes, flags) + def pending(self): self._checkClosed() if self._sslobj: diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 1ad6f0297f..9d59a26cc4 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -781,6 +781,9 @@ else: def testMalformedCert(self): badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, "badcert.pem")) + def testWrongCert(self): + badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, + "wrongcert.pem")) def testMalformedKey(self): badCertTest(os.path.join(os.path.dirname(__file__) or os.curdir, "badkey.pem")) @@ -1033,6 +1036,129 @@ else: server.stop() server.join() + def testAllRecvAndSendMethods(self): + + if support.verbose: + sys.stdout.write("\n") + + server = ThreadedEchoServer(CERTFILE, + certreqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1, + cacerts=CERTFILE, + chatty=True, + connectionchatty=False) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + try: + s = ssl.wrap_socket(socket.socket(), + server_side=False, + certfile=CERTFILE, + ca_certs=CERTFILE, + cert_reqs=ssl.CERT_NONE, + ssl_version=ssl.PROTOCOL_TLSv1) + s.connect((HOST, server.port)) + except ssl.SSLError as x: + raise support.TestFailed("Unexpected SSL error: " + str(x)) + except Exception as x: + raise support.TestFailed("Unexpected exception: " + str(x)) + else: + # helper methods for standardising recv* method signatures + def _recv_into(): + b = bytearray(b"\0"*100) + count = s.recv_into(b) + return b[:count] + + def _recvfrom_into(): + b = bytearray(b"\0"*100) + count, addr = s.recvfrom_into(b) + return b[:count] + + # (name, method, whether to expect success, *args) + send_methods = [ + ('send', s.send, True, []), + ('sendto', s.sendto, False, ["some.address"]), + ('sendall', s.sendall, True, []), + ] + recv_methods = [ + ('recv', s.recv, True, []), + ('recvfrom', s.recvfrom, False, ["some.address"]), + ('recv_into', _recv_into, True, []), + ('recvfrom_into', _recvfrom_into, False, []), + ] + data_prefix = "PREFIX_" + + for meth_name, send_meth, expect_success, args in send_methods: + indata = data_prefix + meth_name + try: + send_meth(indata.encode('ASCII', 'strict'), *args) + outdata = s.read() + outdata = str(outdata, 'ASCII', 'strict') + if outdata != indata.lower(): + raise support.TestFailed( + "While sending with <<{name:s}>> bad data " + "<<{outdata:s}>> ({nout:d}) received; " + "expected <<{indata:s}>> ({nin:d})\n".format( + name=meth_name, outdata=repr(outdata[:20]), + nout=len(outdata), + indata=repr(indata[:20]), nin=len(indata) + ) + ) + except ValueError as e: + if expect_success: + raise support.TestFailed( + "Failed to send with method <<{name:s}>>; " + "expected to succeed.\n".format(name=meth_name) + ) + if not str(e).startswith(meth_name): + raise support.TestFailed( + "Method <<{name:s}>> failed with unexpected " + "exception message: {exp:s}\n".format( + name=meth_name, exp=e + ) + ) + + for meth_name, recv_meth, expect_success, args in recv_methods: + indata = data_prefix + meth_name + try: + s.send(indata.encode('ASCII', 'strict')) + outdata = recv_meth(*args) + outdata = str(outdata, 'ASCII', 'strict') + if outdata != indata.lower(): + raise support.TestFailed( + "While receiving with <<{name:s}>> bad data " + "<<{outdata:s}>> ({nout:d}) received; " + "expected <<{indata:s}>> ({nin:d})\n".format( + name=meth_name, outdata=repr(outdata[:20]), + nout=len(outdata), + indata=repr(indata[:20]), nin=len(indata) + ) + ) + except ValueError as e: + if expect_success: + raise support.TestFailed( + "Failed to receive with method <<{name:s}>>; " + "expected to succeed.\n".format(name=meth_name) + ) + if not str(e).startswith(meth_name): + raise support.TestFailed( + "Method <<{name:s}>> failed with unexpected " + "exception message: {exp:s}\n".format( + name=meth_name, exp=e + ) + ) + # consume data + s.read() + + s.write("over\n".encode("ASCII", "strict")) + s.close() + finally: + server.stop() + server.join() + + def test_main(verbose=False): if skip_expected: raise support.TestSkipped("No SSL support")