chromium/ash/ambient/model/ambient_topic_queue.cc

// Copyright 2022 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "ash/ambient/model/ambient_topic_queue.h"

#include <algorithm>
#include <limits>
#include <random>
#include <utility>
#include <vector>

#include "ash/ambient/ambient_constants.h"
#include "ash/public/cpp/ambient/proto/photo_cache_entry.pb.h"
#include "ash/public/cpp/shell_window_ids.h"
#include "ash/shell.h"
#include "base/barrier_closure.h"
#include "base/check.h"
#include "base/containers/flat_map.h"
#include "base/functional/bind.h"
#include "base/location.h"
#include "base/task/sequenced_task_runner.h"
#include "ui/display/display.h"
#include "ui/display/screen.h"
#include "ui/gfx/geometry/size.h"

namespace ash {
namespace {

constexpr net::BackoffEntry::Policy kFetchTopicRetryBackoffPolicy = {
    10,             // Number of initial errors to ignore.
    500,            // Initial delay in ms.
    2.0,            // Factor by which the waiting time will be multiplied.
    0.2,            // Fuzzing percentage.
    2 * 60 * 1000,  // Maximum delay in ms.
    -1,             // Never discard the entry.
    true,           // Use initial delay.
};

int TypeToIndex(::ambient::TopicType topic_type) {
  int index = static_cast<int>(topic_type);
  DCHECK_GE(index, 0);
  return index;
}

::ambient::TopicType IndexToType(int index) {
  ::ambient::TopicType topic_type = static_cast<::ambient::TopicType>(index);
  return topic_type;
}

std::vector<AmbientModeTopic> CreatePairedTopics(
    const std::vector<AmbientModeTopic>& topics) {
  // We pair two topics if:
  // 1. They are in the landscape orientation.
  // 2. They are in the same category.
  // 3. They are not Geo photos.
  base::flat_map<int, std::vector<int>> topics_by_type;
  std::vector<AmbientModeTopic> paired_topics;
  int topic_idx = -1;
  for (const auto& topic : topics) {
    topic_idx++;

    // Do not pair Geo photos, which will be rotate to fill the screen.
    // If a photo is portrait, it is from Google Photos and should have a paired
    // photo already.
    if (topic.topic_type == ::ambient::TopicType::kGeo || topic.is_portrait) {
      paired_topics.emplace_back(topic);
      continue;
    }

    int type_index = TypeToIndex(topic.topic_type);
    auto it = topics_by_type.find(type_index);
    if (it == topics_by_type.end()) {
      topics_by_type.insert({type_index, {topic_idx}});
    } else {
      it->second.emplace_back(topic_idx);
    }
  }

  // We merge two unpaired topics to create a new topic with related images.
  for (auto it = topics_by_type.begin(); it < topics_by_type.end(); ++it) {
    size_t idx = 0;
    while (idx < it->second.size() - 1) {
      AmbientModeTopic paired_topic;
      const auto& topic_1 = topics[it->second[idx]];
      const auto& topic_2 = topics[it->second[idx + 1]];
      paired_topic.url = topic_1.url;
      paired_topic.related_image_url = topic_2.url;

      paired_topic.details = topic_1.details;
      paired_topic.related_details = topic_2.details;
      paired_topic.topic_type = IndexToType(it->first);
      paired_topic.is_portrait = topic_1.is_portrait;
      paired_topics.emplace_back(paired_topic);

      idx += 2;
    }
  }

  std::shuffle(paired_topics.begin(), paired_topics.end(),
               std::default_random_engine());
  return paired_topics;
}

std::pair<AmbientModeTopic, AmbientModeTopic> SplitTopic(
    const AmbientModeTopic& paired_topic) {
  const auto clear_related_fields_from_topic = [](AmbientModeTopic& topic) {
    topic.related_image_url.clear();
    topic.related_details.clear();
  };

  AmbientModeTopic topic_with_primary(paired_topic);
  clear_related_fields_from_topic(topic_with_primary);

  AmbientModeTopic topic_with_related(paired_topic);
  topic_with_related.url = std::move(topic_with_related.related_image_url);
  topic_with_related.details = std::move(topic_with_related.related_details);
  clear_related_fields_from_topic(topic_with_related);
  return std::make_pair(topic_with_primary, topic_with_related);
}

}  // namespace

AmbientTopicQueue::AmbientTopicQueue(
    int topic_fetch_limit,
    int topic_fetch_size,
    base::TimeDelta topic_fetch_interval,
    bool should_split_topics,
    Delegate* delegate,
    AmbientBackendController* backend_controller)
    : topic_fetch_limit_(topic_fetch_limit),
      topic_fetch_size_(topic_fetch_size),
      topic_fetch_interval_(topic_fetch_interval),
      should_split_topics_(should_split_topics),
      delegate_(delegate),
      backend_controller_(backend_controller),
      fetch_topic_retry_backoff_(&kFetchTopicRetryBackoffPolicy) {
  DCHECK_GT(topic_fetch_size_, 0);
  DCHECK(backend_controller_);
  FetchTopics();
}

AmbientTopicQueue::~AmbientTopicQueue() = default;

void AmbientTopicQueue::WaitForTopicsAvailable(WaitCallback wait_cb) {
  DCHECK(wait_cb);
  if (!IsEmpty()) {
    std::move(wait_cb).Run(WaitResult::kTopicsAvailable);
  } else if (HasReachedTopicFetchLimit()) {
    std::move(wait_cb).Run(WaitResult::kTopicFetchLimitReached);
  } else if (topic_fetch_in_progress_) {
    pending_wait_cbs_.push_back(std::move(wait_cb));
  } else {
    // Only other possible option is that we're backing off because of a
    // previously failed topic fetch.
    std::move(wait_cb).Run(WaitResult::kTopicFetchBackingOff);
  }
}

AmbientModeTopic AmbientTopicQueue::Pop() {
  DCHECK(!IsEmpty());
  AmbientModeTopic popped_topic = std::move(available_topics_.front());
  available_topics_.pop();
  if (available_topics_.empty())
    FetchTopics();
  return popped_topic;
}

bool AmbientTopicQueue::IsEmpty() const {
  return available_topics_.empty();
}

bool AmbientTopicQueue::HasReachedTopicFetchLimit() const {
  return total_topics_fetched_ >= topic_fetch_limit_;
}

void AmbientTopicQueue::FetchTopics() {
  if (topic_fetch_in_progress_ || HasReachedTopicFetchLimit())
    return;

  topic_fetch_in_progress_ = true;
  fetch_topic_timer_.Stop();
  pending_topic_batches_.clear();

  // Make one topic network request per topic size and wait for them all to
  // complete.
  std::vector<gfx::Size> topic_sizes = delegate_->GetTopicSizes();
  int num_topic_sizes_requested = topic_sizes.size();
  DCHECK_GT(num_topic_sizes_requested, 0);
  // Distribute the |topic_fetch_size_| equally among the different topic
  // sizes requested. For example, if |topic_fetch_size_| is 100 and there is a
  // portrait and landscape size requested, request 50 of each. Impose a floor
  // of 1 though in case there are a bunch of topic sizes requested.
  int num_topics_per_request =
      std::max(topic_fetch_size_ / num_topic_sizes_requested, 1);
  auto barrier_closure = base::BarrierClosure(
      /*num_closures=*/num_topic_sizes_requested,
      base::BindOnce(&AmbientTopicQueue::OnAllScreenUpdateInfoFetched,
                     weak_factory_.GetWeakPtr()));
  for (const gfx::Size& requested_topic_size : topic_sizes) {
    backend_controller_->FetchScreenUpdateInfo(
        num_topics_per_request,
        // If |should_split_topics_| is true, it does not make sense to ever
        // request paired personal portrait topics since they will ultimately
        // just be unpaired on the client anyways.
        /*show_pair_personal_portraits=*/!should_split_topics_,
        requested_topic_size,
        base::BindOnce(&AmbientTopicQueue::OnScreenUpdateInfoFetched,
                       weak_factory_.GetWeakPtr(), barrier_closure,
                       requested_topic_size));
  }
}

void AmbientTopicQueue::OnScreenUpdateInfoFetched(
    const base::RepeatingClosure& barrier_closure,
    const gfx::Size& requested_topic_size,
    const ash::ScreenUpdate& screen_update) {
  DCHECK(barrier_closure);
  std::vector<AmbientModeTopic> processed_topics;
  if (should_split_topics_) {
    for (const AmbientModeTopic& topic : screen_update.next_topics) {
      if (topic.related_image_url.empty()) {
        processed_topics.push_back(topic);
      } else {
        std::pair<AmbientModeTopic, AmbientModeTopic> split_topic =
            SplitTopic(topic);
        processed_topics.push_back(std::move(split_topic.first));
        processed_topics.push_back(std::move(split_topic.second));
      }
    }
  } else {
    std::vector<AmbientModeTopic> related_topics =
        CreatePairedTopics(screen_update.next_topics);
    for (AmbientModeTopic& topic : related_topics) {
      processed_topics.push_back(std::move(topic));
    }
  }

  // It is possible that |screen_update| is an empty instance if fatal errors
  // happened during the fetch or CreatePairedTopics() yielded no paired topics.
  if (processed_topics.empty()) {
    if (screen_update.next_topics.empty()) {
      LOG(WARNING)
          << "IMAX server returned screen update with no topics for size "
          << requested_topic_size.ToString();
    } else {
      LOG(WARNING) << "CreatePairedTopics() yielded no topics for size "
                   << requested_topic_size.ToString();
    }
  }

  bool inserted = pending_topic_batches_
                      .emplace(std::make_pair(requested_topic_size.width(),
                                              requested_topic_size.height()),
                               std::move(processed_topics))
                      .second;
  DCHECK(inserted) << "Duplicate topic size found: "
                   << requested_topic_size.ToString();
  barrier_closure.Run();
}

void AmbientTopicQueue::OnAllScreenUpdateInfoFetched() {
  DCHECK(topic_fetch_in_progress_);
  topic_fetch_in_progress_ = false;

  size_t total_fetched_topics = 0;
  size_t smallest_topic_batch = std::numeric_limits<size_t>::max();
  for (const auto& [_, topic_batch] : pending_topic_batches_) {
    total_fetched_topics += topic_batch.size();
    if (topic_batch.size() < smallest_topic_batch)
      smallest_topic_batch = topic_batch.size();
  }

  if (total_fetched_topics == 0) {
    LOG(ERROR) << "Topic fetch returned no topics. Retrying with backoff";
    fetch_topic_retry_backoff_.InformOfRequest(/*succeeded=*/false);
    ScheduleFetchTopics(/*backoff=*/true);
    RunPendingWaitCallbacks(WaitResult::kTopicFetchBackingOff);
    return;
  }

  if (smallest_topic_batch > 0) {
    // Intersperse topics from each requested size in the queue so that the
    // caller sees a uniform distribution of each size when popping from it.
    for (size_t topic_idx = 0; topic_idx < smallest_topic_batch; ++topic_idx) {
      for (auto& [_, topic_batch] : pending_topic_batches_) {
        DCHECK_LT(topic_idx, topic_batch.size());
        Push(std::move(topic_batch[topic_idx]));
      }
    }
  } else {
    // In the worst case where there were no topics fetched a topic size, add
    // just one topic from all of the other topic sizes. This means there will
    // not be a completely uniform distribution in the end, but this is damage
    // control and prevents scenarios where we don't fill the queue with
    // anything because of one problematic request.
    for (auto& [_, topic_batch] : pending_topic_batches_) {
      if (!topic_batch.empty())
        Push(std::move(topic_batch.front()));
    }
  }

  fetch_topic_retry_backoff_.InformOfRequest(/*succeeded=*/true);
  ScheduleFetchTopics(/*backoff=*/false);
  RunPendingWaitCallbacks(WaitResult::kTopicsAvailable);
}

void AmbientTopicQueue::ScheduleFetchTopics(bool backoff) {
  // If retry, use the backoff delay, otherwise the default delay.
  const base::TimeDelta delay =
      backoff ? fetch_topic_retry_backoff_.GetTimeUntilRelease()
              : topic_fetch_interval_;
  fetch_topic_timer_.Start(FROM_HERE, delay,
                           base::BindOnce(&AmbientTopicQueue::FetchTopics,
                                          weak_factory_.GetWeakPtr()));
}

void AmbientTopicQueue::Push(AmbientModeTopic topic) {
  if (HasReachedTopicFetchLimit())
    return;

  available_topics_.push(std::move(topic));
  ++total_topics_fetched_;
}

void AmbientTopicQueue::RunPendingWaitCallbacks(WaitResult wait_result) {
  for (WaitCallback& wait_cb : pending_wait_cbs_) {
    // Run the callbacks asynchronously in case the callback's implementation
    // invokes WaitForTopicsAvailable() again.
    base::SequencedTaskRunner::GetCurrentDefault()->PostTask(
        FROM_HERE, base::BindOnce(std::move(wait_cb), wait_result));
  }
  pending_wait_cbs_.clear();
}

}  // namespace ash