chromium/mojo/core/ports/ports_unittest.cc

// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifdef UNSAFE_BUFFERS_BUILD
// TODO(crbug.com/351564777): Remove this and convert code to safer constructs.
#pragma allow_unsafe_buffers
#endif

#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <map>
#include <sstream>
#include <string_view>
#include <utility>

#include "base/containers/contains.h"
#include "base/containers/heap_array.h"
#include "base/containers/queue.h"
#include "base/functional/bind.h"
#include "base/functional/callback.h"
#include "base/logging.h"
#include "base/memory/raw_ptr.h"
#include "base/memory/ref_counted.h"
#include "base/strings/stringprintf.h"
#include "base/synchronization/lock.h"
#include "base/synchronization/waitable_event.h"
#include "base/test/task_environment.h"
#include "base/threading/thread.h"
#include "mojo/core/ports/event.h"
#include "mojo/core/ports/node.h"
#include "mojo/core/ports/node_delegate.h"
#include "mojo/core/ports/port_locker.h"
#include "mojo/core/ports/user_message.h"
#include "testing/gtest/include/gtest/gtest.h"

namespace mojo {
namespace core {
namespace ports {
namespace test {

namespace {

// TODO(rockot): Remove this unnecessary alias.
using ScopedMessage = std::unique_ptr<UserMessageEvent>;

class TestMessage : public UserMessage {
 public:
  static const TypeInfo kUserMessageTypeInfo;

  TestMessage(const std::string_view& payload)
      : UserMessage(&kUserMessageTypeInfo), payload_(payload) {}
  ~TestMessage() override = default;

  const std::string& payload() const { return payload_; }

 private:
  std::string payload_;
};

const UserMessage::TypeInfo TestMessage::kUserMessageTypeInfo = {};

ScopedMessage NewUserMessageEvent(const std::string_view& payload,
                                  size_t num_ports) {
  auto event = std::make_unique<UserMessageEvent>(num_ports);
  event->AttachMessage(std::make_unique<TestMessage>(payload));
  return event;
}

bool MessageEquals(const ScopedMessage& message, const std::string_view& s) {
  return message->GetMessage<TestMessage>()->payload() == s;
}

class TestNode;

class MessageRouter {
 public:
  virtual ~MessageRouter() = default;

  virtual void ForwardEvent(TestNode* from_node,
                            const NodeName& node_name,
                            ScopedEvent event) = 0;
  virtual void BroadcastEvent(TestNode* from_node, ScopedEvent event) = 0;
};

class TestNode : public NodeDelegate {
 public:
  explicit TestNode(uint64_t id)
      : node_name_(id, 1),
        node_(node_name_, this),
        node_thread_(base::StringPrintf("Node %" PRIu64 " thread", id)),
        events_available_event_(
            base::WaitableEvent::ResetPolicy::AUTOMATIC,
            base::WaitableEvent::InitialState::NOT_SIGNALED),
        idle_event_(base::WaitableEvent::ResetPolicy::MANUAL,
                    base::WaitableEvent::InitialState::SIGNALED) {}

  ~TestNode() override {
    StopWhenIdle();
    node_thread_.Stop();
  }

  const NodeName& name() const { return node_name_; }

  // NOTE: Node is thread-safe.
  Node& node() { return node_; }

  base::WaitableEvent& idle_event() { return idle_event_; }

  bool IsIdle() {
    base::AutoLock lock(lock_);
    return started_ && !dispatching_ &&
           (incoming_events_.empty() || (block_on_event_ && blocked_));
  }

  void BlockOnEvent(Event::Type type) {
    base::AutoLock lock(lock_);
    blocked_event_type_ = type;
    block_on_event_ = true;
  }

  void Unblock() {
    base::AutoLock lock(lock_);
    block_on_event_ = false;
    events_available_event_.Signal();
  }

  void Start(MessageRouter* router) {
    router_ = router;
    node_thread_.Start();
    node_thread_.task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&TestNode::ProcessEvents, base::Unretained(this)));
  }

  void StopWhenIdle() {
    base::AutoLock lock(lock_);
    should_quit_ = true;
    events_available_event_.Signal();
  }

  void WakeUp() { events_available_event_.Signal(); }

  int SendStringMessage(const PortRef& port, const std::string& s) {
    return node_.SendUserMessage(port, NewUserMessageEvent(s, 0));
  }

  int SendMultipleMessages(const PortRef& port, size_t num_messages) {
    for (size_t i = 0; i < num_messages; ++i) {
      int result = SendStringMessage(port, "");
      if (result != OK)
        return result;
    }
    return OK;
  }

  int SendStringMessageWithPort(const PortRef& port,
                                const std::string& s,
                                const PortName& sent_port_name) {
    auto event = NewUserMessageEvent(s, 1);
    event->ports()[0] = sent_port_name;
    return node_.SendUserMessage(port, std::move(event));
  }

  int SendStringMessageWithPort(const PortRef& port,
                                const std::string& s,
                                const PortRef& sent_port) {
    return SendStringMessageWithPort(port, s, sent_port.name());
  }

  void set_drop_messages(bool value) {
    base::AutoLock lock(lock_);
    drop_messages_ = value;
  }

  void set_save_messages(bool value) {
    base::AutoLock lock(lock_);
    save_messages_ = value;
  }

  bool ReadMessage(const PortRef& port, ScopedMessage* message) {
    return node_.GetMessage(port, message, nullptr) == OK && *message;
  }

  bool ReadMultipleMessages(const PortRef& port, size_t num_messages) {
    for (size_t i = 0; i < num_messages; ++i) {
      ScopedMessage message;
      if (!ReadMessage(port, &message))
        return false;
    }
    return true;
  }

  bool GetSavedMessage(ScopedMessage* message) {
    base::AutoLock lock(lock_);
    if (saved_messages_.empty()) {
      message->reset();
      return false;
    }
    std::swap(*message, saved_messages_.front());
    saved_messages_.pop();
    return true;
  }

  void EnqueueEvent(const NodeName& from_node, ScopedEvent event) {
    idle_event_.Reset();

    // NOTE: This may be called from ForwardMessage and thus must not reenter
    // |node_|.
    base::AutoLock lock(lock_);
    incoming_events_.push({from_node, std::move(event)});
    events_available_event_.Signal();
  }

  void ForwardEvent(const NodeName& node_name, ScopedEvent event) override {
    {
      base::AutoLock lock(lock_);
      if (drop_messages_) {
        DVLOG(1) << "Dropping ForwardMessage from node " << node_name_ << " to "
                 << node_name;

        base::AutoUnlock unlock(lock_);
        ClosePortsInEvent(event.get());
        return;
      }
    }

    DCHECK(router_);
    DVLOG(1) << "ForwardEvent from node " << node_name_ << " to " << node_name;
    router_->ForwardEvent(this, node_name, std::move(event));
  }

  void BroadcastEvent(ScopedEvent event) override {
    router_->BroadcastEvent(this, std::move(event));
  }

  void PortStatusChanged(const PortRef& port) override {
    // The port may be closed, in which case we ignore the notification.
    base::AutoLock lock(lock_);
    if (!save_messages_)
      return;

    for (;;) {
      ScopedMessage message;
      {
        base::AutoUnlock unlock(lock_);
        if (!ReadMessage(port, &message))
          break;
      }

      saved_messages_.emplace(std::move(message));
    }
  }

  void ClosePortsInEvent(Event* event) {
    if (event->type() != Event::Type::kUserMessage)
      return;

    UserMessageEvent* message_event = static_cast<UserMessageEvent*>(event);
    for (size_t i = 0; i < message_event->num_ports(); ++i) {
      PortRef port;
      ASSERT_EQ(OK, node_.GetPort(message_event->ports()[i], &port));
      EXPECT_EQ(OK, node_.ClosePort(port));
    }
  }

  uint64_t GetUnacknowledgedMessageCount(const PortRef& port_ref) {
    PortStatus status;
    if (node_.GetStatus(port_ref, &status) != OK)
      return 0;

    return status.unacknowledged_message_count;
  }

  void AllowPortMerge(const PortRef& port_ref) {
    SinglePortLocker locker(&port_ref);
    locker.port()->pending_merge_peer = true;
  }

 private:
  void ProcessEvents() {
    for (;;) {
      events_available_event_.Wait();
      base::AutoLock lock(lock_);

      if (should_quit_)
        return;

      dispatching_ = true;
      while (!incoming_events_.empty()) {
        if (block_on_event_ &&
            incoming_events_.front().second->type() == blocked_event_type_) {
          blocked_ = true;
          // Go idle if we hit a blocked event type.
          break;
        } else {
          blocked_ = false;
        }
        auto node_event_pair = std::move(incoming_events_.front());
        incoming_events_.pop();

        // NOTE: AcceptMessage() can re-enter this object to call any of the
        // NodeDelegate interface methods.
        base::AutoUnlock unlock(lock_);
        node_.AcceptEvent(node_event_pair.first,
                          std::move(node_event_pair.second));
      }

      dispatching_ = false;
      started_ = true;
      idle_event_.Signal();
    };
  }

  const NodeName node_name_;
  Node node_;
  raw_ptr<MessageRouter> router_ = nullptr;

  base::Thread node_thread_;
  base::WaitableEvent events_available_event_;
  base::WaitableEvent idle_event_;

  // Guards fields below.
  base::Lock lock_;
  bool started_ = false;
  bool dispatching_ = false;
  bool should_quit_ = false;
  bool drop_messages_ = false;
  bool save_messages_ = false;
  bool blocked_ = false;
  bool block_on_event_ = false;
  Event::Type blocked_event_type_;
  base::queue<std::pair<NodeName, ScopedEvent>> incoming_events_;
  base::queue<ScopedMessage> saved_messages_;
};

class PortsTest : public testing::Test, public MessageRouter {
 public:
  void AddNode(TestNode* node) {
    {
      base::AutoLock lock(lock_);
      nodes_[node->name()] = node;
    }
    node->Start(this);
  }

  void RemoveNode(TestNode* node) {
    {
      base::AutoLock lock(lock_);
      nodes_.erase(node->name());
    }

    for (const auto& entry : nodes_)
      entry.second->node().LostConnectionToNode(node->name());
  }

  // Waits until all known Nodes are idle. Message forwarding and processing
  // is handled in such a way that idleness is a stable state: once all nodes in
  // the system are idle, they will remain idle until the test explicitly
  // initiates some further event (e.g. sending a message, closing a port, or
  // removing a Node).
  void WaitForIdle() {
    for (;;) {
      base::AutoLock global_lock(global_lock_);
      bool all_nodes_idle = true;
      for (const auto& entry : nodes_) {
        if (!entry.second->IsIdle())
          all_nodes_idle = false;
        entry.second->WakeUp();
      }
      if (all_nodes_idle)
        return;

      // Wait for any Node to signal that it's idle.
      base::AutoUnlock global_unlock(global_lock_);
      std::vector<base::WaitableEvent*> events;
      for (const auto& entry : nodes_)
        events.push_back(&entry.second->idle_event());
      base::WaitableEvent::WaitMany(events.data(), events.size());
    }
  }

  void CreatePortPair(TestNode* node0,
                      PortRef* port0,
                      TestNode* node1,
                      PortRef* port1) {
    if (node0 == node1) {
      EXPECT_EQ(OK, node0->node().CreatePortPair(port0, port1));
    } else {
      EXPECT_EQ(OK, node0->node().CreateUninitializedPort(port0));
      EXPECT_EQ(OK, node1->node().CreateUninitializedPort(port1));
      EXPECT_EQ(
          OK, node0->node().InitializePort(*port0, node1->name(), port1->name(),
                                           node1->name(), port1->name()));
      EXPECT_EQ(
          OK, node1->node().InitializePort(*port1, node0->name(), port0->name(),
                                           node0->name(), port0->name()));
    }
  }

 private:
  // MessageRouter:
  void ForwardEvent(TestNode* from_node,
                    const NodeName& node_name,
                    ScopedEvent event) override {
    base::AutoLock global_lock(global_lock_);
    base::AutoLock lock(lock_);
    // Drop messages from nodes that have been removed.
    if (!base::Contains(nodes_, from_node->name())) {
      from_node->ClosePortsInEvent(event.get());
      return;
    }

    auto it = nodes_.find(node_name);
    if (it == nodes_.end()) {
      DVLOG(1) << "Node not found: " << node_name;
      return;
    }

    // Serialize and de-serialize all forwarded events.
    size_t buf_size = event->GetSerializedSize();
    auto buf = base::HeapArray<char>::Uninit(buf_size);
    event->Serialize(buf.data());
    ScopedEvent copy = Event::Deserialize(buf.data(), buf.size());
    // This should always succeed unless serialization or deserialization
    // is broken. In that case, the loss of events should cause a test failure.
    ASSERT_TRUE(copy);

    // Also copy the payload for user messages.
    if (event->type() == Event::Type::kUserMessage) {
      UserMessageEvent* message_event =
          static_cast<UserMessageEvent*>(event.get());
      UserMessageEvent* message_copy =
          static_cast<UserMessageEvent*>(copy.get());

      message_copy->AttachMessage(std::make_unique<TestMessage>(
          message_event->GetMessage<TestMessage>()->payload()));
    }

    it->second->EnqueueEvent(from_node->name(), std::move(event));
  }

  void BroadcastEvent(TestNode* from_node, ScopedEvent event) override {
    base::AutoLock global_lock(global_lock_);
    base::AutoLock lock(lock_);

    // Drop messages from nodes that have been removed.
    if (!base::Contains(nodes_, from_node->name())) {
      return;
    }

    for (const auto& entry : nodes_) {
      TestNode* node = entry.second;
      // Broadcast doesn't deliver to the local node.
      if (node == from_node)
        continue;
      node->EnqueueEvent(from_node->name(), event->CloneForBroadcast());
    }
  }

  base::test::TaskEnvironment task_environment_;

  // Acquired before any operation which makes a Node busy, and before testing
  // if all nodes are idle.
  base::Lock global_lock_;

  base::Lock lock_;
  std::map<NodeName, TestNode*> nodes_;
};

}  // namespace

TEST_F(PortsTest, Basic1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
  EXPECT_EQ(OK, node0.node().ClosePort(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Basic2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", b1));
  EXPECT_EQ(OK, node0.SendStringMessage(b0, "hello again"));

  EXPECT_EQ(OK, node0.node().ClosePort(b0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Basic3) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "hello", a1));
  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello again"));

  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a0));

  PortRef b0, b1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&b0, &b1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "bar", b1));
  EXPECT_EQ(OK, node0.SendStringMessage(b0, "baz"));

  EXPECT_EQ(OK, node0.node().ClosePort(b0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNode1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);
  node1.set_drop_messages(true);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  // Transfer a port to node1 and simulate a lost connection to node1.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));

  WaitForIdle();

  RemoveNode(&node1);

  WaitForIdle();

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNode2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "take a1", a1));

  WaitForIdle();

  node1.set_drop_messages(true);

  RemoveNode(&node1);

  WaitForIdle();

  // a0 should have eventually detected peer closure after node loss.
  ScopedMessage message;
  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
            node0.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node0.node().ClosePort(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));

  EXPECT_EQ(OK, node1.node().GetMessage(x1, &message, nullptr));
  EXPECT_TRUE(message);
  node1.ClosePortsInEvent(message.get());

  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeWithSecondaryProxy) {
  // Tests that a proxy gets cleaned up when its indirect peer lives on a lost
  // node.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  TestNode node2(2);
  AddNode(&node2);

  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
  PortRef A, B, C, D;
  CreatePortPair(&node0, &A, &node1, &B);
  CreatePortPair(&node1, &C, &node2, &D);

  // Create E-F and send F over A to node 1.
  PortRef E, F;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));

  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());

  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));

  // Send F over C to node 2 and then simulate node 2 loss from node 1. Node 1
  // will trivially become aware of the loss, and this test verifies that the
  // port A on node 0 will eventually also become aware of it.

  // Make sure node2 stops processing events when it encounters an ObserveProxy.
  node2.BlockOnEvent(Event::Type::kObserveProxy);

  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
  WaitForIdle();

  // Simulate node 1 and 2 disconnecting.
  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));

  // Let node2 continue processing events and wait for everyone to go idle.
  node2.Unblock();
  WaitForIdle();

  // Port F should be gone.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));

  // Port E should have detected peer closure despite the fact that there is
  // no longer a continuous route from F to E over which the event could travel.
  PortStatus status;
  EXPECT_EQ(OK, node0.node().GetStatus(E, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node1.node().ClosePort(C));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeAfterSendingObserveProxy) {
  // Tests that a proxy gets cleaned up after a node disconnect if the
  // previous port already received the ObserveProxy event.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  TestNode node2(2);
  AddNode(&node2);

  // Create A-B spanning nodes 0 and 1 and C-D spanning 1 and 2.
  PortRef A, B, C, D;
  CreatePortPair(&node0, &A, &node1, &B);
  CreatePortPair(&node1, &C, &node2, &D);

  // Create E-F and send F over A to node 1.
  PortRef E, F;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", F));

  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());

  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &F));

  // Send F over C to node 2 and then simulate node 2 loss from node 1 after
  // node 0 received the ObserveProxy event. Node 1 needs to clean up the
  // closed proxy while the node 0 to node 2 connection is still intact.
  node0.BlockOnEvent(Event::Type::kObserveProxy);

  EXPECT_EQ(OK, node1.SendStringMessageWithPort(C, ".", F));
  WaitForIdle();

  // Simulate node 1 and 2 disconnecting.
  EXPECT_EQ(OK, node1.node().LostConnectionToNode(node2.name()));

  // Let node2 continue processing events and wait for everyone to go idle.
  node0.Unblock();
  WaitForIdle();

  // Port F should be gone.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(F.name(), &F));

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node1.node().ClosePort(C));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, LostConnectionToNodeWithLocalProxy) {
  // Tests that a proxy gets cleaned up when its direct peer lives on a lost
  // node and it's predecessor lives on the same node.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef A, B;
  CreatePortPair(&node0, &A, &node1, &B);

  PortRef C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));

  // Send D but block node0 on an ObserveProxy event.
  node0.BlockOnEvent(Event::Type::kObserveProxy);
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, ".", D));

  // node0 won't collapse the proxy but node1 will receive the message before
  // going idle.
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  EXPECT_EQ(OK, node1.node().GetPort(message->ports()[0], &E));

  RemoveNode(&node1);

  node0.Unblock();
  WaitForIdle();

  // Port C should have detected peer closure.
  PortStatus status;
  EXPECT_EQ(OK, node0.node().GetStatus(C, &status));
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(B));
  EXPECT_EQ(OK, node0.node().ClosePort(C));
  EXPECT_EQ(OK, node1.node().ClosePort(E));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage1) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  ScopedMessage message;
  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node.node().ClosePort(a1));

  WaitForIdle();

  EXPECT_EQ(ERROR_PORT_PEER_CLOSED,
            node.node().GetMessage(a0, &message, nullptr));
  EXPECT_FALSE(message);

  EXPECT_EQ(OK, node.node().ClosePort(a0));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage2) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  EXPECT_EQ(OK, node.SendStringMessage(a1, "1"));

  ScopedMessage message;
  EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));

  ASSERT_TRUE(message);
  EXPECT_TRUE(MessageEquals(message, "1"));

  EXPECT_EQ(OK, node.node().ClosePort(a0));
  EXPECT_EQ(OK, node.node().ClosePort(a1));

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, GetMessage3) {
  TestNode node(0);
  AddNode(&node);

  PortRef a0, a1;
  EXPECT_EQ(OK, node.node().CreatePortPair(&a0, &a1));

  const char* kStrings[] = {"1", "2", "3"};

  for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i)
    EXPECT_EQ(OK, node.SendStringMessage(a1, kStrings[i]));

  ScopedMessage message;
  for (size_t i = 0; i < sizeof(kStrings) / sizeof(kStrings[0]); ++i) {
    EXPECT_EQ(OK, node.node().GetMessage(a0, &message, nullptr));
    ASSERT_TRUE(message);
    EXPECT_TRUE(MessageEquals(message, kStrings[i]));
  }

  EXPECT_EQ(OK, node.node().ClosePort(a0));
  EXPECT_EQ(OK, node.node().ClosePort(a1));

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Delegation1) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  // In this test, we send a message to a port that has been moved.

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "a1", a1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(x1, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "a1"));

  // This is "a1" from the point of view of node1.
  PortName a2_name = message->ports()[0];
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x1, "a2", a2_name));
  EXPECT_EQ(OK, node0.SendStringMessage(a0, "hello"));

  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(x0, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "a2"));

  // This is "a2" from the point of view of node1.
  PortName a3_name = message->ports()[0];

  PortRef a3;
  EXPECT_EQ(OK, node0.node().GetPort(a3_name, &a3));

  ASSERT_TRUE(node0.ReadMessage(a3, &message));
  EXPECT_EQ(0u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "hello"));

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(a3));

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, Delegation2) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  for (int i = 0; i < 100; ++i) {
    // Setup pipe a<->b between node0 and node1.
    PortRef A, B;
    CreatePortPair(&node0, &A, &node1, &B);

    PortRef C, D;
    EXPECT_EQ(OK, node0.node().CreatePortPair(&C, &D));

    PortRef E, F;
    EXPECT_EQ(OK, node0.node().CreatePortPair(&E, &F));

    node1.set_save_messages(true);

    // Pass D over A to B.
    EXPECT_EQ(OK, node0.SendStringMessageWithPort(A, "1", D));

    // Pass F over C to D.
    EXPECT_EQ(OK, node0.SendStringMessageWithPort(C, "1", F));

    // This message should find its way to node1.
    EXPECT_EQ(OK, node0.SendStringMessage(E, "hello"));

    WaitForIdle();

    EXPECT_EQ(OK, node0.node().ClosePort(C));
    EXPECT_EQ(OK, node0.node().ClosePort(E));

    EXPECT_EQ(OK, node0.node().ClosePort(A));
    EXPECT_EQ(OK, node1.node().ClosePort(B));

    bool got_hello = false;
    ScopedMessage message;
    while (node1.GetSavedMessage(&message)) {
      node1.ClosePortsInEvent(message.get());
      if (MessageEquals(message, "hello")) {
        got_hello = true;
        break;
      }
    }

    EXPECT_TRUE(got_hello);

    WaitForIdle();  // Because closing ports may have generated tasks.
  }

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendUninitialized) {
  TestNode node(0);
  AddNode(&node);

  PortRef x0;
  EXPECT_EQ(OK, node.node().CreateUninitializedPort(&x0));
  EXPECT_EQ(ERROR_PORT_STATE_UNEXPECTED, node.SendStringMessage(x0, "oops"));
  EXPECT_EQ(OK, node.node().ClosePort(x0));
  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendFailure) {
  TestNode node(0);
  AddNode(&node);

  node.set_save_messages(true);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  // Try to send A over itself.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_SELF,
            node.SendStringMessageWithPort(A, "oops", A));

  // Try to send B over A.

  EXPECT_EQ(ERROR_PORT_CANNOT_SEND_PEER,
            node.SendStringMessageWithPort(A, "nope", B));

  // B should be closed immediately.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node.node().GetPort(B.name(), &B));

  WaitForIdle();

  // There should have been no messages accepted.
  ScopedMessage message;
  EXPECT_FALSE(node.GetSavedMessage(&message));

  EXPECT_EQ(OK, node.node().ClosePort(A));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, DontLeakUnreceivedPorts) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B, C, D;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));

  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));

  EXPECT_EQ(OK, node.node().ClosePort(C));
  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(B));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, AllowShutdownWithLocalPortsOpen) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B, C, D;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.node().CreatePortPair(&C, &D));

  EXPECT_EQ(OK, node.SendStringMessageWithPort(A, "foo", D));

  ScopedMessage message;
  EXPECT_TRUE(node.ReadMessage(B, &message));
  ASSERT_EQ(1u, message->num_ports());
  EXPECT_TRUE(MessageEquals(message, "foo"));
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_TRUE(
      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  WaitForIdle();

  EXPECT_TRUE(
      node.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_FALSE(node.node().CanShutdownCleanly());

  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(B));
  EXPECT_EQ(OK, node.node().ClosePort(C));
  EXPECT_EQ(OK, node.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, ProxyCollapse1) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and receive it as C.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  // Send C and receive it as D.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));

  // Send D and receive it as E.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", D));
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  EXPECT_EQ(OK, node.node().ClosePort(A));
  EXPECT_EQ(OK, node.node().ClosePort(E));

  // The node should not idle until all proxies are collapsed.
  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, ProxyCollapse2) {
  TestNode node(0);
  AddNode(&node);

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Send B and A to create proxies in each direction.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  // At this point we have a scenario with:
  //
  // D -> [B] -> C -> [A]
  //
  // Ensure that the proxies can collapse. The sent ports will be closed
  // eventually as a result of Y's closure.

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendWithClosedPeer) {
  // This tests that if a port is sent when its peer is already known to be
  // closed, the newly created port will be aware of that peer closure, and the
  // proxy will eventually collapse.

  TestNode node(0);
  AddNode(&node);

  // Send a message from A to B, then close A.
  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node.SendStringMessage(A, "hey"));
  EXPECT_EQ(OK, node.node().ClosePort(A));

  // Now send B over X-Y as new port C.
  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  ScopedMessage message;
  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  EXPECT_EQ(OK, node.node().ClosePort(X));
  EXPECT_EQ(OK, node.node().ClosePort(Y));

  WaitForIdle();

  // C should have received the message originally sent to B, and it should also
  // be aware of A's closure.

  ASSERT_TRUE(node.ReadMessage(C, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  PortStatus status;
  EXPECT_EQ(OK, node.node().GetStatus(C, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  node.node().ClosePort(C);

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, SendWithClosedPeerSent) {
  // This tests that if a port is closed while some number of proxies are still
  // routing messages (directly or indirectly) to it, that the peer port is
  // eventually notified of the closure, and the dead-end proxies will
  // eventually be removed.

  TestNode node(0);
  AddNode(&node);

  PortRef X, Y;
  EXPECT_EQ(OK, node.node().CreatePortPair(&X, &Y));

  PortRef A, B;
  EXPECT_EQ(OK, node.node().CreatePortPair(&A, &B));

  ScopedMessage message;

  // Send A as new port C.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", A));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef C;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &C));

  // Send C as new port D.
  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", C));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef D;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &D));

  // Send a message to B through D, then close D.
  EXPECT_EQ(OK, node.SendStringMessage(D, "hey"));
  EXPECT_EQ(OK, node.node().ClosePort(D));

  // Now send B as new port E.

  EXPECT_EQ(OK, node.SendStringMessageWithPort(X, "foo", B));
  EXPECT_EQ(OK, node.node().ClosePort(X));

  ASSERT_TRUE(node.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node.node().ClosePort(Y));

  WaitForIdle();

  // E should receive the message originally sent to B, and it should also be
  // aware of D's closure.

  ASSERT_TRUE(node.ReadMessage(E, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  PortStatus status;
  EXPECT_EQ(OK, node.node().GetStatus(E, &status));
  EXPECT_FALSE(status.receiving_messages);
  EXPECT_FALSE(status.has_messages);
  EXPECT_TRUE(status.peer_closed);

  EXPECT_EQ(OK, node.node().ClosePort(E));

  WaitForIdle();

  EXPECT_TRUE(node.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePorts) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on A.
  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle.
  EXPECT_TRUE(
      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_TRUE(
      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortWithClosedPeer1) {
  // This tests that the right thing happens when initiating a merge on a port
  // whose peer has already been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on A.
  EXPECT_EQ(OK, node0.SendStringMessage(A, "hey"));

  // Close A.
  EXPECT_EQ(OK, node0.node().ClosePort(A));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle. node0 should have no ports since
  // A was explicitly closed.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(
      node1.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));

  // Expect D to have received the message sent on A.
  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortWithClosedPeer2) {
  // This tests that the right thing happens when merging into a port whose peer
  // has already been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Write a message on D and close it.
  EXPECT_EQ(OK, node1.SendStringMessage(D, "hey"));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect all proxies to be gone once idle. node1 should have no ports since
  // D was explicitly closed.
  EXPECT_TRUE(
      node0.node().CanShutdownCleanly(Node::ShutdownPolicy::ALLOW_LOCAL_PORTS));
  EXPECT_TRUE(node1.node().CanShutdownCleanly());

  // Expect A to have received the message sent on D.
  ScopedMessage message;
  ASSERT_TRUE(node0.ReadMessage(A, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  EXPECT_EQ(OK, node0.node().ClosePort(A));

  // No more ports should be open.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsWithClosedPeers) {
  // This tests that no residual ports are left behind if two ports are merged
  // when both of their peers have been closed.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Close A and D.
  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  WaitForIdle();

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsWithMovedPeers) {
  // This tests that ports can be merged successfully even if their peers are
  // moved around.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  // Set up another pair X-Y for moving ports on node0.
  PortRef X, Y;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&X, &Y));

  ScopedMessage message;

  // Move A to new port E.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(X, "foo", A));
  ASSERT_TRUE(node0.ReadMessage(Y, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.node().ClosePort(X));
  EXPECT_EQ(OK, node0.node().ClosePort(Y));

  // Write messages on E and D.
  EXPECT_EQ(OK, node0.SendStringMessage(E, "hey"));
  EXPECT_EQ(OK, node1.SendStringMessage(D, "hi"));

  // Initiate a merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  WaitForIdle();

  // Expect to receive D's message on E and E's message on D.
  ASSERT_TRUE(node0.ReadMessage(E, &message));
  EXPECT_TRUE(MessageEquals(message, "hi"));
  ASSERT_TRUE(node1.ReadMessage(D, &message));
  EXPECT_TRUE(MessageEquals(message, "hey"));

  // Close E and D.
  EXPECT_EQ(OK, node0.node().ClosePort(E));
  EXPECT_EQ(OK, node1.node().ClosePort(D));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, MergePortsFailsGracefully) {
  // This tests that the system remains in a well-defined state if something
  // goes wrong during port merge.

  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Setup two independent port pairs, A-B on node0 and C-D on node1.
  PortRef A, B, C, D;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&A, &B));
  EXPECT_EQ(OK, node1.node().CreatePortPair(&C, &D));

  ScopedMessage message;
  PortRef X, Y;
  EXPECT_EQ(OK, node0.node().CreateUninitializedPort(&X));
  EXPECT_EQ(OK, node1.node().CreateUninitializedPort(&Y));
  EXPECT_EQ(OK, node0.node().InitializePort(X, node1.name(), Y.name(),
                                            node1.name(), Y.name()));
  EXPECT_EQ(OK, node1.node().InitializePort(Y, node0.name(), X.name(),
                                            node0.name(), X.name()));

  // Block the merge from proceeding until we can do something stupid with port
  // C. This avoids the test logic racing with async merge logic.
  node1.BlockOnEvent(Event::Type::kMergePort);

  // Initiate the merge between B and C.
  node1.AllowPortMerge(C);
  EXPECT_EQ(OK, node0.node().MergePorts(B, node1.name(), C.name()));

  // Move C to a new port E. This is not a sane use of Node's public API but
  // is still hypothetically possible. It allows us to force a merge failure
  // because C will be in an invalid state by the time the merge is processed.
  // As a result, B should be closed.
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(Y, "foo", C));

  node1.Unblock();

  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(X, &message));
  ASSERT_EQ(1u, message->num_ports());
  PortRef E;
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &E));

  EXPECT_EQ(OK, node0.node().ClosePort(X));
  EXPECT_EQ(OK, node1.node().ClosePort(Y));

  WaitForIdle();

  // C goes away as a result of normal proxy removal. B should have been closed
  // cleanly by the failed MergePorts.
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node1.node().GetPort(C.name(), &C));
  EXPECT_EQ(ERROR_PORT_UNKNOWN, node0.node().GetPort(B.name(), &B));

  // Close A, D, and E.
  EXPECT_EQ(OK, node0.node().ClosePort(A));
  EXPECT_EQ(OK, node1.node().ClosePort(D));
  EXPECT_EQ(OK, node0.node().ClosePort(E));

  WaitForIdle();

  // Expect everything to have gone away.
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RemotePeerStatus) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Create a local port pair. Neither port should appear to have a remote peer.
  PortRef a, b;
  PortStatus status;
  node0.node().CreatePortPair(&a, &b);
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);

  // Create a port pair spanning the two nodes. Both spanning ports should
  // immediately appear to have a remote peer.
  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);

  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
  EXPECT_TRUE(status.peer_remote);

  PortRef x2, x3;
  CreatePortPair(&node0, &x2, &node1, &x3);

  // Transfer |b| to |node1| and |x1| to |node0|. i.e., make the local peers
  // remote and the remote peers local.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", b));
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", x1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node0.ReadMessage(x2, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &x1));

  ASSERT_TRUE(node1.ReadMessage(x3, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &b));

  // Now x0-x1 should be local to node0 and a-b should span the nodes.
  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(x1, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(b, &status));
  EXPECT_TRUE(status.peer_remote);

  // And swap them back one more time.
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x2, "foo", x1));
  EXPECT_EQ(OK, node1.SendStringMessageWithPort(x3, "bar", b));
  WaitForIdle();

  ASSERT_TRUE(node0.ReadMessage(x2, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node0.node().GetPort(message->ports()[0], &b));

  ASSERT_TRUE(node1.ReadMessage(x3, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &x1));

  ASSERT_EQ(OK, node0.node().GetStatus(x0, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(x1, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));
  EXPECT_EQ(OK, node0.node().ClosePort(x2));
  EXPECT_EQ(OK, node1.node().ClosePort(x3));
  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node0.node().ClosePort(b));

  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RemotePeerStatusAfterLocalPortMerge) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Set up a-b on node0 and c-d spanning node0-node1.
  PortRef a, b, c, d;
  node0.node().CreatePortPair(&a, &b);
  CreatePortPair(&node0, &c, &node1, &d);

  PortStatus status;
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(c, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_TRUE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().MergeLocalPorts(b, c));
  WaitForIdle();

  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_TRUE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node1.node().ClosePort(d));
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RemotePeerStatusAfterRemotePortMerge) {
  TestNode node0(0);
  AddNode(&node0);

  TestNode node1(1);
  AddNode(&node1);

  // Set up a-b on node0 and c-d on node1.
  PortRef a, b, c, d;
  node0.node().CreatePortPair(&a, &b);
  node1.node().CreatePortPair(&c, &d);

  PortStatus status;
  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node0.node().GetStatus(b, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(c, &status));
  EXPECT_FALSE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_FALSE(status.peer_remote);

  node1.AllowPortMerge(c);
  EXPECT_EQ(OK, node0.node().MergePorts(b, node1.name(), c.name()));
  WaitForIdle();

  ASSERT_EQ(OK, node0.node().GetStatus(a, &status));
  EXPECT_TRUE(status.peer_remote);
  ASSERT_EQ(OK, node1.node().GetStatus(d, &status));
  EXPECT_TRUE(status.peer_remote);

  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node1.node().ClosePort(d));
  EXPECT_TRUE(node0.node().CanShutdownCleanly());
  EXPECT_TRUE(node1.node().CanShutdownCleanly());
}

TEST_F(PortsTest, RetransmitUserMessageEvents) {
  // Ensures that user message events can be retransmitted properly.
  TestNode node0(0);
  AddNode(&node0);

  PortRef a, b;
  node0.node().CreatePortPair(&a, &b);

  // Ping.
  const char* kMessage = "hey";
  ScopedMessage message;
  EXPECT_EQ(OK, node0.SendStringMessage(a, kMessage));
  ASSERT_TRUE(node0.ReadMessage(b, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  // Pong.
  EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
  EXPECT_FALSE(message);
  ASSERT_TRUE(node0.ReadMessage(a, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  // Ping again.
  EXPECT_EQ(OK, node0.node().SendUserMessage(a, std::move(message)));
  EXPECT_FALSE(message);
  ASSERT_TRUE(node0.ReadMessage(b, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  // Pong again!
  EXPECT_EQ(OK, node0.node().SendUserMessage(b, std::move(message)));
  EXPECT_FALSE(message);
  ASSERT_TRUE(node0.ReadMessage(a, &message));
  EXPECT_TRUE(MessageEquals(message, kMessage));

  EXPECT_EQ(OK, node0.node().ClosePort(a));
  EXPECT_EQ(OK, node0.node().ClosePort(b));
}

TEST_F(PortsTest, SetAcknowledgeRequestInterval) {
  TestNode node0(0);
  AddNode(&node0);

  PortRef a0, a1;
  EXPECT_EQ(OK, node0.node().CreatePortPair(&a0, &a1));
  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));

  // Send a batch of messages.
  EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 15));
  EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));
  EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
  WaitForIdle();
  EXPECT_EQ(15u, node0.GetUnacknowledgedMessageCount(a0));

  // Set to acknowledge every read message, and validate that already-read
  // messages are acknowledged.
  EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 1));
  WaitForIdle();
  EXPECT_EQ(10u, node0.GetUnacknowledgedMessageCount(a0));

  // Read a third of the messages from the other end.
  EXPECT_TRUE(node0.ReadMultipleMessages(a1, 5));
  WaitForIdle();

  EXPECT_EQ(5u, node0.GetUnacknowledgedMessageCount(a0));

  TestNode node1(1);
  AddNode(&node1);

  // Transfer a1 across to node1.
  PortRef x0, x1;
  CreatePortPair(&node0, &x0, &node1, &x1);
  EXPECT_EQ(OK, node0.SendStringMessageWithPort(x0, "foo", a1));
  WaitForIdle();

  ScopedMessage message;
  ASSERT_TRUE(node1.ReadMessage(x1, &message));
  ASSERT_EQ(1u, message->num_ports());
  ASSERT_EQ(OK, node1.node().GetPort(message->ports()[0], &a1));

  // Read the last third of the messages from the transferred node, and
  // validate that the unacknowledge message count updates correctly.
  EXPECT_TRUE(node1.ReadMultipleMessages(a1, 5));
  WaitForIdle();
  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));

  // Turn the acknowledges down and validate that they don't go on indefinitely.
  EXPECT_EQ(OK, node0.node().SetAcknowledgeRequestInterval(a0, 0));
  EXPECT_EQ(OK, node0.SendMultipleMessages(a0, 10));
  WaitForIdle();
  EXPECT_TRUE(node1.ReadMultipleMessages(a1, 10));
  WaitForIdle();
  EXPECT_NE(0u, node0.GetUnacknowledgedMessageCount(a0));

  // Close the far port and validate that the closure updates the unacknowledged
  // count.
  EXPECT_EQ(OK, node1.node().ClosePort(a1));
  WaitForIdle();
  EXPECT_EQ(0u, node0.GetUnacknowledgedMessageCount(a0));

  EXPECT_EQ(OK, node0.node().ClosePort(a0));
  EXPECT_EQ(OK, node0.node().ClosePort(x0));
  EXPECT_EQ(OK, node1.node().ClosePort(x1));
}

}  // namespace test
}  // namespace ports
}  // namespace core
}  // namespace mojo