cpython/Lib/test/lock_tests.py

"""
Various tests for synchronization primitives.
"""

import gc
import sys
import time
from _thread import start_new_thread, TIMEOUT_MAX
import threading
import unittest
import weakref

from test import support
from test.support import threading_helper


requires_fork = unittest.skipUnless(support.has_fork_support,
                                    "platform doesn't support fork "
                                     "(no _at_fork_reinit method)")


def wait_threads_blocked(nthread):
    # Arbitrary sleep to wait until N threads are blocked,
    # like waiting for a lock.
    time.sleep(0.010 * nthread)


class Bunch(object):
    """
    A bunch of threads.
    """
    def __init__(self, func, nthread, wait_before_exit=False):
        """
        Construct a bunch of `nthread` threads running the same function `func`.
        If `wait_before_exit` is True, the threads won't terminate until
        do_finish() is called.
        """
        self.func = func
        self.nthread = nthread
        self.started = []
        self.finished = []
        self.exceptions = []
        self._can_exit = not wait_before_exit
        self._wait_thread = None

    def task(self):
        tid = threading.get_ident()
        self.started.append(tid)
        try:
            self.func()
        except BaseException as exc:
            self.exceptions.append(exc)
        finally:
            self.finished.append(tid)
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if self._can_exit:
                    break

    def __enter__(self):
        self._wait_thread = threading_helper.wait_threads_exit(support.SHORT_TIMEOUT)
        self._wait_thread.__enter__()

        try:
            for _ in range(self.nthread):
                start_new_thread(self.task, ())
        except:
            self._can_exit = True
            raise

        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
            if len(self.started) >= self.nthread:
                break

        return self

    def __exit__(self, exc_type, exc_value, traceback):
        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
            if len(self.finished) >= self.nthread:
                break

        # Wait until threads completely exit according to _thread._count()
        self._wait_thread.__exit__(None, None, None)

        # Break reference cycle
        exceptions = self.exceptions
        self.exceptions = None
        if exceptions:
            raise ExceptionGroup(f"{self.func} threads raised exceptions",
                                 exceptions)

    def do_finish(self):
        self._can_exit = True


class BaseTestCase(unittest.TestCase):
    def setUp(self):
        self._threads = threading_helper.threading_setup()

    def tearDown(self):
        threading_helper.threading_cleanup(*self._threads)
        support.reap_children()

    def assertTimeout(self, actual, expected):
        # The waiting and/or time.monotonic() can be imprecise, which
        # is why comparing to the expected value would sometimes fail
        # (especially under Windows).
        self.assertGreaterEqual(actual, expected * 0.6)
        # Test nothing insane happened
        self.assertLess(actual, expected * 10.0)


class BaseLockTests(BaseTestCase):
    """
    Tests for both recursive and non-recursive locks.
    """

    def wait_phase(self, phase, expected):
        for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
            if len(phase) >= expected:
                break
        self.assertEqual(len(phase), expected)

    def test_constructor(self):
        lock = self.locktype()
        del lock

    def test_repr(self):
        lock = self.locktype()
        self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
        del lock

    def test_locked_repr(self):
        lock = self.locktype()
        lock.acquire()
        self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
        del lock

    def test_acquire_destroy(self):
        lock = self.locktype()
        lock.acquire()
        del lock

    def test_acquire_release(self):
        lock = self.locktype()
        lock.acquire()
        lock.release()
        del lock

    def test_try_acquire(self):
        lock = self.locktype()
        self.assertTrue(lock.acquire(False))
        lock.release()

    def test_try_acquire_contended(self):
        lock = self.locktype()
        lock.acquire()
        result = []
        def f():
            result.append(lock.acquire(False))
        with Bunch(f, 1):
            pass
        self.assertFalse(result[0])
        lock.release()

    def test_acquire_contended(self):
        lock = self.locktype()
        lock.acquire()
        def f():
            lock.acquire()
            lock.release()

        N = 5
        with Bunch(f, N) as bunch:
            # Threads block on lock.acquire()
            wait_threads_blocked(N)
            self.assertEqual(len(bunch.finished), 0)

            # Threads unblocked
            lock.release()

        self.assertEqual(len(bunch.finished), N)

    def test_with(self):
        lock = self.locktype()
        def f():
            lock.acquire()
            lock.release()

        def with_lock(err=None):
            with lock:
                if err is not None:
                    raise err

        # Acquire the lock, do nothing, with releases the lock
        with lock:
            pass

        # Check that the lock is unacquired
        with Bunch(f, 1):
            pass

        # Acquire the lock, raise an exception, with releases the lock
        with self.assertRaises(TypeError):
            with lock:
                raise TypeError

        # Check that the lock is unacquired even if after an exception
        # was raised in the previous "with lock:" block
        with Bunch(f, 1):
            pass

    def test_thread_leak(self):
        # The lock shouldn't leak a Thread instance when used from a foreign
        # (non-threading) thread.
        lock = self.locktype()
        def f():
            lock.acquire()
            lock.release()

        # We run many threads in the hope that existing threads ids won't
        # be recycled.
        with Bunch(f, 15):
            pass

    def test_timeout(self):
        lock = self.locktype()
        # Can't set timeout if not blocking
        self.assertRaises(ValueError, lock.acquire, False, 1)
        # Invalid timeout values
        self.assertRaises(ValueError, lock.acquire, timeout=-100)
        self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
        self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
        # TIMEOUT_MAX is ok
        lock.acquire(timeout=TIMEOUT_MAX)
        lock.release()
        t1 = time.monotonic()
        self.assertTrue(lock.acquire(timeout=5))
        t2 = time.monotonic()
        # Just a sanity test that it didn't actually wait for the timeout.
        self.assertLess(t2 - t1, 5)
        results = []
        def f():
            t1 = time.monotonic()
            results.append(lock.acquire(timeout=0.5))
            t2 = time.monotonic()
            results.append(t2 - t1)
        with Bunch(f, 1):
            pass
        self.assertFalse(results[0])
        self.assertTimeout(results[1], 0.5)

    def test_weakref_exists(self):
        lock = self.locktype()
        ref = weakref.ref(lock)
        self.assertIsNotNone(ref())

    def test_weakref_deleted(self):
        lock = self.locktype()
        ref = weakref.ref(lock)
        del lock
        gc.collect()  # For PyPy or other GCs.
        self.assertIsNone(ref())


class LockTests(BaseLockTests):
    """
    Tests for non-recursive, weak locks
    (which can be acquired and released from different threads).
    """
    def test_reacquire(self):
        # Lock needs to be released before re-acquiring.
        lock = self.locktype()
        phase = []

        def f():
            lock.acquire()
            phase.append(None)
            lock.acquire()
            phase.append(None)

        with threading_helper.wait_threads_exit():
            # Thread blocked on lock.acquire()
            start_new_thread(f, ())
            self.wait_phase(phase, 1)

            # Thread unblocked
            lock.release()
            self.wait_phase(phase, 2)

    def test_different_thread(self):
        # Lock can be released from a different thread.
        lock = self.locktype()
        lock.acquire()
        def f():
            lock.release()
        with Bunch(f, 1):
            pass
        lock.acquire()
        lock.release()

    def test_state_after_timeout(self):
        # Issue #11618: check that lock is in a proper state after a
        # (non-zero) timeout.
        lock = self.locktype()
        lock.acquire()
        self.assertFalse(lock.acquire(timeout=0.01))
        lock.release()
        self.assertFalse(lock.locked())
        self.assertTrue(lock.acquire(blocking=False))

    @requires_fork
    def test_at_fork_reinit(self):
        def use_lock(lock):
            # make sure that the lock still works normally
            # after _at_fork_reinit()
            lock.acquire()
            lock.release()

        # unlocked
        lock = self.locktype()
        lock._at_fork_reinit()
        use_lock(lock)

        # locked: _at_fork_reinit() resets the lock to the unlocked state
        lock2 = self.locktype()
        lock2.acquire()
        lock2._at_fork_reinit()
        use_lock(lock2)


class RLockTests(BaseLockTests):
    """
    Tests for recursive locks.
    """
    def test_reacquire(self):
        lock = self.locktype()
        lock.acquire()
        lock.acquire()
        lock.release()
        lock.acquire()
        lock.release()
        lock.release()

    def test_release_unacquired(self):
        # Cannot release an unacquired lock
        lock = self.locktype()
        self.assertRaises(RuntimeError, lock.release)
        lock.acquire()
        lock.acquire()
        lock.release()
        lock.acquire()
        lock.release()
        lock.release()
        self.assertRaises(RuntimeError, lock.release)

    def test_release_save_unacquired(self):
        # Cannot _release_save an unacquired lock
        lock = self.locktype()
        self.assertRaises(RuntimeError, lock._release_save)
        lock.acquire()
        lock.acquire()
        lock.release()
        lock.acquire()
        lock.release()
        lock.release()
        self.assertRaises(RuntimeError, lock._release_save)

    def test_recursion_count(self):
        lock = self.locktype()
        self.assertEqual(0, lock._recursion_count())
        lock.acquire()
        self.assertEqual(1, lock._recursion_count())
        lock.acquire()
        lock.acquire()
        self.assertEqual(3, lock._recursion_count())
        lock.release()
        self.assertEqual(2, lock._recursion_count())
        lock.release()
        lock.release()
        self.assertEqual(0, lock._recursion_count())

        phase = []

        def f():
            lock.acquire()
            phase.append(None)

            self.wait_phase(phase, 2)
            lock.release()
            phase.append(None)

        with threading_helper.wait_threads_exit():
            # Thread blocked on lock.acquire()
            start_new_thread(f, ())
            self.wait_phase(phase, 1)
            self.assertEqual(0, lock._recursion_count())

            # Thread unblocked
            phase.append(None)
            self.wait_phase(phase, 3)
            self.assertEqual(0, lock._recursion_count())

    def test_different_thread(self):
        # Cannot release from a different thread
        lock = self.locktype()
        def f():
            lock.acquire()

        with Bunch(f, 1, True) as bunch:
            try:
                self.assertRaises(RuntimeError, lock.release)
            finally:
                bunch.do_finish()

    def test__is_owned(self):
        lock = self.locktype()
        self.assertFalse(lock._is_owned())
        lock.acquire()
        self.assertTrue(lock._is_owned())
        lock.acquire()
        self.assertTrue(lock._is_owned())
        result = []
        def f():
            result.append(lock._is_owned())
        with Bunch(f, 1):
            pass
        self.assertFalse(result[0])
        lock.release()
        self.assertTrue(lock._is_owned())
        lock.release()
        self.assertFalse(lock._is_owned())


class EventTests(BaseTestCase):
    """
    Tests for Event objects.
    """

    def test_is_set(self):
        evt = self.eventtype()
        self.assertFalse(evt.is_set())
        evt.set()
        self.assertTrue(evt.is_set())
        evt.set()
        self.assertTrue(evt.is_set())
        evt.clear()
        self.assertFalse(evt.is_set())
        evt.clear()
        self.assertFalse(evt.is_set())

    def _check_notify(self, evt):
        # All threads get notified
        N = 5
        results1 = []
        results2 = []
        def f():
            results1.append(evt.wait())
            results2.append(evt.wait())

        with Bunch(f, N):
            # Threads blocked on first evt.wait()
            wait_threads_blocked(N)
            self.assertEqual(len(results1), 0)

            # Threads unblocked
            evt.set()

        self.assertEqual(results1, [True] * N)
        self.assertEqual(results2, [True] * N)

    def test_notify(self):
        evt = self.eventtype()
        self._check_notify(evt)
        # Another time, after an explicit clear()
        evt.set()
        evt.clear()
        self._check_notify(evt)

    def test_timeout(self):
        evt = self.eventtype()
        results1 = []
        results2 = []
        N = 5
        def f():
            results1.append(evt.wait(0.0))
            t1 = time.monotonic()
            r = evt.wait(0.5)
            t2 = time.monotonic()
            results2.append((r, t2 - t1))

        with Bunch(f, N):
            pass

        self.assertEqual(results1, [False] * N)
        for r, dt in results2:
            self.assertFalse(r)
            self.assertTimeout(dt, 0.5)

        # The event is set
        results1 = []
        results2 = []
        evt.set()
        with Bunch(f, N):
            pass

        self.assertEqual(results1, [True] * N)
        for r, dt in results2:
            self.assertTrue(r)

    def test_set_and_clear(self):
        # gh-57711: check that wait() returns true even when the event is
        # cleared before the waiting thread is woken up.
        event = self.eventtype()
        results = []
        def f():
            results.append(event.wait(support.LONG_TIMEOUT))

        N = 5
        with Bunch(f, N):
            # Threads blocked on event.wait()
            wait_threads_blocked(N)

            # Threads unblocked
            event.set()
            event.clear()

        self.assertEqual(results, [True] * N)

    @requires_fork
    def test_at_fork_reinit(self):
        # ensure that condition is still using a Lock after reset
        evt = self.eventtype()
        with evt._cond:
            self.assertFalse(evt._cond.acquire(False))
        evt._at_fork_reinit()
        with evt._cond:
            self.assertFalse(evt._cond.acquire(False))

    def test_repr(self):
        evt = self.eventtype()
        self.assertRegex(repr(evt), r"<\w+\.Event at .*: unset>")
        evt.set()
        self.assertRegex(repr(evt), r"<\w+\.Event at .*: set>")


class ConditionTests(BaseTestCase):
    """
    Tests for condition variables.
    """

    def test_acquire(self):
        cond = self.condtype()
        # Be default we have an RLock: the condition can be acquired multiple
        # times.
        cond.acquire()
        cond.acquire()
        cond.release()
        cond.release()
        lock = threading.Lock()
        cond = self.condtype(lock)
        cond.acquire()
        self.assertFalse(lock.acquire(False))
        cond.release()
        self.assertTrue(lock.acquire(False))
        self.assertFalse(cond.acquire(False))
        lock.release()
        with cond:
            self.assertFalse(lock.acquire(False))

    def test_unacquired_wait(self):
        cond = self.condtype()
        self.assertRaises(RuntimeError, cond.wait)

    def test_unacquired_notify(self):
        cond = self.condtype()
        self.assertRaises(RuntimeError, cond.notify)

    def _check_notify(self, cond):
        # Note that this test is sensitive to timing.  If the worker threads
        # don't execute in a timely fashion, the main thread may think they
        # are further along then they are.  The main thread therefore issues
        # wait_threads_blocked() statements to try to make sure that it doesn't
        # race ahead of the workers.
        # Secondly, this test assumes that condition variables are not subject
        # to spurious wakeups.  The absence of spurious wakeups is an implementation
        # detail of Condition Variables in current CPython, but in general, not
        # a guaranteed property of condition variables as a programming
        # construct.  In particular, it is possible that this can no longer
        # be conveniently guaranteed should their implementation ever change.
        ready = []
        results1 = []
        results2 = []
        phase_num = 0
        def f():
            cond.acquire()
            ready.append(phase_num)
            result = cond.wait()

            cond.release()
            results1.append((result, phase_num))

            cond.acquire()
            ready.append(phase_num)

            result = cond.wait()
            cond.release()
            results2.append((result, phase_num))

        N = 5
        with Bunch(f, N):
            # first wait, to ensure all workers settle into cond.wait() before
            # we continue. See issues #8799 and #30727.
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(ready) >= N:
                    break

            ready.clear()
            self.assertEqual(results1, [])

            # Notify 3 threads at first
            count1 = 3
            cond.acquire()
            cond.notify(count1)
            wait_threads_blocked(count1)

            # Phase 1
            phase_num = 1
            cond.release()
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(results1) >= count1:
                    break

            self.assertEqual(results1, [(True, 1)] * count1)
            self.assertEqual(results2, [])

            # Wait until awaken workers are blocked on cond.wait()
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(ready) >= count1 :
                    break

            # Notify 5 threads: they might be in their first or second wait
            cond.acquire()
            cond.notify(5)
            wait_threads_blocked(N)

            # Phase 2
            phase_num = 2
            cond.release()
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(results1) + len(results2) >= (N + count1):
                    break

            count2 = N - count1
            self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
            self.assertEqual(results2, [(True, 2)] * count1)

            # Make sure all workers settle into cond.wait()
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(ready) >= N:
                    break

            # Notify all threads: they are all in their second wait
            cond.acquire()
            cond.notify_all()
            wait_threads_blocked(N)

            # Phase 3
            phase_num = 3
            cond.release()
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(results2) >= N:
                    break
            self.assertEqual(results1, [(True, 1)] * count1 + [(True, 2)] * count2)
            self.assertEqual(results2, [(True, 2)] * count1 + [(True, 3)] * count2)

    def test_notify(self):
        cond = self.condtype()
        self._check_notify(cond)
        # A second time, to check internal state is still ok.
        self._check_notify(cond)

    def test_timeout(self):
        cond = self.condtype()
        timeout = 0.5
        results = []
        def f():
            cond.acquire()
            t1 = time.monotonic()
            result = cond.wait(timeout)
            t2 = time.monotonic()
            cond.release()
            results.append((t2 - t1, result))

        N = 5
        with Bunch(f, N):
            pass
        self.assertEqual(len(results), N)

        for dt, result in results:
            self.assertTimeout(dt, timeout)
            # Note that conceptually (that"s the condition variable protocol)
            # a wait() may succeed even if no one notifies us and before any
            # timeout occurs.  Spurious wakeups can occur.
            # This makes it hard to verify the result value.
            # In practice, this implementation has no spurious wakeups.
            self.assertFalse(result)

    def test_waitfor(self):
        cond = self.condtype()
        state = 0
        def f():
            with cond:
                result = cond.wait_for(lambda: state == 4)
                self.assertTrue(result)
                self.assertEqual(state, 4)

        with Bunch(f, 1):
            for i in range(4):
                time.sleep(0.010)
                with cond:
                    state += 1
                    cond.notify()

    def test_waitfor_timeout(self):
        cond = self.condtype()
        state = 0
        success = []
        def f():
            with cond:
                dt = time.monotonic()
                result = cond.wait_for(lambda : state==4, timeout=0.1)
                dt = time.monotonic() - dt
                self.assertFalse(result)
                self.assertTimeout(dt, 0.1)
                success.append(None)

        with Bunch(f, 1):
            # Only increment 3 times, so state == 4 is never reached.
            for i in range(3):
                time.sleep(0.010)
                with cond:
                    state += 1
                    cond.notify()

        self.assertEqual(len(success), 1)


class BaseSemaphoreTests(BaseTestCase):
    """
    Common tests for {bounded, unbounded} semaphore objects.
    """

    def test_constructor(self):
        self.assertRaises(ValueError, self.semtype, value = -1)
        self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)

    def test_acquire(self):
        sem = self.semtype(1)
        sem.acquire()
        sem.release()
        sem = self.semtype(2)
        sem.acquire()
        sem.acquire()
        sem.release()
        sem.release()

    def test_acquire_destroy(self):
        sem = self.semtype()
        sem.acquire()
        del sem

    def test_acquire_contended(self):
        sem_value = 7
        sem = self.semtype(sem_value)
        sem.acquire()

        sem_results = []
        results1 = []
        results2 = []
        phase_num = 0

        def func():
            sem_results.append(sem.acquire())
            results1.append(phase_num)

            sem_results.append(sem.acquire())
            results2.append(phase_num)

        def wait_count(count):
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(results1) + len(results2) >= count:
                    break

        N = 10
        with Bunch(func, N):
            # Phase 0
            count1 = sem_value - 1
            wait_count(count1)
            self.assertEqual(results1 + results2, [0] * count1)

            # Phase 1
            phase_num = 1
            for i in range(sem_value):
                sem.release()
            count2 = sem_value
            wait_count(count1 + count2)
            self.assertEqual(sorted(results1 + results2),
                             [0] * count1 + [1] * count2)

            # Phase 2
            phase_num = 2
            count3 = (sem_value - 1)
            for i in range(count3):
                sem.release()
            wait_count(count1 + count2 + count3)
            self.assertEqual(sorted(results1 + results2),
                             [0] * count1 + [1] * count2 + [2] * count3)
            # The semaphore is still locked
            self.assertFalse(sem.acquire(False))

            # Final release, to let the last thread finish
            count4 = 1
            sem.release()

        self.assertEqual(sem_results,
                         [True] * (count1 + count2 + count3 + count4))

    def test_multirelease(self):
        sem_value = 7
        sem = self.semtype(sem_value)
        sem.acquire()

        results1 = []
        results2 = []
        phase_num = 0
        def func():
            sem.acquire()
            results1.append(phase_num)

            sem.acquire()
            results2.append(phase_num)

        def wait_count(count):
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if len(results1) + len(results2) >= count:
                    break

        with Bunch(func, 10):
            # Phase 0
            count1 = sem_value - 1
            wait_count(count1)
            self.assertEqual(results1 + results2, [0] * count1)

            # Phase 1
            phase_num = 1
            count2 = sem_value
            sem.release(count2)
            wait_count(count1 + count2)
            self.assertEqual(sorted(results1 + results2),
                             [0] * count1 + [1] * count2)

            # Phase 2
            phase_num = 2
            count3 = sem_value - 1
            sem.release(count3)
            wait_count(count1 + count2 + count3)
            self.assertEqual(sorted(results1 + results2),
                             [0] * count1 + [1] * count2 + [2] * count3)
            # The semaphore is still locked
            self.assertFalse(sem.acquire(False))

            # Final release, to let the last thread finish
            sem.release()

    def test_try_acquire(self):
        sem = self.semtype(2)
        self.assertTrue(sem.acquire(False))
        self.assertTrue(sem.acquire(False))
        self.assertFalse(sem.acquire(False))
        sem.release()
        self.assertTrue(sem.acquire(False))

    def test_try_acquire_contended(self):
        sem = self.semtype(4)
        sem.acquire()
        results = []
        def f():
            results.append(sem.acquire(False))
            results.append(sem.acquire(False))
        with Bunch(f, 5):
            pass
        # There can be a thread switch between acquiring the semaphore and
        # appending the result, therefore results will not necessarily be
        # ordered.
        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )

    def test_acquire_timeout(self):
        sem = self.semtype(2)
        self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
        self.assertTrue(sem.acquire(timeout=0.005))
        self.assertTrue(sem.acquire(timeout=0.005))
        self.assertFalse(sem.acquire(timeout=0.005))
        sem.release()
        self.assertTrue(sem.acquire(timeout=0.005))
        t = time.monotonic()
        self.assertFalse(sem.acquire(timeout=0.5))
        dt = time.monotonic() - t
        self.assertTimeout(dt, 0.5)

    def test_default_value(self):
        # The default initial value is 1.
        sem = self.semtype()
        sem.acquire()
        def f():
            sem.acquire()
            sem.release()

        with Bunch(f, 1) as bunch:
            # Thread blocked on sem.acquire()
            wait_threads_blocked(1)
            self.assertFalse(bunch.finished)

            # Thread unblocked
            sem.release()

    def test_with(self):
        sem = self.semtype(2)
        def _with(err=None):
            with sem:
                self.assertTrue(sem.acquire(False))
                sem.release()
                with sem:
                    self.assertFalse(sem.acquire(False))
                    if err:
                        raise err
        _with()
        self.assertTrue(sem.acquire(False))
        sem.release()
        self.assertRaises(TypeError, _with, TypeError)
        self.assertTrue(sem.acquire(False))
        sem.release()

class SemaphoreTests(BaseSemaphoreTests):
    """
    Tests for unbounded semaphores.
    """

    def test_release_unacquired(self):
        # Unbounded releases are allowed and increment the semaphore's value
        sem = self.semtype(1)
        sem.release()
        sem.acquire()
        sem.acquire()
        sem.release()

    def test_repr(self):
        sem = self.semtype(3)
        self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=3>")
        sem.acquire()
        self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=2>")
        sem.release()
        sem.release()
        self.assertRegex(repr(sem), r"<\w+\.Semaphore at .*: value=4>")


class BoundedSemaphoreTests(BaseSemaphoreTests):
    """
    Tests for bounded semaphores.
    """

    def test_release_unacquired(self):
        # Cannot go past the initial value
        sem = self.semtype()
        self.assertRaises(ValueError, sem.release)
        sem.acquire()
        sem.release()
        self.assertRaises(ValueError, sem.release)

    def test_repr(self):
        sem = self.semtype(3)
        self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=3/3>")
        sem.acquire()
        self.assertRegex(repr(sem), r"<\w+\.BoundedSemaphore at .*: value=2/3>")


class BarrierTests(BaseTestCase):
    """
    Tests for Barrier objects.
    """
    N = 5
    defaultTimeout = 2.0

    def setUp(self):
        self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)

    def tearDown(self):
        self.barrier.abort()

    def run_threads(self, f):
        with Bunch(f, self.N):
            pass

    def multipass(self, results, n):
        m = self.barrier.parties
        self.assertEqual(m, self.N)
        for i in range(n):
            results[0].append(True)
            self.assertEqual(len(results[1]), i * m)
            self.barrier.wait()
            results[1].append(True)
            self.assertEqual(len(results[0]), (i + 1) * m)
            self.barrier.wait()
        self.assertEqual(self.barrier.n_waiting, 0)
        self.assertFalse(self.barrier.broken)

    def test_constructor(self):
        self.assertRaises(ValueError, self.barriertype, parties=0)
        self.assertRaises(ValueError, self.barriertype, parties=-1)

    def test_barrier(self, passes=1):
        """
        Test that a barrier is passed in lockstep
        """
        results = [[],[]]
        def f():
            self.multipass(results, passes)
        self.run_threads(f)

    def test_barrier_10(self):
        """
        Test that a barrier works for 10 consecutive runs
        """
        return self.test_barrier(10)

    def test_wait_return(self):
        """
        test the return value from barrier.wait
        """
        results = []
        def f():
            r = self.barrier.wait()
            results.append(r)

        self.run_threads(f)
        self.assertEqual(sum(results), sum(range(self.N)))

    def test_action(self):
        """
        Test the 'action' callback
        """
        results = []
        def action():
            results.append(True)
        barrier = self.barriertype(self.N, action)
        def f():
            barrier.wait()
            self.assertEqual(len(results), 1)

        self.run_threads(f)

    def test_abort(self):
        """
        Test that an abort will put the barrier in a broken state
        """
        results1 = []
        results2 = []
        def f():
            try:
                i = self.barrier.wait()
                if i == self.N//2:
                    raise RuntimeError
                self.barrier.wait()
                results1.append(True)
            except threading.BrokenBarrierError:
                results2.append(True)
            except RuntimeError:
                self.barrier.abort()
                pass

        self.run_threads(f)
        self.assertEqual(len(results1), 0)
        self.assertEqual(len(results2), self.N-1)
        self.assertTrue(self.barrier.broken)

    def test_reset(self):
        """
        Test that a 'reset' on a barrier frees the waiting threads
        """
        results1 = []
        results2 = []
        results3 = []
        def f():
            i = self.barrier.wait()
            if i == self.N//2:
                # Wait until the other threads are all in the barrier.
                for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                    if self.barrier.n_waiting >= (self.N - 1):
                        break
                self.barrier.reset()
            else:
                try:
                    self.barrier.wait()
                    results1.append(True)
                except threading.BrokenBarrierError:
                    results2.append(True)
            # Now, pass the barrier again
            self.barrier.wait()
            results3.append(True)

        self.run_threads(f)
        self.assertEqual(len(results1), 0)
        self.assertEqual(len(results2), self.N-1)
        self.assertEqual(len(results3), self.N)


    def test_abort_and_reset(self):
        """
        Test that a barrier can be reset after being broken.
        """
        results1 = []
        results2 = []
        results3 = []
        barrier2 = self.barriertype(self.N)
        def f():
            try:
                i = self.barrier.wait()
                if i == self.N//2:
                    raise RuntimeError
                self.barrier.wait()
                results1.append(True)
            except threading.BrokenBarrierError:
                results2.append(True)
            except RuntimeError:
                self.barrier.abort()
                pass
            # Synchronize and reset the barrier.  Must synchronize first so
            # that everyone has left it when we reset, and after so that no
            # one enters it before the reset.
            if barrier2.wait() == self.N//2:
                self.barrier.reset()
            barrier2.wait()
            self.barrier.wait()
            results3.append(True)

        self.run_threads(f)
        self.assertEqual(len(results1), 0)
        self.assertEqual(len(results2), self.N-1)
        self.assertEqual(len(results3), self.N)

    def test_timeout(self):
        """
        Test wait(timeout)
        """
        def f():
            i = self.barrier.wait()
            if i == self.N // 2:
                # One thread is late!
                time.sleep(self.defaultTimeout / 2)
            # Default timeout is 2.0, so this is shorter.
            self.assertRaises(threading.BrokenBarrierError,
                              self.barrier.wait, self.defaultTimeout / 4)
        self.run_threads(f)

    def test_default_timeout(self):
        """
        Test the barrier's default timeout
        """
        timeout = 0.100
        barrier = self.barriertype(2, timeout=timeout)
        def f():
            self.assertRaises(threading.BrokenBarrierError,
                              barrier.wait)

        start_time = time.monotonic()
        with Bunch(f, 1):
            pass
        dt = time.monotonic() - start_time
        self.assertGreaterEqual(dt, timeout)

    def test_single_thread(self):
        b = self.barriertype(1)
        b.wait()
        b.wait()

    def test_repr(self):
        barrier = self.barriertype(3)
        timeout = support.LONG_TIMEOUT
        self.assertRegex(repr(barrier), r"<\w+\.Barrier at .*: waiters=0/3>")
        def f():
            barrier.wait(timeout)

        N = 2
        with Bunch(f, N):
            # Threads blocked on barrier.wait()
            for _ in support.sleeping_retry(support.SHORT_TIMEOUT):
                if barrier.n_waiting >= N:
                    break
            self.assertRegex(repr(barrier),
                             r"<\w+\.Barrier at .*: waiters=2/3>")

            # Threads unblocked
            barrier.wait(timeout)

        self.assertRegex(repr(barrier),
                         r"<\w+\.Barrier at .*: waiters=0/3>")

        # Abort the barrier
        barrier.abort()
        self.assertRegex(repr(barrier),
                         r"<\w+\.Barrier at .*: broken>")