cpython/Lib/test/test_concurrent_futures/test_as_completed.py

import itertools
import time
import unittest
import weakref
from concurrent import futures
from concurrent.futures._base import (
    CANCELLED_AND_NOTIFIED, FINISHED, Future)

from test import support

from .util import (
    PENDING_FUTURE, RUNNING_FUTURE,
    CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE, SUCCESSFUL_FUTURE,
    create_future, create_executor_tests, setup_module)


def mul(x, y):
    return x * y


class AsCompletedTests:
    def test_no_timeout(self):
        future1 = self.executor.submit(mul, 2, 21)
        future2 = self.executor.submit(mul, 7, 6)

        completed = set(futures.as_completed(
                [CANCELLED_AND_NOTIFIED_FUTURE,
                 EXCEPTION_FUTURE,
                 SUCCESSFUL_FUTURE,
                 future1, future2]))
        self.assertEqual(set(
                [CANCELLED_AND_NOTIFIED_FUTURE,
                 EXCEPTION_FUTURE,
                 SUCCESSFUL_FUTURE,
                 future1, future2]),
                completed)

    def test_future_times_out(self):
        """Test ``futures.as_completed`` timing out before
        completing it's final future."""
        already_completed = {CANCELLED_AND_NOTIFIED_FUTURE,
                             EXCEPTION_FUTURE,
                             SUCCESSFUL_FUTURE}

        # Windows clock resolution is around 15.6 ms
        short_timeout = 0.100
        for timeout in (0, short_timeout):
            with self.subTest(timeout):

                completed_futures = set()
                future = self.executor.submit(time.sleep, short_timeout * 10)

                try:
                    for f in futures.as_completed(
                        already_completed | {future},
                        timeout
                    ):
                        completed_futures.add(f)
                except futures.TimeoutError:
                    pass

                # Check that ``future`` wasn't completed.
                self.assertEqual(completed_futures, already_completed)

    def test_duplicate_futures(self):
        # Issue 20367. Duplicate futures should not raise exceptions or give
        # duplicate responses.
        # Issue #31641: accept arbitrary iterables.
        future1 = self.executor.submit(time.sleep, 2)
        completed = [
            f for f in futures.as_completed(itertools.repeat(future1, 3))
        ]
        self.assertEqual(len(completed), 1)

    def test_free_reference_yielded_future(self):
        # Issue #14406: Generator should not keep references
        # to finished futures.
        futures_list = [Future() for _ in range(8)]
        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
        futures_list.append(create_future(state=FINISHED, result=42))

        with self.assertRaises(futures.TimeoutError):
            for future in futures.as_completed(futures_list, timeout=0):
                futures_list.remove(future)
                wr = weakref.ref(future)
                del future
                support.gc_collect()  # For PyPy or other GCs.
                self.assertIsNone(wr())

        futures_list[0].set_result("test")
        for future in futures.as_completed(futures_list):
            futures_list.remove(future)
            wr = weakref.ref(future)
            del future
            support.gc_collect()  # For PyPy or other GCs.
            self.assertIsNone(wr())
            if futures_list:
                futures_list[0].set_result("test")

    def test_correct_timeout_exception_msg(self):
        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]

        with self.assertRaises(futures.TimeoutError) as cm:
            list(futures.as_completed(futures_list, timeout=0))

        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')


create_executor_tests(globals(), AsCompletedTests)


def setUpModule():
    setup_module()


if __name__ == "__main__":
    unittest.main()