// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/services/sharing/nearby/platform/input_stream_impl.h"
#include "base/containers/span.h"
#include "base/metrics/histogram_functions.h"
#include "base/task/sequenced_task_runner.h"
#include "mojo/public/c/system/data_pipe.h"
namespace nearby {
namespace chrome {
namespace {
void LogReadResult(connections::mojom::Medium medium, bool success) {
switch (medium) {
case connections::mojom::Medium::kBluetooth:
base::UmaHistogramBoolean(
"Nearby.Connections.Bluetooth.Socket.Read.Result", success);
break;
case connections::mojom::Medium::kWifiLan:
base::UmaHistogramBoolean("Nearby.Connections.WifiLan.Socket.Read.Result",
success);
break;
case connections::mojom::Medium::kUnknown:
case connections::mojom::Medium::kMdns:
case connections::mojom::Medium::kWifiHotspot:
case connections::mojom::Medium::kBle:
case connections::mojom::Medium::kWifiAware:
case connections::mojom::Medium::kNfc:
case connections::mojom::Medium::kWifiDirect:
case connections::mojom::Medium::kWebRtc:
case connections::mojom::Medium::kBleL2Cap:
case connections::mojom::Medium::kUsb:
break;
}
}
} // namespace
InputStreamImpl::InputStreamImpl(
connections::mojom::Medium medium,
scoped_refptr<base::SequencedTaskRunner> task_runner,
mojo::ScopedDataPipeConsumerHandle receive_stream)
: medium_(medium),
task_runner_(std::move(task_runner)),
receive_stream_(std::move(receive_stream)),
receive_stream_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
task_runner_) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
receive_stream_watcher_.Watch(
receive_stream_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&InputStreamImpl::ReceiveMore,
base::Unretained(this)));
}
InputStreamImpl::~InputStreamImpl() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
Close();
}
ExceptionOr<ByteArray> InputStreamImpl::Read(std::int64_t size) {
if (IsClosed())
return {Exception::kIo};
bool invalid_size = size <= 0 || size > std::numeric_limits<uint32_t>::max();
if (invalid_size) {
// Only log it as a read failure when |size| is out of range as in
// reality a null |receive_stream_| is an expected state after Close()
// was called normally.
LogReadResult(medium_, false);
return {Exception::kIo};
}
pending_read_buffer_ = std::make_unique<ByteArray>(size);
pending_read_buffer_pos_ = 0;
// Signal and reset the WaitableEvent in case another thread is already
// waiting on a Read().
read_waitable_event_.Signal();
read_waitable_event_.Reset();
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&mojo::SimpleWatcher::ArmOrNotify,
base::Unretained(&receive_stream_watcher_)));
read_waitable_event_.Wait();
pending_read_buffer_.reset();
pending_read_buffer_pos_ = 0;
// |receive_stream_| might have been reset in Close() while
// |read_waitable_event_| was waiting.
if (IsClosed())
return {Exception::kIo};
LogReadResult(medium_, exception_or_received_byte_array_.ok());
return exception_or_received_byte_array_;
}
Exception InputStreamImpl::Close() {
// NOTE(http://crbug.com/1247876): Close() might be called from multiple
// threads at the same time; sequence calls and check if stream is already
// closed inside task. Also, must cancel |receive_stream_watcher_| on the
// same sequence it was initialized on.
if (task_runner_->RunsTasksInCurrentSequence()) {
// No need to post a task if this is already running on |task_runner_|.
DoClose(/*task_run_waitable_event=*/nullptr);
} else {
base::WaitableEvent task_run_waitable_event;
task_runner_->PostTask(FROM_HERE, base::BindOnce(&InputStreamImpl::DoClose,
base::Unretained(this),
&task_run_waitable_event));
task_run_waitable_event.Wait();
}
return {Exception::kSuccess};
}
void InputStreamImpl::ReceiveMore(MojoResult result,
const mojo::HandleSignalsState& state) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
DCHECK_NE(result, MOJO_RESULT_SHOULD_WAIT);
DCHECK(!IsClosed());
DCHECK(pending_read_buffer_);
DCHECK_LT(pending_read_buffer_pos_, pending_read_buffer_->size());
if (state.peer_closed()) {
exception_or_received_byte_array_ = ExceptionOr<ByteArray>(Exception::kIo);
read_waitable_event_.Signal();
return;
}
if (result == MOJO_RESULT_OK) {
base::span<uint8_t> buffer =
base::as_writable_byte_span(*pending_read_buffer_)
.subspan(pending_read_buffer_pos_);
size_t bytes_read = 0;
result =
receive_stream_->ReadData(MOJO_READ_DATA_FLAG_NONE, buffer, bytes_read);
if (result == MOJO_RESULT_OK) {
pending_read_buffer_pos_ += bytes_read;
}
}
if (result == MOJO_RESULT_SHOULD_WAIT ||
pending_read_buffer_pos_ < pending_read_buffer_->size()) {
receive_stream_watcher_.ArmOrNotify();
return;
}
if (result == MOJO_RESULT_OK) {
exception_or_received_byte_array_ =
ExceptionOr<ByteArray>(std::move(*pending_read_buffer_));
} else {
exception_or_received_byte_array_ = ExceptionOr<ByteArray>(Exception::kIo);
}
read_waitable_event_.Signal();
}
bool InputStreamImpl::IsClosed() const {
return !receive_stream_.is_valid();
}
void InputStreamImpl::DoClose(base::WaitableEvent* task_run_waitable_event) {
// Must cancel |receive_stream_watcher_| on the same sequence it was
// initialized on.
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (!IsClosed()) {
receive_stream_watcher_.Cancel();
receive_stream_.reset();
// It is possible that a Read() call could still be blocking a different
// sequence via |read_waitable_event_| when Close() is called. If we only
// cancel the stream watcher, the Read() call will block forever. We
// trigger the event manually here, which will cause an IO exception to be
// returned from Read().
read_waitable_event_.Signal();
}
if (task_run_waitable_event)
task_run_waitable_event->Signal();
}
} // namespace chrome
} // namespace nearby