cpython/Lib/test/test_asyncio/test_ssl.py

# Contains code from https://github.com/MagicStack/uvloop/tree/v0.16.0
# SPDX-License-Identifier: PSF-2.0 AND (MIT OR Apache-2.0)
# SPDX-FileCopyrightText: Copyright (c) 2015-2021 MagicStack Inc.  http://magic.io

import asyncio
import contextlib
import gc
import logging
import select
import socket
import sys
import tempfile
import threading
import time
import weakref
import unittest

try:
    import ssl
except ImportError:
    ssl = None

from test import support
from test.test_asyncio import utils as test_utils


MACOS = (sys.platform == 'darwin')
BUF_MULTIPLIER = 1024 if not MACOS else 64


def tearDownModule():
    asyncio.set_event_loop_policy(None)


class MyBaseProto(asyncio.Protocol):
    connected = None
    done = None

    def __init__(self, loop=None):
        self.transport = None
        self.state = 'INITIAL'
        self.nbytes = 0
        if loop is not None:
            self.connected = asyncio.Future(loop=loop)
            self.done = asyncio.Future(loop=loop)

    def connection_made(self, transport):
        self.transport = transport
        assert self.state == 'INITIAL', self.state
        self.state = 'CONNECTED'
        if self.connected:
            self.connected.set_result(None)

    def data_received(self, data):
        assert self.state == 'CONNECTED', self.state
        self.nbytes += len(data)

    def eof_received(self):
        assert self.state == 'CONNECTED', self.state
        self.state = 'EOF'

    def connection_lost(self, exc):
        assert self.state in ('CONNECTED', 'EOF'), self.state
        self.state = 'CLOSED'
        if self.done:
            self.done.set_result(None)


class MessageOutFilter(logging.Filter):
    def __init__(self, msg):
        self.msg = msg

    def filter(self, record):
        if self.msg in record.msg:
            return False
        return True


@unittest.skipIf(ssl is None, 'No ssl module')
class TestSSL(test_utils.TestCase):

    PAYLOAD_SIZE = 1024 * 100
    TIMEOUT = support.LONG_TIMEOUT

    def setUp(self):
        super().setUp()
        self.loop = asyncio.new_event_loop()
        self.set_event_loop(self.loop)
        self.addCleanup(self.loop.close)

    def tearDown(self):
        # just in case if we have transport close callbacks
        if not self.loop.is_closed():
            test_utils.run_briefly(self.loop)

        self.doCleanups()
        support.gc_collect()
        super().tearDown()

    def tcp_server(self, server_prog, *,
                   family=socket.AF_INET,
                   addr=None,
                   timeout=support.SHORT_TIMEOUT,
                   backlog=1,
                   max_clients=10):

        if addr is None:
            if family == getattr(socket, "AF_UNIX", None):
                with tempfile.NamedTemporaryFile() as tmp:
                    addr = tmp.name
            else:
                addr = ('127.0.0.1', 0)

        sock = socket.socket(family, socket.SOCK_STREAM)

        if timeout is None:
            raise RuntimeError('timeout is required')
        if timeout <= 0:
            raise RuntimeError('only blocking sockets are supported')
        sock.settimeout(timeout)

        try:
            sock.bind(addr)
            sock.listen(backlog)
        except OSError as ex:
            sock.close()
            raise ex

        return TestThreadedServer(
            self, sock, server_prog, timeout, max_clients)

    def tcp_client(self, client_prog,
                   family=socket.AF_INET,
                   timeout=support.SHORT_TIMEOUT):

        sock = socket.socket(family, socket.SOCK_STREAM)

        if timeout is None:
            raise RuntimeError('timeout is required')
        if timeout <= 0:
            raise RuntimeError('only blocking sockets are supported')
        sock.settimeout(timeout)

        return TestThreadedClient(
            self, sock, client_prog, timeout)

    def unix_server(self, *args, **kwargs):
        return self.tcp_server(*args, family=socket.AF_UNIX, **kwargs)

    def unix_client(self, *args, **kwargs):
        return self.tcp_client(*args, family=socket.AF_UNIX, **kwargs)

    def _create_server_ssl_context(self, certfile, keyfile=None):
        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        sslcontext.options |= ssl.OP_NO_SSLv2
        sslcontext.load_cert_chain(certfile, keyfile)
        return sslcontext

    def _create_client_ssl_context(self, *, disable_verify=True):
        sslcontext = ssl.create_default_context()
        sslcontext.check_hostname = False
        if disable_verify:
            sslcontext.verify_mode = ssl.CERT_NONE
        return sslcontext

    @contextlib.contextmanager
    def _silence_eof_received_warning(self):
        # TODO This warning has to be fixed in asyncio.
        logger = logging.getLogger('asyncio')
        filter = MessageOutFilter('has no effect when using ssl')
        logger.addFilter(filter)
        try:
            yield
        finally:
            logger.removeFilter(filter)

    def _abort_socket_test(self, ex):
        try:
            self.loop.stop()
        finally:
            self.fail(ex)

    def new_loop(self):
        return asyncio.new_event_loop()

    def new_policy(self):
        return asyncio.DefaultEventLoopPolicy()

    async def wait_closed(self, obj):
        if not isinstance(obj, asyncio.StreamWriter):
            return
        try:
            await obj.wait_closed()
        except (BrokenPipeError, ConnectionError):
            pass

    def test_create_server_ssl_1(self):
        CNT = 0           # number of clients that were successful
        TOTAL_CNT = 25    # total number of clients that test will create
        TIMEOUT = support.LONG_TIMEOUT  # timeout for this test

        A_DATA = b'A' * 1024 * BUF_MULTIPLIER
        B_DATA = b'B' * 1024 * BUF_MULTIPLIER

        sslctx = self._create_server_ssl_context(
            test_utils.ONLYCERT, test_utils.ONLYKEY
        )
        client_sslctx = self._create_client_ssl_context()

        clients = []

        async def handle_client(reader, writer):
            nonlocal CNT

            data = await reader.readexactly(len(A_DATA))
            self.assertEqual(data, A_DATA)
            writer.write(b'OK')

            data = await reader.readexactly(len(B_DATA))
            self.assertEqual(data, B_DATA)
            writer.writelines([b'SP', bytearray(b'A'), memoryview(b'M')])

            await writer.drain()
            writer.close()

            CNT += 1

        async def test_client(addr):
            fut = asyncio.Future()

            def prog(sock):
                try:
                    sock.starttls(client_sslctx)
                    sock.connect(addr)
                    sock.send(A_DATA)

                    data = sock.recv_all(2)
                    self.assertEqual(data, b'OK')

                    sock.send(B_DATA)
                    data = sock.recv_all(4)
                    self.assertEqual(data, b'SPAM')

                    sock.close()

                except Exception as ex:
                    self.loop.call_soon_threadsafe(fut.set_exception, ex)
                else:
                    self.loop.call_soon_threadsafe(fut.set_result, None)

            client = self.tcp_client(prog)
            client.start()
            clients.append(client)

            await fut

        async def start_server():
            extras = {}
            extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)

            srv = await asyncio.start_server(
                handle_client,
                '127.0.0.1', 0,
                family=socket.AF_INET,
                ssl=sslctx,
                **extras)

            try:
                srv_socks = srv.sockets
                self.assertTrue(srv_socks)

                addr = srv_socks[0].getsockname()

                tasks = []
                for _ in range(TOTAL_CNT):
                    tasks.append(test_client(addr))

                await asyncio.wait_for(asyncio.gather(*tasks), TIMEOUT)

            finally:
                self.loop.call_soon(srv.close)
                await srv.wait_closed()

        with self._silence_eof_received_warning():
            self.loop.run_until_complete(start_server())

        self.assertEqual(CNT, TOTAL_CNT)

        for client in clients:
            client.stop()

    def test_create_connection_ssl_1(self):
        self.loop.set_exception_handler(None)

        CNT = 0
        TOTAL_CNT = 25

        A_DATA = b'A' * 1024 * BUF_MULTIPLIER
        B_DATA = b'B' * 1024 * BUF_MULTIPLIER

        sslctx = self._create_server_ssl_context(
            test_utils.ONLYCERT,
            test_utils.ONLYKEY
        )
        client_sslctx = self._create_client_ssl_context()

        def server(sock):
            sock.starttls(
                sslctx,
                server_side=True)

            data = sock.recv_all(len(A_DATA))
            self.assertEqual(data, A_DATA)
            sock.send(b'OK')

            data = sock.recv_all(len(B_DATA))
            self.assertEqual(data, B_DATA)
            sock.send(b'SPAM')

            sock.close()

        async def client(addr):
            extras = {}
            extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)

            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='',
                **extras)

            writer.write(A_DATA)
            self.assertEqual(await reader.readexactly(2), b'OK')

            writer.write(B_DATA)
            self.assertEqual(await reader.readexactly(4), b'SPAM')

            nonlocal CNT
            CNT += 1

            writer.close()
            await self.wait_closed(writer)

        async def client_sock(addr):
            sock = socket.socket()
            sock.connect(addr)
            reader, writer = await asyncio.open_connection(
                sock=sock,
                ssl=client_sslctx,
                server_hostname='')

            writer.write(A_DATA)
            self.assertEqual(await reader.readexactly(2), b'OK')

            writer.write(B_DATA)
            self.assertEqual(await reader.readexactly(4), b'SPAM')

            nonlocal CNT
            CNT += 1

            writer.close()
            await self.wait_closed(writer)
            sock.close()

        def run(coro):
            nonlocal CNT
            CNT = 0

            async def _gather(*tasks):
                # trampoline
                return await asyncio.gather(*tasks)

            with self.tcp_server(server,
                                 max_clients=TOTAL_CNT,
                                 backlog=TOTAL_CNT) as srv:
                tasks = []
                for _ in range(TOTAL_CNT):
                    tasks.append(coro(srv.addr))

                self.loop.run_until_complete(_gather(*tasks))

            self.assertEqual(CNT, TOTAL_CNT)

        with self._silence_eof_received_warning():
            run(client)

        with self._silence_eof_received_warning():
            run(client_sock)

    def test_create_connection_ssl_slow_handshake(self):
        client_sslctx = self._create_client_ssl_context()

        # silence error logger
        self.loop.set_exception_handler(lambda *args: None)

        def server(sock):
            try:
                sock.recv_all(1024 * 1024)
            except ConnectionAbortedError:
                pass
            finally:
                sock.close()

        async def client(addr):
            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='',
                ssl_handshake_timeout=1.0)
            writer.close()
            await self.wait_closed(writer)

        with self.tcp_server(server,
                             max_clients=1,
                             backlog=1) as srv:

            with self.assertRaisesRegex(
                    ConnectionAbortedError,
                    r'SSL handshake.*is taking longer'):

                self.loop.run_until_complete(client(srv.addr))

    def test_create_connection_ssl_failed_certificate(self):
        # silence error logger
        self.loop.set_exception_handler(lambda *args: None)

        sslctx = self._create_server_ssl_context(
            test_utils.ONLYCERT,
            test_utils.ONLYKEY
        )
        client_sslctx = self._create_client_ssl_context(disable_verify=False)

        def server(sock):
            try:
                sock.starttls(
                    sslctx,
                    server_side=True)
                sock.connect()
            except (ssl.SSLError, OSError):
                pass
            finally:
                sock.close()

        async def client(addr):
            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='',
                ssl_handshake_timeout=support.SHORT_TIMEOUT)
            writer.close()
            await self.wait_closed(writer)

        with self.tcp_server(server,
                             max_clients=1,
                             backlog=1) as srv:

            with self.assertRaises(ssl.SSLCertVerificationError):
                self.loop.run_until_complete(client(srv.addr))

    def test_ssl_handshake_timeout(self):
        # bpo-29970: Check that a connection is aborted if handshake is not
        # completed in timeout period, instead of remaining open indefinitely
        client_sslctx = test_utils.simple_client_sslcontext()

        # silence error logger
        messages = []
        self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

        server_side_aborted = False

        def server(sock):
            nonlocal server_side_aborted
            try:
                sock.recv_all(1024 * 1024)
            except ConnectionAbortedError:
                server_side_aborted = True
            finally:
                sock.close()

        async def client(addr):
            await asyncio.wait_for(
                self.loop.create_connection(
                    asyncio.Protocol,
                    *addr,
                    ssl=client_sslctx,
                    server_hostname='',
                    ssl_handshake_timeout=10.0),
                0.5)

        with self.tcp_server(server,
                             max_clients=1,
                             backlog=1) as srv:

            with self.assertRaises(asyncio.TimeoutError):
                self.loop.run_until_complete(client(srv.addr))

        self.assertTrue(server_side_aborted)

        # Python issue #23197: cancelling a handshake must not raise an
        # exception or log an error, even if the handshake failed
        self.assertEqual(messages, [])

    def test_ssl_handshake_connection_lost(self):
        # #246: make sure that no connection_lost() is called before
        # connection_made() is called first

        client_sslctx = test_utils.simple_client_sslcontext()

        # silence error logger
        self.loop.set_exception_handler(lambda loop, ctx: None)

        connection_made_called = False
        connection_lost_called = False

        def server(sock):
            sock.recv(1024)
            # break the connection during handshake
            sock.close()

        class ClientProto(asyncio.Protocol):
            def connection_made(self, transport):
                nonlocal connection_made_called
                connection_made_called = True

            def connection_lost(self, exc):
                nonlocal connection_lost_called
                connection_lost_called = True

        async def client(addr):
            await self.loop.create_connection(
                ClientProto,
                *addr,
                ssl=client_sslctx,
                server_hostname=''),

        with self.tcp_server(server,
                             max_clients=1,
                             backlog=1) as srv:

            with self.assertRaises(ConnectionResetError):
                self.loop.run_until_complete(client(srv.addr))

        if connection_lost_called:
            if connection_made_called:
                self.fail("unexpected call to connection_lost()")
            else:
                self.fail("unexpected call to connection_lost() without"
                          "calling connection_made()")
        elif connection_made_called:
            self.fail("unexpected call to connection_made()")

    def test_ssl_connect_accepted_socket(self):
        proto = ssl.PROTOCOL_TLS_SERVER
        server_context = ssl.SSLContext(proto)
        server_context.load_cert_chain(test_utils.ONLYCERT, test_utils.ONLYKEY)
        if hasattr(server_context, 'check_hostname'):
            server_context.check_hostname = False
        server_context.verify_mode = ssl.CERT_NONE

        client_context = ssl.SSLContext(proto)
        if hasattr(server_context, 'check_hostname'):
            client_context.check_hostname = False
        client_context.verify_mode = ssl.CERT_NONE

    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
        loop = self.loop

        class MyProto(MyBaseProto):

            def connection_lost(self, exc):
                super().connection_lost(exc)
                loop.call_soon(loop.stop)

            def data_received(self, data):
                super().data_received(data)
                self.transport.write(expected_response)

        lsock = socket.socket(socket.AF_INET)
        lsock.bind(('127.0.0.1', 0))
        lsock.listen(1)
        addr = lsock.getsockname()

        message = b'test data'
        response = None
        expected_response = b'roger'

        def client():
            nonlocal response
            try:
                csock = socket.socket(socket.AF_INET)
                if client_ssl is not None:
                    csock = client_ssl.wrap_socket(csock)
                csock.connect(addr)
                csock.sendall(message)
                response = csock.recv(99)
                csock.close()
            except Exception as exc:
                print(
                    "Failure in client thread in test_connect_accepted_socket",
                    exc)

        thread = threading.Thread(target=client, daemon=True)
        thread.start()

        conn, _ = lsock.accept()
        proto = MyProto(loop=loop)
        proto.loop = loop

        extras = {}
        if server_ssl:
            extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)

        f = loop.create_task(
            loop.connect_accepted_socket(
                (lambda: proto), conn, ssl=server_ssl,
                **extras))
        loop.run_forever()
        conn.close()
        lsock.close()

        thread.join(1)
        self.assertFalse(thread.is_alive())
        self.assertEqual(proto.state, 'CLOSED')
        self.assertEqual(proto.nbytes, len(message))
        self.assertEqual(response, expected_response)
        tr, _ = f.result()

        if server_ssl:
            self.assertIn('SSL', tr.__class__.__name__)

        tr.close()
        # let it close
        self.loop.run_until_complete(asyncio.sleep(0.1))

    def test_start_tls_client_corrupted_ssl(self):
        self.loop.set_exception_handler(lambda loop, ctx: None)

        sslctx = test_utils.simple_server_sslcontext()
        client_sslctx = test_utils.simple_client_sslcontext()

        def server(sock):
            orig_sock = sock.dup()
            try:
                sock.starttls(
                    sslctx,
                    server_side=True)
                sock.sendall(b'A\n')
                sock.recv_all(1)
                orig_sock.send(b'please corrupt the SSL connection')
            except ssl.SSLError:
                pass
            finally:
                sock.close()
                orig_sock.close()

        async def client(addr):
            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='')

            self.assertEqual(await reader.readline(), b'A\n')
            writer.write(b'B')
            with self.assertRaises(ssl.SSLError):
                await reader.readline()
            writer.close()
            try:
                await self.wait_closed(writer)
            except ssl.SSLError:
                pass
            return 'OK'

        with self.tcp_server(server,
                             max_clients=1,
                             backlog=1) as srv:

            res = self.loop.run_until_complete(client(srv.addr))

        self.assertEqual(res, 'OK')

    def test_start_tls_client_reg_proto_1(self):
        HELLO_MSG = b'1' * self.PAYLOAD_SIZE

        server_context = test_utils.simple_server_sslcontext()
        client_context = test_utils.simple_client_sslcontext()

        def serve(sock):
            sock.settimeout(self.TIMEOUT)

            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.starttls(server_context, server_side=True)

            sock.sendall(b'O')
            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.unwrap()
            sock.close()

        class ClientProto(asyncio.Protocol):
            def __init__(self, on_data, on_eof):
                self.on_data = on_data
                self.on_eof = on_eof
                self.con_made_cnt = 0

            def connection_made(proto, tr):
                proto.con_made_cnt += 1
                # Ensure connection_made gets called only once.
                self.assertEqual(proto.con_made_cnt, 1)

            def data_received(self, data):
                self.on_data.set_result(data)

            def eof_received(self):
                self.on_eof.set_result(True)

        async def client(addr):
            await asyncio.sleep(0.5)

            on_data = self.loop.create_future()
            on_eof = self.loop.create_future()

            tr, proto = await self.loop.create_connection(
                lambda: ClientProto(on_data, on_eof), *addr)

            tr.write(HELLO_MSG)
            new_tr = await self.loop.start_tls(tr, proto, client_context)

            self.assertEqual(await on_data, b'O')
            new_tr.write(HELLO_MSG)
            await on_eof

            new_tr.close()

        with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
            self.loop.run_until_complete(
                asyncio.wait_for(client(srv.addr),
                                 timeout=support.SHORT_TIMEOUT))

    def test_create_connection_memory_leak(self):
        HELLO_MSG = b'1' * self.PAYLOAD_SIZE

        server_context = self._create_server_ssl_context(
            test_utils.ONLYCERT, test_utils.ONLYKEY)
        client_context = self._create_client_ssl_context()

        def serve(sock):
            sock.settimeout(self.TIMEOUT)

            sock.starttls(server_context, server_side=True)

            sock.sendall(b'O')
            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.unwrap()
            sock.close()

        class ClientProto(asyncio.Protocol):
            def __init__(self, on_data, on_eof):
                self.on_data = on_data
                self.on_eof = on_eof
                self.con_made_cnt = 0

            def connection_made(proto, tr):
                # XXX: We assume user stores the transport in protocol
                proto.tr = tr
                proto.con_made_cnt += 1
                # Ensure connection_made gets called only once.
                self.assertEqual(proto.con_made_cnt, 1)

            def data_received(self, data):
                self.on_data.set_result(data)

            def eof_received(self):
                self.on_eof.set_result(True)

        async def client(addr):
            await asyncio.sleep(0.5)

            on_data = self.loop.create_future()
            on_eof = self.loop.create_future()

            tr, proto = await self.loop.create_connection(
                lambda: ClientProto(on_data, on_eof), *addr,
                ssl=client_context)

            self.assertEqual(await on_data, b'O')
            tr.write(HELLO_MSG)
            await on_eof

            tr.close()

        with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
            self.loop.run_until_complete(
                asyncio.wait_for(client(srv.addr),
                                 timeout=support.SHORT_TIMEOUT))

        # No garbage is left for SSL client from loop.create_connection, even
        # if user stores the SSLTransport in corresponding protocol instance
        client_context = weakref.ref(client_context)
        self.assertIsNone(client_context())

    def test_start_tls_client_buf_proto_1(self):
        HELLO_MSG = b'1' * self.PAYLOAD_SIZE

        server_context = test_utils.simple_server_sslcontext()
        client_context = test_utils.simple_client_sslcontext()

        client_con_made_calls = 0

        def serve(sock):
            sock.settimeout(self.TIMEOUT)

            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.starttls(server_context, server_side=True)

            sock.sendall(b'O')
            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.sendall(b'2')
            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.unwrap()
            sock.close()

        class ClientProtoFirst(asyncio.BufferedProtocol):
            def __init__(self, on_data):
                self.on_data = on_data
                self.buf = bytearray(1)

            def connection_made(self, tr):
                nonlocal client_con_made_calls
                client_con_made_calls += 1

            def get_buffer(self, sizehint):
                return self.buf

            def buffer_updated(self, nsize):
                assert nsize == 1
                self.on_data.set_result(bytes(self.buf[:nsize]))

            def eof_received(self):
                pass

        class ClientProtoSecond(asyncio.Protocol):
            def __init__(self, on_data, on_eof):
                self.on_data = on_data
                self.on_eof = on_eof
                self.con_made_cnt = 0

            def connection_made(self, tr):
                nonlocal client_con_made_calls
                client_con_made_calls += 1

            def data_received(self, data):
                self.on_data.set_result(data)

            def eof_received(self):
                self.on_eof.set_result(True)

        async def client(addr):
            await asyncio.sleep(0.5)

            on_data1 = self.loop.create_future()
            on_data2 = self.loop.create_future()
            on_eof = self.loop.create_future()

            tr, proto = await self.loop.create_connection(
                lambda: ClientProtoFirst(on_data1), *addr)

            tr.write(HELLO_MSG)
            new_tr = await self.loop.start_tls(tr, proto, client_context)

            self.assertEqual(await on_data1, b'O')
            new_tr.write(HELLO_MSG)

            new_tr.set_protocol(ClientProtoSecond(on_data2, on_eof))
            self.assertEqual(await on_data2, b'2')
            new_tr.write(HELLO_MSG)
            await on_eof

            new_tr.close()

            # connection_made() should be called only once -- when
            # we establish connection for the first time. Start TLS
            # doesn't call connection_made() on application protocols.
            self.assertEqual(client_con_made_calls, 1)

        with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
            self.loop.run_until_complete(
                asyncio.wait_for(client(srv.addr),
                                 timeout=self.TIMEOUT))

    def test_start_tls_slow_client_cancel(self):
        HELLO_MSG = b'1' * self.PAYLOAD_SIZE

        client_context = test_utils.simple_client_sslcontext()
        server_waits_on_handshake = self.loop.create_future()

        def serve(sock):
            sock.settimeout(self.TIMEOUT)

            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            try:
                self.loop.call_soon_threadsafe(
                    server_waits_on_handshake.set_result, None)
                data = sock.recv_all(1024 * 1024)
            except ConnectionAbortedError:
                pass
            finally:
                sock.close()

        class ClientProto(asyncio.Protocol):
            def __init__(self, on_data, on_eof):
                self.on_data = on_data
                self.on_eof = on_eof
                self.con_made_cnt = 0

            def connection_made(proto, tr):
                proto.con_made_cnt += 1
                # Ensure connection_made gets called only once.
                self.assertEqual(proto.con_made_cnt, 1)

            def data_received(self, data):
                self.on_data.set_result(data)

            def eof_received(self):
                self.on_eof.set_result(True)

        async def client(addr):
            await asyncio.sleep(0.5)

            on_data = self.loop.create_future()
            on_eof = self.loop.create_future()

            tr, proto = await self.loop.create_connection(
                lambda: ClientProto(on_data, on_eof), *addr)

            tr.write(HELLO_MSG)

            await server_waits_on_handshake

            with self.assertRaises(asyncio.TimeoutError):
                await asyncio.wait_for(
                    self.loop.start_tls(tr, proto, client_context),
                    0.5)

        with self.tcp_server(serve, timeout=self.TIMEOUT) as srv:
            self.loop.run_until_complete(
                asyncio.wait_for(client(srv.addr),
                                 timeout=support.SHORT_TIMEOUT))

    def test_start_tls_server_1(self):
        HELLO_MSG = b'1' * self.PAYLOAD_SIZE

        server_context = test_utils.simple_server_sslcontext()
        client_context = test_utils.simple_client_sslcontext()

        def client(sock, addr):
            sock.settimeout(self.TIMEOUT)

            sock.connect(addr)
            data = sock.recv_all(len(HELLO_MSG))
            self.assertEqual(len(data), len(HELLO_MSG))

            sock.starttls(client_context)
            sock.sendall(HELLO_MSG)

            sock.unwrap()
            sock.close()

        class ServerProto(asyncio.Protocol):
            def __init__(self, on_con, on_eof, on_con_lost):
                self.on_con = on_con
                self.on_eof = on_eof
                self.on_con_lost = on_con_lost
                self.data = b''

            def connection_made(self, tr):
                self.on_con.set_result(tr)

            def data_received(self, data):
                self.data += data

            def eof_received(self):
                self.on_eof.set_result(1)

            def connection_lost(self, exc):
                if exc is None:
                    self.on_con_lost.set_result(None)
                else:
                    self.on_con_lost.set_exception(exc)

        async def main(proto, on_con, on_eof, on_con_lost):
            tr = await on_con
            tr.write(HELLO_MSG)

            self.assertEqual(proto.data, b'')

            new_tr = await self.loop.start_tls(
                tr, proto, server_context,
                server_side=True,
                ssl_handshake_timeout=self.TIMEOUT)

            await on_eof
            await on_con_lost
            self.assertEqual(proto.data, HELLO_MSG)
            new_tr.close()

        async def run_main():
            on_con = self.loop.create_future()
            on_eof = self.loop.create_future()
            on_con_lost = self.loop.create_future()
            proto = ServerProto(on_con, on_eof, on_con_lost)

            server = await self.loop.create_server(
                lambda: proto, '127.0.0.1', 0)
            addr = server.sockets[0].getsockname()

            with self.tcp_client(lambda sock: client(sock, addr),
                                 timeout=self.TIMEOUT):
                await asyncio.wait_for(
                    main(proto, on_con, on_eof, on_con_lost),
                    timeout=self.TIMEOUT)

            server.close()
            await server.wait_closed()

        self.loop.run_until_complete(run_main())

    def test_create_server_ssl_over_ssl(self):
        CNT = 0           # number of clients that were successful
        TOTAL_CNT = 25    # total number of clients that test will create
        TIMEOUT = support.LONG_TIMEOUT  # timeout for this test

        A_DATA = b'A' * 1024 * BUF_MULTIPLIER
        B_DATA = b'B' * 1024 * BUF_MULTIPLIER

        sslctx_1 = self._create_server_ssl_context(
            test_utils.ONLYCERT, test_utils.ONLYKEY)
        client_sslctx_1 = self._create_client_ssl_context()
        sslctx_2 = self._create_server_ssl_context(
            test_utils.ONLYCERT, test_utils.ONLYKEY)
        client_sslctx_2 = self._create_client_ssl_context()

        clients = []

        async def handle_client(reader, writer):
            nonlocal CNT

            data = await reader.readexactly(len(A_DATA))
            self.assertEqual(data, A_DATA)
            writer.write(b'OK')

            data = await reader.readexactly(len(B_DATA))
            self.assertEqual(data, B_DATA)
            writer.writelines([b'SP', bytearray(b'A'), memoryview(b'M')])

            await writer.drain()
            writer.close()

            CNT += 1

        class ServerProtocol(asyncio.StreamReaderProtocol):
            def connection_made(self, transport):
                super_ = super()
                transport.pause_reading()
                fut = self._loop.create_task(self._loop.start_tls(
                    transport, self, sslctx_2, server_side=True))

                def cb(_):
                    try:
                        tr = fut.result()
                    except Exception as ex:
                        super_.connection_lost(ex)
                    else:
                        super_.connection_made(tr)
                fut.add_done_callback(cb)

        def server_protocol_factory():
            reader = asyncio.StreamReader()
            protocol = ServerProtocol(reader, handle_client)
            return protocol

        async def test_client(addr):
            fut = asyncio.Future()

            def prog(sock):
                try:
                    sock.connect(addr)
                    sock.starttls(client_sslctx_1)

                    # because wrap_socket() doesn't work correctly on
                    # SSLSocket, we have to do the 2nd level SSL manually
                    incoming = ssl.MemoryBIO()
                    outgoing = ssl.MemoryBIO()
                    sslobj = client_sslctx_2.wrap_bio(incoming, outgoing)

                    def do(func, *args):
                        while True:
                            try:
                                rv = func(*args)
                                break
                            except ssl.SSLWantReadError:
                                if outgoing.pending:
                                    sock.send(outgoing.read())
                                incoming.write(sock.recv(65536))
                        if outgoing.pending:
                            sock.send(outgoing.read())
                        return rv

                    do(sslobj.do_handshake)

                    do(sslobj.write, A_DATA)
                    data = do(sslobj.read, 2)
                    self.assertEqual(data, b'OK')

                    do(sslobj.write, B_DATA)
                    data = b''
                    while True:
                        chunk = do(sslobj.read, 4)
                        if not chunk:
                            break
                        data += chunk
                    self.assertEqual(data, b'SPAM')

                    do(sslobj.unwrap)
                    sock.close()

                except Exception as ex:
                    self.loop.call_soon_threadsafe(fut.set_exception, ex)
                    sock.close()
                else:
                    self.loop.call_soon_threadsafe(fut.set_result, None)

            client = self.tcp_client(prog)
            client.start()
            clients.append(client)

            await fut

        async def start_server():
            extras = {}

            srv = await self.loop.create_server(
                server_protocol_factory,
                '127.0.0.1', 0,
                family=socket.AF_INET,
                ssl=sslctx_1,
                **extras)

            try:
                srv_socks = srv.sockets
                self.assertTrue(srv_socks)

                addr = srv_socks[0].getsockname()

                tasks = []
                for _ in range(TOTAL_CNT):
                    tasks.append(test_client(addr))

                await asyncio.wait_for(asyncio.gather(*tasks), TIMEOUT)

            finally:
                self.loop.call_soon(srv.close)
                await srv.wait_closed()

        with self._silence_eof_received_warning():
            self.loop.run_until_complete(start_server())

        self.assertEqual(CNT, TOTAL_CNT)

        for client in clients:
            client.stop()

    def test_shutdown_cleanly(self):
        CNT = 0
        TOTAL_CNT = 25

        A_DATA = b'A' * 1024 * BUF_MULTIPLIER

        sslctx = self._create_server_ssl_context(
            test_utils.ONLYCERT, test_utils.ONLYKEY)
        client_sslctx = self._create_client_ssl_context()

        def server(sock):
            sock.starttls(
                sslctx,
                server_side=True)

            data = sock.recv_all(len(A_DATA))
            self.assertEqual(data, A_DATA)
            sock.send(b'OK')

            sock.unwrap()

            sock.close()

        async def client(addr):
            extras = {}
            extras = dict(ssl_handshake_timeout=support.SHORT_TIMEOUT)

            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='',
                **extras)

            writer.write(A_DATA)
            self.assertEqual(await reader.readexactly(2), b'OK')

            self.assertEqual(await reader.read(), b'')

            nonlocal CNT
            CNT += 1

            writer.close()
            await self.wait_closed(writer)

        def run(coro):
            nonlocal CNT
            CNT = 0

            async def _gather(*tasks):
                return await asyncio.gather(*tasks)

            with self.tcp_server(server,
                                 max_clients=TOTAL_CNT,
                                 backlog=TOTAL_CNT) as srv:
                tasks = []
                for _ in range(TOTAL_CNT):
                    tasks.append(coro(srv.addr))

                self.loop.run_until_complete(
                    _gather(*tasks))

            self.assertEqual(CNT, TOTAL_CNT)

        with self._silence_eof_received_warning():
            run(client)

    def test_flush_before_shutdown(self):
        CHUNK = 1024 * 128
        SIZE = 32

        sslctx = self._create_server_ssl_context(
            test_utils.ONLYCERT, test_utils.ONLYKEY)
        client_sslctx = self._create_client_ssl_context()

        future = None

        def server(sock):
            sock.starttls(sslctx, server_side=True)
            self.assertEqual(sock.recv_all(4), b'ping')
            sock.send(b'pong')
            time.sleep(0.5)  # hopefully stuck the TCP buffer
            data = sock.recv_all(CHUNK * SIZE)
            self.assertEqual(len(data), CHUNK * SIZE)
            sock.close()

        def run(meth):
            def wrapper(sock):
                try:
                    meth(sock)
                except Exception as ex:
                    self.loop.call_soon_threadsafe(future.set_exception, ex)
                else:
                    self.loop.call_soon_threadsafe(future.set_result, None)
            return wrapper

        async def client(addr):
            nonlocal future
            future = self.loop.create_future()
            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='')
            sslprotocol = writer.transport._ssl_protocol
            writer.write(b'ping')
            data = await reader.readexactly(4)
            self.assertEqual(data, b'pong')

            sslprotocol.pause_writing()
            for _ in range(SIZE):
                writer.write(b'x' * CHUNK)

            writer.close()
            sslprotocol.resume_writing()

            await self.wait_closed(writer)
            try:
                data = await reader.read()
                self.assertEqual(data, b'')
            except ConnectionResetError:
                pass
            await future

        with self.tcp_server(run(server)) as srv:
            self.loop.run_until_complete(client(srv.addr))

    def test_remote_shutdown_receives_trailing_data(self):
        CHUNK = 1024 * 128
        SIZE = 32

        sslctx = self._create_server_ssl_context(
            test_utils.ONLYCERT,
            test_utils.ONLYKEY
        )
        client_sslctx = self._create_client_ssl_context()
        future = None

        def server(sock):
            incoming = ssl.MemoryBIO()
            outgoing = ssl.MemoryBIO()
            sslobj = sslctx.wrap_bio(incoming, outgoing, server_side=True)

            while True:
                try:
                    sslobj.do_handshake()
                except ssl.SSLWantReadError:
                    if outgoing.pending:
                        sock.send(outgoing.read())
                    incoming.write(sock.recv(16384))
                else:
                    if outgoing.pending:
                        sock.send(outgoing.read())
                    break

            while True:
                try:
                    data = sslobj.read(4)
                except ssl.SSLWantReadError:
                    incoming.write(sock.recv(16384))
                else:
                    break

            self.assertEqual(data, b'ping')
            sslobj.write(b'pong')
            sock.send(outgoing.read())

            time.sleep(0.2)  # wait for the peer to fill its backlog

            # send close_notify but don't wait for response
            with self.assertRaises(ssl.SSLWantReadError):
                sslobj.unwrap()
            sock.send(outgoing.read())

            # should receive all data
            data_len = 0
            while True:
                try:
                    chunk = len(sslobj.read(16384))
                    data_len += chunk
                except ssl.SSLWantReadError:
                    incoming.write(sock.recv(16384))
                except ssl.SSLZeroReturnError:
                    break

            self.assertEqual(data_len, CHUNK * SIZE)

            # verify that close_notify is received
            sslobj.unwrap()

            sock.close()

        def eof_server(sock):
            sock.starttls(sslctx, server_side=True)
            self.assertEqual(sock.recv_all(4), b'ping')
            sock.send(b'pong')

            time.sleep(0.2)  # wait for the peer to fill its backlog

            # send EOF
            sock.shutdown(socket.SHUT_WR)

            # should receive all data
            data = sock.recv_all(CHUNK * SIZE)
            self.assertEqual(len(data), CHUNK * SIZE)

            sock.close()

        async def client(addr):
            nonlocal future
            future = self.loop.create_future()

            reader, writer = await asyncio.open_connection(
                *addr,
                ssl=client_sslctx,
                server_hostname='')
            writer.write(b'ping')
            data = await reader.readexactly(4)
            self.assertEqual(data, b'pong')

            # fill write backlog in a hacky way - renegotiation won't help
            for _ in range(SIZE):
                writer.transport._test__append_write_backlog(b'x' * CHUNK)

            try:
                data = await reader.read()
                self.assertEqual(data, b'')
            except (BrokenPipeError, ConnectionResetError):
                pass

            await future

            writer.close()
            await self.wait_closed(writer)

        def run(meth):
            def wrapper(sock):
                try:
                    meth(sock)
                except Exception as ex:
                    self.loop.call_soon_threadsafe(future.set_exception, ex)
                else:
                    self.loop.call_soon_threadsafe(future.set_result, None)
            return wrapper

        with self.tcp_server(run(server)) as srv:
            self.loop.run_until_complete(client(srv.addr))

        with self.tcp_server(run(eof_server)) as srv:
            self.loop.run_until_complete(client(srv.addr))

    def test_connect_timeout_warning(self):
        s = socket.socket(socket.AF_INET)
        s.bind(('127.0.0.1', 0))
        addr = s.getsockname()

        async def test():
            try:
                await asyncio.wait_for(
                    self.loop.create_connection(asyncio.Protocol,
                                                *addr, ssl=True),
                    0.1)
            except (ConnectionRefusedError, asyncio.TimeoutError):
                pass
            else:
                self.fail('TimeoutError is not raised')

        with s:
            try:
                with self.assertWarns(ResourceWarning) as cm:
                    self.loop.run_until_complete(test())
                    gc.collect()
                    gc.collect()
                    gc.collect()
            except AssertionError as e:
                self.assertEqual(str(e), 'ResourceWarning not triggered')
            else:
                self.fail('Unexpected ResourceWarning: {}'.format(cm.warning))

    def test_handshake_timeout_handler_leak(self):
        s = socket.socket(socket.AF_INET)
        s.bind(('127.0.0.1', 0))
        s.listen(1)
        addr = s.getsockname()

        async def test(ctx):
            try:
                await asyncio.wait_for(
                    self.loop.create_connection(asyncio.Protocol, *addr,
                                                ssl=ctx),
                    0.1)
            except (ConnectionRefusedError, asyncio.TimeoutError):
                pass
            else:
                self.fail('TimeoutError is not raised')

        with s:
            ctx = ssl.create_default_context()
            self.loop.run_until_complete(test(ctx))
            ctx = weakref.ref(ctx)

        # SSLProtocol should be DECREF to 0
        self.assertIsNone(ctx())

    def test_shutdown_timeout_handler_leak(self):
        loop = self.loop

        def server(sock):
            sslctx = self._create_server_ssl_context(
                test_utils.ONLYCERT,
                test_utils.ONLYKEY
            )
            sock = sslctx.wrap_socket(sock, server_side=True)
            sock.recv(32)
            sock.close()

        class Protocol(asyncio.Protocol):
            def __init__(self):
                self.fut = asyncio.Future(loop=loop)

            def connection_lost(self, exc):
                self.fut.set_result(None)

        async def client(addr, ctx):
            tr, pr = await loop.create_connection(Protocol, *addr, ssl=ctx)
            tr.close()
            await pr.fut

        with self.tcp_server(server) as srv:
            ctx = self._create_client_ssl_context()
            loop.run_until_complete(client(srv.addr, ctx))
            ctx = weakref.ref(ctx)

        # asyncio has no shutdown timeout, but it ends up with a circular
        # reference loop - not ideal (introduces gc glitches), but at least
        # not leaking
        gc.collect()
        gc.collect()
        gc.collect()

        # SSLProtocol should be DECREF to 0
        self.assertIsNone(ctx())

    def test_shutdown_timeout_handler_not_set(self):
        loop = self.loop
        eof = asyncio.Event()
        extra = None

        def server(sock):
            sslctx = self._create_server_ssl_context(
                test_utils.ONLYCERT,
                test_utils.ONLYKEY
            )
            sock = sslctx.wrap_socket(sock, server_side=True)
            sock.send(b'hello')
            assert sock.recv(1024) == b'world'
            sock.send(b'extra bytes')
            # sending EOF here
            sock.shutdown(socket.SHUT_WR)
            loop.call_soon_threadsafe(eof.set)
            # make sure we have enough time to reproduce the issue
            assert sock.recv(1024) == b''
            sock.close()

        class Protocol(asyncio.Protocol):
            def __init__(self):
                self.fut = asyncio.Future(loop=loop)
                self.transport = None

            def connection_made(self, transport):
                self.transport = transport

            def data_received(self, data):
                if data == b'hello':
                    self.transport.write(b'world')
                    # pause reading would make incoming data stay in the sslobj
                    self.transport.pause_reading()
                else:
                    nonlocal extra
                    extra = data

            def connection_lost(self, exc):
                if exc is None:
                    self.fut.set_result(None)
                else:
                    self.fut.set_exception(exc)

        async def client(addr):
            ctx = self._create_client_ssl_context()
            tr, pr = await loop.create_connection(Protocol, *addr, ssl=ctx)
            await eof.wait()
            tr.resume_reading()
            await pr.fut
            tr.close()
            assert extra == b'extra bytes'

        with self.tcp_server(server) as srv:
            loop.run_until_complete(client(srv.addr))


###############################################################################
# Socket Testing Utilities
###############################################################################


class TestSocketWrapper:

    def __init__(self, sock):
        self.__sock = sock

    def recv_all(self, n):
        buf = b''
        while len(buf) < n:
            data = self.recv(n - len(buf))
            if data == b'':
                raise ConnectionAbortedError
            buf += data
        return buf

    def starttls(self, ssl_context, *,
                 server_side=False,
                 server_hostname=None,
                 do_handshake_on_connect=True):

        assert isinstance(ssl_context, ssl.SSLContext)

        ssl_sock = ssl_context.wrap_socket(
            self.__sock, server_side=server_side,
            server_hostname=server_hostname,
            do_handshake_on_connect=do_handshake_on_connect)

        if server_side:
            ssl_sock.do_handshake()

        self.__sock.close()
        self.__sock = ssl_sock

    def __getattr__(self, name):
        return getattr(self.__sock, name)

    def __repr__(self):
        return '<{} {!r}>'.format(type(self).__name__, self.__sock)


class SocketThread(threading.Thread):

    def stop(self):
        self._active = False
        self.join()

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, *exc):
        self.stop()


class TestThreadedClient(SocketThread):

    def __init__(self, test, sock, prog, timeout):
        threading.Thread.__init__(self, None, None, 'test-client')
        self.daemon = True

        self._timeout = timeout
        self._sock = sock
        self._active = True
        self._prog = prog
        self._test = test

    def run(self):
        try:
            self._prog(TestSocketWrapper(self._sock))
        except (KeyboardInterrupt, SystemExit):
            raise
        except BaseException as ex:
            self._test._abort_socket_test(ex)


class TestThreadedServer(SocketThread):

    def __init__(self, test, sock, prog, timeout, max_clients):
        threading.Thread.__init__(self, None, None, 'test-server')
        self.daemon = True

        self._clients = 0
        self._finished_clients = 0
        self._max_clients = max_clients
        self._timeout = timeout
        self._sock = sock
        self._active = True

        self._prog = prog

        self._s1, self._s2 = socket.socketpair()
        self._s1.setblocking(False)

        self._test = test

    def stop(self):
        try:
            if self._s2 and self._s2.fileno() != -1:
                try:
                    self._s2.send(b'stop')
                except OSError:
                    pass
        finally:
            super().stop()

    def run(self):
        try:
            with self._sock:
                self._sock.setblocking(False)
                self._run()
        finally:
            self._s1.close()
            self._s2.close()

    def _run(self):
        while self._active:
            if self._clients >= self._max_clients:
                return

            r, w, x = select.select(
                [self._sock, self._s1], [], [], self._timeout)

            if self._s1 in r:
                return

            if self._sock in r:
                try:
                    conn, addr = self._sock.accept()
                except BlockingIOError:
                    continue
                except socket.timeout:
                    if not self._active:
                        return
                    else:
                        raise
                else:
                    self._clients += 1
                    conn.settimeout(self._timeout)
                    try:
                        with conn:
                            self._handle_client(conn)
                    except (KeyboardInterrupt, SystemExit):
                        raise
                    except BaseException as ex:
                        self._active = False
                        try:
                            raise
                        finally:
                            self._test._abort_socket_test(ex)

    def _handle_client(self, sock):
        self._prog(TestSocketWrapper(sock))

    @property
    def addr(self):
        return self._sock.getsockname()