import unittest, os, errno
from ctypes import *
from ctypes.util import find_library
-import threading
+try:
+ import threading
+except ImportError:
+ threading = None
class Test(unittest.TestCase):
def test_open(self):
libc_name = find_library("c")
- if libc_name is not None:
- libc = CDLL(libc_name, use_errno=True)
- if os.name == "nt":
- libc_open = libc._open
- else:
- libc_open = libc.open
+ if libc_name is None:
+ raise unittest.SkipTest("Unable to find C library")
+ libc = CDLL(libc_name, use_errno=True)
+ if os.name == "nt":
+ libc_open = libc._open
+ else:
+ libc_open = libc.open
- libc_open.argtypes = c_char_p, c_int
+ libc_open.argtypes = c_char_p, c_int
- self.assertEqual(libc_open("", 0), -1)
- self.assertEqual(get_errno(), errno.ENOENT)
-
- self.assertEqual(set_errno(32), errno.ENOENT)
- self.assertEqual(get_errno(), 32)
+ self.assertEqual(libc_open("", 0), -1)
+ self.assertEqual(get_errno(), errno.ENOENT)
+ self.assertEqual(set_errno(32), errno.ENOENT)
+ self.assertEqual(get_errno(), 32)
+ if threading:
def _worker():
set_errno(0)
self.assertEqual(get_errno(), 32)
set_errno(0)
- if os.name == "nt":
-
- def test_GetLastError(self):
- dll = WinDLL("kernel32", use_last_error=True)
- GetModuleHandle = dll.GetModuleHandleA
- GetModuleHandle.argtypes = [c_wchar_p]
+ @unittest.skipUnless(os.name == "nt", 'Test specific to Windows')
+ def test_GetLastError(self):
+ dll = WinDLL("kernel32", use_last_error=True)
+ GetModuleHandle = dll.GetModuleHandleA
+ GetModuleHandle.argtypes = [c_wchar_p]
- self.assertEqual(0, GetModuleHandle("foo"))
- self.assertEqual(get_last_error(), 126)
+ self.assertEqual(0, GetModuleHandle("foo"))
+ self.assertEqual(get_last_error(), 126)
- self.assertEqual(set_last_error(32), 126)
- self.assertEqual(get_last_error(), 32)
+ self.assertEqual(set_last_error(32), 126)
+ self.assertEqual(get_last_error(), 32)
- def _worker():
- set_last_error(0)
+ def _worker():
+ set_last_error(0)
- dll = WinDLL("kernel32", use_last_error=False)
- GetModuleHandle = dll.GetModuleHandleW
- GetModuleHandle.argtypes = [c_wchar_p]
- GetModuleHandle("bar")
+ dll = WinDLL("kernel32", use_last_error=False)
+ GetModuleHandle = dll.GetModuleHandleW
+ GetModuleHandle.argtypes = [c_wchar_p]
+ GetModuleHandle("bar")
- self.assertEqual(get_last_error(), 0)
+ self.assertEqual(get_last_error(), 0)
- t = threading.Thread(target=_worker)
- t.start()
- t.join()
+ t = threading.Thread(target=_worker)
+ t.start()
+ t.join()
- self.assertEqual(get_last_error(), 32)
+ self.assertEqual(get_last_error(), 32)
- set_last_error(0)
+ set_last_error(0)
if __name__ == "__main__":
unittest.main()
# 3. This notice may not be removed or altered from any source distribution.
import unittest
-import threading
import sqlite3 as sqlite
+try:
+ import threading
+except ImportError:
+ threading = None
class ModuleTests(unittest.TestCase):
def CheckAPILevel(self):
except TypeError:
pass
+@unittest.skipUnless(threading, 'This test requires threading.')
class ThreadTests(unittest.TestCase):
def setUp(self):
self.con = sqlite.connect(":memory:")
"""This test case provides support for checking forking and wait behavior.
-To test different wait behavior, overrise the wait_impl method.
+To test different wait behavior, override the wait_impl method.
We want fork1() semantics -- only the forking thread survives in the
child after a fork().
active threads survive in the child after a fork(); this is an error.
"""
-import os, sys, time, _thread, unittest
+import os, sys, time, unittest
+import test.support as support
+_thread = support.import_module('_thread')
LONGSLEEP = 2
SHORTSLEEP = 0.5
tests = iter(selected)
if use_mp:
- from threading import Thread
+ try:
+ from threading import Thread
+ except ImportError:
+ print("Multiprocess option requires thread support")
+ sys.exit(2)
from queue import Queue
from subprocess import Popen, PIPE
debug_output_pat = re.compile(r"\[\d+ refs\]$")
import re
import imp
import time
+try:
+ import _thread
+except ImportError:
+ _thread = None
__all__ = [
"Error", "TestFailed", "ResourceDenied", "import_module",
# at the end of a test run.
def threading_setup():
- import _thread
- return _thread._count(),
+ if _thread:
+ return _thread._count(),
+ else:
+ return 1,
def threading_cleanup(nb_threads):
- import _thread
-
+ if not _thread:
+ return
_MAX_COUNT = 10
for count in range(_MAX_COUNT):
n = _thread._count()
# XXX print a warning in case of failure?
def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ if not _thread:
+ return func
+
@functools.wraps(func)
def decorator(*args):
key = threading_setup()
# If this fails, the test will be skipped.
thread = support.import_module('_thread')
-import asyncore, asynchat, socket, threading, time
+import asyncore, asynchat, socket, time
import unittest
import sys
+try:
+ import threading
+except ImportError:
+ threading = None
HOST = support.HOST
SERVER_QUIT = b'QUIT\n'
-class echo_server(threading.Thread):
- # parameter to determine the number of bytes passed back to the
- # client each send
- chunk_size = 1
-
- def __init__(self, event):
- threading.Thread.__init__(self)
- self.event = event
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.port = support.bind_port(self.sock)
- # This will be set if the client wants us to wait before echoing data
- # back.
- self.start_resend_event = None
-
- def run(self):
- self.sock.listen(1)
- self.event.set()
- conn, client = self.sock.accept()
- self.buffer = b""
- # collect data until quit message is seen
- while SERVER_QUIT not in self.buffer:
- data = conn.recv(1)
- if not data:
- break
- self.buffer = self.buffer + data
-
- # remove the SERVER_QUIT message
- self.buffer = self.buffer.replace(SERVER_QUIT, b'')
-
- if self.start_resend_event:
- self.start_resend_event.wait()
-
- # re-send entire set of collected data
- try:
- # this may fail on some tests, such as test_close_when_done, since
- # the client closes the channel when it's done sending
- while self.buffer:
- n = conn.send(self.buffer[:self.chunk_size])
- time.sleep(0.001)
- self.buffer = self.buffer[n:]
- except:
- pass
-
- conn.close()
- self.sock.close()
-
-class echo_client(asynchat.async_chat):
-
- def __init__(self, terminator, server_port):
- asynchat.async_chat.__init__(self)
- self.contents = []
- self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect((HOST, server_port))
- self.set_terminator(terminator)
- self.buffer = b""
-
- def handle_connect(self):
- pass
-
- if sys.platform == 'darwin':
- # select.poll returns a select.POLLHUP at the end of the tests
- # on darwin, so just ignore it
- def handle_expt(self):
- pass
-
- def collect_incoming_data(self, data):
- self.buffer += data
-
- def found_terminator(self):
- self.contents.append(self.buffer)
- self.buffer = b""
-
-
-def start_echo_server():
- event = threading.Event()
- s = echo_server(event)
- s.start()
- event.wait()
- event.clear()
- time.sleep(0.01) # Give server time to start accepting.
- return s, event
+if threading:
+ class echo_server(threading.Thread):
+ # parameter to determine the number of bytes passed back to the
+ # client each send
+ chunk_size = 1
+
+ def __init__(self, event):
+ threading.Thread.__init__(self)
+ self.event = event
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.port = support.bind_port(self.sock)
+ # This will be set if the client wants us to wait before echoing data
+ # back.
+ self.start_resend_event = None
+
+ def run(self):
+ self.sock.listen(1)
+ self.event.set()
+ conn, client = self.sock.accept()
+ self.buffer = b""
+ # collect data until quit message is seen
+ while SERVER_QUIT not in self.buffer:
+ data = conn.recv(1)
+ if not data:
+ break
+ self.buffer = self.buffer + data
+
+ # remove the SERVER_QUIT message
+ self.buffer = self.buffer.replace(SERVER_QUIT, b'')
+
+ if self.start_resend_event:
+ self.start_resend_event.wait()
+
+ # re-send entire set of collected data
+ try:
+ # this may fail on some tests, such as test_close_when_done, since
+ # the client closes the channel when it's done sending
+ while self.buffer:
+ n = conn.send(self.buffer[:self.chunk_size])
+ time.sleep(0.001)
+ self.buffer = self.buffer[n:]
+ except:
+ pass
+
+ conn.close()
+ self.sock.close()
+
+ class echo_client(asynchat.async_chat):
+
+ def __init__(self, terminator, server_port):
+ asynchat.async_chat.__init__(self)
+ self.contents = []
+ self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.connect((HOST, server_port))
+ self.set_terminator(terminator)
+ self.buffer = b""
+
+ def handle_connect(self):
+ pass
+
+ if sys.platform == 'darwin':
+ # select.poll returns a select.POLLHUP at the end of the tests
+ # on darwin, so just ignore it
+ def handle_expt(self):
+ pass
+
+ def collect_incoming_data(self, data):
+ self.buffer += data
+
+ def found_terminator(self):
+ self.contents.append(self.buffer)
+ self.buffer = b""
+
+ def start_echo_server():
+ event = threading.Event()
+ s = echo_server(event)
+ s.start()
+ event.wait()
+ event.clear()
+ time.sleep(0.01) # Give server time to start accepting.
+ return s, event
+@unittest.skipUnless(threading, 'Threading required for this test.')
class TestAsynchat(unittest.TestCase):
usepoll = False
import select
import os
import socket
-import threading
import sys
import time
from io import BytesIO
from io import StringIO
+try:
+ import threading
+except ImportError:
+ threading = None
+
HOST = support.HOST
class dummysocket:
def tearDown(self):
asyncore.close_all()
+ @unittest.skipUnless(threading, 'Threading required for this test.')
@support.reap_threads
def test_send(self):
evt = threading.Event()
import os
import subprocess
import sys
-import threading
+
+try:
+ import threading
+except ImportError:
+ threading = None
# Skip tests if the bz2 module doesn't exist.
bz2 = support.import_module('bz2')
-
from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor
has_cmdline_bunzip2 = sys.platform not in ("win32", "os2emx")
else:
self.fail("1/0 didn't raise an exception")
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def testThreading(self):
# Using a BZ2File from several threads doesn't deadlock (issue #7205).
data = b"1" * 2**20
import time
import random
import unittest
-import threading
from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
import _testcapi
self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
+@unittest.skipUnless(threading, 'Threading required for this test.')
class TestPendingCalls(unittest.TestCase):
def pendingcalls_submit(self, l, n):
raise support.TestFailed(
"Couldn't find main thread correctly in the list")
- try:
- _testcapi._test_thread_state
- have_thread_state = True
- except AttributeError:
- have_thread_state = False
-
- if have_thread_state:
+ if threading:
import _thread
import time
TestThreadState()
- import threading
t = threading.Thread(target=TestThreadState)
t.start()
t.join()
import cmd
import sys
-import trace
import re
from io import StringIO
+from test import support
class samplecmdclass(cmd.Cmd):
"""
return True
def test_main(verbose=None):
- from test import support, test_cmd
+ from test import test_cmd
support.run_doctest(test_cmd, verbose)
def test_coverage(coverdir):
- import trace
+ trace = support.import_module('trace')
tracer=trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix,],
trace=0, count=1)
tracer.run('reload(cmd);test_main()')
import sys
import tempfile
import unittest
-import threading
from contextlib import * # Tests __all__
from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
class ContextManagerTestCase(unittest.TestCase):
finally:
support.unlink(tfn)
+@unittest.skipUnless(threading, 'Threading required for this test.')
class LockContextTestCase(unittest.TestCase):
def boilerPlate(self, lock, locked):
from test import test_doctest
support.run_doctest(test_doctest, verbosity=True)
-import trace, sys, re, io
+import sys, re, io
+
def test_coverage(coverdir):
+ trace = support.import_module('trace')
tracer = trace.Trace(ignoredirs=[sys.prefix, sys.exec_prefix,],
trace=0, count=1)
tracer.run('test_main()')
import http.client
import sys
from test import support
-import threading
+threading = support.import_module('threading')
import time
import socket
import unittest
import signal
import sys
import time
-import threading
from test.fork_wait import ForkWait
-from test.support import run_unittest, reap_children, get_attribute
+from test.support import run_unittest, reap_children, get_attribute, import_module
+threading = import_module('threading')
# Skip test if fork does not exist.
get_attribute(os, 'fork')
# environment
import ftplib
-import threading
import asyncore
import asynchat
import socket
from unittest import TestCase
from test import support
from test.support import HOST
+threading = support.import_module('threading')
# the dummy data returned by server over the data channel when
# RETR, LIST and NLST commands are issued
m = hashlib.md5(b'x' * gil_minsize)
self.assertEquals(m.hexdigest(), 'cfb767f225d58469c5de3632a8803958')
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ @support.reap_threads
def test_threaded_hashing(self):
- if not threading:
- raise unittest.SkipTest('No threading module.')
-
# Updating the same hash object from several threads at once
# using data chunk sizes containing the same byte sequences.
#
self.assertEqual(expected_hash, hasher.hexdigest())
-@support.reap_threads
def test_main():
support.run_unittest(HashLibTestCase)
import urllib.parse
import http.client
import tempfile
-import threading
import unittest
from test import support
+threading = support.import_module('threading')
class NoLogRequestHandler:
def log_message(self, *args):
import sys
import time
import array
-import threading
import random
import unittest
import weakref
import codecs
import io # C implementation of io
import _pyio as pyio # Python implementation of io
+try:
+ import threading
+except ImportError:
+ threading = None
def _default_chunk_size():
self.assertEquals(b"abcdefg", bufio.read())
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads(self):
try:
# Write out many bytes with exactly the same number of 0's,
with self.open(support.TESTFN, "rb", buffering=0) as f:
self.assertEqual(f.read(), b"abc")
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads(self):
try:
# Write out many bytes from many threads and test they were
with self.open(support.TESTFN, "w", errors="replace") as f:
self.assertEqual(f.errors, "replace")
-
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_threads_write(self):
# Issue6750: concurrent writes could duplicate data
event = threading.Event()
from test.support import captured_stdout, run_with_locale, run_unittest,\
find_unused_port
import textwrap
-import threading
import unittest
import warnings
import weakref
+try:
+ import threading
+except ImportError:
+ threading = None
class BaseTest(unittest.TestCase):
self.server_close()
+@unittest.skipUnless(threading, 'Threading required for this test.')
class SocketHandlerTest(BaseTest):
"""Test for SocketHandler objects."""
def test_config13_failure(self):
self.assertRaises(Exception, self.apply_config, self.config13)
+ @unittest.skipUnless(threading, 'listen() needs threading to work')
def setup_via_listener(self, text):
text = text.encode("utf-8")
port = find_unused_port()
#
import unittest
-import threading
import queue as pyqueue
import time
import io
_multiprocessing = test.support.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
test.support.import_module('multiprocessing.synchronize')
+# import threading after _multiprocessing to raise a more revelant error
+# message: "No module named _multiprocessing". _multiprocessing is not compiled
+# without thread support.
+import threading
import multiprocessing.dummy
import multiprocessing.connection
# a real test suite
import poplib
-import threading
import asyncore
import asynchat
import socket
from unittest import TestCase
from test import support as test_support
+threading = test_support.import_module('threading')
HOST = test_support.HOST
PORT = 0
# Some simple queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import queue
-import threading
import time
import unittest
from test import support
+threading = support.import_module('threading')
QUEUE_SIZE = 5
import asyncore
import email.utils
import socket
-import threading
import smtpd
import smtplib
import io
import time
import select
-from unittest import TestCase
+import unittest
from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
HOST = support.HOST
if sys.platform == 'darwin':
serv.close()
evt.set()
-class GeneralTests(TestCase):
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class GeneralTests(unittest.TestCase):
def setUp(self):
self._threads = support.threading_setup()
# test server times out, causing the test to fail.
# Test behavior of smtpd.DebuggingServer
-class DebuggingServerTests(TestCase):
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class DebuggingServerTests(unittest.TestCase):
def setUp(self):
# temporarily replace sys.stdout to capture DebuggingServer output
self.assertEqual(self.output.getvalue(), mexpect)
-class NonConnectingTests(TestCase):
+class NonConnectingTests(unittest.TestCase):
def testNotConnected(self):
# Test various operations on an unconnected SMTP object that
# test response of client to a non-successful HELO message
-class BadHELOServerTests(TestCase):
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class BadHELOServerTests(unittest.TestCase):
def setUp(self):
self.old_stdout = sys.stdout
# Test various SMTP & ESMTP commands/behaviors that require a simulated server
# (i.e., something with more features than DebuggingServer)
-class SMTPSimTests(TestCase):
+@unittest.skipUnless(threading, 'Threading required for this test.')
+class SMTPSimTests(unittest.TestCase):
def setUp(self):
self._threads = support.threading_setup()
import io
import socket
import select
-import _thread as thread
-import threading
import time
import traceback
import queue
HOST = support.HOST
MSG = b'Michael Gilfix was here\n'
+try:
+ import _thread as thread
+ import threading
+except ImportError:
+ thread = None
+ threading = None
+
class SocketTCPTest(unittest.TestCase):
def setUp(self):
s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
+@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicTCPTest(SocketConnectedTest):
def __init__(self, methodName='runTest'):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicUDPTest(ThreadedUDPSocketTest):
def __init__(self, methodName='runTest'):
def _testRecvFromNegative(self):
self.cli.sendto(MSG, 0, (HOST, self.port))
+@unittest.skipUnless(thread, 'Threading required for this test.')
class TCPCloserTest(ThreadedTCPSocketTest):
def testClose(self):
self.cli.connect((HOST, self.port))
time.sleep(1.0)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
def __init__(self, methodName='runTest'):
msg = self.cli.recv(1024)
self.assertEqual(msg, MSG)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class NonBlockingTCPTests(ThreadedTCPSocketTest):
def __init__(self, methodName='runTest'):
time.sleep(0.1)
self.cli.send(MSG)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class FileObjectClassTestCase(SocketConnectedTest):
"""Unit tests for the object returned by socket.makefile()
lambda: socket.create_connection((HOST, port))
)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
def __init__(self, methodName='runTest'):
self.assertRaises(socket.error, s.bind, address)
+@unittest.skipUnless(thread, 'Threading required for this test.')
class BufferIOTest(SocketConnectedTest):
"""
Test the buffer versions of socket.recv() and socket.send().
import signal
import socket
import tempfile
-import threading
import unittest
import socketserver
import test.support
from test.support import reap_children, reap_threads, verbose
+try:
+ import threading
+except ImportError:
+ threading = None
test.support.requires("network")
self.assertEquals(server.server_address, server.socket.getsockname())
return server
+ @unittest.skipUnless(threading, 'Threading required for this test.')
@reap_threads
def run_server(self, svrcls, hdlrbase, testfunc):
server = self.make_server(self.pickaddr(svrcls.address_family),
# strings to intern in test_intern()
numruns = 0
+try:
+ import threading
+except ImportError:
+ threading = None
class SysModuleTest(unittest.TestCase):
sys.setcheckinterval(n)
self.assertEquals(sys.getcheckinterval(), n)
+ @unittest.skipUnless(threading, 'Threading required for this test.')
def test_switchinterval(self):
self.assertRaises(TypeError, sys.setswitchinterval)
self.assertRaises(TypeError, sys.setswitchinterval, "a")
import socket
import select
-import threading
import telnetlib
import time
import contextlib
from unittest import TestCase
from test import support
+threading = support.import_module('threading')
HOST = support.HOST
import unittest
import random
from test import support
-import _thread as thread
+thread = support.import_module('_thread')
import time
import sys
import weakref
# complains several times about module random having no attribute
# randrange, and then Python hangs.
-import _thread as thread
import unittest
-from test.support import verbose, TestFailed
+from test.support import verbose, TestFailed, import_module
+thread = import_module('_thread')
critical_section = thread.allocate_lock()
done = thread.allocate_lock()
NUM_THREADS = 20
FILES_PER_THREAD = 50
-import _thread as thread # If this fails, we can't test this module
-import threading
import tempfile
-from test.support import threading_setup, threading_cleanup, run_unittest
+from test.support import threading_setup, threading_cleanup, run_unittest, import_module
+threading = import_module('threading')
import unittest
import io
from traceback import print_exc
import random
import re
import sys
-import threading
-import _thread
+_thread = test.support.import_module('_thread')
+threading = test.support.import_module('threading')
import time
import unittest
import weakref
import unittest
from doctest import DocTestSuite
from test import support
-import threading
+threading = support.import_module('threading')
import weakref
import gc
"""PyUnit testing that threads honor our signal semantics"""
import unittest
-import _thread as thread
import signal
import os
import sys
-from test.support import run_unittest
+from test.support import run_unittest, import_module
+thread = import_module('_thread')
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
#!/usr/bin/env python3
import email
-import threading
import urllib.parse
import urllib.request
import http.server
import unittest
import hashlib
from test import support
+threading = support.import_module('threading')
# Loopback http server infrastructure
import unittest
import xmlrpc.client as xmlrpclib
import xmlrpc.server
-import threading
import http.client
import socket
import os
import contextlib
from test import support
+try:
+ import threading
+except ImportError:
+ threading = None
+
alist = [{'astring': 'foo@bar.baz.spam',
'afloat': 7283.43,
'anint': 2**20,
return make_request_and_skip
return decorator
+@unittest.skipUnless(threading, 'Threading required for this test.')
class BaseServerTestCase(unittest.TestCase):
requestHandler = None
request_count = 1
threadFunc = staticmethod(http_server)
+
def setUp(self):
# enable traceback reporting
xmlrpc.server.SimpleXMLRPCServer._send_traceback_header = True
connection.putheader("Content-Encoding", "gzip")
return xmlrpclib.Transport.send_content(self, connection, body)
+ def setUp(self):
+ BaseServerTestCase.setUp(self)
+
def test_gzip_request(self):
t = self.Transport()
t.encode_threshold = None
#Test special attributes of the ServerProxy object
class ServerProxyTestCase(unittest.TestCase):
+ def setUp(self):
+ unittest.TestCase.setUp(self)
+ if threading:
+ self.url = URL
+ else:
+ # Without threading, http_server() and http_multi_server() will not
+ # be executed and URL is still equal to None. 'http://' is a just
+ # enough to choose the scheme (HTTP)
+ self.url = 'http://'
+
def test_close(self):
- p = xmlrpclib.ServerProxy(URL)
+ p = xmlrpclib.ServerProxy(self.url)
self.assertEqual(p('close')(), None)
def test_transport(self):
t = xmlrpclib.Transport()
- p = xmlrpclib.ServerProxy(URL, transport=t)
+ p = xmlrpclib.ServerProxy(self.url, transport=t)
self.assertEqual(p('transport'), t)
# This is a contrived way to make a failure occur on the server side
return super().get(key, failobj)
+@unittest.skipUnless(threading, 'Threading required for this test.')
class FailingServerTestCase(unittest.TestCase):
def setUp(self):
self.evt = threading.Event()
Tests
-----
+- Issue #7449: Fix many tests to support Python compiled without thread
+ support. Patches written by Jerry Seutter.
+
- Issue #8108: test_ftplib's non-blocking SSL server now has proper handling
of SSL shutdowns.