chromium/chrome/services/sharing/nearby/platform/input_stream_impl.h

// Copyright 2021 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef CHROME_SERVICES_SHARING_NEARBY_PLATFORM_INPUT_STREAM_IMPL_H_
#define CHROME_SERVICES_SHARING_NEARBY_PLATFORM_INPUT_STREAM_IMPL_H_

#include <stdint.h>

#include <memory>
#include <optional>

#include "base/memory/scoped_refptr.h"
#include "base/synchronization/waitable_event.h"
#include "chromeos/ash/services/nearby/public/mojom/nearby_connections_types.mojom.h"
#include "mojo/public/cpp/system/data_pipe.h"
#include "mojo/public/cpp/system/simple_watcher.h"
#include "third_party/nearby/src/internal/platform/input_stream.h"

namespace base {
class SequencedTaskRunner;
}  // namespace base

namespace nearby {
namespace chrome {

// An implementation of a Nearby Connections InputStream that reads from the
// Mojo DataPipe, |receive_stream|, passed into the constructor by the specified
// |medium| implementation.
//
// Because the InputStream interface is synchronous but the DataPipe interface
// is asynchronous, we block Read() on the calling thread while waiting for the
// DataPipe to become readable. We also block Close() on the calling thread
// while shutting down the DataPipe. While the calling thread is blocked,
// |task_runner| handles the actual read and cancel operations.
//
// This class must be created and destroyed on the |task_runner| because the
// mojo::SimpleWatcher expects to be created on the same sequence it is run on.
class InputStreamImpl : public InputStream {
 public:
  InputStreamImpl(connections::mojom::Medium medium,
                  scoped_refptr<base::SequencedTaskRunner> task_runner,
                  mojo::ScopedDataPipeConsumerHandle receive_stream);
  ~InputStreamImpl() override;
  InputStreamImpl(const InputStreamImpl&) = delete;
  InputStreamImpl& operator=(const InputStreamImpl&) = delete;

  // InputStream:
  ExceptionOr<ByteArray> Read(std::int64_t size) override;
  Exception Close() override;

 private:
  // Must be run on the |task_runner_|.
  void ReceiveMore(MojoResult result, const mojo::HandleSignalsState& state);

  // Returns true if |receive_stream_| is already closed.
  bool IsClosed() const;

  // Signals |task_run_waitable_event|, if not null, after the stream is closed.
  // Must be run on the |task_runner_|.
  void DoClose(base::WaitableEvent* task_run_waitable_event);

  connections::mojom::Medium medium_;
  scoped_refptr<base::SequencedTaskRunner> task_runner_;
  mojo::ScopedDataPipeConsumerHandle receive_stream_;
  mojo::SimpleWatcher receive_stream_watcher_;

  std::unique_ptr<ByteArray> pending_read_buffer_;
  size_t pending_read_buffer_pos_ = 0;
  ExceptionOr<ByteArray> exception_or_received_byte_array_;
  base::WaitableEvent read_waitable_event_;
};

}  // namespace chrome
}  // namespace nearby

#endif  // CHROME_SERVICES_SHARING_NEARBY_PLATFORM_INPUT_STREAM_IMPL_H_