chromium/native_client_sdk/src/libraries/nacl_io/socket/tcp_node.cc

// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "nacl_io/ossocket.h"
#ifdef PROVIDES_SOCKET_API

#include <assert.h>
#include <errno.h>
#include <string.h>
#include <algorithm>

#include "nacl_io/kernel_handle.h"
#include "nacl_io/log.h"
#include "nacl_io/pepper_interface.h"
#include "nacl_io/socket/tcp_node.h"
#include "nacl_io/stream/stream_fs.h"

namespace {
const size_t kMaxPacketSize = 65536;
const size_t kDefaultFifoSize = kMaxPacketSize * 8;
}

namespace nacl_io {

class TcpWork : public StreamFs::Work {
 public:
  explicit TcpWork(const ScopedTcpEventEmitter& emitter)
      : StreamFs::Work(emitter->stream()->stream()),
        emitter_(emitter),
        data_(NULL) {}

  ~TcpWork() {
    free(data_);
  }

  TCPSocketInterface* TCPInterface() {
    return filesystem()->ppapi()->GetTCPSocketInterface();
  }

 protected:
  ScopedTcpEventEmitter emitter_;
  char* data_;
};

class TcpSendWork : public TcpWork {
 public:
  explicit TcpSendWork(const ScopedTcpEventEmitter& emitter,
                       const ScopedSocketNode& stream)
      : TcpWork(emitter), node_(stream) {}

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());

    // Does the stream exist, and can it send?
    if (!node_->TestStreamFlags(SSF_CAN_SEND))
      return false;

    // Check if we are already sending.
    if (node_->TestStreamFlags(SSF_SENDING))
      return false;

    size_t tx_data_avail = emitter_->BytesInOutputFIFO();
    int capped_len = std::min(tx_data_avail, kMaxPacketSize);
    if (capped_len == 0)
      return false;

    data_ = (char*)malloc(capped_len);
    assert(data_);
    if (data_ == NULL)
      return false;
    emitter_->ReadOut_Locked(data_, capped_len);

    int err = TCPInterface()->Write(node_->socket_resource(),
                                    data_,
                                    capped_len,
                                    filesystem()->GetRunCompletion(this));

    if (err != PP_OK_COMPLETIONPENDING) {
      // Anything else, we should assume the socket has gone bad.
      node_->SetError_Locked(err);
      return false;
    }

    node_->SetStreamFlags(SSF_SENDING);
    return true;
  }

  virtual void Run(int32_t length_error) {
    AUTO_LOCK(emitter_->GetLock());

    if (length_error < 0) {
      // Send failed, mark the socket as bad
      node_->SetError_Locked(length_error);
      return;
    }

    // If we did send, then Q more work.
    node_->ClearStreamFlags(SSF_SENDING);
    node_->QueueOutput();
  }

 private:
  // We assume that transmits will always complete.  If the upstream
  // actually back pressures, enough to prevent the Send callback
  // from triggering, this resource may never go away.
  ScopedSocketNode node_;
};

class TcpRecvWork : public TcpWork {
 public:
  explicit TcpRecvWork(const ScopedTcpEventEmitter& emitter)
      : TcpWork(emitter) {}

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());
    TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());

    // Does the stream exist, and can it recv?
    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
      return false;

    // If we are not currently receiving
    if (stream->TestStreamFlags(SSF_RECVING))
      return false;

    size_t rx_space_avail = emitter_->SpaceInInputFIFO();
    int capped_len =
        static_cast<int32_t>(std::min(rx_space_avail, kMaxPacketSize));

    if (capped_len == 0)
      return false;

    data_ = (char*)malloc(capped_len);
    assert(data_);
    if (data_ == NULL)
      return false;
    int err = TCPInterface()->Read(stream->socket_resource(),
                                   data_,
                                   capped_len,
                                   filesystem()->GetRunCompletion(this));
    if (err != PP_OK_COMPLETIONPENDING) {
      // Anything else, we should assume the socket has gone bad.
      stream->SetError_Locked(err);
      return false;
    }

    stream->SetStreamFlags(SSF_RECVING);
    return true;
  }

  virtual void Run(int32_t length_error) {
    AUTO_LOCK(emitter_->GetLock());
    TcpNode* stream = static_cast<TcpNode*>(emitter_->stream());

    if (!stream)
      return;

    if (length_error < 0) {
      stream->SetError_Locked(length_error);
      return;
    } else if (length_error == 0) {
      stream->SetStreamFlags(SSF_RECV_ENDOFSTREAM);
      emitter_->SetRecvEndOfStream_Locked();
    }

    // If we successfully received, queue more input
    emitter_->WriteIn_Locked(data_, length_error);
    stream->ClearStreamFlags(SSF_RECVING);
    stream->QueueInput();
  }
};

class TCPAcceptWork : public StreamFs::Work {
 public:
  explicit TCPAcceptWork(StreamFs* stream, const ScopedTcpEventEmitter& emitter)
      : StreamFs::Work(stream), emitter_(emitter) {}

  TCPSocketInterface* TCPInterface() {
    return filesystem()->ppapi()->GetTCPSocketInterface();
  }

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());
    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());

    // Does the stream exist, and can it accept?
    if (NULL == node)
      return false;

    // If we are not currently accepting
    if (!node->TestStreamFlags(SSF_LISTENING))
      return false;

    int err = TCPInterface()->Accept(node->socket_resource(),
                                     &new_socket_,
                                     filesystem()->GetRunCompletion(this));

    if (err != PP_OK_COMPLETIONPENDING) {
      // Anything else, we should assume the socket has gone bad.
      node->SetError_Locked(err);
      return false;
    }

    return true;
  }

  virtual void Run(int32_t error) {
    AUTO_LOCK(emitter_->GetLock());
    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());

    if (node == NULL)
      return;

    if (error != PP_OK) {
      node->SetError_Locked(error);
      return;
    }

    emitter_->SetAcceptedSocket_Locked(new_socket_);
  }

 protected:
  PP_Resource new_socket_;
  ScopedTcpEventEmitter emitter_;
};

class TCPConnectWork : public StreamFs::Work {
 public:
  explicit TCPConnectWork(StreamFs* stream,
                          const ScopedTcpEventEmitter& emitter)
      : StreamFs::Work(stream), emitter_(emitter) {}

  TCPSocketInterface* TCPInterface() {
    return filesystem()->ppapi()->GetTCPSocketInterface();
  }

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());
    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());

    // Does the stream exist, and can it connect?
    if (NULL == node)
      return false;

    int err = TCPInterface()->Connect(node->socket_resource(),
                                      node->remote_addr(),
                                      filesystem()->GetRunCompletion(this));
    if (err != PP_OK_COMPLETIONPENDING) {
      // Anything else, we should assume the socket has gone bad.
      node->SetError_Locked(err);
      return false;
    }

    return true;
  }

  virtual void Run(int32_t error) {
    AUTO_LOCK(emitter_->GetLock());
    TcpNode* node = static_cast<TcpNode*>(emitter_->stream());

    if (node == NULL)
      return;

    if (error != PP_OK) {
      node->ConnectFailed_Locked();
      node->SetError_Locked(error);
      return;
    }

    node->ConnectDone_Locked();
  }

 protected:
  ScopedTcpEventEmitter emitter_;
};

TcpNode::TcpNode(Filesystem* filesystem)
    : SocketNode(SOCK_STREAM, filesystem),
      emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
      tcp_nodelay_(false) {
  emitter_->AttachStream(this);
}

TcpNode::TcpNode(Filesystem* filesystem, PP_Resource socket)
    : SocketNode(SOCK_STREAM, filesystem, socket),
      emitter_(new TcpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)),
      tcp_nodelay_(false) {
  emitter_->AttachStream(this);
}

void TcpNode::Destroy() {
  emitter_->DetachStream();
  SocketNode::Destroy();
}

Error TcpNode::Init(int open_flags) {
  Error err = SocketNode::Init(open_flags);
  if (err != 0)
    return err;

  if (TCPInterface() == NULL) {
    LOG_ERROR("Got NULL interface: TCP");
    return EACCES;
  }

  if (socket_resource_ != 0) {
    // TCP sockets that are contructed with an existing socket_resource_
    // are those that generated from calls to Accept() and therefore are
    // already connected.
    remote_addr_ = TCPInterface()->GetRemoteAddress(socket_resource_);
    ConnectDone_Locked();
  } else {
    socket_resource_ =
        TCPInterface()->Create(filesystem_->ppapi()->GetInstance());
    if (0 == socket_resource_) {
      LOG_ERROR("Unable to create TCP resource.");
      return EACCES;
    }
    SetStreamFlags(SSF_CAN_CONNECT);
  }

  return 0;
}

EventEmitter* TcpNode::GetEventEmitter() {
  return emitter_.get();
}

void TcpNode::SetError_Locked(int pp_error_num) {
  SocketNode::SetError_Locked(pp_error_num);
  emitter_->SetError_Locked();
}

Error TcpNode::GetSockOpt(int lvl, int optname, void* optval, socklen_t* len) {
  if (lvl == IPPROTO_TCP && optname == TCP_NODELAY) {
    AUTO_LOCK(node_lock_);
    int value = tcp_nodelay_;
    socklen_t value_len = static_cast<socklen_t>(sizeof(value));
    int copy_bytes = std::min(value_len, *len);
    memcpy(optval, &value, copy_bytes);
    *len = value_len;
    return 0;
  }

  return SocketNode::GetSockOpt(lvl, optname, optval, len);
}

Error TcpNode::SetNoDelay_Locked() {
  if (!IsConnected())
    return 0;

  int32_t error =
      TCPInterface()->SetOption(socket_resource_,
                                PP_TCPSOCKET_OPTION_NO_DELAY,
                                PP_MakeBool(tcp_nodelay_ ? PP_TRUE : PP_FALSE),
                                PP_BlockUntilComplete());
  return PPERROR_TO_ERRNO(error);
}

Error TcpNode::SetSockOptSocket(int optname,
                                const void* optval,
                                socklen_t len) {
  if (static_cast<size_t>(len) < sizeof(int))
    return EINVAL;
  int bufsize = *static_cast<const int*>(optval);

  if (optname == SO_RCVBUF) {
    int32_t error =
        TCPInterface()->SetOption(socket_resource_,
                                  PP_TCPSOCKET_OPTION_RECV_BUFFER_SIZE,
                                  PP_MakeInt32(bufsize),
                                  PP_BlockUntilComplete());
    return PPERROR_TO_ERRNO(error);
  } else if (optname == SO_SNDBUF) {
    int32_t error = TCPInterface()->SetOption(
        socket_resource_, PP_TCPSOCKET_OPTION_SEND_BUFFER_SIZE,
        PP_MakeInt32(bufsize), PP_BlockUntilComplete());
    return PPERROR_TO_ERRNO(error);
  }

  return SocketNode::SetSockOptSocket(optname, optval, len);
}

Error TcpNode::SetSockOptTCP(int optname, const void* optval, socklen_t len) {
  if (optname == TCP_NODELAY) {
    if (static_cast<size_t>(len) < sizeof(int))
      return EINVAL;
    tcp_nodelay_ = *static_cast<const int*>(optval) != 0;
    return SetNoDelay_Locked();
  }

  return SocketNode::SetSockOptTCP(optname, optval, len);
}

void TcpNode::QueueAccept() {
  StreamFs::Work* work = new TCPAcceptWork(stream(), emitter_);
  stream()->EnqueueWork(work);
}

void TcpNode::QueueConnect() {
  StreamFs::Work* work = new TCPConnectWork(stream(), emitter_);
  stream()->EnqueueWork(work);
}

void TcpNode::QueueInput() {
  if (TestStreamFlags(SSF_RECV_ENDOFSTREAM))
    return;

  StreamFs::Work* work = new TcpRecvWork(emitter_);
  stream()->EnqueueWork(work);
}

void TcpNode::QueueOutput() {
  if (TestStreamFlags(SSF_SENDING))
    return;

  if (!TestStreamFlags(SSF_CAN_SEND))
    return;

  if (0 == emitter_->BytesInOutputFIFO())
    return;

  StreamFs::Work* work = new TcpSendWork(emitter_, ScopedSocketNode(this));
  stream()->EnqueueWork(work);
}

Error TcpNode::Accept(const HandleAttr& attr,
                      PP_Resource* out_sock,
                      struct sockaddr* addr,
                      socklen_t* len) {
  EventListenerLock wait(GetEventEmitter());

  if (!TestStreamFlags(SSF_LISTENING))
    return EINVAL;

  // Either block forever or not at all
  int ms = attr.IsBlocking() ? -1 : 0;

  Error err = wait.WaitOnEvent(POLLIN, ms);
  if (ETIMEDOUT == err)
    return EWOULDBLOCK;

  int s = emitter_->GetAcceptedSocket_Locked();
  // Non-blocking case.
  if (s == 0)
    return EAGAIN;

  // Consume the new socket and start listening for the next one
  *out_sock = s;
  emitter_->ClearEvents_Locked(POLLIN);

  // Set the out paramaters
  if (addr && len) {
    PP_Resource remote_addr = TCPInterface()->GetRemoteAddress(*out_sock);
    *len = ResourceToSockAddr(remote_addr, *len, addr);
    filesystem_->ppapi()->ReleaseResource(remote_addr);
  }

  QueueAccept();
  return 0;
}

// We can not bind a client socket with PPAPI.  For now we ignore the
// bind but report the correct address later, just in case someone is
// binding without really caring what the address is (for example to
// select a more optimized interface/route.)
Error TcpNode::Bind(const struct sockaddr* addr, socklen_t len) {
  AUTO_LOCK(node_lock_);

  /* Only bind once. */
  if (IsBound())
    return EINVAL;

  local_addr_ = SockAddrToResource(addr, len);
  int err = TCPInterface()->Bind(
      socket_resource_, local_addr_, PP_BlockUntilComplete());

  // If we fail, release the local addr resource
  if (err != PP_OK) {
    filesystem_->ppapi()->ReleaseResource(local_addr_);
    local_addr_ = 0;
    return PPERROR_TO_ERRNO(err);
  }

  local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);
  return 0;
}

Error TcpNode::Connect(const HandleAttr& attr,
                       const struct sockaddr* addr,
                       socklen_t len) {
  EventListenerLock wait(GetEventEmitter());

  if (TestStreamFlags(SSF_CONNECTING))
    return EALREADY;

  if (IsConnected())
    return EISCONN;

  remote_addr_ = SockAddrToResource(addr, len);
  if (0 == remote_addr_)
    return EINVAL;

  int ms = attr.IsBlocking() ? -1 : 0;

  SetStreamFlags(SSF_CONNECTING);
  QueueConnect();

  Error err = wait.WaitOnEvent(POLLOUT, ms);
  if (ETIMEDOUT == err)
    return EINPROGRESS;

  // If we fail, release the dest addr resource
  if (err != 0) {
    ConnectFailed_Locked();
    return err;
  }

  // Make sure the connection succeeded.
  if (last_errno_ != 0) {
    ConnectFailed_Locked();
    return last_errno_;
  }

  ConnectDone_Locked();
  return 0;
}

Error TcpNode::Shutdown(int how) {
  AUTO_LOCK(node_lock_);
  if (!IsConnected())
    return ENOTCONN;

  {
    AUTO_LOCK(emitter_->GetLock());
    emitter_->SetError_Locked();
  }
  return 0;
}

void TcpNode::ConnectDone_Locked() {
  local_addr_ = TCPInterface()->GetLocalAddress(socket_resource_);

  // Now that we are connected, we can start sending and receiving.
  ClearStreamFlags(SSF_CONNECTING | SSF_CAN_CONNECT);
  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);

  emitter_->ConnectDone_Locked();

  // The NODELAY option cannot be set in PPAPI before the socket
  // is connected, but setsockopt() might have already set it.
  SetNoDelay_Locked();

  // Begin the input pump
  QueueInput();
}

void TcpNode::ConnectFailed_Locked() {
  filesystem_->ppapi()->ReleaseResource(remote_addr_);
  remote_addr_ = 0;
}

Error TcpNode::Listen(int backlog) {
  AUTO_LOCK(node_lock_);
  if (!IsBound())
    return EINVAL;

  int err = TCPInterface()->Listen(
      socket_resource_, backlog, PP_BlockUntilComplete());
  if (err != PP_OK)
    return PPERROR_TO_ERRNO(err);

  ClearStreamFlags(SSF_CAN_CONNECT);
  SetStreamFlags(SSF_LISTENING);
  emitter_->SetListening_Locked();
  QueueAccept();
  return 0;
}

Error TcpNode::Recv_Locked(void* buf,
                           size_t len,
                           PP_Resource* out_addr,
                           int* out_len) {
  assert(emitter_.get());
  *out_len = emitter_->ReadIn_Locked((char*)buf, len);
  *out_addr = remote_addr_;

  // Ref the address copy we pass back.
  filesystem_->ppapi()->AddRefResource(remote_addr_);
  return 0;
}

// TCP ignores dst addr passed to send_to, and always uses bound address
Error TcpNode::Send_Locked(const void* buf,
                           size_t len,
                           PP_Resource,
                           int* out_len) {
  assert(emitter_.get());
  if (emitter_->GetError_Locked())
    return EPIPE;
  *out_len = emitter_->WriteOut_Locked((char*)buf, len);
  return 0;
}

}  // namespace nacl_io

#endif  // PROVIDES_SOCKET_API