This source file includes following definitions.
- IsStrictlyValidCloseStatusCode
- AllowUnused
- GetFrameTypeForOpcode
- SendBuffer
- frames
- AddFrame
- ConnectDelegate
- OnSuccess
- OnFailure
- OnStartOpeningHandshake
- OnFinishOpeningHandshake
- handshake_request_info
- set_handshake_request_info
- handshake_response_info
- set_handshake_response_info
- SendImmediately
- size_
- ResetOpcode
- DidConsume
- initial_frame_forwarded_
- SendAddChannelRequest
- InClosingState
- SendFrame
- SendFlowControl
- StartClosingHandshake
- SendAddChannelRequestForTesting
- SetClosingHandshakeTimeoutForTesting
- SendAddChannelRequestWithSuppliedCreator
- OnConnectSuccess
- OnConnectFailure
- OnStartOpeningHandshake
- OnFinishOpeningHandshake
- ScheduleOpeningHandshakeNotification
- WriteFrames
- OnWriteDone
- ReadFrames
- OnReadDone
- HandleFrame
- HandleFrameByState
- HandleDataFrame
- SendFrameFromIOBuffer
- FailChannel
- SendClose
- ParseClose
- DoDropChannel
- CloseTimeout
#include "net/websockets/websocket_channel.h"
#include <limits.h>
#include <algorithm>
#include <deque>
#include "base/basictypes.h"
#include "base/big_endian.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/memory/ref_counted.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/numerics/safe_conversions.h"
#include "base/stl_util.h"
#include "base/strings/stringprintf.h"
#include "base/time/time.h"
#include "net/base/io_buffer.h"
#include "net/base/net_log.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_response_headers.h"
#include "net/http/http_util.h"
#include "net/websockets/websocket_errors.h"
#include "net/websockets/websocket_event_interface.h"
#include "net/websockets/websocket_frame.h"
#include "net/websockets/websocket_handshake_request_info.h"
#include "net/websockets/websocket_handshake_response_info.h"
#include "net/websockets/websocket_mux.h"
#include "net/websockets/websocket_stream.h"
#include "url/origin.h"
namespace net {
namespace {
using base::StreamingUtf8Validator;
const int kDefaultSendQuotaLowWaterMark = 1 << 16;
const int kDefaultSendQuotaHighWaterMark = 1 << 17;
const size_t kWebSocketCloseCodeLength = 2;
const int kClosingHandshakeTimeoutSeconds = 2 * 2 * 60;
typedef WebSocketEventInterface::ChannelState ChannelState;
const ChannelState CHANNEL_ALIVE = WebSocketEventInterface::CHANNEL_ALIVE;
const ChannelState CHANNEL_DELETED = WebSocketEventInterface::CHANNEL_DELETED;
const size_t kMaximumCloseReasonLength = 125 - kWebSocketCloseCodeLength;
bool IsStrictlyValidCloseStatusCode(int code) {
static const int kInvalidRanges[] = {
0, 1000,
1006, 1007,
1014, 3000,
5000, 65536,
};
const int* const kInvalidRangesEnd =
kInvalidRanges + arraysize(kInvalidRanges);
DCHECK_GE(code, 0);
DCHECK_LT(code, 65536);
const int* upper = std::upper_bound(kInvalidRanges, kInvalidRangesEnd, code);
DCHECK_NE(kInvalidRangesEnd, upper);
DCHECK_GT(upper, kInvalidRanges);
DCHECK_GT(*upper, code);
DCHECK_LE(*(upper - 1), code);
return ((upper - kInvalidRanges) % 2) == 0;
}
void AllowUnused(ChannelState ALLOW_UNUSED unused) {}
void GetFrameTypeForOpcode(WebSocketFrameHeader::OpCode opcode,
std::string* name) {
switch (opcode) {
case WebSocketFrameHeader::kOpCodeText:
case WebSocketFrameHeader::kOpCodeBinary:
case WebSocketFrameHeader::kOpCodeContinuation:
*name = "Data frame";
break;
case WebSocketFrameHeader::kOpCodePing:
*name = "Ping";
break;
case WebSocketFrameHeader::kOpCodePong:
*name = "Pong";
break;
case WebSocketFrameHeader::kOpCodeClose:
*name = "Close";
break;
default:
*name = "Unknown frame type";
break;
}
return;
}
}
class WebSocketChannel::SendBuffer {
public:
SendBuffer() : total_bytes_(0) {}
void AddFrame(scoped_ptr<WebSocketFrame> chunk);
ScopedVector<WebSocketFrame>* frames() { return &frames_; }
private:
ScopedVector<WebSocketFrame> frames_;
size_t total_bytes_;
};
void WebSocketChannel::SendBuffer::AddFrame(scoped_ptr<WebSocketFrame> frame) {
total_bytes_ += frame->header.payload_length;
frames_.push_back(frame.release());
}
class WebSocketChannel::ConnectDelegate
: public WebSocketStream::ConnectDelegate {
public:
explicit ConnectDelegate(WebSocketChannel* creator) : creator_(creator) {}
virtual void OnSuccess(scoped_ptr<WebSocketStream> stream) OVERRIDE {
creator_->OnConnectSuccess(stream.Pass());
}
virtual void OnFailure(const std::string& message) OVERRIDE {
creator_->OnConnectFailure(message);
}
virtual void OnStartOpeningHandshake(
scoped_ptr<WebSocketHandshakeRequestInfo> request) OVERRIDE {
creator_->OnStartOpeningHandshake(request.Pass());
}
virtual void OnFinishOpeningHandshake(
scoped_ptr<WebSocketHandshakeResponseInfo> response) OVERRIDE {
creator_->OnFinishOpeningHandshake(response.Pass());
}
private:
WebSocketChannel* const creator_;
DISALLOW_COPY_AND_ASSIGN(ConnectDelegate);
};
class WebSocketChannel::HandshakeNotificationSender
: public base::SupportsWeakPtr<HandshakeNotificationSender> {
public:
explicit HandshakeNotificationSender(WebSocketChannel* channel);
~HandshakeNotificationSender();
static void Send(base::WeakPtr<HandshakeNotificationSender> sender);
ChannelState SendImmediately(WebSocketEventInterface* event_interface);
const WebSocketHandshakeRequestInfo* handshake_request_info() const {
return handshake_request_info_.get();
}
void set_handshake_request_info(
scoped_ptr<WebSocketHandshakeRequestInfo> request_info) {
handshake_request_info_ = request_info.Pass();
}
const WebSocketHandshakeResponseInfo* handshake_response_info() const {
return handshake_response_info_.get();
}
void set_handshake_response_info(
scoped_ptr<WebSocketHandshakeResponseInfo> response_info) {
handshake_response_info_ = response_info.Pass();
}
private:
WebSocketChannel* owner_;
scoped_ptr<WebSocketHandshakeRequestInfo> handshake_request_info_;
scoped_ptr<WebSocketHandshakeResponseInfo> handshake_response_info_;
};
WebSocketChannel::HandshakeNotificationSender::HandshakeNotificationSender(
WebSocketChannel* channel)
: owner_(channel) {}
WebSocketChannel::HandshakeNotificationSender::~HandshakeNotificationSender() {}
void WebSocketChannel::HandshakeNotificationSender::Send(
base::WeakPtr<HandshakeNotificationSender> sender) {
if (sender) {
WebSocketChannel* channel = sender->owner_;
AllowUnused(sender->SendImmediately(channel->event_interface_.get()));
}
}
ChannelState WebSocketChannel::HandshakeNotificationSender::SendImmediately(
WebSocketEventInterface* event_interface) {
if (handshake_request_info_.get()) {
if (CHANNEL_DELETED == event_interface->OnStartOpeningHandshake(
handshake_request_info_.Pass()))
return CHANNEL_DELETED;
}
if (handshake_response_info_.get()) {
if (CHANNEL_DELETED == event_interface->OnFinishOpeningHandshake(
handshake_response_info_.Pass()))
return CHANNEL_DELETED;
}
return CHANNEL_ALIVE;
}
WebSocketChannel::PendingReceivedFrame::PendingReceivedFrame(
bool final,
WebSocketFrameHeader::OpCode opcode,
const scoped_refptr<IOBuffer>& data,
size_t offset,
size_t size)
: final_(final),
opcode_(opcode),
data_(data),
offset_(offset),
size_(size) {}
WebSocketChannel::PendingReceivedFrame::~PendingReceivedFrame() {}
void WebSocketChannel::PendingReceivedFrame::ResetOpcode() {
DCHECK(WebSocketFrameHeader::IsKnownDataOpCode(opcode_));
opcode_ = WebSocketFrameHeader::kOpCodeContinuation;
}
void WebSocketChannel::PendingReceivedFrame::DidConsume(size_t bytes) {
DCHECK_LE(offset_, size_);
DCHECK_LE(bytes, size_ - offset_);
offset_ += bytes;
}
WebSocketChannel::WebSocketChannel(
scoped_ptr<WebSocketEventInterface> event_interface,
URLRequestContext* url_request_context)
: event_interface_(event_interface.Pass()),
url_request_context_(url_request_context),
send_quota_low_water_mark_(kDefaultSendQuotaLowWaterMark),
send_quota_high_water_mark_(kDefaultSendQuotaHighWaterMark),
current_send_quota_(0),
current_receive_quota_(0),
timeout_(base::TimeDelta::FromSeconds(kClosingHandshakeTimeoutSeconds)),
received_close_code_(0),
state_(FRESHLY_CONSTRUCTED),
notification_sender_(new HandshakeNotificationSender(this)),
sending_text_message_(false),
receiving_text_message_(false),
expecting_to_handle_continuation_(false),
initial_frame_forwarded_(false) {}
WebSocketChannel::~WebSocketChannel() {
stream_.reset();
timer_.Stop();
}
void WebSocketChannel::SendAddChannelRequest(
const GURL& socket_url,
const std::vector<std::string>& requested_subprotocols,
const url::Origin& origin) {
SendAddChannelRequestWithSuppliedCreator(
socket_url,
requested_subprotocols,
origin,
base::Bind(&WebSocketStream::CreateAndConnectStream));
}
bool WebSocketChannel::InClosingState() const {
DCHECK_NE(RECV_CLOSED, state_)
<< "InClosingState called with state_ == RECV_CLOSED";
return state_ == SEND_CLOSED || state_ == CLOSE_WAIT || state_ == CLOSED;
}
void WebSocketChannel::SendFrame(bool fin,
WebSocketFrameHeader::OpCode op_code,
const std::vector<char>& data) {
if (data.size() > INT_MAX) {
NOTREACHED() << "Frame size sanity check failed";
return;
}
if (stream_ == NULL) {
LOG(DFATAL) << "Got SendFrame without a connection established; "
<< "misbehaving renderer? fin=" << fin << " op_code=" << op_code
<< " data.size()=" << data.size();
return;
}
if (InClosingState()) {
VLOG(1) << "SendFrame called in state " << state_
<< ". This may be a bug, or a harmless race.";
return;
}
if (state_ != CONNECTED) {
NOTREACHED() << "SendFrame() called in state " << state_;
return;
}
if (data.size() > base::checked_cast<size_t>(current_send_quota_)) {
AllowUnused(
FailChannel("Send quota exceeded", kWebSocketErrorGoingAway, ""));
return;
}
if (!WebSocketFrameHeader::IsKnownDataOpCode(op_code)) {
LOG(DFATAL) << "Got SendFrame with bogus op_code " << op_code
<< "; misbehaving renderer? fin=" << fin
<< " data.size()=" << data.size();
return;
}
if (op_code == WebSocketFrameHeader::kOpCodeText ||
(op_code == WebSocketFrameHeader::kOpCodeContinuation &&
sending_text_message_)) {
StreamingUtf8Validator::State state =
outgoing_utf8_validator_.AddBytes(vector_as_array(&data), data.size());
if (state == StreamingUtf8Validator::INVALID ||
(state == StreamingUtf8Validator::VALID_MIDPOINT && fin)) {
AllowUnused(
FailChannel("Browser sent a text frame containing invalid UTF-8",
kWebSocketErrorGoingAway,
""));
return;
}
sending_text_message_ = !fin;
DCHECK(!fin || state == StreamingUtf8Validator::VALID_ENDPOINT);
}
current_send_quota_ -= data.size();
scoped_refptr<IOBuffer> buffer(new IOBuffer(data.size()));
std::copy(data.begin(), data.end(), buffer->data());
AllowUnused(SendFrameFromIOBuffer(fin, op_code, buffer, data.size()));
}
void WebSocketChannel::SendFlowControl(int64 quota) {
DCHECK(state_ == CONNECTING || state_ == CONNECTED || state_ == SEND_CLOSED ||
state_ == CLOSE_WAIT);
DCHECK_GE(quota, 0);
DCHECK_LE(quota, INT_MAX);
if (!pending_received_frames_.empty()) {
DCHECK_EQ(0, current_receive_quota_);
}
while (!pending_received_frames_.empty() && quota > 0) {
PendingReceivedFrame& front = pending_received_frames_.front();
const size_t data_size = front.size() - front.offset();
const size_t bytes_to_send =
std::min(base::checked_cast<size_t>(quota), data_size);
const bool final = front.final() && data_size == bytes_to_send;
const char* data = front.data()->data() + front.offset();
const std::vector<char> data_vector(data, data + bytes_to_send);
DVLOG(3) << "Sending frame previously split due to quota to the "
<< "renderer: quota=" << quota << " data_size=" << data_size
<< " bytes_to_send=" << bytes_to_send;
if (event_interface_->OnDataFrame(final, front.opcode(), data_vector) ==
CHANNEL_DELETED)
return;
if (bytes_to_send < data_size) {
front.DidConsume(bytes_to_send);
front.ResetOpcode();
return;
}
const int64 signed_bytes_to_send = base::checked_cast<int64>(bytes_to_send);
DCHECK_GE(quota, signed_bytes_to_send);
quota -= signed_bytes_to_send;
pending_received_frames_.pop();
}
const bool start_read =
current_receive_quota_ == 0 && quota > 0 &&
(state_ == CONNECTED || state_ == SEND_CLOSED || state_ == CLOSE_WAIT);
current_receive_quota_ += base::checked_cast<int>(quota);
if (start_read)
AllowUnused(ReadFrames());
}
void WebSocketChannel::StartClosingHandshake(uint16 code,
const std::string& reason) {
if (InClosingState()) {
VLOG(1) << "StartClosingHandshake called in state " << state_
<< ". This may be a bug, or a harmless race.";
return;
}
if (state_ == CONNECTING) {
stream_request_.reset();
state_ = CLOSED;
AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
return;
}
if (state_ != CONNECTED) {
NOTREACHED() << "StartClosingHandshake() called in state " << state_;
return;
}
if (!IsStrictlyValidCloseStatusCode(code) ||
reason.size() > kMaximumCloseReasonLength) {
if (SendClose(kWebSocketErrorInternalServerError, "") != CHANNEL_DELETED)
state_ = SEND_CLOSED;
return;
}
if (SendClose(
code,
StreamingUtf8Validator::Validate(reason) ? reason : std::string()) ==
CHANNEL_DELETED)
return;
state_ = SEND_CLOSED;
}
void WebSocketChannel::SendAddChannelRequestForTesting(
const GURL& socket_url,
const std::vector<std::string>& requested_subprotocols,
const url::Origin& origin,
const WebSocketStreamCreator& creator) {
SendAddChannelRequestWithSuppliedCreator(
socket_url, requested_subprotocols, origin, creator);
}
void WebSocketChannel::SetClosingHandshakeTimeoutForTesting(
base::TimeDelta delay) {
timeout_ = delay;
}
void WebSocketChannel::SendAddChannelRequestWithSuppliedCreator(
const GURL& socket_url,
const std::vector<std::string>& requested_subprotocols,
const url::Origin& origin,
const WebSocketStreamCreator& creator) {
DCHECK_EQ(FRESHLY_CONSTRUCTED, state_);
if (!socket_url.SchemeIsWSOrWSS()) {
AllowUnused(event_interface_->OnAddChannelResponse(true, "", ""));
return;
}
socket_url_ = socket_url;
scoped_ptr<WebSocketStream::ConnectDelegate> connect_delegate(
new ConnectDelegate(this));
stream_request_ = creator.Run(socket_url_,
requested_subprotocols,
origin,
url_request_context_,
BoundNetLog(),
connect_delegate.Pass());
state_ = CONNECTING;
}
void WebSocketChannel::OnConnectSuccess(scoped_ptr<WebSocketStream> stream) {
DCHECK(stream);
DCHECK_EQ(CONNECTING, state_);
stream_ = stream.Pass();
state_ = CONNECTED;
if (event_interface_->OnAddChannelResponse(
false, stream_->GetSubProtocol(), stream_->GetExtensions()) ==
CHANNEL_DELETED)
return;
current_send_quota_ = send_quota_high_water_mark_;
if (event_interface_->OnFlowControl(send_quota_high_water_mark_) ==
CHANNEL_DELETED)
return;
stream_request_.reset();
AllowUnused(ReadFrames());
}
void WebSocketChannel::OnConnectFailure(const std::string& message) {
DCHECK_EQ(CONNECTING, state_);
state_ = CLOSED;
stream_request_.reset();
if (CHANNEL_DELETED ==
notification_sender_->SendImmediately(event_interface_.get())) {
return;
}
AllowUnused(event_interface_->OnFailChannel(message));
}
void WebSocketChannel::OnStartOpeningHandshake(
scoped_ptr<WebSocketHandshakeRequestInfo> request) {
DCHECK(!notification_sender_->handshake_request_info());
notification_sender_->set_handshake_request_info(request.Pass());
ScheduleOpeningHandshakeNotification();
}
void WebSocketChannel::OnFinishOpeningHandshake(
scoped_ptr<WebSocketHandshakeResponseInfo> response) {
DCHECK(!notification_sender_->handshake_response_info());
notification_sender_->set_handshake_response_info(response.Pass());
ScheduleOpeningHandshakeNotification();
}
void WebSocketChannel::ScheduleOpeningHandshakeNotification() {
base::MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(HandshakeNotificationSender::Send,
notification_sender_->AsWeakPtr()));
}
ChannelState WebSocketChannel::WriteFrames() {
int result = OK;
do {
result = stream_->WriteFrames(
data_being_sent_->frames(),
base::Bind(base::IgnoreResult(&WebSocketChannel::OnWriteDone),
base::Unretained(this),
false));
if (result != ERR_IO_PENDING) {
if (OnWriteDone(true, result) == CHANNEL_DELETED)
return CHANNEL_DELETED;
}
} while (result == OK && data_being_sent_);
return CHANNEL_ALIVE;
}
ChannelState WebSocketChannel::OnWriteDone(bool synchronous, int result) {
DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
DCHECK_NE(CONNECTING, state_);
DCHECK_NE(ERR_IO_PENDING, result);
DCHECK(data_being_sent_);
switch (result) {
case OK:
if (data_to_send_next_) {
data_being_sent_ = data_to_send_next_.Pass();
if (!synchronous)
return WriteFrames();
} else {
data_being_sent_.reset();
if (current_send_quota_ < send_quota_low_water_mark_) {
DCHECK_LE(send_quota_low_water_mark_, send_quota_high_water_mark_);
int fresh_quota = send_quota_high_water_mark_ - current_send_quota_;
current_send_quota_ += fresh_quota;
return event_interface_->OnFlowControl(fresh_quota);
}
}
return CHANNEL_ALIVE;
default:
DCHECK_LT(result, 0)
<< "WriteFrames() should only return OK or ERR_ codes";
stream_->Close();
DCHECK_NE(CLOSED, state_);
state_ = CLOSED;
return DoDropChannel(false, kWebSocketErrorAbnormalClosure, "");
}
}
ChannelState WebSocketChannel::ReadFrames() {
int result = OK;
while (result == OK && current_receive_quota_ > 0) {
result = stream_->ReadFrames(
&read_frames_,
base::Bind(base::IgnoreResult(&WebSocketChannel::OnReadDone),
base::Unretained(this),
false));
if (result != ERR_IO_PENDING) {
if (OnReadDone(true, result) == CHANNEL_DELETED)
return CHANNEL_DELETED;
}
DCHECK_NE(CLOSED, state_);
}
return CHANNEL_ALIVE;
}
ChannelState WebSocketChannel::OnReadDone(bool synchronous, int result) {
DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
DCHECK_NE(CONNECTING, state_);
DCHECK_NE(ERR_IO_PENDING, result);
switch (result) {
case OK:
DCHECK(!read_frames_.empty())
<< "ReadFrames() returned OK, but nothing was read.";
for (size_t i = 0; i < read_frames_.size(); ++i) {
scoped_ptr<WebSocketFrame> frame(read_frames_[i]);
read_frames_[i] = NULL;
if (HandleFrame(frame.Pass()) == CHANNEL_DELETED)
return CHANNEL_DELETED;
}
read_frames_.clear();
DCHECK_NE(CLOSED, state_);
if (!synchronous)
return ReadFrames();
return CHANNEL_ALIVE;
case ERR_WS_PROTOCOL_ERROR:
return FailChannel("Invalid frame header",
kWebSocketErrorProtocolError,
"WebSocket Protocol Error");
default:
DCHECK_LT(result, 0)
<< "ReadFrames() should only return OK or ERR_ codes";
stream_->Close();
DCHECK_NE(CLOSED, state_);
state_ = CLOSED;
uint16 code = kWebSocketErrorAbnormalClosure;
std::string reason = "";
bool was_clean = false;
if (received_close_code_ != 0) {
code = received_close_code_;
reason = received_close_reason_;
was_clean = (result == ERR_CONNECTION_CLOSED);
}
return DoDropChannel(was_clean, code, reason);
}
}
ChannelState WebSocketChannel::HandleFrame(scoped_ptr<WebSocketFrame> frame) {
if (frame->header.masked) {
return FailChannel(
"A server must not mask any frames that it sends to the "
"client.",
kWebSocketErrorProtocolError,
"Masked frame from server");
}
const WebSocketFrameHeader::OpCode opcode = frame->header.opcode;
DCHECK(!WebSocketFrameHeader::IsKnownControlOpCode(opcode) ||
frame->header.final);
if (frame->header.reserved1 || frame->header.reserved2 ||
frame->header.reserved3) {
return FailChannel(base::StringPrintf(
"One or more reserved bits are on: reserved1 = %d, "
"reserved2 = %d, reserved3 = %d",
static_cast<int>(frame->header.reserved1),
static_cast<int>(frame->header.reserved2),
static_cast<int>(frame->header.reserved3)),
kWebSocketErrorProtocolError,
"Invalid reserved bit");
}
return HandleFrameByState(
opcode, frame->header.final, frame->data, frame->header.payload_length);
}
ChannelState WebSocketChannel::HandleFrameByState(
const WebSocketFrameHeader::OpCode opcode,
bool final,
const scoped_refptr<IOBuffer>& data_buffer,
size_t size) {
DCHECK_NE(RECV_CLOSED, state_)
<< "HandleFrame() does not support being called re-entrantly from within "
"SendClose()";
DCHECK_NE(CLOSED, state_);
if (state_ == CLOSE_WAIT) {
std::string frame_name;
GetFrameTypeForOpcode(opcode, &frame_name);
return FailChannel(
frame_name + " received after close", kWebSocketErrorProtocolError, "");
}
switch (opcode) {
case WebSocketFrameHeader::kOpCodeText:
case WebSocketFrameHeader::kOpCodeBinary:
case WebSocketFrameHeader::kOpCodeContinuation:
return HandleDataFrame(opcode, final, data_buffer, size);
case WebSocketFrameHeader::kOpCodePing:
VLOG(1) << "Got Ping of size " << size;
if (state_ == CONNECTED)
return SendFrameFromIOBuffer(
true, WebSocketFrameHeader::kOpCodePong, data_buffer, size);
VLOG(3) << "Ignored ping in state " << state_;
return CHANNEL_ALIVE;
case WebSocketFrameHeader::kOpCodePong:
VLOG(1) << "Got Pong of size " << size;
return CHANNEL_ALIVE;
case WebSocketFrameHeader::kOpCodeClose: {
uint16 code = kWebSocketNormalClosure;
std::string reason;
std::string message;
if (!ParseClose(data_buffer, size, &code, &reason, &message)) {
return FailChannel(message, code, reason);
}
VLOG(1) << "Got Close with code " << code;
switch (state_) {
case CONNECTED:
state_ = RECV_CLOSED;
if (SendClose(code, reason) == CHANNEL_DELETED)
return CHANNEL_DELETED;
state_ = CLOSE_WAIT;
if (event_interface_->OnClosingHandshake() == CHANNEL_DELETED)
return CHANNEL_DELETED;
received_close_code_ = code;
received_close_reason_ = reason;
break;
case SEND_CLOSED:
state_ = CLOSE_WAIT;
received_close_code_ = code;
received_close_reason_ = reason;
break;
default:
LOG(DFATAL) << "Got Close in unexpected state " << state_;
break;
}
return CHANNEL_ALIVE;
}
default:
return FailChannel(
base::StringPrintf("Unrecognized frame opcode: %d", opcode),
kWebSocketErrorProtocolError,
"Unknown opcode");
}
}
ChannelState WebSocketChannel::HandleDataFrame(
WebSocketFrameHeader::OpCode opcode,
bool final,
const scoped_refptr<IOBuffer>& data_buffer,
size_t size) {
if (state_ != CONNECTED) {
DVLOG(3) << "Ignored data packet received in state " << state_;
return CHANNEL_ALIVE;
}
DCHECK(opcode == WebSocketFrameHeader::kOpCodeContinuation ||
opcode == WebSocketFrameHeader::kOpCodeText ||
opcode == WebSocketFrameHeader::kOpCodeBinary);
const bool got_continuation =
(opcode == WebSocketFrameHeader::kOpCodeContinuation);
if (got_continuation != expecting_to_handle_continuation_) {
const std::string console_log = got_continuation
? "Received unexpected continuation frame."
: "Received start of new message but previous message is unfinished.";
const std::string reason = got_continuation
? "Unexpected continuation"
: "Previous data frame unfinished";
return FailChannel(console_log, kWebSocketErrorProtocolError, reason);
}
expecting_to_handle_continuation_ = !final;
WebSocketFrameHeader::OpCode opcode_to_send = opcode;
if (!initial_frame_forwarded_ &&
opcode == WebSocketFrameHeader::kOpCodeContinuation) {
opcode_to_send = receiving_text_message_
? WebSocketFrameHeader::kOpCodeText
: WebSocketFrameHeader::kOpCodeBinary;
}
if (opcode == WebSocketFrameHeader::kOpCodeText ||
(opcode == WebSocketFrameHeader::kOpCodeContinuation &&
receiving_text_message_)) {
StreamingUtf8Validator::State state = incoming_utf8_validator_.AddBytes(
size ? data_buffer->data() : NULL, size);
if (state == StreamingUtf8Validator::INVALID ||
(state == StreamingUtf8Validator::VALID_MIDPOINT && final)) {
return FailChannel("Could not decode a text frame as UTF-8.",
kWebSocketErrorProtocolError,
"Invalid UTF-8 in text frame");
}
receiving_text_message_ = !final;
DCHECK(!final || state == StreamingUtf8Validator::VALID_ENDPOINT);
}
if (size == 0U && !final)
return CHANNEL_ALIVE;
initial_frame_forwarded_ = !final;
if (size > base::checked_cast<size_t>(current_receive_quota_) ||
!pending_received_frames_.empty()) {
const bool no_quota = (current_receive_quota_ == 0);
DCHECK(no_quota || pending_received_frames_.empty());
DVLOG(3) << "Queueing frame to renderer due to quota. quota="
<< current_receive_quota_ << " size=" << size;
WebSocketFrameHeader::OpCode opcode_to_queue =
no_quota ? opcode_to_send : WebSocketFrameHeader::kOpCodeContinuation;
pending_received_frames_.push(PendingReceivedFrame(
final, opcode_to_queue, data_buffer, current_receive_quota_, size));
if (no_quota)
return CHANNEL_ALIVE;
size = current_receive_quota_;
final = false;
}
const char* const data_begin = size ? data_buffer->data() : NULL;
const char* const data_end = data_begin + size;
const std::vector<char> data(data_begin, data_end);
current_receive_quota_ -= size;
DCHECK_GE(current_receive_quota_, 0);
return event_interface_->OnDataFrame(final, opcode_to_send, data);
}
ChannelState WebSocketChannel::SendFrameFromIOBuffer(
bool fin,
WebSocketFrameHeader::OpCode op_code,
const scoped_refptr<IOBuffer>& buffer,
size_t size) {
DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
DCHECK(stream_);
scoped_ptr<WebSocketFrame> frame(new WebSocketFrame(op_code));
WebSocketFrameHeader& header = frame->header;
header.final = fin;
header.masked = true;
header.payload_length = size;
frame->data = buffer;
if (data_being_sent_) {
if (!data_to_send_next_)
data_to_send_next_.reset(new SendBuffer);
data_to_send_next_->AddFrame(frame.Pass());
return CHANNEL_ALIVE;
}
data_being_sent_.reset(new SendBuffer);
data_being_sent_->AddFrame(frame.Pass());
return WriteFrames();
}
ChannelState WebSocketChannel::FailChannel(const std::string& message,
uint16 code,
const std::string& reason) {
DCHECK_NE(FRESHLY_CONSTRUCTED, state_);
DCHECK_NE(CONNECTING, state_);
DCHECK_NE(CLOSED, state_);
if (state_ == CONNECTED) {
if (SendClose(code, reason) == CHANNEL_DELETED)
return CHANNEL_DELETED;
}
stream_->Close();
state_ = CLOSED;
return event_interface_->OnFailChannel(message);
}
ChannelState WebSocketChannel::SendClose(uint16 code,
const std::string& reason) {
DCHECK(state_ == CONNECTED || state_ == RECV_CLOSED);
DCHECK_LE(reason.size(), kMaximumCloseReasonLength);
scoped_refptr<IOBuffer> body;
size_t size = 0;
if (code == kWebSocketErrorNoStatusReceived) {
DCHECK(reason.empty());
body = new IOBuffer(0);
} else {
const size_t payload_length = kWebSocketCloseCodeLength + reason.length();
body = new IOBuffer(payload_length);
size = payload_length;
base::WriteBigEndian(body->data(), code);
COMPILE_ASSERT(sizeof(code) == kWebSocketCloseCodeLength,
they_should_both_be_two);
std::copy(
reason.begin(), reason.end(), body->data() + kWebSocketCloseCodeLength);
}
timer_.Start(
FROM_HERE,
timeout_,
base::Bind(&WebSocketChannel::CloseTimeout, base::Unretained(this)));
if (SendFrameFromIOBuffer(
true, WebSocketFrameHeader::kOpCodeClose, body, size) ==
CHANNEL_DELETED)
return CHANNEL_DELETED;
return CHANNEL_ALIVE;
}
bool WebSocketChannel::ParseClose(const scoped_refptr<IOBuffer>& buffer,
size_t size,
uint16* code,
std::string* reason,
std::string* message) {
reason->clear();
if (size < kWebSocketCloseCodeLength) {
if (size == 0U) {
*code = kWebSocketErrorNoStatusReceived;
return true;
}
DVLOG(1) << "Close frame with payload size " << size << " received "
<< "(the first byte is " << std::hex
<< static_cast<int>(buffer->data()[0]) << ")";
*code = kWebSocketErrorProtocolError;
*message =
"Received a broken close frame containing an invalid size body.";
return false;
}
const char* data = buffer->data();
uint16 unchecked_code = 0;
base::ReadBigEndian(data, &unchecked_code);
COMPILE_ASSERT(sizeof(unchecked_code) == kWebSocketCloseCodeLength,
they_should_both_be_two_bytes);
switch (unchecked_code) {
case kWebSocketErrorNoStatusReceived:
case kWebSocketErrorAbnormalClosure:
case kWebSocketErrorTlsHandshake:
*code = kWebSocketErrorProtocolError;
*message =
"Received a broken close frame containing a reserved status code.";
return false;
default:
*code = unchecked_code;
break;
}
std::string text(data + kWebSocketCloseCodeLength, data + size);
if (StreamingUtf8Validator::Validate(text)) {
reason->swap(text);
return true;
}
*code = kWebSocketErrorProtocolError;
*reason = "Invalid UTF-8 in Close frame";
*message = "Received a broken close frame containing invalid UTF-8.";
return false;
}
ChannelState WebSocketChannel::DoDropChannel(bool was_clean,
uint16 code,
const std::string& reason) {
if (CHANNEL_DELETED ==
notification_sender_->SendImmediately(event_interface_.get()))
return CHANNEL_DELETED;
ChannelState result =
event_interface_->OnDropChannel(was_clean, code, reason);
DCHECK_EQ(CHANNEL_DELETED, result);
return result;
}
void WebSocketChannel::CloseTimeout() {
stream_->Close();
DCHECK_NE(CLOSED, state_);
state_ = CLOSED;
AllowUnused(DoDropChannel(false, kWebSocketErrorAbnormalClosure, ""));
}
}