import itertools
import time
import unittest
import weakref
from concurrent import futures
from concurrent.futures._base import (
CANCELLED_AND_NOTIFIED, FINISHED, Future)
from test import support
from .util import (
PENDING_FUTURE, RUNNING_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
create_future, create_executor_tests, setup_module)
def mul(x, y):
return x * y
class AsCompletedTests:
def test_no_timeout(self):
future1 = self.executor.submit(mul, 2, 21)
future2 = self.executor.submit(mul, 7, 6)
completed = set(futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]))
self.assertEqual(set(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]),
completed)
def test_future_times_out(self):
"""Test ``futures.as_completed`` timing out before
completing it's final future."""
already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE}
# Windows clock resolution is around 15.6 ms
short_timeout = 0.100
for timeout in (0, short_timeout):
with self.subTest(timeout):
completed_futures = set()
future = self.executor.submit(time.sleep, short_timeout * 10)
try:
for f in futures.as_completed(
already_completed | {future},
timeout
):
completed_futures.add(f)
except futures.TimeoutError:
pass
# Check that ``future`` wasn't completed.
self.assertEqual(completed_futures, already_completed)
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.
# Issue #31641: accept arbitrary iterables.
future1 = self.executor.submit(time.sleep, 2)
completed = [
f for f in futures.as_completed(itertools.repeat(future1, 3))
]
self.assertEqual(len(completed), 1)
def test_free_reference_yielded_future(self):
# Issue #14406: Generator should not keep references
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
futures_list.append(create_future(state=FINISHED, result=42))
with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):
futures_list.remove(future)
wr = weakref.ref(future)
del future
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())
futures_list[0].set_result("test")
for future in futures.as_completed(futures_list):
futures_list.remove(future)
wr = weakref.ref(future)
del future
support.gc_collect() # For PyPy or other GCs.
self.assertIsNone(wr())
if futures_list:
futures_list[0].set_result("test")
def test_correct_timeout_exception_msg(self):
futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
RUNNING_FUTURE, SUCCESSFUL_FUTURE]
with self.assertRaises(futures.TimeoutError) as cm:
list(futures.as_completed(futures_list, timeout=0))
self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
create_executor_tests(globals(), AsCompletedTests)
def setUpModule():
setup_module()
if __name__ == "__main__":
unittest.main()