// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "ppapi/proxy/udp_socket_filter.h"
#include <algorithm>
#include <cstring>
#include <memory>
#include <utility>
#include "base/check_op.h"
#include "base/functional/bind.h"
#include "base/notreached.h"
#include "ppapi/c/pp_errors.h"
#include "ppapi/proxy/error_conversion.h"
#include "ppapi/proxy/plugin_globals.h"
#include "ppapi/proxy/ppapi_messages.h"
#include "ppapi/proxy/udp_socket_resource_constants.h"
#include "ppapi/thunk/enter.h"
#include "ppapi/thunk/resource_creation_api.h"
namespace ppapi {
namespace proxy {
namespace {
int32_t SetRecvFromOutput(PP_Instance pp_instance,
const std::unique_ptr<std::string>& data,
const PP_NetAddress_Private& addr,
char* output_buffer,
int32_t num_bytes,
PP_Resource* output_addr,
int32_t browser_result) {
ProxyLock::AssertAcquired();
DCHECK_GE(num_bytes, static_cast<int32_t>(data->size()));
int32_t result = browser_result;
if (result == PP_OK && output_addr) {
thunk::EnterResourceCreationNoLock enter(pp_instance);
if (enter.succeeded()) {
*output_addr = enter.functions()->CreateNetAddressFromNetAddressPrivate(
pp_instance, addr);
} else {
result = PP_ERROR_FAILED;
}
}
if (result == PP_OK && !data->empty())
memcpy(output_buffer, data->c_str(), data->size());
return result == PP_OK ? static_cast<int32_t>(data->size()) : result;
}
} // namespace
UDPSocketFilter::UDPSocketFilter() {
}
UDPSocketFilter::~UDPSocketFilter() {
}
void UDPSocketFilter::AddUDPResource(
PP_Instance instance,
PP_Resource resource,
bool private_api,
base::RepeatingClosure slot_available_callback) {
ProxyLock::AssertAcquired();
base::AutoLock acquire(lock_);
DCHECK(queues_.find(resource) == queues_.end());
queues_[resource] = std::make_unique<RecvQueue>(
instance, private_api, std::move(slot_available_callback));
}
void UDPSocketFilter::RemoveUDPResource(PP_Resource resource) {
ProxyLock::AssertAcquired();
base::AutoLock acquire(lock_);
auto erase_count = queues_.erase(resource);
DCHECK_GT(erase_count, 0u);
}
int32_t UDPSocketFilter::RequestData(
PP_Resource resource,
int32_t num_bytes,
char* buffer,
PP_Resource* addr,
const scoped_refptr<TrackedCallback>& callback) {
ProxyLock::AssertAcquired();
base::AutoLock acquire(lock_);
auto it = queues_.find(resource);
if (it == queues_.end()) {
NOTREACHED();
}
return it->second->RequestData(num_bytes, buffer, addr, callback);
}
bool UDPSocketFilter::OnResourceReplyReceived(
const ResourceMessageReplyParams& params,
const IPC::Message& nested_msg) {
bool handled = true;
PPAPI_BEGIN_MESSAGE_MAP(UDPSocketFilter, nested_msg)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(PpapiPluginMsg_UDPSocket_PushRecvResult,
OnPluginMsgPushRecvResult)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL_UNHANDLED(handled = false)
PPAPI_END_MESSAGE_MAP()
return handled;
}
PP_NetAddress_Private UDPSocketFilter::GetLastAddrPrivate(
PP_Resource resource) const {
base::AutoLock acquire(lock_);
auto it = queues_.find(resource);
return it->second->GetLastAddrPrivate();
}
void UDPSocketFilter::OnPluginMsgPushRecvResult(
const ResourceMessageReplyParams& params,
int32_t result,
const std::string& data,
const PP_NetAddress_Private& addr) {
DCHECK(PluginGlobals::Get()->ipc_task_runner()->RunsTasksInCurrentSequence());
base::AutoLock acquire(lock_);
auto it = queues_.find(params.pp_resource());
// The RecvQueue might be gone if there were messages in-flight for a
// resource that has been destroyed.
if (it != queues_.end()) {
// TODO(yzshen): Support passing in a non-const string ref, so that we can
// eliminate one copy when storing the data in the buffer.
it->second->DataReceivedOnIOThread(result, data, addr);
}
}
UDPSocketFilter::RecvQueue::RecvQueue(
PP_Instance pp_instance,
bool private_api,
base::RepeatingClosure slot_available_callback)
: pp_instance_(pp_instance),
read_buffer_(nullptr),
bytes_to_read_(0),
recvfrom_addr_resource_(nullptr),
last_recvfrom_addr_(),
private_api_(private_api),
slot_available_callback_(std::move(slot_available_callback)) {}
UDPSocketFilter::RecvQueue::~RecvQueue() {
if (TrackedCallback::IsPending(recvfrom_callback_))
recvfrom_callback_->PostAbort();
}
void UDPSocketFilter::RecvQueue::DataReceivedOnIOThread(
int32_t result,
const std::string& data,
const PP_NetAddress_Private& addr) {
DCHECK(PluginGlobals::Get()->ipc_task_runner()->RunsTasksInCurrentSequence());
DCHECK_LT(recv_buffers_.size(),
static_cast<size_t>(
UDPSocketResourceConstants::kPluginReceiveBufferSlots));
if (!TrackedCallback::IsPending(recvfrom_callback_) || !read_buffer_) {
recv_buffers_.push(RecvBuffer());
RecvBuffer& back = recv_buffers_.back();
back.result = result;
back.data = data;
back.addr = addr;
return;
}
DCHECK_EQ(recv_buffers_.size(), 0u);
if (bytes_to_read_ < static_cast<int32_t>(data.size())) {
recv_buffers_.push(RecvBuffer());
RecvBuffer& back = recv_buffers_.back();
back.result = result;
back.data = data;
back.addr = addr;
result = PP_ERROR_MESSAGE_TOO_BIG;
} else {
// Instead of calling SetRecvFromOutput directly, post it as a completion
// task, so that:
// 1) It can run with the ProxyLock (we can't lock it on the IO thread.)
// 2) So that we only write to the output params in the case of success.
// (Since the callback will complete on another thread, it's possible
// that the resource will be deleted and abort the callback before it
// is actually run.)
std::unique_ptr<std::string> data_to_pass(new std::string(data));
recvfrom_callback_->set_completion_task(base::BindOnce(
&SetRecvFromOutput, pp_instance_, std::move(data_to_pass), addr,
base::Unretained(read_buffer_), bytes_to_read_,
base::Unretained(recvfrom_addr_resource_)));
last_recvfrom_addr_ = addr;
PpapiGlobals::Get()->GetMainThreadMessageLoop()->PostTask(
FROM_HERE, RunWhileLocked(base::BindOnce(slot_available_callback_)));
}
read_buffer_ = NULL;
bytes_to_read_ = -1;
recvfrom_addr_resource_ = NULL;
recvfrom_callback_->Run(
ConvertNetworkAPIErrorForCompatibility(result, private_api_));
}
int32_t UDPSocketFilter::RecvQueue::RequestData(
int32_t num_bytes,
char* buffer_out,
PP_Resource* addr_out,
const scoped_refptr<TrackedCallback>& callback) {
ProxyLock::AssertAcquired();
if (!buffer_out || num_bytes <= 0)
return PP_ERROR_BADARGUMENT;
if (TrackedCallback::IsPending(recvfrom_callback_))
return PP_ERROR_INPROGRESS;
if (recv_buffers_.empty()) {
read_buffer_ = buffer_out;
bytes_to_read_ = std::min(
num_bytes,
static_cast<int32_t>(UDPSocketResourceConstants::kMaxReadSize));
recvfrom_addr_resource_ = addr_out;
recvfrom_callback_ = callback;
return PP_OK_COMPLETIONPENDING;
} else {
RecvBuffer& front = recv_buffers_.front();
if (static_cast<size_t>(num_bytes) < front.data.size())
return PP_ERROR_MESSAGE_TOO_BIG;
std::unique_ptr<std::string> data_to_pass(new std::string);
data_to_pass->swap(front.data);
int32_t result =
SetRecvFromOutput(pp_instance_, std::move(data_to_pass), front.addr,
buffer_out, num_bytes, addr_out, front.result);
last_recvfrom_addr_ = front.addr;
recv_buffers_.pop();
slot_available_callback_.Run();
return result;
}
}
PP_NetAddress_Private UDPSocketFilter::RecvQueue::GetLastAddrPrivate() const {
CHECK(private_api_);
return last_recvfrom_addr_;
}
} // namespace proxy
} // namespace ppapi