cpython/Lib/test/test_concurrent_futures/test_shutdown.py

import signal
import sys
import threading
import time
import unittest
from concurrent import futures

from test import support
from test.support.script_helper import assert_python_ok

from .util import (
    BaseTestCase, ThreadPoolMixin, ProcessPoolForkMixin,
    ProcessPoolForkserverMixin, ProcessPoolSpawnMixin,
    create_executor_tests, setup_module)


def sleep_and_print(t, msg):
    time.sleep(t)
    print(msg)
    sys.stdout.flush()


class ExecutorShutdownTest:
    def test_run_after_shutdown(self):
        self.executor.shutdown()
        self.assertRaises(RuntimeError,
                          self.executor.submit,
                          pow, 2, 5)

    def test_interpreter_shutdown(self):
        # Test the atexit hook for shutdown of worker threads and processes
        rc, out, err = assert_python_ok('-c', """if 1:
            from concurrent.futures import {executor_type}
            from time import sleep
            from test.test_concurrent_futures.test_shutdown import sleep_and_print
            if __name__ == "__main__":
                context = '{context}'
                if context == "":
                    t = {executor_type}(5)
                else:
                    from multiprocessing import get_context
                    context = get_context(context)
                    t = {executor_type}(5, mp_context=context)
                t.submit(sleep_and_print, 1.0, "apple")
            """.format(executor_type=self.executor_type.__name__,
                       context=getattr(self, "ctx", "")))
        # Errors in atexit hooks don't change the process exit code, check
        # stderr manually.
        self.assertFalse(err)
        self.assertEqual(out.strip(), b"apple")

    def test_submit_after_interpreter_shutdown(self):
        # Test the atexit hook for shutdown of worker threads and processes
        rc, out, err = assert_python_ok('-c', """if 1:
            import atexit
            @atexit.register
            def run_last():
                try:
                    t.submit(id, None)
                except RuntimeError:
                    print("runtime-error")
                    raise
            from concurrent.futures import {executor_type}
            if __name__ == "__main__":
                context = '{context}'
                if not context:
                    t = {executor_type}(5)
                else:
                    from multiprocessing import get_context
                    context = get_context(context)
                    t = {executor_type}(5, mp_context=context)
                    t.submit(id, 42).result()
            """.format(executor_type=self.executor_type.__name__,
                       context=getattr(self, "ctx", "")))
        # Errors in atexit hooks don't change the process exit code, check
        # stderr manually.
        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
        self.assertEqual(out.strip(), b"runtime-error")

    def test_hang_issue12364(self):
        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
        self.executor.shutdown()
        for f in fs:
            f.result()

    def test_cancel_futures(self):
        assert self.worker_count <= 5, "test needs few workers"
        fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
        self.executor.shutdown(cancel_futures=True)
        # We can't guarantee the exact number of cancellations, but we can
        # guarantee that *some* were cancelled. With few workers, many of
        # the submitted futures should have been cancelled.
        cancelled = [fut for fut in fs if fut.cancelled()]
        self.assertGreater(len(cancelled), 20)

        # Ensure the other futures were able to finish.
        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
        # that may have been left in a pending state.
        others = [fut for fut in fs if not fut.cancelled()]
        for fut in others:
            self.assertTrue(fut.done(), msg=f"{fut._state=}")
            self.assertIsNone(fut.exception())

        # Similar to the number of cancelled futures, we can't guarantee the
        # exact number that completed. But, we can guarantee that at least
        # one finished.
        self.assertGreater(len(others), 0)

    def test_hang_gh83386(self):
        """shutdown(wait=False) doesn't hang at exit with running futures.

        See https://github.com/python/cpython/issues/83386.
        """
        if self.executor_type == futures.ProcessPoolExecutor:
            raise unittest.SkipTest(
                "Hangs, see https://github.com/python/cpython/issues/83386")

        rc, out, err = assert_python_ok('-c', """if True:
            from concurrent.futures import {executor_type}
            from test.test_concurrent_futures.test_shutdown import sleep_and_print
            if __name__ == "__main__":
                if {context!r}: multiprocessing.set_start_method({context!r})
                t = {executor_type}(max_workers=3)
                t.submit(sleep_and_print, 1.0, "apple")
                t.shutdown(wait=False)
            """.format(executor_type=self.executor_type.__name__,
                       context=getattr(self, 'ctx', None)))
        self.assertFalse(err)
        self.assertEqual(out.strip(), b"apple")

    def test_hang_gh94440(self):
        """shutdown(wait=True) doesn't hang when a future was submitted and
        quickly canceled right before shutdown.

        See https://github.com/python/cpython/issues/94440.
        """
        if not hasattr(signal, 'alarm'):
            raise unittest.SkipTest(
                "Tested platform does not support the alarm signal")

        def timeout(_signum, _frame):
            raise RuntimeError("timed out waiting for shutdown")

        kwargs = {}
        if getattr(self, 'ctx', None):
            kwargs['mp_context'] = self.get_context()
        executor = self.executor_type(max_workers=1, **kwargs)
        executor.submit(int).result()
        old_handler = signal.signal(signal.SIGALRM, timeout)
        try:
            signal.alarm(5)
            executor.submit(int).cancel()
            executor.shutdown(wait=True)
        finally:
            signal.alarm(0)
            signal.signal(signal.SIGALRM, old_handler)


class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
    def test_threads_terminate(self):
        def acquire_lock(lock):
            lock.acquire()

        sem = threading.Semaphore(0)
        for i in range(3):
            self.executor.submit(acquire_lock, sem)
        self.assertEqual(len(self.executor._threads), 3)
        for i in range(3):
            sem.release()
        self.executor.shutdown()
        for t in self.executor._threads:
            t.join()

    def test_context_manager_shutdown(self):
        with futures.ThreadPoolExecutor(max_workers=5) as e:
            executor = e
            self.assertEqual(list(e.map(abs, range(-5, 5))),
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])

        for t in executor._threads:
            t.join()

    def test_del_shutdown(self):
        executor = futures.ThreadPoolExecutor(max_workers=5)
        res = executor.map(abs, range(-5, 5))
        threads = executor._threads
        del executor

        for t in threads:
            t.join()

        # Make sure the results were all computed before the
        # executor got shutdown.
        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])

    def test_shutdown_no_wait(self):
        # Ensure that the executor cleans up the threads when calling
        # shutdown with wait=False
        executor = futures.ThreadPoolExecutor(max_workers=5)
        res = executor.map(abs, range(-5, 5))
        threads = executor._threads
        executor.shutdown(wait=False)
        for t in threads:
            t.join()

        # Make sure the results were all computed before the
        # executor got shutdown.
        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])


    def test_thread_names_assigned(self):
        executor = futures.ThreadPoolExecutor(
            max_workers=5, thread_name_prefix='SpecialPool')
        executor.map(abs, range(-5, 5))
        threads = executor._threads
        del executor
        support.gc_collect()  # For PyPy or other GCs.

        for t in threads:
            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
            t.join()

    def test_thread_names_default(self):
        executor = futures.ThreadPoolExecutor(max_workers=5)
        executor.map(abs, range(-5, 5))
        threads = executor._threads
        del executor
        support.gc_collect()  # For PyPy or other GCs.

        for t in threads:
            # Ensure that our default name is reasonably sane and unique when
            # no thread_name_prefix was supplied.
            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
            t.join()

    def test_cancel_futures_wait_false(self):
        # Can only be reliably tested for TPE, since PPE often hangs with
        # `wait=False` (even without *cancel_futures*).
        rc, out, err = assert_python_ok('-c', """if True:
            from concurrent.futures import ThreadPoolExecutor
            from test.test_concurrent_futures.test_shutdown import sleep_and_print
            if __name__ == "__main__":
                t = ThreadPoolExecutor()
                t.submit(sleep_and_print, .1, "apple")
                t.shutdown(wait=False, cancel_futures=True)
            """)
        # Errors in atexit hooks don't change the process exit code, check
        # stderr manually.
        self.assertFalse(err)
        # gh-116682: stdout may be empty if shutdown happens before task
        # starts executing.
        self.assertIn(out.strip(), [b"apple", b""])


class ProcessPoolShutdownTest(ExecutorShutdownTest):
    def test_processes_terminate(self):
        def acquire_lock(lock):
            lock.acquire()

        mp_context = self.get_context()
        if mp_context.get_start_method(allow_none=False) == "fork":
            # fork pre-spawns, not on demand.
            expected_num_processes = self.worker_count
        else:
            expected_num_processes = 3

        sem = mp_context.Semaphore(0)
        for _ in range(3):
            self.executor.submit(acquire_lock, sem)
        self.assertEqual(len(self.executor._processes), expected_num_processes)
        for _ in range(3):
            sem.release()
        processes = self.executor._processes
        self.executor.shutdown()

        for p in processes.values():
            p.join()

    def test_context_manager_shutdown(self):
        with futures.ProcessPoolExecutor(
                max_workers=5, mp_context=self.get_context()) as e:
            processes = e._processes
            self.assertEqual(list(e.map(abs, range(-5, 5))),
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])

        for p in processes.values():
            p.join()

    def test_del_shutdown(self):
        executor = futures.ProcessPoolExecutor(
                max_workers=5, mp_context=self.get_context())
        res = executor.map(abs, range(-5, 5))
        executor_manager_thread = executor._executor_manager_thread
        processes = executor._processes
        call_queue = executor._call_queue
        executor_manager_thread = executor._executor_manager_thread
        del executor
        support.gc_collect()  # For PyPy or other GCs.

        # Make sure that all the executor resources were properly cleaned by
        # the shutdown process
        executor_manager_thread.join()
        for p in processes.values():
            p.join()
        call_queue.join_thread()

        # Make sure the results were all computed before the
        # executor got shutdown.
        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])

    def test_shutdown_no_wait(self):
        # Ensure that the executor cleans up the processes when calling
        # shutdown with wait=False
        executor = futures.ProcessPoolExecutor(
                max_workers=5, mp_context=self.get_context())
        res = executor.map(abs, range(-5, 5))
        processes = executor._processes
        call_queue = executor._call_queue
        executor_manager_thread = executor._executor_manager_thread
        executor.shutdown(wait=False)

        # Make sure that all the executor resources were properly cleaned by
        # the shutdown process
        executor_manager_thread.join()
        for p in processes.values():
            p.join()
        call_queue.join_thread()

        # Make sure the results were all computed before the executor got
        # shutdown.
        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])


create_executor_tests(globals(), ProcessPoolShutdownTest,
                      executor_mixins=(ProcessPoolForkMixin,
                                       ProcessPoolForkserverMixin,
                                       ProcessPoolSpawnMixin))


def setUpModule():
    setup_module()


if __name__ == "__main__":
    unittest.main()