chromium/components/cronet/android/java/src/org/chromium/net/impl/CronetUploadDataStream.java

// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

package org.chromium.net.impl;

import android.annotation.SuppressLint;

import androidx.annotation.IntDef;
import androidx.annotation.VisibleForTesting;

import org.jni_zero.CalledByNative;
import org.jni_zero.JNINamespace;
import org.jni_zero.NativeClassQualifiedName;
import org.jni_zero.NativeMethods;

import org.chromium.base.Log;
import org.chromium.net.UploadDataProvider;
import org.chromium.net.UploadDataSink;

import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.concurrent.GuardedBy;

/**
 * CronetUploadDataStream handles communication between an upload body encapsulated in the
 * embedder's {@link UploadDataSink} and a C++ UploadDataStreamAdapter, which it owns. It's attached
 * to a {@link CronetUrlRequest}'s during the construction of request's native C++ objects on the
 * network thread, though it's created on one of the embedder's threads. It is called by the
 * UploadDataStreamAdapter on the network thread, but calls into the UploadDataSink and the
 * UploadDataStreamAdapter on the Executor passed into its constructor.
 */
@JNINamespace("cronet")
@VisibleForTesting
public final class CronetUploadDataStream extends UploadDataSink {
    private static final String TAG = CronetUploadDataStream.class.getSimpleName();
    // These are never changed, once a request starts.
    private final Executor mExecutor;
    private final VersionSafeCallbacks.UploadDataProviderWrapper mDataProvider;
    private final CronetUrlRequest mRequest;
    private long mLength;
    private long mRemainingLength;
    private long mByteBufferLimit;

    // This is atomic because the read code, which runs during URL request metrics collection, could
    // run at the same time we are issuing a read, especially if the request fails or is cancelled
    // while uploading.
    private final AtomicInteger mReadCount = new AtomicInteger();

    // Reusable read task, to reduce redundant memory allocation.
    private final Runnable mReadTask =
            new Runnable() {
                @Override
                public void run() {
                    synchronized (mLock) {
                        if (mUploadDataStreamAdapter == 0) {
                            return;
                        }
                        checkState(UserCallback.NOT_IN_CALLBACK);
                        if (mByteBuffer == null) {
                            throw new IllegalStateException(
                                    "Unexpected readData call. Buffer is null");
                        }
                        mInWhichUserCallback = UserCallback.READ;
                    }
                    try {
                        checkCallingThread();
                        assert mByteBuffer.position() == 0;
                        mDataProvider.read(CronetUploadDataStream.this, mByteBuffer);
                        mReadCount.incrementAndGet();
                    } catch (Exception exception) {
                        onError(exception);
                    }
                }
            };

    // ByteBuffer created in the native code and passed to
    // UploadDataProvider for reading. It is only valid from the
    // call to mDataProvider.read until onError or onReadSucceeded.
    private ByteBuffer mByteBuffer;

    // Lock that protects all subsequent variables. The adapter has to be
    // protected to ensure safe shutdown, mReading and mRewinding are protected
    // to robustly detect getting read/rewind results more often than expected.
    private final Object mLock = new Object();

    // Native adapter object, owned by the CronetUploadDataStream. It's only
    // deleted after the native UploadDataStream object is destroyed. All access
    // to the adapter is synchronized, for safe usage and cleanup.
    @GuardedBy("mLock")
    private long mUploadDataStreamAdapter;

    @IntDef({
        UserCallback.READ,
        UserCallback.REWIND,
        UserCallback.GET_LENGTH,
        UserCallback.NOT_IN_CALLBACK
    })
    @Retention(RetentionPolicy.SOURCE)
    private @interface UserCallback {
        int READ = 0;
        int REWIND = 1;
        int GET_LENGTH = 2;
        int NOT_IN_CALLBACK = 3;
    }

    @GuardedBy("mLock")
    private @UserCallback int mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;

    @GuardedBy("mLock")
    private boolean mDestroyAdapterPostponed;

    private Runnable mOnDestroyedCallbackForTesting;

    /**
     * Constructs a CronetUploadDataStream.
     *
     * @param dataProvider the UploadDataProvider to read data from.
     * @param executor the Executor to execute UploadDataProvider tasks.
     */
    public CronetUploadDataStream(
            UploadDataProvider dataProvider, Executor executor, CronetUrlRequest request) {
        mExecutor = executor;
        mDataProvider = new VersionSafeCallbacks.UploadDataProviderWrapper(dataProvider);
        mRequest = request;
    }

    /** Called by native code to make the UploadDataProvider read data into {@code byteBuffer}. */
    @SuppressWarnings("unused")
    @CalledByNative
    void readData(ByteBuffer byteBuffer) {
        mByteBuffer = byteBuffer;
        mByteBufferLimit = byteBuffer.limit();
        postTaskToExecutor(mReadTask);
    }

    // TODO(mmenke): Consider implementing a cancel method.
    // currently wait for any pending read to complete.

    /** Called by native code to make the UploadDataProvider rewind upload data. */
    @SuppressWarnings("unused")
    @CalledByNative
    void rewind() {
        Runnable task =
                new Runnable() {
                    @Override
                    public void run() {
                        synchronized (mLock) {
                            if (mUploadDataStreamAdapter == 0) {
                                return;
                            }
                            checkState(UserCallback.NOT_IN_CALLBACK);
                            mInWhichUserCallback = UserCallback.REWIND;
                        }
                        try {
                            checkCallingThread();
                            mDataProvider.rewind(CronetUploadDataStream.this);
                        } catch (Exception exception) {
                            onError(exception);
                        }
                    }
                };
        postTaskToExecutor(task);
    }

    private void checkCallingThread() {
        mRequest.checkCallingThread();
    }

    @GuardedBy("mLock")
    private void checkState(@UserCallback int mode) {
        if (mInWhichUserCallback != mode) {
            throw new IllegalStateException(
                    "Expected " + mode + ", but was " + mInWhichUserCallback);
        }
    }

    /**
     * Called when the native UploadDataStream is destroyed. At this point, the native adapter needs
     * to be destroyed, but only after any pending read operation completes, as the adapter owns the
     * read buffer.
     */
    @SuppressWarnings("unused")
    @CalledByNative
    void onUploadDataStreamDestroyed() {
        destroyAdapter();
    }

    /**
     * Helper method called when an exception occurred. This method resets states and propagates the
     * error to the request.
     */
    private void onError(Throwable exception) {
        final boolean sendClose;
        synchronized (mLock) {
            if (mInWhichUserCallback == UserCallback.NOT_IN_CALLBACK) {
                throw new IllegalStateException(
                        "There is no read or rewind or length check in progress.", exception);
            }
            sendClose = mInWhichUserCallback == UserCallback.GET_LENGTH;
            mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
            mByteBuffer = null;
            destroyAdapterIfPostponed();
        }
        // Failure before length is obtained means that the request has failed before the
        // adapter has been initialized. Close the UploadDataProvider. This is safe to call
        // here since failure during getLength can only happen on the user's executor.
        if (sendClose) {
            try {
                mDataProvider.close();
            } catch (Exception e) {
                Log.e(TAG, "Failure closing data provider", e);
            }
        }

        // Just fail the request - simpler to fail directly, and
        // UploadDataStream only supports failing during initialization, not
        // while reading. The request is smart enough to handle the case where
        // it was already canceled by the embedder.
        mRequest.onUploadException(exception);
    }

    @Override
    @SuppressLint("DefaultLocale")
    public void onReadSucceeded(boolean lastChunk) {
        synchronized (mLock) {
            checkState(UserCallback.READ);
            if (mByteBufferLimit != mByteBuffer.limit()) {
                throw new IllegalStateException("ByteBuffer limit changed");
            }
            if (lastChunk && mLength >= 0) {
                throw new IllegalArgumentException("Non-chunked upload can't have last chunk");
            }
            int bytesRead = mByteBuffer.position();
            if (bytesRead == 0 && !lastChunk) {
                // Sending an empty buffer does not make any sense, if the user wishes
                // to signal end of data then that is done automatically done by the
                // networking stack as we know the size through |getLength()|. So once
                // the data has all completely transmitted, the networking stack will
                // automatically signal to the receiver. However, for the case for
                // chunked-upload, the optimal scenario is that the last chunk must
                // be sent with |lastChunk = true| with a non-empty buffer, but sending
                // an empty buffer with |lastChunk = true| is also allowed.
                //
                // Currently, H/1 and H/3 requests will hang indefinitely which will
                // means that the user must handle the request timeout manually, while
                // H/2 requests will immediately crash. In order to provide a consistent
                // behavior, we will fail the request immediately and put the request
                // in terminal state of |onError|
                //
                // We explicitly choose not to crash / throw for the sake of maintaining
                // app compatibility unlike the other branches in this method which throws
                // immediately.
                //
                // See b/332860415 for more details.
                onError(
                        new IllegalStateException(
                                "Bytes read can't be zero except for last chunk!"));
                return;
            }
            mRemainingLength -= bytesRead;
            if (mRemainingLength < 0 && mLength >= 0) {
                throw new IllegalArgumentException(
                        String.format(
                                "Read upload data length %d exceeds expected length %d",
                                mLength - mRemainingLength, mLength));
            }
            mByteBuffer.position(0);
            mByteBuffer = null;
            mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;

            destroyAdapterIfPostponed();
            // Request may been canceled already.
            if (mUploadDataStreamAdapter == 0) {
                return;
            }
            CronetUploadDataStreamJni.get()
                    .onReadSucceeded(
                            mUploadDataStreamAdapter,
                            CronetUploadDataStream.this,
                            bytesRead,
                            lastChunk);
        }
    }

    @Override
    public void onReadError(Exception exception) {
        synchronized (mLock) {
            checkState(UserCallback.READ);
            onError(exception);
        }
    }

    @Override
    public void onRewindSucceeded() {
        synchronized (mLock) {
            checkState(UserCallback.REWIND);
            mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
            mRemainingLength = mLength;
            // Request may been canceled already.
            if (mUploadDataStreamAdapter == 0) {
                return;
            }
            CronetUploadDataStreamJni.get()
                    .onRewindSucceeded(mUploadDataStreamAdapter, CronetUploadDataStream.this);
        }
    }

    @Override
    public void onRewindError(Exception exception) {
        synchronized (mLock) {
            checkState(UserCallback.REWIND);
            onError(exception);
        }
    }

    /** Posts task to application Executor. */
    void postTaskToExecutor(Runnable task) {
        try {
            mExecutor.execute(task);
        } catch (Throwable e) {
            // Just fail the request. The request is smart enough to handle the
            // case where it was already canceled by the embedder.
            mRequest.onUploadException(e);
        }
    }

    /**
     * The adapter is owned by the CronetUploadDataStream, so it can be destroyed safely when there
     * is no pending read; however, destruction is initiated by the destruction of the native
     * UploadDataStream.
     */
    private void destroyAdapter() {
        synchronized (mLock) {
            if (mInWhichUserCallback == UserCallback.READ) {
                // Wait for the read to complete before destroy the adapter.
                mDestroyAdapterPostponed = true;
                return;
            }
            if (mUploadDataStreamAdapter == 0) {
                return;
            }
            CronetUploadDataStreamJni.get().destroy(mUploadDataStreamAdapter);
            mUploadDataStreamAdapter = 0;
            if (mOnDestroyedCallbackForTesting != null) {
                mOnDestroyedCallbackForTesting.run();
            }
        }
        postTaskToExecutor(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            checkCallingThread();
                            mDataProvider.close();
                        } catch (Exception e) {
                            Log.e(TAG, "Exception thrown when closing", e);
                        }
                    }
                });
    }

    /**
     * Destroys the native adapter if the destruction is postponed due to a pending read, which has
     * since completed. Caller needs to be on executor thread.
     */
    private void destroyAdapterIfPostponed() {
        synchronized (mLock) {
            if (mInWhichUserCallback == UserCallback.READ) {
                throw new IllegalStateException(
                        "Method should not be called when read has not completed.");
            }
            if (mDestroyAdapterPostponed) {
                destroyAdapter();
            }
        }
    }

    /**
     * Initializes upload length by getting it from data provider. Submits to the user's executor
     * thread to allow getLength() to block and/or report errors. If data provider throws an
     * exception, then it is reported to the request. No native calls to urlRequest are allowed as
     * this is done before request start, so native object may not exist.
     */
    void initializeWithRequest() {
        synchronized (mLock) {
            mInWhichUserCallback = UserCallback.GET_LENGTH;
        }
        try {
            mRequest.checkCallingThread();
            mLength = mDataProvider.getLength();
            mRemainingLength = mLength;
        } catch (Throwable t) {
            onError(t);
        }
        synchronized (mLock) {
            mInWhichUserCallback = UserCallback.NOT_IN_CALLBACK;
        }
    }

    /**
     * Creates native objects and attaches them to the underlying request adapter object. Always
     * called on executor thread.
     */
    void attachNativeAdapterToRequest(final long requestAdapter) {
        synchronized (mLock) {
            mUploadDataStreamAdapter =
                    CronetUploadDataStreamJni.get()
                            .attachUploadDataToRequest(
                                    CronetUploadDataStream.this, requestAdapter, mLength);
        }
    }

    /**
     * Creates a native CronetUploadDataStreamAdapter and CronetUploadDataStream for testing.
     *
     * @return the address of the native CronetUploadDataStream object.
     */
    public long createUploadDataStreamForTesting() throws IOException {
        synchronized (mLock) {
            mUploadDataStreamAdapter =
                    CronetUploadDataStreamJni.get()
                            .createAdapterForTesting(CronetUploadDataStream.this);
            mLength = mDataProvider.getLength();
            mRemainingLength = mLength;
            return CronetUploadDataStreamJni.get()
                    .createUploadDataStreamForTesting(
                            CronetUploadDataStream.this, mLength, mUploadDataStreamAdapter);
        }
    }

    void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) {
        mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
    }

    int getReadCount() {
        return mReadCount.get();
    }

    // Native methods are implemented in upload_data_stream_adapter.cc.
    @NativeMethods
    interface Natives {
        long attachUploadDataToRequest(
                CronetUploadDataStream caller, long urlRequestAdapter, long length);

        long createAdapterForTesting(CronetUploadDataStream caller);

        long createUploadDataStreamForTesting(
                CronetUploadDataStream caller, long length, long adapter);

        @NativeClassQualifiedName("CronetUploadDataStreamAdapter")
        void onReadSucceeded(
                long nativePtr, CronetUploadDataStream caller, int bytesRead, boolean finalChunk);

        @NativeClassQualifiedName("CronetUploadDataStreamAdapter")
        void onRewindSucceeded(long nativePtr, CronetUploadDataStream caller);

        @NativeClassQualifiedName("CronetUploadDataStreamAdapter")
        void destroy(long nativePtr);
    }
}