chromium/native_client_sdk/src/libraries/nacl_io/socket/udp_node.cc

// Copyright 2013 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "nacl_io/socket/udp_node.h"

#include <errno.h>
#include <string.h>

#include <algorithm>

#include "nacl_io/log.h"
#include "nacl_io/pepper_interface.h"
#include "nacl_io/socket/packet.h"
#include "nacl_io/socket/udp_event_emitter.h"
#include "nacl_io/stream/stream_fs.h"

namespace {
const size_t kMaxPacketSize = 65536;
const size_t kDefaultFifoSize = kMaxPacketSize * 8;
}

namespace nacl_io {

class UdpWork : public StreamFs::Work {
 public:
  explicit UdpWork(const ScopedUdpEventEmitter& emitter)
      : StreamFs::Work(emitter->stream()->stream()),
        emitter_(emitter),
        packet_(NULL) {}

  ~UdpWork() { delete packet_; }

  UDPSocketInterface* UDPInterface() {
    return filesystem()->ppapi()->GetUDPSocketInterface();
  }

 protected:
  ScopedUdpEventEmitter emitter_;
  Packet* packet_;
};

class UdpSendWork : public UdpWork {
 public:
  explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
                       const ScopedSocketNode& node)
      : UdpWork(emitter), node_(node) {}

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());

    // Does the stream exist, and can it send?
    if (!node_->TestStreamFlags(SSF_CAN_SEND))
      return false;

    // Check if we are already sending.
    if (node_->TestStreamFlags(SSF_SENDING))
      return false;

    // If this is a retry packet, packet_ will be already set
    // and we don't need to dequeue from emitter_.
    if (NULL == packet_) {
      packet_ = emitter_->ReadTXPacket_Locked();
      if (NULL == packet_)
        return false;
    }

    int err = UDPInterface()->SendTo(node_->socket_resource(),
                                     packet_->buffer(),
                                     packet_->len(),
                                     packet_->addr(),
                                     filesystem()->GetRunCompletion(this));
    if (err != PP_OK_COMPLETIONPENDING) {
      // Anything else, we should assume the socket has gone bad.
      node_->SetError_Locked(err);
      return false;
    }

    node_->SetStreamFlags(SSF_SENDING);
    return true;
  }

  virtual void Run(int32_t length_error) {
    AUTO_LOCK(emitter_->GetLock());

    if (length_error < 0) {
      if (length_error == PP_ERROR_INPROGRESS) {
        // We need to retry this packet later.
        node_->ClearStreamFlags(SSF_SENDING);
        node_->stream()->EnqueueWork(this);
        return;
      }
      node_->SetError_Locked(length_error);
      return;
    }

    // If we did send, then Q more work.
    node_->ClearStreamFlags(SSF_SENDING);
    node_->QueueOutput();
  }

 private:
  // We assume that transmits will always complete.  If the upstream
  // actually back pressures, enough to prevent the Send callback
  // from triggering, this resource may never go away.
  ScopedSocketNode node_;
};

class UdpRecvWork : public UdpWork {
 public:
  explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
      : UdpWork(emitter) {
  }

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());
    UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());

    // Does the stream exist, and can it recv?
    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
      return false;

    // Check if we are already receiving.
    if (stream->TestStreamFlags(SSF_RECVING))
      return false;

    stream->SetStreamFlags(SSF_RECVING);
    int err = UDPInterface()->RecvFrom(stream->socket_resource(),
                                       data_,
                                       kMaxPacketSize,
                                       &addr_,
                                       filesystem()->GetRunCompletion(this));
    if (err != PP_OK_COMPLETIONPENDING) {
      stream->SetError_Locked(err);
      return false;
    }

    return true;
  }

  virtual void Run(int32_t length_error) {
    AUTO_LOCK(emitter_->GetLock());
    UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
    if (NULL == stream)
      return;

    // On successful receive we queue more input
    if (length_error > 0) {
      Packet* packet = new Packet(filesystem()->ppapi());
      packet->Copy(data_, length_error, addr_);
      filesystem()->ppapi()->ReleaseResource(addr_);
      emitter_->WriteRXPacket_Locked(packet);
      stream->ClearStreamFlags(SSF_RECVING);
      stream->QueueInput();
    } else {
      stream->SetError_Locked(length_error);
    }
  }

 private:
  char data_[kMaxPacketSize];
  PP_Resource addr_;
};

UdpNode::UdpNode(Filesystem* filesystem)
    : SocketNode(SOCK_DGRAM, filesystem),
      emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
  emitter_->AttachStream(this);
}

void UdpNode::Destroy() {
  emitter_->DetachStream();
  SocketNode::Destroy();
}

UdpEventEmitter* UdpNode::GetEventEmitter() {
  return emitter_.get();
}

Error UdpNode::Init(int open_flags) {
  Error err = SocketNode::Init(open_flags);
  if (err != 0)
    return err;

  if (UDPInterface() == NULL) {
    LOG_ERROR("Got NULL interface: UDP");
    return EACCES;
  }

  socket_resource_ =
      UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
  if (0 == socket_resource_) {
    LOG_ERROR("Unable to create UDP resource.");
    return EACCES;
  }

  return 0;
}

void UdpNode::QueueInput() {
  UdpRecvWork* work = new UdpRecvWork(emitter_);
  stream()->EnqueueWork(work);
}

void UdpNode::QueueOutput() {
  if (!TestStreamFlags(SSF_CAN_SEND))
    return;

  if (TestStreamFlags(SSF_SENDING))
    return;

  UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
  stream()->EnqueueWork(work);
}

Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
  if (0 == socket_resource_)
    return EBADF;

  /* Only bind once. */
  if (IsBound())
    return EINVAL;

  PP_Resource out_addr = SockAddrToResource(addr, len);
  if (0 == out_addr)
    return EINVAL;

  int err =
      UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
  filesystem_->ppapi()->ReleaseResource(out_addr);
  if (err != 0)
    return PPERROR_TO_ERRNO(err);

  // Get the address that was actually bound (in case addr was 0.0.0.0:0).
  out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
  if (out_addr == 0)
    return EINVAL;

  // Now that we are bound, we can start sending and receiving.
  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
  QueueInput();

  local_addr_ = out_addr;
  return 0;
}

Error UdpNode::Connect(const HandleAttr& attr,
                       const struct sockaddr* addr,
                       socklen_t len) {
  if (0 == socket_resource_)
    return EBADF;

  /* Connect for UDP is the default dest, it's legal to change it. */
  if (remote_addr_ != 0) {
    filesystem_->ppapi()->ReleaseResource(remote_addr_);
    remote_addr_ = 0;
  }

  remote_addr_ = SockAddrToResource(addr, len);
  if (0 == remote_addr_)
    return EINVAL;

  return 0;
}

Error UdpNode::Recv_Locked(void* buf,
                           size_t len,
                           PP_Resource* out_addr,
                           int* out_len) {
  Packet* packet = emitter_->ReadRXPacket_Locked();
  *out_len = 0;
  *out_addr = 0;

  if (packet) {
    int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
    memcpy(buf, packet->buffer(), capped_len);

    if (packet->addr() != 0) {
      filesystem_->ppapi()->AddRefResource(packet->addr());
      *out_addr = packet->addr();
    }

    *out_len = capped_len;
    delete packet;
    return 0;
  }

  // Should never happen, Recv_Locked should not be called
  // unless already in a POLLIN state.
  return EBADF;
}

Error UdpNode::Send_Locked(const void* buf,
                           size_t len,
                           PP_Resource addr,
                           int* out_len) {
  if (!IsBound()) {
    // Pepper requires a socket to be bound before it can send.
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = 0;
    memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
    Error err =
        Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
    if (err != 0)
      return err;
  }

  *out_len = 0;
  int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
  Packet* packet = new Packet(filesystem_->ppapi());
  packet->Copy(buf, capped_len, addr);

  emitter_->WriteTXPacket_Locked(packet);
  *out_len = capped_len;
  return 0;
}

Error UdpNode::SetSockOptSocket(int optname,
                                const void* optval,
                                socklen_t len) {
  if (static_cast<size_t>(len) < sizeof(int))
    return EINVAL;

  switch (optname) {
    case SO_RCVBUF: {
      int bufsize = *static_cast<const int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_RECV_BUFFER_SIZE,
          PP_MakeInt32(bufsize), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case SO_SNDBUF: {
      int bufsize = *static_cast<const int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_SEND_BUFFER_SIZE,
          PP_MakeInt32(bufsize), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case SO_BROADCAST: {
      int broadcast = *static_cast<const int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_BROADCAST,
          PP_MakeBool(broadcast ? PP_TRUE : PP_FALSE), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    default: { break; }
  }

  return SocketNode::SetSockOptSocket(optname, optval, len);
}

Error UdpNode::SetSockOptIP(int optname, const void* optval, socklen_t len) {
  if (static_cast<size_t>(len) < sizeof(int))
    return EINVAL;

  switch (optname) {
    case IP_MULTICAST_LOOP: {
      int loop = *static_cast<const int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_MULTICAST_LOOP,
          PP_MakeBool(loop ? PP_TRUE : PP_FALSE), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case IP_MULTICAST_TTL: {
      unsigned int ttl = *static_cast<const unsigned int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_MULTICAST_TTL,
          PP_MakeInt32(ttl), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case IP_MULTICAST_IF: {
      // PPAPI does not expose this option, but we pretend to support it.
      // TODO(etrunko): store value to be returned by GetSockOpt() when it is
      // implemented.
      return 0;
    }
    case IP_ADD_MEMBERSHIP: {
      const struct ip_mreq* mreq = static_cast<const struct ip_mreq*>(optval);
      struct sockaddr_in sin = {0};
      sin.sin_family = AF_INET;
      memcpy(&sin.sin_addr, &mreq->imr_multiaddr, sizeof(struct in_addr));

      PP_Resource net_addr = SockAddrInToResource(&sin, sizeof(sin));
      int32_t error = UDPInterface()->JoinGroup(socket_resource_, net_addr,
                                                PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case IP_DROP_MEMBERSHIP: {
      const struct ip_mreq* mreq = static_cast<const struct ip_mreq*>(optval);
      struct sockaddr_in sin = {0};
      sin.sin_family = AF_INET;
      memcpy(&sin.sin_addr, &mreq->imr_multiaddr, sizeof(struct in_addr));

      PP_Resource net_addr = SockAddrInToResource(&sin, sizeof(sin));
      int32_t error = UDPInterface()->LeaveGroup(socket_resource_, net_addr,
                                                 PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    default: { break; }
  }

  return SocketNode::SetSockOptIP(optname, optval, len);
}

Error UdpNode::SetSockOptIPV6(int optname, const void* optval, socklen_t len) {
  if (static_cast<size_t>(len) < sizeof(int))
    return EINVAL;

  switch (optname) {
    case IPV6_MULTICAST_LOOP: {
      int loop = *static_cast<const int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_MULTICAST_LOOP,
          PP_MakeBool(loop ? PP_TRUE : PP_FALSE), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case IPV6_MULTICAST_HOPS: {
      unsigned int ttl = *static_cast<const unsigned int*>(optval);
      int32_t error = UDPInterface()->SetOption(
          socket_resource_, PP_UDPSOCKET_OPTION_MULTICAST_TTL,
          PP_MakeInt32(ttl), PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case IPV6_MULTICAST_IF: {
      // PPAPI does not expose this option, but we pretend to support it.
      // TODO(etrunko): store value to be returned by GetSockOpt() when it is
      // implemented.
      return 0;
    }
    case IPV6_JOIN_GROUP: {
      const struct ipv6_mreq* mreq =
          static_cast<const struct ipv6_mreq*>(optval);
      struct sockaddr_in6 sin = {0};
      sin.sin6_family = AF_INET6;
      memcpy(&sin.sin6_addr, &mreq->ipv6mr_multiaddr, sizeof(struct in6_addr));

      PP_Resource net_addr = SockAddrIn6ToResource(&sin, sizeof(sin));
      int32_t error = UDPInterface()->JoinGroup(socket_resource_, net_addr,
                                                PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    case IPV6_LEAVE_GROUP: {
      const struct ipv6_mreq* mreq =
          static_cast<const struct ipv6_mreq*>(optval);
      struct sockaddr_in6 sin = {0};
      sin.sin6_family = AF_INET6;
      memcpy(&sin.sin6_addr, &mreq->ipv6mr_multiaddr, sizeof(struct in6_addr));

      PP_Resource net_addr = SockAddrIn6ToResource(&sin, sizeof(sin));
      int32_t error = UDPInterface()->LeaveGroup(socket_resource_, net_addr,
                                                 PP_BlockUntilComplete());
      return PPERROR_TO_ERRNO(error);
    }
    default: { break; }
  }

  return SocketNode::SetSockOptIPV6(optname, optval, len);
}

}  // namespace nacl_io