chromium/chromecast/base/java/src/org/chromium/chromecast/base/Observable.java

// Copyright 2018 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package org.chromium.chromecast.base;

import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

import java.util.function.Supplier;

/**
 * Interface for Observable state.
 *
 * Observables can have some data associated with them, which is provided to observers when the
 * Observable activates.
 *
 * @param <T> The type of the activation data.
 */
@FunctionalInterface
public interface Observable<T> {
    /**
     * Tracks this Observable with the given observer.
     *
     * When this Observable is activated, the observer will be opened with the activation data to
     * produce a scope. When this Observable is deactivated, that scope will be closed.
     *
     * When the returned Scope (referred to as a "subscription") is closed, the observer's scopes
     * will be closed and the observer will no longer be notified of updates.
     */
    Scope subscribe(Observer<? super T> observer);

    /**
     * Creates an Observable that opens observers's scopes only if both `this` and `other` are
     * activated, and closes those scopes if either of `this` or `other` are deactivated.
     *
     * This is useful for creating an event handler that should only activate when two events
     * have occurred, but those events may occur in any order.
     */
    default <U> Observable<Both<T, U>> and(Observable<U> other) {
        return flatMap(t -> other.flatMap(u -> just(Both.both(t, u))));
    }

    /**
     * Creates an Observable out of the activations of `this` and `other`. An observer that is
     * subscribed to the result will be subscribed to both `this` and `other` at the same time.
     *
     * This combines Observables in the same way that concatenation combines lists. This operation
     * forms a monoid over Observables with empty() as the identity element. I.e. for all
     * Observables `x`:
     *
     *   x.or(empty()) == x
     *   empty().or(x) == x
     */
    default Observable<T> or(Observable<T> other) {
        return observer -> subscribe(observer).and(other.subscribe(observer));
    }

    /**
     * Returns an Observable that is only activated with the values added after subscription.
     *
     * Some Observables synchronously notify observers of their current state the moment they are
     * subscribed. This operator filters these out and only notifies the observer of changes that
     * occur after the moment of subscription.
     */
    default Observable<T> after() {
        return observer -> {
            Box<Boolean> after = new Box<Boolean>(false);
            Scope sub = subscribe(t -> after.value ? observer.open(t) : Scope.NO_OP);
            after.value = true;
            return sub;
        };
    }

    /**
     * Returns an Observable that is activated when `this` and `other` are activated in order.
     *
     * This is similar to `and()`, but does not activate if `other` is activated before `this`.
     */
    default <U> Observable<Both<T, U>> andThen(Observable<U> other) {
        return and(other.after());
    }

    /**
     * Returns an Observable that applies the given Function to this Observable's activation values.
     */
    default <R> Observable<R> map(Function<? super T, ? extends R> f) {
        return flatMap(t -> just(f.apply(t)));
    }

    /**
     * Returns an Observable that applies the given Function to this Observable's activation data,
     * and notifies observers with the activation data of the Observable returned by the Function.
     *
     * If you have a function that returns an Observable, you can use this to avoid using map() to
     * create an Observable of Observables.
     *
     * This is an extremely powerful operation! Observables are a monad where flatMap() is the
     * "bind" operation that combines an Observable with a function that returns an Observable to
     * create a new Observable.
     *
     * One use case could be using Observables as "promises" and using flatMap() with "async
     * functions" that return Observables to create asynchronous programs:
     *
     *   Observable<Foo> getFooAsync();
     *   Observable<Bar> getBarAsync(Foo foo);
     *   Scope useBar(Bar bar);
     *
     *   getFooAsync().flatMap(foo -> getBarAsync(foo)).subscribe(bar -> useBar(bar));
     */
    default <R> Observable<R> flatMap(
            Function<? super T, ? extends Observable<? extends R>> f) {
        return observer -> subscribe(t -> f.apply(t).subscribe(r -> observer.open(r)));
    }

    /**
     * Returns an Observable that is only activated when `this` is activated with a value such that
     * the given `predicate` returns true.
     */
    default Observable<T> filter(Predicate<? super T> predicate) {
        return flatMap(t -> predicate.test(t) ? just(t) : empty());
    }

    /**
     * Returns an Observable with its type mapped to Unit.
     */
    default Observable<Unit> opaque() {
        return map(x -> Unit.unit());
    }

    /**
     * Returns an Observable that accumulates this Observable's data into another Observable by
     * invoking |acc| whenever data are added to this, and closing the Scope it returns whenever
     * the data that added it is removed from this.
     *
     * The |factory| must create some object that extends Observable. Typically, this is a mutable
     * object like Cell (used in the implementation of fold()). This mutable Observable is passed
     * alongside the data from this Observable to the |acc| function, which can decide to mutate the
     * mutable Observable with whatever transformation of the data it chooses. The |acc| function
     * then returns a Scope which can defer another mutation operation until when the data are
     * removed from this Observable.
     *
     * This has a number of use cases. The fold() operator uses this with Cell to aggregate all the
     * simultaneous activations of the source into an Observable of a single value, which is useful
     * for computing sums or counts of the data (which in turn is used by the not() operator).
     *
     * Another use case is taking the most recent activation from the source Observable and
     * deduplicating equal activations.
     *
     *   Observable<Foo> source = ...;
     *   // |deduped| will have only one activation at a time, will not drop those activations if
     *   // the |source| revokes them, and will not deactivate and reactivate if |source| produces
     *   // data that .equals() the most recent data that preceded it.
     *   Observable<Foo> deduped = source.accumulate(() -> new Controller<Foo>(), (a, foo) -> {
     *       a.set(foo);
     *       return Scope.NO_OP;
     *   });
     */
    default <U, A extends Observable<U>> Observable<U> accumulate(
            Supplier<A> factory, BiFunction<A, T, ? extends Scope> acc) {
        return observer -> {
            A current = factory.get();
            return subscribe(t -> acc.apply(current, t)).and(current.subscribe(observer));
        };
    }

    /**
     * Returns an Observable that emits the most recent activation from |this|. Even if data is
     * revoked from |this|, the distinctUntilChanged() observable does not revoke it, and will only
     * do so once it receives a new *distinct* activation from |this|. If a new activation is
     * equivalent to the previous one, it is ignored.
     *
     * This provides a way to conveniently "break seams" in streams of data. If, for example, an
     * Observable emits `true`, `true`, `false`, `true`, in that order, the distinctUntilChanged()
     * Observable will emit `true`, `false`, `true` (dropping the redundant `true`).
     */
    default Observable<T> distinctUntilChanged() {
        return accumulate(Controller::new, (c, t) -> {
            c.set(t);
            return Scope.NO_OP;
        });
    }

    /**
     * Returns an Observable that combines the state of all of this Observable's data into
     * a single activation of type A, where the state is combined by successively applying |acc|
     * when this Observable adds data, and |dim| when this Observable removes data.
     *
     * By default, if |this| is empty, then the result Observable will be activated with the value
     * of |start|. If |this| is activated with data, then that T-typed data and the current A-typed
     * data from the result Observable will be fed to |acc| to calculate the new A-typed data for
     * the result Observable. If |this| has data deactivated, then that data, along with the current
     * A-typed data from the result Observable, will be fed to |dim| to calculate the new A-typed
     * data for the result Observable.
     *
     * There is always exactly one activation in the result Observable.
     *
     * This method provides a generic way to combine the state of multiple activations, "remember"
     * previous activations, or keep track of ordering in a way that can't be done with pure monadic
     * operations.
     */
    default <A> Observable<A> fold(A start, BiFunction<A, T, A> acc, BiFunction<A, T, A> dim) {
        return accumulate(() -> new Cell<A>(start), (current, t) -> {
            current.mutate(a -> acc.apply(a, t));
            return () -> current.mutate(a -> dim.apply(a, t));
        });
    }

    /**
     * Returns an Observable that contains the number of activations in |this|, which updates
     * dynamically as |this| updates.
     */
    default Observable<Integer> count() {
        return fold(0, (n, x) -> n + 1, (n, x) -> n - 1);
    }

    /**
     * Returns an Observable that is activated only when the given Observable is not activated.
     */
    static Observable<?> not(Observable<?> observable) {
        return observable.count().filter(n -> n == 0);
    }

    /**
     * A degenerate Observable that has no data.
     */
    static <T> Observable<T> empty() {
        return observer -> Scope.NO_OP;
    }

    /**
     * Creates an Observable that notifies subscribers with a single immutable value.
     *
     * This is the "return" operation in the Observable monad.
     */
    static <T> Observable<T> just(T value) {
        if (value == null) return empty();
        return observer -> observer.open(value);
    }

    /**
     * Push debug info about subscriptions and state transitions to a logger.
     *
     * The logger is a consumer of strings. Typically, the consumer should be a lambda that prints
     * the input using org.chromium.base.Log with any extra info you want to include. See
     * chromium/base/reactive_java.md for an example.
     *
     * By passing a Consumer instead of having debug() call logger methods directly, you can 1)
     * control the logging level, or use alternative loggers, and 2) when using chromium's logger,
     * see the right file name and line number in the logs.
     */
    default Observable<T> debug(Consumer<String> logger) {
        return observer -> {
            logger.accept("subscribe");
            Scope subscription = subscribe(data -> {
                logger.accept(new StringBuilder("open ").append(data).toString());
                Scope scope = observer.open(data);
                Scope debugClose =
                        () -> logger.accept(new StringBuilder("close ").append(data).toString());
                return scope.and(debugClose);
            });
            Scope debugUnsubscribe = () -> logger.accept("unsubscribe");
            return subscription.and(debugUnsubscribe);
        };
    }

    /**
     * An abstraction over posting a delayed task. Implementations should run the posted Runnable
     * after a certain amount of time has elapsed, preferably on the same thread that posted the
     * Runnable.
     */
    interface Scheduler {
        void postDelayed(Runnable runnable, long delay);
    }

    /**
     * Return an Observable that activates a given amount of time (in milliseconds) after it is
     * subscribed to.
     *
     * Note that the alarm countdown starts when subscribed, not when the Observable is constructed.
     * Therefore, if there are multiple observers, each will have its own timer.
     *
     * If you use an alarm as the argument to an `.and()` operator, for example, a unique timer will
     * start for each activation of the left-hand side of the `.and()` call.
     *
     * The Scheduler is responsible for executing a Runnable after the given delay has elapsed,
     * preferably on the same thread that invoked its postDelayed() method, unless the observers of
     * this Observable are thread-safe.
     */
    static Observable<?> alarm(Scheduler scheduler, long ms) {
        return observer -> {
            Controller<Unit> activation = new Controller<>();
            scheduler.postDelayed(() -> activation.set(Unit.unit()), ms);
            return activation.subscribe(observer);
        };
    }

    /**
     * An Observable that delays any activations by the given amount of time.
     *
     * If the activation is revoked before the given amount of time elapses, the activation is
     * effectively "canceled" from the perspective of observers.
     *
     * The Scheduler is responsible for executing a Runnable after the given delay has elapsed,
     * preferably on the same thread that invoked its postDelayed() method, unless the observers of
     * this Observable are thread-safe.
     */
    default Observable<T> delay(Scheduler scheduler, long ms) {
        return flatMap(t -> alarm(scheduler, ms).map(x -> t));
    }
}