import operator
import contextlib
import copy
+import time
from test import support
from test.support import script_helper
self.cbcalled += 1
+@contextlib.contextmanager
+def collect_in_thread(period=0.0001):
+ """
+ Ensure GC collections happen in a different thread, at a high frequency.
+ """
+ threading = support.import_module('threading')
+ please_stop = False
+
+ def collect():
+ while not please_stop:
+ time.sleep(period)
+ gc.collect()
+
+ with support.disable_gc():
+ t = threading.Thread(target=collect)
+ t.start()
+ try:
+ yield
+ finally:
+ please_stop = True
+ t.join()
+
+
class ReferencesTestCase(TestBase):
def test_basic_ref(self):
dict = weakref.WeakKeyDictionary()
self.assertRegex(repr(dict), '<WeakKeyDictionary at 0x.*>')
+ def test_threaded_weak_valued_setdefault(self):
+ d = weakref.WeakValueDictionary()
+ with collect_in_thread():
+ for i in range(100000):
+ x = d.setdefault(10, RefCycle())
+ self.assertIsNot(x, None) # we never put None in there!
+ del x
+
+ def test_threaded_weak_valued_pop(self):
+ d = weakref.WeakValueDictionary()
+ with collect_in_thread():
+ for i in range(100000):
+ d[10] = RefCycle()
+ x = d.pop(10, 10)
+ self.assertIsNot(x, None) # we never put None in there!
+
+
from test import mapping_tests
class WeakValueDictionaryTestCase(mapping_tests.BasicTestMappingProtocol):
try:
o = self.data.pop(key)()
except KeyError:
+ o = None
+ if o is None:
if args:
return args[0]
- raise
- if o is None:
- raise KeyError(key)
+ else:
+ raise KeyError(key)
else:
return o
def setdefault(self, key, default=None):
try:
- wr = self.data[key]
+ o = self.data[key]()
except KeyError:
+ o = None
+ if o is None:
if self._pending_removals:
self._commit_removals()
self.data[key] = KeyedRef(default, self._remove, key)
return default
else:
- return wr()
+ return o
def update(*args, **kwargs):
if not args:
Library
-------
+- Issue #19542: Fix bugs in WeakValueDictionary.setdefault() and
+ WeakValueDictionary.pop() when a GC collection happens in another
+ thread.
+
- Issue #20191: Fixed a crash in resource.prlimit() when pass a sequence that
doesn't own its elements as limits.