chromium/content/test/data/direct_sockets/tcp.js

'use strict';

const assertEq = (actual, expected) => {
  if (actual !== expected) {
    throw `Expected ${JSON.stringify(expected)}, got ${JSON.stringify(actual)}`;
  }
};

async function writeLoop(writer, requiredBytes) {
  if (!(writer instanceof WritableStreamDefaultWriter))
    return 'write failed: writer is not a WritableStreamDefaultWriter';
  let bytesWritten = 0;
  let chunkLength = 0;
  while (bytesWritten < requiredBytes) {
    chunkLength = Math.min(chunkLength + 1,
                           requiredBytes - bytesWritten);
    let chunk = new Uint8Array(chunkLength);
    for (let index = 0; index < chunkLength; ++index) {
      chunk[index] = bytesWritten % 256;
      ++bytesWritten;
    }
    await writer.ready;
    await writer.write(chunk);
  }
  return 'write succeeded';
}

async function writeTcp(address, port, options, requiredBytes) {
  try {
    let tcpSocket = new TCPSocket(address, port, options);
    let { writable } = await tcpSocket.opened;
    let writer = writable.getWriter();
    return await writeLoop(writer, requiredBytes);
  } catch(error) {
    return ('writeTcp failed: ' + error);
  }
}

async function writeLargeTcpPacket(address, port, size) {
  try {
    let tcpSocket = new TCPSocket(address, port);
    let { writable } = await tcpSocket.opened;
    let writer = writable.getWriter();

    let chunk = new Uint8Array(size);
    for (let index = 0; index < size; ++index) {
      chunk[index] = index % 256;
    }
    await writer.write(chunk);

    return 'writeLargeTcpPacket succeeded';
  } catch(error) {
    return ('writeLargeTcpPacket failed: ' + error);
  }
}

async function readLoop(reader, requiredBytes) {
  if (!(reader instanceof ReadableStreamBYOBReader))
    return 'read failed: reader is not a ReadableStreamBYOBReader';
  let bytesRead = 0;
  let buffer = new ArrayBuffer(requiredBytes);
  while (bytesRead < requiredBytes) {
    const {value: view, done} =
        await reader.read(new Uint8Array(buffer, bytesRead));
    if (done)
      return 'read failed: unexpected stream close';
    if (!view || view.length === 0)
      return 'read failed: no data returned';

    bytesRead += view.byteLength;
    buffer = view.buffer;
  }

  const array = new Uint8Array(buffer);
  for (let index = 0; index < bytesRead; ++index) {
    if (array[index] !== index % 256)
      return 'read failed: bad data returned';
  }

  return 'read succeeded';
}

async function readTcp(address, port, options, requiredBytes) {
  try {
    let tcpSocket = new TCPSocket(address, port, options);
    let { readable } = await tcpSocket.opened;
    let reader = readable.getReader({mode: 'byob'});
    return await readLoop(reader, requiredBytes);
  } catch(error) {
    return ('readTcp failed: ' + error);
  }
}

async function readWriteTcp(address, port, options, requiredBytes) {
  try {
    let tcpSocket = new TCPSocket(address, port, options);
    let { readable, writable } = await tcpSocket.opened;
    let reader = readable.getReader({mode: 'byob'});
    let writer = writable.getWriter();
    let [readResult, writeResult] =
        await Promise.all([readLoop(reader, requiredBytes),
                           writeLoop(writer, requiredBytes)]);
    if (readResult !== 'read succeeded')
      return readResult;
    if (writeResult !== 'write succeeded')
      return writeResult;
    return 'readWrite succeeded';
  } catch(error) {
    return ('readWriteTcp failed: ' + error);
  }
}

async function closeTcp(address, port, options) {
  try {
    let tcpSocket = new TCPSocket(address, port, options);
    let { readable, writable } = await tcpSocket.opened;

    let reader = readable.getReader();
    let writer = writable.getWriter();

    reader.releaseLock();
    writer.releaseLock();

    let closed = tcpSocket.closed.then(() => true, () => false);
    await tcpSocket.close();

    if (await closed) {
      return 'closeTcp succeeded';
    }
  } catch (error) {
    return ('closeTcp failed: ' + error);
  }
}

async function read(reader) {
  return reader.read().then(() => true, () => false);
}

async function write(writer, value) {
  const encoder = new TextEncoder();
  return writer.write(encoder.encode(value)).then(() => true, () => false);
}

async function readTcpOnError(socket, expected_read_success) {
  try {
    let { readable } = await socket.opened;

    let reader = readable.getReader();

    let read_request_success = await read(reader);
    if (read_request_success === expected_read_success) {
      return 'readTcpOnError succeeded.';
    } else {
      throw new TypeError(`read_request_success = ${read_request_success}`);
    }
  } catch (error) {
    return 'readTcpOnError failed: ' + error;
  }
}

async function writeTcpOnError(socket) {
  try {
    let { writable } = await socket.opened;

    let writer = writable.getWriter();

    let write_request_success = await write(writer, '_'.repeat(3));
    if (!write_request_success) {
      return 'writeTcpOnError succeeded.';
    } else {
      throw new TypeError(`write_request_success = ${write_request_success}`);
    }
  } catch (error) {
    return 'writeTcpOnError failed: ' + error;
  }
}

async function readWriteTcpOnError(socket) {
  try {
    let { readable, writable } = await socket.opened;

    let reader = readable.getReader();
    let writer = writable.getWriter();

    let [read_request_success, write_request_success] = await Promise.all([read(reader), write(writer, '_'.repeat(3))]);
    if (!read_request_success && !write_request_success) {
      return 'readWriteTcpOnError succeeded.';
    } else {
      throw new TypeError(`read_request_success = ${read_request_success}, write_request_success = ${write_request_success}`);
    }
  } catch (error) {
    return 'readWriteTcpOnError failed: ' + error;
  }
}

async function waitForClosedPromise(socket, expected_closed_result, cancel_reader = false, close_writer = false) {
  try {
    let { readable, writable } = await socket.opened;

    let reader = readable.getReader();
    let writer = writable.getWriter();

    if (cancel_reader) {
      reader.cancel();
    }

    if (close_writer) {
      writer.close();
    }

    const closed_result = await socket.closed.then(() => true, () => false);

    if (closed_result === expected_closed_result) {
      return 'waitForClosedPromise succeeded.';
    } else {
      throw new TypeError(`closed_result = ${closed_result}, expected_close_result = ${expected_closed_result}`);
    }
  } catch (error) {
    return 'waitForClosedPromise failed: ' + error;
  }
}

async function exchangeSingleTcpPacketBetweenClientAndServer() {
  const kPacket = "I'm a TCP packet. Meow-meow!";

  try {
    // |localPort| is intentionally omitted so that the OS will pick one itself.
    const serverSocket = new TCPServerSocket('127.0.0.1');
    const { localPort: serverSocketPort } = await serverSocket.opened;

    // Connect a client to the server.
    const clientSocket = new TCPSocket('127.0.0.1', serverSocketPort);

    async function acceptOnce() {
      const { readable } = await serverSocket.opened;
      const reader = readable.getReader();
      const { value: acceptedSocket, done } = await reader.read();
      assertEq(done, false);
      reader.releaseLock();
      return acceptedSocket;
    };

    const acceptedSocket = await acceptOnce();
    await clientSocket.opened;

    const encoder = new TextEncoder();
    const decoder = new TextDecoder();

    async function acceptedSocketSend() {
      const { writable } = await acceptedSocket.opened;
      const writer = writable.getWriter();

      await writer.ready;
      await writer.write(encoder.encode(kPacket));

      writer.releaseLock();
    }

    async function clientSocketReceive() {
      const { readable } = await clientSocket.opened;
      const reader = readable.getReader();
      let result = "";
      while (result.length < kPacket.length) {
        const { value, done } = await reader.read();
        assertEq(done, false);
        result += decoder.decode(value);
      }
      reader.releaseLock();
      assertEq(result, kPacket);
    }

    acceptedSocketSend();
    await clientSocketReceive();

    await clientSocket.close();
    await acceptedSocket.close();
    await serverSocket.close();

    return "exchangeSingleTcpPacketBetweenClientAndServer succeeded.";
  } catch (error) {
    return "exchangeSingleTcpPacketBetweenClientAndServer failed: " + error;
  }
}

async function connectToServerWithIPv6Only(ipv6Only, connectionAddress) {
  const serverSocket = new TCPServerSocket('::', { ipv6Only });
  const { localPort } = await serverSocket.opened;

  const clientSocket = new TCPSocket(connectionAddress, localPort);
  return await clientSocket.opened.then(() => true, () => false);
}