// Copyright 2017 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifdef UNSAFE_BUFFERS_BUILD
// TODO(crbug.com/351564777): Remove this and convert code to safer constructs.
#pragma allow_unsafe_buffers
#endif
#include "mojo/core/channel.h"
#include <lib/fdio/fd.h>
#include <lib/fdio/limits.h>
#include <lib/zx/channel.h>
#include <lib/zx/handle.h>
#include <zircon/processargs.h>
#include <zircon/status.h>
#include <zircon/syscalls.h>
#include <algorithm>
#include <memory>
#include <tuple>
#include "base/containers/circular_deque.h"
#include "base/files/scoped_file.h"
#include "base/fuchsia/fuchsia_logging.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/memory/ref_counted.h"
#include "base/message_loop/message_pump_for_io.h"
#include "base/synchronization/lock.h"
#include "base/task/current_thread.h"
#include "base/task/single_thread_task_runner.h"
#include "base/task/task_runner.h"
#include "mojo/core/platform_handle_in_transit.h"
namespace mojo {
namespace core {
namespace {
const size_t kMaxBatchReadCapacity = 256 * 1024;
bool UnwrapFdioHandle(PlatformHandleInTransit handle,
PlatformHandleInTransit* out_handle,
Channel::Message::HandleInfoEntry* info) {
DCHECK(handle.handle().is_valid());
if (!handle.handle().is_valid_fd()) {
info->is_file_descriptor = false;
*out_handle = std::move(handle);
return true;
}
// Try to transfer the FD if possible, otherwise take a clone of it.
// This allows non-dup()d FDs to be efficiently unwrapped, while dup()d FDs
// have a new handle attached to the same underlying resource created.
zx::handle result;
zx_status_t status = fdio_fd_transfer_or_clone(
handle.TakeHandle().ReleaseFD(), result.reset_and_get_address());
if (status != ZX_OK) {
ZX_DLOG(ERROR, status) << "fdio_fd_transfer_or_clone";
return false;
}
info->is_file_descriptor = true;
*out_handle = PlatformHandleInTransit(PlatformHandle(std::move(result)));
return true;
}
PlatformHandle WrapFdioHandle(zx::handle handle,
Channel::Message::HandleInfoEntry info) {
if (!info.is_file_descriptor)
return PlatformHandle(std::move(handle));
base::ScopedFD out_fd;
zx_status_t status =
fdio_fd_create(handle.release(), base::ScopedFD::Receiver(out_fd).get());
if (status != ZX_OK) {
ZX_DLOG(ERROR, status) << "fdio_fd_create";
return PlatformHandle();
}
return PlatformHandle(std::move(out_fd));
}
// A view over a Channel::Message object. The write queue uses these since
// large messages may need to be sent in chunks.
class MessageView {
public:
// Owns |message|. |offset| indexes the first unsent byte in the message.
MessageView(Channel::MessagePtr message, size_t offset)
: message_(std::move(message)),
offset_(offset),
handles_(message_->TakeHandles()) {
DCHECK_GT(message_->data_num_bytes(), offset_);
}
MessageView(MessageView&& other) = default;
MessageView& operator=(MessageView&& other) = default;
MessageView(const MessageView&) = delete;
MessageView& operator=(const MessageView&) = delete;
~MessageView() = default;
const void* data() const {
return static_cast<const char*>(message_->data()) + offset_;
}
size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; }
size_t data_offset() const { return offset_; }
void advance_data_offset(size_t num_bytes) {
DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes);
offset_ += num_bytes;
}
std::vector<PlatformHandleInTransit> TakeHandles(bool unwrap_fds) {
if (handles_.empty() || !unwrap_fds)
return std::move(handles_);
// We can only pass Fuchsia handles via IPC, so unwrap any FDIO file-
// descriptors in |handles_| into the underlying handles, with metadata in
// the extra header to note which belong to FDIO.
auto* handles_info = reinterpret_cast<Channel::Message::HandleInfoEntry*>(
message_->mutable_extra_header());
memset(handles_info, 0, message_->extra_header_size());
// Since file descriptors unwrap to a single handle, we can unwrap in-place
// in the |handles_| vector.
for (size_t i = 0; i < handles_.size(); i++) {
if (!UnwrapFdioHandle(std::move(handles_[i]), &handles_[i],
&handles_info[i])) {
return std::vector<PlatformHandleInTransit>();
}
}
return std::move(handles_);
}
private:
Channel::MessagePtr message_;
size_t offset_;
std::vector<PlatformHandleInTransit> handles_;
};
class ChannelFuchsia : public Channel,
public base::CurrentThread::DestructionObserver,
public base::MessagePumpForIO::ZxHandleWatcher {
public:
ChannelFuchsia(Delegate* delegate,
ConnectionParams connection_params,
HandlePolicy handle_policy,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner)
: Channel(delegate, handle_policy),
self_(this),
handle_(
connection_params.TakeEndpoint().TakePlatformHandle().TakeHandle()),
io_task_runner_(io_task_runner) {
CHECK(handle_.is_valid());
}
ChannelFuchsia(const ChannelFuchsia&) = delete;
ChannelFuchsia& operator=(const ChannelFuchsia&) = delete;
void Start() override {
if (io_task_runner_->RunsTasksInCurrentSequence()) {
StartOnIOThread();
} else {
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelFuchsia::StartOnIOThread, this));
}
}
void ShutDownImpl() override {
// Always shut down asynchronously when called through the public interface.
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelFuchsia::ShutDownOnIOThread, this));
}
void Write(MessagePtr message) override {
bool write_error = false;
{
base::AutoLock lock(write_lock_);
if (reject_writes_)
return;
if (!WriteNoLock(MessageView(std::move(message), 0)))
reject_writes_ = write_error = true;
}
if (write_error) {
// Do not synchronously invoke OnWriteError(). Write() may have been
// called by the delegate and we don't want to re-enter it.
io_task_runner_->PostTask(
FROM_HERE, base::BindOnce(&ChannelFuchsia::OnWriteError, this,
Error::kDisconnected));
}
}
void LeakHandle() override {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
leak_handle_ = true;
}
bool GetReadPlatformHandles(const void* payload,
size_t payload_size,
size_t num_handles,
const void* extra_header,
size_t extra_header_size,
std::vector<PlatformHandle>* handles,
bool* deferred) override {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
if (num_handles > std::numeric_limits<uint16_t>::max())
return false;
// Locate the handle info and verify there is enough of it.
if (!extra_header)
return false;
const auto* handles_info =
reinterpret_cast<const Channel::Message::HandleInfoEntry*>(
extra_header);
size_t handles_info_size = sizeof(handles_info[0]) * num_handles;
if (handles_info_size > extra_header_size)
return false;
// If there are too few handles then we're not ready yet, so return true
// indicating things are OK, but leave |handles| empty.
if (incoming_handles_.size() < num_handles)
return true;
handles->reserve(num_handles);
for (size_t i = 0; i < num_handles; ++i) {
handles->emplace_back(WrapFdioHandle(std::move(incoming_handles_.front()),
handles_info[i]));
DCHECK(handles->back().is_valid());
incoming_handles_.pop_front();
}
return true;
}
bool GetReadPlatformHandlesForIpcz(
size_t num_handles,
std::vector<PlatformHandle>& handles) override {
if (incoming_handles_.size() < num_handles) {
return true;
}
DCHECK(handles.empty());
handles.reserve(num_handles);
for (size_t i = 0; i < num_handles; ++i) {
handles.emplace_back(std::move(incoming_handles_.front()));
incoming_handles_.pop_front();
}
return true;
}
private:
~ChannelFuchsia() override { DCHECK(!read_watch_); }
void StartOnIOThread() {
DCHECK(!read_watch_);
base::CurrentThread::Get()->AddDestructionObserver(this);
read_watch_ =
std::make_unique<base::MessagePumpForIO::ZxHandleWatchController>(
FROM_HERE);
base::CurrentIOThread::Get()->WatchZxHandle(
handle_.get(), true /* persistent */,
ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED, read_watch_.get(), this);
}
void ShutDownOnIOThread() {
base::CurrentThread::Get()->RemoveDestructionObserver(this);
read_watch_.reset();
if (leak_handle_)
std::ignore = handle_.release();
handle_.reset();
// May destroy the |this| if it was the last reference.
self_ = nullptr;
}
// base::CurrentThread::DestructionObserver:
void WillDestroyCurrentMessageLoop() override {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
if (self_)
ShutDownOnIOThread();
}
// base::MessagePumpForIO::ZxHandleWatcher:
void OnZxHandleSignalled(zx_handle_t handle, zx_signals_t signals) override {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
CHECK_EQ(handle, handle_.get());
DCHECK((ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED) & signals);
// We always try to read message(s), even if ZX_CHANNEL_PEER_CLOSED, since
// the peer may have closed while messages were still unread, in the pipe.
bool validation_error = false;
bool read_error = false;
size_t next_read_size = 0;
size_t buffer_capacity = 0;
size_t total_bytes_read = 0;
do {
buffer_capacity = next_read_size;
char* buffer = GetReadBuffer(&buffer_capacity);
DCHECK_GT(buffer_capacity, 0u);
uint32_t bytes_read = 0;
uint32_t handles_read = 0;
zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
zx_status_t read_result =
handle_.read(0, buffer, handles, buffer_capacity, std::size(handles),
&bytes_read, &handles_read);
if (read_result == ZX_OK) {
for (size_t i = 0; i < handles_read; ++i) {
incoming_handles_.emplace_back(handles[i]);
}
total_bytes_read += bytes_read;
if (!OnReadComplete(bytes_read, &next_read_size)) {
read_error = true;
validation_error = true;
break;
}
} else if (read_result == ZX_ERR_BUFFER_TOO_SMALL) {
DCHECK_LE(handles_read, std::size(handles));
next_read_size = bytes_read;
} else if (read_result == ZX_ERR_SHOULD_WAIT) {
break;
} else {
ZX_DLOG_IF(ERROR, read_result != ZX_ERR_PEER_CLOSED, read_result)
<< "zx_channel_read";
read_error = true;
break;
}
} while (total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0);
if (read_error) {
// Stop receiving read notifications.
read_watch_.reset();
if (validation_error)
OnError(Error::kReceivedMalformedData);
else
OnError(Error::kDisconnected);
}
}
// Attempts to write a message directly to the channel. If the full message
// cannot be written, it's queued and a wait is initiated to write the message
// ASAP on the I/O thread.
bool WriteNoLock(MessageView message_view) {
uint32_t write_bytes = 0;
do {
message_view.advance_data_offset(write_bytes);
std::vector<PlatformHandleInTransit> outgoing_handles =
message_view.TakeHandles(/*unwrap_fds=*/!is_for_ipcz());
zx_handle_t handles[ZX_CHANNEL_MAX_MSG_HANDLES] = {};
size_t handles_count = outgoing_handles.size();
DCHECK_LE(handles_count, std::size(handles));
for (size_t i = 0; i < handles_count; ++i) {
DCHECK(outgoing_handles[i].handle().is_valid());
handles[i] = outgoing_handles[i].handle().GetHandle().get();
}
write_bytes = std::min(message_view.data_num_bytes(),
static_cast<size_t>(ZX_CHANNEL_MAX_MSG_BYTES));
zx_status_t result = handle_.write(0, message_view.data(), write_bytes,
handles, handles_count);
// zx_channel_write() consumes |handles| whether or not it succeeds, so
// release() our copies now, to avoid them being double-closed.
for (auto& outgoing_handle : outgoing_handles)
outgoing_handle.CompleteTransit();
if (result != ZX_OK) {
// TODO(crbug.com/42050611): Handle ZX_ERR_SHOULD_WAIT flow-control
// errors, once the platform starts generating them.
ZX_DLOG_IF(ERROR, result != ZX_ERR_PEER_CLOSED, result)
<< "WriteNoLock(zx_channel_write)";
return false;
}
} while (write_bytes < message_view.data_num_bytes());
return true;
}
void OnWriteError(Error error) {
DCHECK(io_task_runner_->RunsTasksInCurrentSequence());
DCHECK(reject_writes_);
if (error == Error::kDisconnected) {
// If we can't write because the pipe is disconnected then continue
// reading to fetch any in-flight messages, relying on end-of-stream to
// signal the actual disconnection.
if (read_watch_) {
// TODO(crbug.com/42050611): When we add flow-control for writes, we
// also need to reset the write-watcher here.
return;
}
}
OnError(error);
}
// Keeps the Channel alive at least until explicit shutdown on the IO thread.
scoped_refptr<Channel> self_;
zx::channel handle_;
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner_;
// These members are only used on the IO thread.
std::unique_ptr<base::MessagePumpForIO::ZxHandleWatchController> read_watch_;
base::circular_deque<zx::handle> incoming_handles_;
bool leak_handle_ = false;
base::Lock write_lock_;
bool reject_writes_ = false;
};
} // namespace
// static
scoped_refptr<Channel> Channel::Create(
Delegate* delegate,
ConnectionParams connection_params,
HandlePolicy handle_policy,
scoped_refptr<base::SingleThreadTaskRunner> io_task_runner) {
return new ChannelFuchsia(delegate, std::move(connection_params),
handle_policy, std::move(io_task_runner));
}
} // namespace core
} // namespace mojo