cpython/Lib/test/test_free_threading/test_tokenize.py

import io
import time
import unittest
import tokenize
from functools import partial
from threading import Thread

from test.support import threading_helper


@threading_helper.requires_working_threading()
class TestTokenize(unittest.TestCase):
    def test_tokenizer_iter(self):
        source = io.StringIO("for _ in a:\n  pass")
        it = tokenize._tokenize.TokenizerIter(source.readline, extra_tokens=False)

        tokens = []
        def next_token(it):
            while True:
                try:
                    r = next(it)
                    tokens.append(tokenize.TokenInfo._make(r))
                    time.sleep(0.03)
                except StopIteration:
                    return

        threads = []
        for _ in range(5):
            threads.append(Thread(target=partial(next_token, it)))

        for thread in threads:
            thread.start()

        for thread in threads:
            thread.join()

        expected_tokens = [
            tokenize.TokenInfo(type=1, string='for', start=(1, 0), end=(1, 3), line='for _ in a:\n'),
            tokenize.TokenInfo(type=1, string='_', start=(1, 4), end=(1, 5), line='for _ in a:\n'),
            tokenize.TokenInfo(type=1, string='in', start=(1, 6), end=(1, 8), line='for _ in a:\n'),
            tokenize.TokenInfo(type=1, string='a', start=(1, 9), end=(1, 10), line='for _ in a:\n'),
            tokenize.TokenInfo(type=11, string=':', start=(1, 10), end=(1, 11), line='for _ in a:\n'),
            tokenize.TokenInfo(type=4, string='', start=(1, 11), end=(1, 11), line='for _ in a:\n'),
            tokenize.TokenInfo(type=5, string='', start=(2, -1), end=(2, -1), line='  pass'),
            tokenize.TokenInfo(type=1, string='pass', start=(2, 2), end=(2, 6), line='  pass'),
            tokenize.TokenInfo(type=4, string='', start=(2, 6), end=(2, 6), line='  pass'),
            tokenize.TokenInfo(type=6, string='', start=(2, -1), end=(2, -1), line='  pass'),
            tokenize.TokenInfo(type=0, string='', start=(2, -1), end=(2, -1), line='  pass'),
        ]

        tokens.sort()
        expected_tokens.sort()
        self.assertListEqual(tokens, expected_tokens)


if __name__ == "__main__":
    unittest.main()