chromium/chrome/browser/ash/file_system_provider/fileapi/file_stream_reader.cc

// Copyright 2014 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/ash/file_system_provider/fileapi/file_stream_reader.h"

#include <memory>
#include <utility>

#include "base/files/file.h"
#include "base/functional/bind.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/trace_event/trace_event.h"
#include "chrome/browser/ash/file_system_provider/abort_callback.h"
#include "chrome/browser/ash/file_system_provider/fileapi/provider_async_file_util.h"
#include "chrome/browser/ash/file_system_provider/mount_path_util.h"
#include "chrome/browser/ash/file_system_provider/provided_file_system_interface.h"
#include "chrome/browser/ash/file_system_provider/scoped_file_opener.h"
#include "content/public/browser/browser_task_traits.h"
#include "content/public/browser/browser_thread.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"

using content::BrowserThread;

namespace ash::file_system_provider {

// Converts net::CompletionOnceCallback to net::Int64CompletionOnceCallback.
void Int64ToIntCompletionOnceCallback(net::CompletionOnceCallback callback,
                                      int64_t result) {
  std::move(callback).Run(static_cast<int>(result));
}

class FileStreamReader::OperationRunner
    : public base::RefCountedThreadSafe<
          FileStreamReader::OperationRunner,
          content::BrowserThread::DeleteOnUIThread> {
 public:
  OperationRunner() : file_handle_(0) {}

  OperationRunner(const OperationRunner&) = delete;
  OperationRunner& operator=(const OperationRunner&) = delete;

  // Opens a file for reading and calls the completion callback. Must be called
  // on UI thread.
  void OpenFileOnUIThread(const storage::FileSystemURL& url,
                          storage::AsyncFileUtil::StatusCallback callback) {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);
    DCHECK(abort_callback_.is_null());

    util::FileSystemURLParser parser(url);
    if (!parser.Parse()) {
      content::GetIOThreadTaskRunner({})->PostTask(
          FROM_HERE,
          base::BindOnce(std::move(callback), base::File::FILE_ERROR_SECURITY));
      return;
    }

    file_system_ = parser.file_system()->GetWeakPtr();
    file_path_ = parser.file_path();
    file_opener_ = std::make_unique<ScopedFileOpener>(
        parser.file_system(), parser.file_path(), OPEN_FILE_MODE_READ,
        base::BindOnce(&OperationRunner::OnOpenFileCompletedOnUIThread, this,
                       std::move(callback)));
  }

  // Requests reading contents of a file. |callback| will always run eventually.
  // It can be called many times, until |has_more| is set to false. This
  // function guarantees that it will succeed only if the file has not been
  // changed while reading. Must be called on UI thread.
  void ReadFileOnUIThread(
      scoped_refptr<net::IOBuffer> buffer,
      int64_t offset,
      int length,
      ProvidedFileSystemInterface::ReadChunkReceivedCallback callback) {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);
    DCHECK(abort_callback_.is_null());

    // If the file system got unmounted, then abort the reading operation.
    if (!file_system_.get()) {
      content::GetIOThreadTaskRunner({})->PostTask(
          FROM_HERE, base::BindOnce(callback, 0, /*has_more=*/false,
                                    base::File::FILE_ERROR_ABORT));
      return;
    }

    abort_callback_ = file_system_->ReadFile(
        file_handle_, buffer.get(), offset, length,
        base::BindRepeating(&OperationRunner::OnReadFileCompletedOnUIThread,
                            this, callback));
  }

  // Requests metadata of a file. |callback| will always run eventually.
  // Must be called on UI thread.
  void GetMetadataOnUIThread(
      ProvidedFileSystemInterface::GetMetadataCallback callback) {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);
    DCHECK(abort_callback_.is_null());

    // If the file system got unmounted, then abort the get length operation.
    if (!file_system_.get()) {
      content::GetIOThreadTaskRunner({})->PostTask(
          FROM_HERE, base::BindOnce(std::move(callback),
                                    base::WrapUnique<EntryMetadata>(nullptr),
                                    base::File::FILE_ERROR_ABORT));
      return;
    }

    abort_callback_ = file_system_->GetMetadata(
        file_path_,
        ProvidedFileSystemInterface::METADATA_FIELD_SIZE |
            ProvidedFileSystemInterface::METADATA_FIELD_MODIFICATION_TIME,
        base::BindOnce(&OperationRunner::OnGetMetadataCompletedOnUIThread, this,
                       std::move(callback)));
  }

  // Aborts the most recent operation (if exists) and closes a file if opened.
  // The runner must not be used anymore after calling this method.
  void CloseRunnerOnUIThread() {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);

    if (!abort_callback_.is_null())
      std::move(abort_callback_).Run();

    // Close the file (if opened).
    file_opener_.reset();
  }

 private:
  friend struct content::BrowserThread::DeleteOnThread<
      content::BrowserThread::UI>;
  friend class base::DeleteHelper<OperationRunner>;

  virtual ~OperationRunner() = default;

  // Remembers a file handle for further operations and forwards the result to
  // the IO thread.
  void OnOpenFileCompletedOnUIThread(
      storage::AsyncFileUtil::StatusCallback callback,
      int file_handle,
      base::File::Error result,
      std::unique_ptr<EntryMetadata> metadata) {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);
    abort_callback_.Reset();

    if (result == base::File::FILE_OK)
      file_handle_ = file_handle;

    content::GetIOThreadTaskRunner({})->PostTask(
        FROM_HERE, base::BindOnce(std::move(callback), result));
  }

  // Forwards a metadata to the IO thread.
  void OnGetMetadataCompletedOnUIThread(
      ProvidedFileSystemInterface::GetMetadataCallback callback,
      std::unique_ptr<EntryMetadata> metadata,
      base::File::Error result) {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);
    abort_callback_.Reset();

    content::GetIOThreadTaskRunner({})->PostTask(
        FROM_HERE,
        base::BindOnce(std::move(callback), std::move(metadata), result));
  }

  // Forwards a response of reading from a file to the IO thread.
  void OnReadFileCompletedOnUIThread(
      ProvidedFileSystemInterface::ReadChunkReceivedCallback
          chunk_received_callback,
      int chunk_length,
      bool has_more,
      base::File::Error result) {
    DCHECK_CURRENTLY_ON(BrowserThread::UI);
    if (!has_more)
      abort_callback_.Reset();

    content::GetIOThreadTaskRunner({})->PostTask(
        FROM_HERE, base::BindOnce(chunk_received_callback, chunk_length,
                                  has_more, result));
  }

  AbortCallback abort_callback_;
  base::WeakPtr<ProvidedFileSystemInterface> file_system_;
  base::FilePath file_path_;
  std::unique_ptr<ScopedFileOpener> file_opener_;
  int file_handle_;
};

FileStreamReader::FileStreamReader(storage::FileSystemContext* context,
                                   const storage::FileSystemURL& url,
                                   int64_t initial_offset,
                                   const base::Time& expected_modification_time)
    : url_(url),
      current_offset_(initial_offset),
      current_length_(0),
      expected_modification_time_(expected_modification_time),
      runner_(new OperationRunner),
      state_(NOT_INITIALIZED) {}

FileStreamReader::~FileStreamReader() {
  // FileStreamReader doesn't have a Cancel() method like in FileStreamWriter.
  // Therefore, aborting and/or closing an opened file is done from the
  // destructor.
  content::GetUIThreadTaskRunner({})->PostTask(
      FROM_HERE,
      base::BindOnce(&OperationRunner::CloseRunnerOnUIThread, runner_));

  // If a read is in progress, mark it as completed.
  TRACE_EVENT_NESTABLE_ASYNC_END0("file_system_provider",
                                  "FileStreamReader::Read", this);
}

void FileStreamReader::Initialize(
    base::OnceClosure pending_closure,
    net::Int64CompletionOnceCallback error_callback) {
  DCHECK_EQ(NOT_INITIALIZED, state_);
  state_ = INITIALIZING;

  content::GetUIThreadTaskRunner({})->PostTask(
      FROM_HERE,
      base::BindOnce(&OperationRunner::OpenFileOnUIThread, runner_, url_,
                     base::BindOnce(&FileStreamReader::OnOpenFileCompleted,
                                    weak_ptr_factory_.GetWeakPtr(),
                                    std::move(pending_closure),
                                    std::move(error_callback))));
}

void FileStreamReader::OnOpenFileCompleted(
    base::OnceClosure pending_closure,
    net::Int64CompletionOnceCallback error_callback,
    base::File::Error result) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  DCHECK_EQ(INITIALIZING, state_);

  // In case of an error, return immediately using the |error_callback| of the
  // Read() or GetLength() pending request.
  if (result != base::File::FILE_OK) {
    state_ = FAILED;
    std::move(error_callback).Run(net::FileErrorToNetError(result));
    return;
  }

  DCHECK_EQ(base::File::FILE_OK, result);

  // Verify the last modification time.
  content::GetUIThreadTaskRunner({})->PostTask(
      FROM_HERE,
      base::BindOnce(&OperationRunner::GetMetadataOnUIThread, runner_,
                     base::BindOnce(&FileStreamReader::OnInitializeCompleted,
                                    weak_ptr_factory_.GetWeakPtr(),
                                    std::move(pending_closure),
                                    std::move(error_callback))));
}

void FileStreamReader::OnInitializeCompleted(
    base::OnceClosure pending_closure,
    net::Int64CompletionOnceCallback error_callback,
    std::unique_ptr<EntryMetadata> metadata,
    base::File::Error result) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  DCHECK_EQ(INITIALIZING, state_);

  // In case of an error, abort.
  if (result != base::File::FILE_OK) {
    state_ = FAILED;
    std::move(error_callback).Run(net::FileErrorToNetError(result));
    return;
  }

  // If the file modification time has changed, then abort. Note, that the file
  // may be changed without affecting the modification time.
  DCHECK(metadata.get());
  if (!expected_modification_time_.is_null() &&
      *metadata->modification_time != expected_modification_time_) {
    state_ = FAILED;
    std::move(error_callback).Run(net::ERR_UPLOAD_FILE_CHANGED);
    return;
  }

  DCHECK_EQ(base::File::FILE_OK, result);
  state_ = INITIALIZED;

  // Run the task waiting for the initialization to be completed.
  std::move(pending_closure).Run();
}

int FileStreamReader::Read(net::IOBuffer* buffer,
                           int buffer_length,
                           net::CompletionOnceCallback callback) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  TRACE_EVENT_NESTABLE_ASYNC_BEGIN1("file_system_provider",
                                    "FileStreamReader::Read", this,
                                    "buffer_length", buffer_length);

  read_callback_ = std::move(callback);
  switch (state_) {
    case NOT_INITIALIZED:
      // Lazily initialize with the first call to Read().
      Initialize(
          base::BindOnce(&FileStreamReader::ReadAfterInitialized,
                         weak_ptr_factory_.GetWeakPtr(),
                         base::WrapRefCounted(buffer), buffer_length,
                         base::BindRepeating(&FileStreamReader::OnReadCompleted,
                                             weak_ptr_factory_.GetWeakPtr())),
          base::BindOnce(&Int64ToIntCompletionOnceCallback,
                         base::BindOnce(&FileStreamReader::OnReadCompleted,
                                        weak_ptr_factory_.GetWeakPtr())));
      break;

    case INITIALIZING:
      NOTREACHED_IN_MIGRATION();
      break;

    case INITIALIZED:
      ReadAfterInitialized(
          buffer, buffer_length,
          base::BindRepeating(&FileStreamReader::OnReadCompleted,
                              weak_ptr_factory_.GetWeakPtr()));
      break;

    case FAILED:
      NOTREACHED_IN_MIGRATION();
      break;
  }

  return net::ERR_IO_PENDING;
}

void FileStreamReader::OnReadCompleted(int result) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  std::move(read_callback_).Run(static_cast<int>(result));
  TRACE_EVENT_NESTABLE_ASYNC_END0("file_system_provider",
                                  "FileStreamReader::Read", this);
}

int64_t FileStreamReader::GetLength(net::Int64CompletionOnceCallback callback) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);

  get_length_callback_ = std::move(callback);
  switch (state_) {
    case NOT_INITIALIZED:
      // Lazily initialize with the first call to GetLength().
      Initialize(base::BindOnce(&FileStreamReader::GetLengthAfterInitialized,
                                weak_ptr_factory_.GetWeakPtr()),
                 base::BindOnce(&FileStreamReader::OnGetLengthCompleted,
                                weak_ptr_factory_.GetWeakPtr()));
      break;

    case INITIALIZING:
      NOTREACHED_IN_MIGRATION();
      break;

    case INITIALIZED:
      GetLengthAfterInitialized();
      break;

    case FAILED:
      NOTREACHED_IN_MIGRATION();
      break;
  }

  return net::ERR_IO_PENDING;
}

void FileStreamReader::OnGetLengthCompleted(int64_t result) {
  std::move(get_length_callback_).Run(result);
}

void FileStreamReader::ReadAfterInitialized(
    scoped_refptr<net::IOBuffer> buffer,
    int buffer_length,
    const net::CompletionRepeatingCallback& callback) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  DCHECK_EQ(INITIALIZED, state_);

  current_length_ = 0;
  content::GetUIThreadTaskRunner({})->PostTask(
      FROM_HERE, base::BindOnce(&OperationRunner::ReadFileOnUIThread, runner_,
                                buffer, current_offset_, buffer_length,
                                base::BindRepeating(
                                    &FileStreamReader::OnReadChunkReceived,
                                    weak_ptr_factory_.GetWeakPtr(), callback)));
}

void FileStreamReader::GetLengthAfterInitialized() {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  DCHECK_EQ(INITIALIZED, state_);

  content::GetUIThreadTaskRunner({})->PostTask(
      FROM_HERE,
      base::BindOnce(
          &OperationRunner::GetMetadataOnUIThread, runner_,
          base::BindOnce(&FileStreamReader::OnGetMetadataForGetLengthReceived,
                         weak_ptr_factory_.GetWeakPtr())));
}

void FileStreamReader::OnReadChunkReceived(
    const net::CompletionRepeatingCallback& callback,
    int chunk_length,
    bool has_more,
    base::File::Error result) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  DCHECK_EQ(INITIALIZED, state_);

  current_length_ += chunk_length;

  // If this is the last chunk with a success, then finalize.
  if (!has_more && result == base::File::FILE_OK) {
    current_offset_ += current_length_;
    callback.Run(current_length_);
    return;
  }

  // In case of an error, abort.
  if (result != base::File::FILE_OK) {
    DCHECK(!has_more);
    state_ = FAILED;
    callback.Run(net::FileErrorToNetError(result));
    return;
  }

  // More data is about to come, so do not call the callback yet.
  DCHECK(has_more);
}

void FileStreamReader::OnGetMetadataForGetLengthReceived(
    std::unique_ptr<EntryMetadata> metadata,
    base::File::Error result) {
  DCHECK_CURRENTLY_ON(BrowserThread::IO);
  DCHECK_EQ(INITIALIZED, state_);

  // In case of an error, abort.
  if (result != base::File::FILE_OK) {
    state_ = FAILED;
    std::move(get_length_callback_).Run(net::FileErrorToNetError(result));
    return;
  }

  // If the file modification time has changed, then abort. Note, that the file
  // may be changed without affecting the modification time.
  DCHECK(metadata.get());
  if (!expected_modification_time_.is_null() &&
      *metadata->modification_time != expected_modification_time_) {
    std::move(get_length_callback_).Run(net::ERR_UPLOAD_FILE_CHANGED);
    return;
  }

  DCHECK_EQ(base::File::FILE_OK, result);
  std::move(get_length_callback_).Run(*metadata->size);
}

}  // namespace ash::file_system_provider