import time
import unittest
from unittest import mock
-from test.script_helper import assert_python_ok
-from test.support import IPV6_ENABLED, gc_collect
import asyncio
from asyncio import base_events
from asyncio import constants
from asyncio import test_utils
+try:
+ from test.script_helper import assert_python_ok
+ from test import support
+except ImportError:
+ from asyncio import test_support as support
+ from asyncio.test_support import assert_python_ok
MOCK_ANY = mock.ANY
except KeyboardInterrupt:
pass
self.loop.close()
- gc_collect()
+ support.gc_collect()
self.assertFalse(self.loop.call_exception_handler.called)
self.assertRaises(
OSError, self.loop.run_until_complete, coro)
- @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
+ @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_create_datagram_endpoint_no_matching_family(self):
coro = self.loop.create_datagram_endpoint(
asyncio.DatagramProtocol,
import unittest
from unittest import mock
import weakref
-from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
import asyncio
from asyncio import proactor_events
from asyncio import selector_events
from asyncio import test_utils
+try:
+ from test import support # find_unused_port, IPV6_ENABLED, TEST_HOME_DIR
+except ImportError:
+ from asyncio import test_support as support
def data_file(filename):
import sys
import threading
import unittest
-from test import support
from unittest import mock
import asyncio
from asyncio import test_utils
+try:
+ from test import support # gc_collect
+except ImportError:
+ from asyncio import test_support as support
def _fakefunc(f):
-from asyncio import subprocess
-from asyncio import test_utils
-import asyncio
import signal
import sys
import unittest
from unittest import mock
-from test import support
+
+import asyncio
+from asyncio import subprocess
+from asyncio import test_utils
if sys.platform != 'win32':
from asyncio import unix_events
+try:
+ from test import support # PIPE_MAX_SIZE
+except ImportError:
+ from asyncio import test_support as support
# Program blocking
PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
import types
import unittest
import weakref
-from test import support
-from test.script_helper import assert_python_ok
from unittest import mock
+try:
+ from test import support # gc_collect
+ from test.script_helper import assert_python_ok
+except ImportError:
+ from asyncio import test_support as support
+ from asyncio.test_support import assert_python_ok
import asyncio
from asyncio import coroutines
import socket
import sys
-import test.support
import unittest
-from test.support import IPV6_ENABLED
from unittest import mock
+try:
+ from test import support # gc_collect, IPV6_ENABLED
+except ImportError:
+ from asyncio import test_support as support
+
if sys.platform != 'win32':
raise unittest.SkipTest('Windows only')
ssock, csock = windows_utils.socketpair()
self.check_winsocketpair(ssock, csock)
- @unittest.skipUnless(IPV6_ENABLED, 'IPv6 not supported or enabled')
+ @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
def test_winsocketpair_ipv6(self):
ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
self.check_winsocketpair(ssock, csock)
# check garbage collection of p closes handle
del p
- test.support.gc_collect()
+ support.gc_collect()
try:
_winapi.CloseHandle(h)
except OSError as e: