// Copyright 2018 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef CHROMECAST_NET_SMALL_MESSAGE_SOCKET_H_
#define CHROMECAST_NET_SMALL_MESSAGE_SOCKET_H_
#include <memory>
#include "base/containers/span.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "net/base/io_buffer.h"
namespace base {
class SequencedTaskRunner;
} // namespace base
namespace net {
class Socket;
} // namespace net
namespace chromecast {
class IOBufferPool;
// Sends messages over a Socket. All methods must be called on the same
// sequence. Any of the delegate methods can destroy this object if desired.
class SmallMessageSocket {
public:
class Delegate {
public:
// Called when sending becomes possible again, if a previous attempt to send
// was rejected.
virtual void OnSendUnblocked() {}
// Called when an unrecoverable error occurs while sending or receiving. Is
// only called asynchronously.
virtual void OnError(int error) {}
// Called when the end of stream has been read. No more data will be
// received.
virtual void OnEndOfStream() {}
// Called when a message has been received and there is no buffer pool. The
// |data| buffer contains |size| bytes of data. Return |true| to continue
// reading messages after OnMessage() returns.
virtual bool OnMessage(char* data, size_t size) = 0;
// Called when a message has been received. The |buffer| contains |size|
// bytes of data, which includes the first 2 (or 6) bytes which are the size
// in network byte order. Note that these 2/6 bytes are not included in
// OnMessage()! Return |true| to continue receiving messages.
// Note: if the first 2 bytes are 0xffff, then the following 4 bytes are the
// size in network byte order (in which case, the offset to the message data
// is 6 bytes).
virtual bool OnMessageBuffer(scoped_refptr<net::IOBuffer> buffer,
size_t size);
protected:
virtual ~Delegate() = default;
};
SmallMessageSocket(Delegate* delegate, std::unique_ptr<net::Socket> socket);
SmallMessageSocket(const SmallMessageSocket&) = delete;
SmallMessageSocket& operator=(const SmallMessageSocket&) = delete;
virtual ~SmallMessageSocket();
net::Socket* socket() const { return socket_.get(); }
base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); }
IOBufferPool* buffer_pool() const { return buffer_pool_.get(); }
// Adds a |buffer_pool| used to allocate buffers to receive messages into;
// received messages are passed to OnMessageBuffer(). If a message would be
// too big to fit in a pool-provided buffer, a dynamically allocated IOBuffer
// will be used instead for that message.
void UseBufferPool(scoped_refptr<IOBufferPool> buffer_pool);
// Removes the buffer pool; subsequent received messages will be passed to
// OnMessage().
void RemoveBufferPool();
// Prepares a buffer to send a message of the given |message_size|. Returns
// nullptr if sending is not allowed right now (ie, another send is currently
// in progress). Otherwise, returns a buffer at least large enough to contain
// |message_size| bytes. The caller should fill in the buffer as desired and
// then call Send() to send the finished message.
// If nullptr is returned, then OnSendUnblocked() will be called once sending
// is possible again.
//
// TODO(crbug.com/40284755): This method should return a span of size
// `message_size` instead of a pointer.
void* PrepareSend(size_t message_size);
void Send();
// Sends an already-prepared buffer of data, if possible. The first part of
// the buffer should contain the message size information as written by
// WriteSizeData(). Returns true if the buffer will be sent; returns false if
// sending is not allowed right now (ie, another send is currently in
// progress). If false is returned, then OnSendUnblocked() will be called once
// sending is possible again.
bool SendBuffer(scoped_refptr<net::IOBuffer> data, size_t size);
// Returns the number of bytes used for size information for the given message
// size.
static size_t SizeDataBytes(size_t message_size);
// Writes the necessary |message_size| information into `buf`. This can be
// used to prepare a buffer for SendBuffer(). Note that if `message_size` is
// greater than or equal to 0xffff, the message size data will take up 6
// bytes, and the `buf` will need to be large enough. Returns the number of
// bytes written (= SizeDataBytes(message_size)).
static size_t WriteSizeData(base::span<uint8_t> buf, size_t message_size);
// Reads the message size from a |ptr| which contains |bytes_read| of data.
// Returns |false| if there was not enough data to read a valid size.
static bool ReadSize(char* ptr,
size_t bytes_read,
size_t& data_offset,
size_t& message_size);
// Enables receiving messages from the stream. Messages will be received and
// passed to OnMessage() until either an error occurs, the end of stream is
// reached, or OnMessage() returns false. If OnMessage() returns false, you
// may call ReceiveMessages() to start receiving again. OnMessage() will not
// be called synchronously from within this method (it always posts a task).
void ReceiveMessages();
// Same as ReceiveMessages(), but OnMessage() may be called synchronously.
// This is more efficient because it doesn't post a task to ensure
// asynchronous reads.
void ReceiveMessagesSynchronously();
private:
friend class SmallMessageSocketTest;
// This class wraps the IOBuffer and controls its range to point into
// `buffer_` but allowing it to be a subset of `buffer_` that can shrink as
// bytes are consumed from the front. The base IOBuffer is passed in from
// SetUnderlyingBuffer.
class BufferWrapper : public net::IOBuffer {
public:
BufferWrapper();
// Set the base buffer. `capacity` is the total size of `base`.
void SetUnderlyingBuffer(scoped_refptr<net::IOBuffer> base,
size_t capacity);
scoped_refptr<net::IOBuffer> TakeUnderlyingBuffer();
void ClearUnderlyingBuffer();
// Offset the next available bytes in the buffer.
void DidConsume(size_t bytes);
// A pointer to the very start of the buffer. The `span()` returned will
// move as DidConsume() is called and moves the start of the buffer forward.
// But this will always return the absolute beginning of the buffer.
// TODO(328018028): This should return a span.
char* StartOfBuffer() const;
size_t used() const { return used_; }
size_t capacity() const { return capacity_; }
base::span<const uint8_t> used_span() const;
private:
~BufferWrapper() override;
// The buffer that actually holds the memory.
scoped_refptr<net::IOBuffer> buffer_;
// Size of the used bytes in previous operations.
size_t used_ = 0;
// Total size of `buffer_`.
size_t capacity_ = 0;
};
void OnWriteComplete(int result);
bool HandleWriteResult(int result);
void OnError(int error);
void Read();
void OnReadComplete(int result);
bool HandleReadResult(int result);
bool HandleCompletedMessages();
bool HandleCompletedMessageBuffers();
void ActivateBufferPool(base::span<const uint8_t> current_data);
Delegate* const delegate_;
const std::unique_ptr<net::Socket> socket_;
const scoped_refptr<base::SequencedTaskRunner> task_runner_;
const scoped_refptr<net::GrowableIOBuffer> write_storage_;
const scoped_refptr<BufferWrapper> write_buffer_;
bool send_blocked_ = false;
const scoped_refptr<net::GrowableIOBuffer> read_storage_;
scoped_refptr<IOBufferPool> buffer_pool_;
const scoped_refptr<BufferWrapper> read_buffer_;
bool in_message_ = false;
base::WeakPtrFactory<SmallMessageSocket> weak_factory_;
};
} // namespace chromecast
#endif // CHROMECAST_NET_SMALL_MESSAGE_SOCKET_H_