// Copyright 2020 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/nearby_sharing/instantmessaging/stream_parser.h"
#include <string_view>
#include "base/logging.h"
#include "net/base/io_buffer.h"
#include "third_party/protobuf/src/google/protobuf/io/coded_stream.h"
#include "third_party/protobuf/src/google/protobuf/wire_format_lite.h"
namespace {
using ::google::protobuf::internal::WireFormatLite;
// A buffer spare capacity limits the amount of times we need to resize the
// buffer when copying over data, which involves reallocating memory.
// We chose 512 because it is one of the larger standard sizes for
// a buffer, and we expect a lot of data to be received in the WebRTC
// signaling process.
constexpr int kReadBufferSpareCapacity = 512;
// The minimum number of bytes to parse the messages or the noop field of
// the StreamBody proto is 2 because the size of the tag and wire type is a
// single byte, and the smallest size information would be contained in another
// single byte.
constexpr int kMinimumBytesToParseNextMessagesField = 2;
} // namespace
StreamParser::StreamParser() = default;
StreamParser::~StreamParser() = default;
std::vector<
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
StreamParser::Append(std::string_view data) {
if (!unparsed_data_buffer_) {
unparsed_data_buffer_ = base::MakeRefCounted<net::GrowableIOBuffer>();
unparsed_data_buffer_->SetCapacity(data.size() + kReadBufferSpareCapacity);
} else if (unparsed_data_buffer_->RemainingCapacity() <
static_cast<int>(data.size())) {
unparsed_data_buffer_->SetCapacity(unparsed_data_buffer_->offset() +
data.size() + kReadBufferSpareCapacity);
}
DCHECK_GE(unparsed_data_buffer_->RemainingCapacity(),
static_cast<int>(data.size()));
memcpy(unparsed_data_buffer_->data(), data.data(), data.size());
unparsed_data_buffer_->set_offset(unparsed_data_buffer_->offset() +
data.size());
return ParseStreamIfAvailable();
}
std::vector<
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
StreamParser::ParseStreamIfAvailable() {
DCHECK(unparsed_data_buffer_);
std::vector<
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse>
receive_messages_responses;
base::span<uint8_t> unparsed_bytes_available =
unparsed_data_buffer_->span_before_offset();
if (unparsed_bytes_available.size() < kMinimumBytesToParseNextMessagesField) {
return receive_messages_responses;
}
google::protobuf::io::CodedInputStream input_stream(
unparsed_bytes_available.data(), unparsed_bytes_available.size());
int bytes_consumed = 0;
// We can't use StreamBody::ParseFromString() here, as it can't do partial
// parsing, nor can it tell how many bytes are consumed.
bool continue_parsing = unparsed_bytes_available.size() > 0;
while (continue_parsing) {
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse
parsed_response;
StreamParsingResult result =
ParseNextMessagesFieldFromStream(&input_stream, &parsed_response);
switch (result) {
case StreamParser::StreamParsingResult::kSuccessfullyParsedResponse:
receive_messages_responses.push_back(parsed_response);
[[fallthrough]];
case StreamParser::StreamParsingResult::kNoop:
bytes_consumed = input_stream.CurrentPosition();
continue_parsing = base::checked_cast<size_t>(bytes_consumed) <
unparsed_bytes_available.size();
break;
case StreamParser::StreamParsingResult::kNotEnoughDataYet:
case StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed:
continue_parsing = false;
break;
}
}
if (bytes_consumed == 0)
return receive_messages_responses;
// Shift the unread data back to the beginning of the buffer for the next
// iteration of reading data.
base::span<uint8_t> bytes_not_consumed =
unparsed_bytes_available.subspan(bytes_consumed);
unparsed_bytes_available.copy_prefix_from(bytes_not_consumed);
unparsed_data_buffer_->set_offset(bytes_not_consumed.size());
return receive_messages_responses;
}
StreamParser::StreamParsingResult
StreamParser::ParseNextMessagesFieldFromStream(
google::protobuf::io::CodedInputStream* input_stream,
chrome_browser_nearby_sharing_instantmessaging::ReceiveMessagesResponse*
parsed_response) {
// The WireFormat nature of protos allows for key:value pairs, each which
// contains the value of one proto field. The key (also called tag) for each
// pair is actually two values: the field number and the wire type.
//
// A typical stream looks like:
// [message tag][field data][message tag][field data]...
// where the message tag consists of the field id and the WireType, like so:
// [field id + WireType][field data][field id + WireType][field data]...
//
// In this case, we are only looking at the two fields of the StreamBody:
// "messages", which is field 1, and "noop", which is field 15. The "messages"
// field is the one containing the ReceiveMessageResponse proto, and the
// "noop" field is sent by the Tachyon server to keep the connection alive.
// Both of these fields we expect to be the 'bytes' data type, which the
// wire type says it is a length delimited value. From this, we know that the
// next bytes on should be a length followed by the actual data bytes (which
// will be read by WireFormatLite::ReadBytes). Note: this is only true when
// the wire type is set to WIRETYPE_LENGTH_DELIMITED and we know the field
// type is bytes.
//
// Therefore, for this specific instance, we expect our stream to look like
// this when it contains the ReceiveMessagesResponse with an InboxMessage or
// a FastPathReady message:
// [field id="messages"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data]
// or it will look like this when we receive a noop message:
// [field id="noop"|WIRETYPE_LENGTH_DELIMITED][bytes size][byte data]
// See https://developers.google.com/protocol-buffers/docs/encoding for
// further explanation.
// A message tag of zero means we don't have a valid tag or we don't have
// enough bytes to read a tag. If we cannot read the tag, we likely need to
// wait for more bytes to be appended to the input stream.
uint32_t messages_tag = input_stream->ReadTag();
if (messages_tag == 0)
return StreamParser::StreamParsingResult::kNotEnoughDataYet;
// If we were able to read the full tag above, and the field id does not
// match the StreamBody messages field body or the noop field body we were
// expecting then we are encountering a field we are not prepared to handle.
// TODO(crbug.com/1217150) Add a way to read through bytes of the unknown
// fields to skip it, in order to be more robost to StreamBody changes.
int field_number = WireFormatLite::GetTagFieldNumber(messages_tag);
if (field_number != chrome_browser_nearby_sharing_instantmessaging::
StreamBody::kMessagesFieldNumber &&
field_number != chrome_browser_nearby_sharing_instantmessaging::
StreamBody::kNoopFieldNumber) {
return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
}
// WireType specifies the format of the data to follow. Here, we are verifying
// the data we are receiving matching the data we are expecting, which is in
// the form of WIRETYPE_LENGTH_DELIMITED. We expect this to be
// WIRETYPE_LENGTH_DELIMITED because the proto defines "messages" and "noop"
// field as the "bytes" type.
if (WireFormatLite::GetTagWireType(messages_tag) !=
WireFormatLite::WireType::WIRETYPE_LENGTH_DELIMITED) {
return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
}
// Read the byte field, not including tags of the StreamBody, which will
// either be "StreamBody.messages" or "StreamBody.noop". If it is not
// successful, we likely need to wait for more bytes to be appended to the
// input stream to form a complete StreamBody. This function makes the
// assumption that we already know the field and read the tag to determine
// what field to read, which is why we need the checks above.
std::string stream_body_field_bytes;
if (!WireFormatLite::ReadBytes(input_stream, &stream_body_field_bytes))
return StreamParser::StreamParsingResult::kNotEnoughDataYet;
// Now that we have a complete "StreamBody.messages" or "StreamBody.noop"
// bytes field, we want to properly handle it. If we have a
// "StreamBody.messages", we want to transform the bytes into a
// ReceiveMessagesResponse and append it to the vector we are returning, then
// we can move along and read the next data from the buffer, if applicable.
// "StreamBody.noop" messages may be generated as a way to keep the connection
// to the server alive, and it is not an error. However, these messages do not
// contain a ReceiveMessagesResponse, but we still want to remove this data
// from the buffer and continue reading the next data, if applicable. We
// update the |is_noop_field_| to true to tell ParseStreamIfAvailable that
// although it receives an std::nullopt, it should still remove the bytes
// from the buffer.
if (field_number == chrome_browser_nearby_sharing_instantmessaging::
StreamBody::kNoopFieldNumber) {
return StreamParser::StreamParsingResult::kNoop;
}
if (!parsed_response->ParseFromString(stream_body_field_bytes)) {
LOG(ERROR) << "Failed to parse ReceiveMessagesResponse from stream body "
"message bytes.";
return StreamParser::StreamParsingResult::kParsingUnexpectedlyFailed;
}
return StreamParser::StreamParsingResult::kSuccessfullyParsedResponse;
}