This source file includes following definitions.
- SaturateAdd
- GetFrameSize
- InValidStateToReceive
- buffered_amount_after_close_
- AsPPB_WebSocket_API
- Connect
- Close
- ReceiveMessage
- SendMessage
- GetBufferedAmount
- GetCloseCode
- GetCloseReason
- GetCloseWasClean
- GetExtensions
- GetProtocol
- GetReadyState
- GetURL
- OnReplyReceived
- OnPluginMsgConnectReply
- OnPluginMsgCloseReply
- OnPluginMsgReceiveTextReply
- OnPluginMsgReceiveBinaryReply
- OnPluginMsgErrorReply
- OnPluginMsgBufferedAmountReply
- OnPluginMsgStateReply
- OnPluginMsgClosedReply
- DoReceive
#include "ppapi/proxy/websocket_resource.h"
#include <set>
#include <string>
#include <vector>
#include "base/bind.h"
#include "ppapi/c/pp_errors.h"
#include "ppapi/proxy/dispatch_reply_message.h"
#include "ppapi/proxy/ppapi_messages.h"
#include "ppapi/shared_impl/ppapi_globals.h"
#include "ppapi/shared_impl/var.h"
#include "ppapi/shared_impl/var_tracker.h"
namespace {
const uint32_t kMaxReasonSizeInBytes = 123;
const size_t kBaseFramingOverhead = 2;
const size_t kMaskingKeyLength = 4;
const size_t kMinimumPayloadSizeWithTwoByteExtendedPayloadLength = 126;
const size_t kMinimumPayloadSizeWithEightByteExtendedPayloadLength = 0x10000;
uint64_t SaturateAdd(uint64_t a, uint64_t b) {
if (kuint64max - a < b)
return kuint64max;
return a + b;
}
uint64_t GetFrameSize(uint64_t payload_size) {
uint64_t overhead = kBaseFramingOverhead + kMaskingKeyLength;
if (payload_size > kMinimumPayloadSizeWithEightByteExtendedPayloadLength)
overhead += 8;
else if (payload_size > kMinimumPayloadSizeWithTwoByteExtendedPayloadLength)
overhead += 2;
return SaturateAdd(payload_size, overhead);
}
bool InValidStateToReceive(PP_WebSocketReadyState state) {
return state == PP_WEBSOCKETREADYSTATE_OPEN ||
state == PP_WEBSOCKETREADYSTATE_CLOSING;
}
}
namespace ppapi {
namespace proxy {
WebSocketResource::WebSocketResource(Connection connection,
PP_Instance instance)
: PluginResource(connection, instance),
state_(PP_WEBSOCKETREADYSTATE_INVALID),
error_was_received_(false),
receive_callback_var_(NULL),
empty_string_(new StringVar(std::string())),
close_code_(0),
close_reason_(NULL),
close_was_clean_(PP_FALSE),
extensions_(NULL),
protocol_(NULL),
url_(NULL),
buffered_amount_(0),
buffered_amount_after_close_(0) {
}
WebSocketResource::~WebSocketResource() {
}
thunk::PPB_WebSocket_API* WebSocketResource::AsPPB_WebSocket_API() {
return this;
}
int32_t WebSocketResource::Connect(
const PP_Var& url,
const PP_Var protocols[],
uint32_t protocol_count,
scoped_refptr<TrackedCallback> callback) {
if (TrackedCallback::IsPending(connect_callback_))
return PP_ERROR_INPROGRESS;
if (state_ != PP_WEBSOCKETREADYSTATE_INVALID)
return PP_ERROR_INPROGRESS;
state_ = PP_WEBSOCKETREADYSTATE_CLOSED;
url_ = StringVar::FromPPVar(url);
if (!url_.get())
return PP_ERROR_BADARGUMENT;
std::set<std::string> protocol_set;
std::vector<std::string> protocol_strings;
protocol_strings.reserve(protocol_count);
for (uint32_t i = 0; i < protocol_count; ++i) {
scoped_refptr<StringVar> protocol(StringVar::FromPPVar(protocols[i]));
if (!protocol.get() || !protocol->value().length())
return PP_ERROR_BADARGUMENT;
if (protocol_set.find(protocol->value()) != protocol_set.end())
return PP_ERROR_BADARGUMENT;
protocol_set.insert(protocol->value());
protocol_strings.push_back(protocol->value());
}
connect_callback_ = callback;
state_ = PP_WEBSOCKETREADYSTATE_CONNECTING;
SendCreate(RENDERER, PpapiHostMsg_WebSocket_Create());
PpapiHostMsg_WebSocket_Connect msg(url_->value(), protocol_strings);
Call<PpapiPluginMsg_WebSocket_ConnectReply>(RENDERER, msg,
base::Bind(&WebSocketResource::OnPluginMsgConnectReply, this));
return PP_OK_COMPLETIONPENDING;
}
int32_t WebSocketResource::Close(uint16_t code,
const PP_Var& reason,
scoped_refptr<TrackedCallback> callback) {
if (TrackedCallback::IsPending(close_callback_))
return PP_ERROR_INPROGRESS;
if (state_ == PP_WEBSOCKETREADYSTATE_INVALID)
return PP_ERROR_FAILED;
scoped_refptr<StringVar> reason_string_var;
std::string reason_string;
if (code != PP_WEBSOCKETSTATUSCODE_NOT_SPECIFIED) {
if (code != PP_WEBSOCKETSTATUSCODE_NORMAL_CLOSURE &&
(code < PP_WEBSOCKETSTATUSCODE_USER_REGISTERED_MIN ||
code > PP_WEBSOCKETSTATUSCODE_USER_PRIVATE_MAX))
return PP_ERROR_NOACCESS;
if (reason.type != PP_VARTYPE_UNDEFINED) {
reason_string_var = StringVar::FromPPVar(reason);
if (!reason_string_var.get() ||
reason_string_var->value().size() > kMaxReasonSizeInBytes)
return PP_ERROR_BADARGUMENT;
reason_string = reason_string_var->value();
}
}
if (state_ == PP_WEBSOCKETREADYSTATE_CLOSING)
return PP_ERROR_INPROGRESS;
if (state_ == PP_WEBSOCKETREADYSTATE_CLOSED)
return PP_OK;
close_callback_ = callback;
if (TrackedCallback::IsPending(connect_callback_)) {
state_ = PP_WEBSOCKETREADYSTATE_CLOSING;
connect_callback_->PostAbort();
connect_callback_ = NULL;
Post(RENDERER, PpapiHostMsg_WebSocket_Fail(
"WebSocket was closed before the connection was established."));
return PP_OK_COMPLETIONPENDING;
}
if (TrackedCallback::IsPending(receive_callback_)) {
receive_callback_var_ = NULL;
receive_callback_->PostAbort();
receive_callback_ = NULL;
}
state_ = PP_WEBSOCKETREADYSTATE_CLOSING;
PpapiHostMsg_WebSocket_Close msg(static_cast<int32_t>(code),
reason_string);
Call<PpapiPluginMsg_WebSocket_CloseReply>(RENDERER, msg,
base::Bind(&WebSocketResource::OnPluginMsgCloseReply, this));
return PP_OK_COMPLETIONPENDING;
}
int32_t WebSocketResource::ReceiveMessage(
PP_Var* message,
scoped_refptr<TrackedCallback> callback) {
if (TrackedCallback::IsPending(receive_callback_))
return PP_ERROR_INPROGRESS;
if (state_ == PP_WEBSOCKETREADYSTATE_INVALID ||
state_ == PP_WEBSOCKETREADYSTATE_CONNECTING)
return PP_ERROR_BADARGUMENT;
if (!received_messages_.empty()) {
receive_callback_var_ = message;
return DoReceive();
}
if (state_ == PP_WEBSOCKETREADYSTATE_CLOSED)
return PP_ERROR_BADARGUMENT;
if (error_was_received_)
return PP_ERROR_FAILED;
receive_callback_var_ = message;
receive_callback_ = callback;
return PP_OK_COMPLETIONPENDING;
}
int32_t WebSocketResource::SendMessage(const PP_Var& message) {
if (state_ == PP_WEBSOCKETREADYSTATE_INVALID ||
state_ == PP_WEBSOCKETREADYSTATE_CONNECTING)
return PP_ERROR_BADARGUMENT;
if (state_ == PP_WEBSOCKETREADYSTATE_CLOSING ||
state_ == PP_WEBSOCKETREADYSTATE_CLOSED) {
uint64_t payload_size = 0;
if (message.type == PP_VARTYPE_STRING) {
scoped_refptr<StringVar> message_string = StringVar::FromPPVar(message);
if (message_string.get())
payload_size += message_string->value().length();
} else if (message.type == PP_VARTYPE_ARRAY_BUFFER) {
scoped_refptr<ArrayBufferVar> message_array_buffer =
ArrayBufferVar::FromPPVar(message);
if (message_array_buffer.get())
payload_size += message_array_buffer->ByteLength();
} else {
return PP_ERROR_NOTSUPPORTED;
}
buffered_amount_after_close_ =
SaturateAdd(buffered_amount_after_close_, GetFrameSize(payload_size));
return PP_ERROR_FAILED;
}
if (message.type == PP_VARTYPE_STRING) {
scoped_refptr<StringVar> message_string = StringVar::FromPPVar(message);
if (!message_string.get())
return PP_ERROR_BADARGUMENT;
Post(RENDERER, PpapiHostMsg_WebSocket_SendText(message_string->value()));
} else if (message.type == PP_VARTYPE_ARRAY_BUFFER) {
scoped_refptr<ArrayBufferVar> message_arraybuffer =
ArrayBufferVar::FromPPVar(message);
if (!message_arraybuffer.get())
return PP_ERROR_BADARGUMENT;
uint8_t* message_data = static_cast<uint8_t*>(message_arraybuffer->Map());
uint32 message_length = message_arraybuffer->ByteLength();
std::vector<uint8_t> message_vector(message_data,
message_data + message_length);
Post(RENDERER, PpapiHostMsg_WebSocket_SendBinary(message_vector));
} else {
return PP_ERROR_NOTSUPPORTED;
}
return PP_OK;
}
uint64_t WebSocketResource::GetBufferedAmount() {
return SaturateAdd(buffered_amount_, buffered_amount_after_close_);
}
uint16_t WebSocketResource::GetCloseCode() {
return close_code_;
}
PP_Var WebSocketResource::GetCloseReason() {
if (!close_reason_.get())
return empty_string_->GetPPVar();
return close_reason_->GetPPVar();
}
PP_Bool WebSocketResource::GetCloseWasClean() {
return close_was_clean_;
}
PP_Var WebSocketResource::GetExtensions() {
return StringVar::StringToPPVar(std::string());
}
PP_Var WebSocketResource::GetProtocol() {
if (!protocol_.get())
return empty_string_->GetPPVar();
return protocol_->GetPPVar();
}
PP_WebSocketReadyState WebSocketResource::GetReadyState() {
return state_;
}
PP_Var WebSocketResource::GetURL() {
if (!url_.get())
return empty_string_->GetPPVar();
return url_->GetPPVar();
}
void WebSocketResource::OnReplyReceived(
const ResourceMessageReplyParams& params,
const IPC::Message& msg) {
if (params.sequence()) {
PluginResource::OnReplyReceived(params, msg);
return;
}
IPC_BEGIN_MESSAGE_MAP(WebSocketResource, msg)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(
PpapiPluginMsg_WebSocket_ReceiveTextReply,
OnPluginMsgReceiveTextReply)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(
PpapiPluginMsg_WebSocket_ReceiveBinaryReply,
OnPluginMsgReceiveBinaryReply)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL_0(
PpapiPluginMsg_WebSocket_ErrorReply,
OnPluginMsgErrorReply)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(
PpapiPluginMsg_WebSocket_BufferedAmountReply,
OnPluginMsgBufferedAmountReply)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(
PpapiPluginMsg_WebSocket_StateReply,
OnPluginMsgStateReply)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL(
PpapiPluginMsg_WebSocket_ClosedReply,
OnPluginMsgClosedReply)
PPAPI_DISPATCH_PLUGIN_RESOURCE_CALL_UNHANDLED(NOTREACHED())
IPC_END_MESSAGE_MAP()
}
void WebSocketResource::OnPluginMsgConnectReply(
const ResourceMessageReplyParams& params,
const std::string& url,
const std::string& protocol) {
if (!TrackedCallback::IsPending(connect_callback_) ||
TrackedCallback::IsScheduledToRun(connect_callback_)) {
return;
}
int32_t result = params.result();
if (result == PP_OK) {
state_ = PP_WEBSOCKETREADYSTATE_OPEN;
protocol_ = new StringVar(protocol);
url_ = new StringVar(url);
}
connect_callback_->Run(params.result());
}
void WebSocketResource::OnPluginMsgCloseReply(
const ResourceMessageReplyParams& params,
unsigned long buffered_amount,
bool was_clean,
unsigned short code,
const std::string& reason) {
state_ = PP_WEBSOCKETREADYSTATE_CLOSED;
buffered_amount_ = buffered_amount;
close_was_clean_ = PP_FromBool(was_clean);
close_code_ = code;
close_reason_ = new StringVar(reason);
if (TrackedCallback::IsPending(receive_callback_)) {
receive_callback_var_ = NULL;
if (!TrackedCallback::IsScheduledToRun(receive_callback_))
receive_callback_->PostRun(PP_ERROR_FAILED);
receive_callback_ = NULL;
}
if (TrackedCallback::IsPending(close_callback_)) {
if (!TrackedCallback::IsScheduledToRun(close_callback_))
close_callback_->PostRun(params.result());
close_callback_ = NULL;
}
}
void WebSocketResource::OnPluginMsgReceiveTextReply(
const ResourceMessageReplyParams& params,
const std::string& message) {
if (error_was_received_ || !InValidStateToReceive(state_))
return;
received_messages_.push(scoped_refptr<Var>(new StringVar(message)));
if (!TrackedCallback::IsPending(receive_callback_) ||
TrackedCallback::IsScheduledToRun(receive_callback_)) {
return;
}
receive_callback_->Run(DoReceive());
}
void WebSocketResource::OnPluginMsgReceiveBinaryReply(
const ResourceMessageReplyParams& params,
const std::vector<uint8_t>& message) {
if (error_was_received_ || !InValidStateToReceive(state_))
return;
scoped_refptr<Var> message_var(
PpapiGlobals::Get()->GetVarTracker()->MakeArrayBufferVar(
message.size(),
&message.front()));
received_messages_.push(message_var);
if (!TrackedCallback::IsPending(receive_callback_) ||
TrackedCallback::IsScheduledToRun(receive_callback_)) {
return;
}
receive_callback_->Run(DoReceive());
}
void WebSocketResource::OnPluginMsgErrorReply(
const ResourceMessageReplyParams& params) {
error_was_received_ = true;
if (!TrackedCallback::IsPending(receive_callback_) ||
TrackedCallback::IsScheduledToRun(receive_callback_)) {
return;
}
receive_callback_var_ = NULL;
receive_callback_->Run(PP_ERROR_FAILED);
}
void WebSocketResource::OnPluginMsgBufferedAmountReply(
const ResourceMessageReplyParams& params,
unsigned long buffered_amount) {
buffered_amount_ = buffered_amount;
}
void WebSocketResource::OnPluginMsgStateReply(
const ResourceMessageReplyParams& params,
int32_t state) {
state_ = static_cast<PP_WebSocketReadyState>(state);
}
void WebSocketResource::OnPluginMsgClosedReply(
const ResourceMessageReplyParams& params,
unsigned long buffered_amount,
bool was_clean,
unsigned short code,
const std::string& reason) {
OnPluginMsgCloseReply(params, buffered_amount, was_clean, code, reason);
}
int32_t WebSocketResource::DoReceive() {
if (!receive_callback_var_)
return PP_OK;
*receive_callback_var_ = received_messages_.front()->GetPPVar();
received_messages_.pop();
receive_callback_var_ = NULL;
return PP_OK;
}
}
}