// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/services/sharing/nearby/platform/output_stream_impl.h"
#include "base/metrics/histogram_functions.h"
#include "base/task/sequenced_task_runner.h"
namespace nearby {
namespace chrome {
namespace {
void LogWriteResult(connections::mojom::Medium medium, bool success) {
switch (medium) {
case connections::mojom::Medium::kBluetooth:
base::UmaHistogramBoolean(
"Nearby.Connections.Bluetooth.Socket.Write.Result", success);
break;
case connections::mojom::Medium::kWifiLan:
base::UmaHistogramBoolean(
"Nearby.Connections.WifiLan.Socket.Write.Result", success);
break;
case connections::mojom::Medium::kUnknown:
case connections::mojom::Medium::kMdns:
case connections::mojom::Medium::kWifiHotspot:
case connections::mojom::Medium::kBle:
case connections::mojom::Medium::kWifiAware:
case connections::mojom::Medium::kNfc:
case connections::mojom::Medium::kWifiDirect:
case connections::mojom::Medium::kWebRtc:
case connections::mojom::Medium::kBleL2Cap:
case connections::mojom::Medium::kUsb:
break;
}
}
} // namespace
OutputStreamImpl::OutputStreamImpl(
connections::mojom::Medium medium,
scoped_refptr<base::SequencedTaskRunner> task_runner,
mojo::ScopedDataPipeProducerHandle send_stream)
: medium_(medium),
task_runner_(std::move(task_runner)),
send_stream_(std::move(send_stream)),
send_stream_watcher_(FROM_HERE,
mojo::SimpleWatcher::ArmingPolicy::MANUAL,
task_runner_) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
send_stream_watcher_.Watch(
send_stream_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&OutputStreamImpl::SendMore, base::Unretained(this)));
}
OutputStreamImpl::~OutputStreamImpl() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
Close();
}
Exception OutputStreamImpl::Write(const ByteArray& data) {
if (IsClosed()) {
return {Exception::kIo};
}
DCHECK(!write_success_);
pending_write_buffer_ = std::make_unique<ByteArray>(data);
pending_write_buffer_pos_ = 0;
// Signal and reset the WaitableEvent in case another thread is already
// waiting on a Write().
write_waitable_event_.Signal();
write_waitable_event_.Reset();
task_runner_->PostTask(
FROM_HERE, base::BindOnce(&mojo::SimpleWatcher::ArmOrNotify,
base::Unretained(&send_stream_watcher_)));
write_waitable_event_.Wait();
Exception result = {write_success_ ? Exception::kSuccess : Exception::kIo};
write_success_ = false;
pending_write_buffer_.reset();
pending_write_buffer_pos_ = 0;
// |send_stream_| might have been reset in Close() while
// |write_waitable_event_| was waiting.
if (IsClosed()) {
return {Exception::kIo};
}
// Ignore a null |send_stream_| when logging since it might be an expected
// state as mentioned above.
LogWriteResult(medium_, result.Ok());
return result;
}
Exception OutputStreamImpl::Flush() {
// TODO(hansberry): Unsure if anything can reasonably be done here. Need to
// ask a reviewer from the Nearby team.
return {Exception::kSuccess};
}
Exception OutputStreamImpl::Close() {
// NOTE(http://crbug.com/1247876): Close() might be called from multiple
// threads at the same time; sequence calls and check if stream is already
// closed inside task. Also, must cancel |send_stream_watcher_| on the same
// sequence it was initialized on.
if (task_runner_->RunsTasksInCurrentSequence()) {
// No need to post a task if this is already running on |task_runner_|.
DoClose(/*task_run_waitable_event=*/nullptr);
} else {
base::WaitableEvent task_run_waitable_event;
task_runner_->PostTask(FROM_HERE, base::BindOnce(&OutputStreamImpl::DoClose,
base::Unretained(this),
&task_run_waitable_event));
task_run_waitable_event.Wait();
}
return {Exception::kSuccess};
}
void OutputStreamImpl::SendMore(MojoResult result,
const mojo::HandleSignalsState& state) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
DCHECK_NE(result, MOJO_RESULT_SHOULD_WAIT);
DCHECK(!IsClosed());
DCHECK(pending_write_buffer_);
DCHECK_LT(pending_write_buffer_pos_, pending_write_buffer_->size());
if (state.peer_closed()) {
write_success_ = false;
write_waitable_event_.Signal();
return;
}
if (result == MOJO_RESULT_OK) {
base::span<const uint8_t> buffer =
base::as_byte_span(*pending_write_buffer_)
.subspan(pending_write_buffer_pos_);
size_t bytes_written = 0;
result = send_stream_->WriteData(buffer, MOJO_WRITE_DATA_FLAG_NONE,
bytes_written);
if (result == MOJO_RESULT_OK) {
pending_write_buffer_pos_ += bytes_written;
}
}
if (result == MOJO_RESULT_SHOULD_WAIT ||
pending_write_buffer_pos_ < pending_write_buffer_->size()) {
send_stream_watcher_.ArmOrNotify();
return;
}
write_success_ = result == MOJO_RESULT_OK;
write_waitable_event_.Signal();
}
bool OutputStreamImpl::IsClosed() const {
return !send_stream_.is_valid();
}
void OutputStreamImpl::DoClose(base::WaitableEvent* task_run_waitable_event) {
// Must cancel |send_stream_watcher_| on the same sequence it was
// initialized on.
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (!IsClosed()) {
send_stream_watcher_.Cancel();
send_stream_.reset();
// It is possible that a Write() call could still be blocking a different
// sequence via |write_waitable_event_| when Close() is called. If we only
// cancel the stream watcher, the Write() call will block forever. We
// trigger the event manually here, which will cause an IO exception to be
// returned from Write().
write_waitable_event_.Signal();
}
if (task_run_waitable_event) {
task_run_waitable_event->Signal();
}
}
} // namespace chrome
} // namespace nearby