This source file includes following definitions.
- direct_
- InitializeStream
- GetResponseInfo
- GetUploadProgress
- ReadResponseHeaders
- ReadResponseBody
- Close
- RenewStreamForAuth
- IsResponseBodyComplete
- CanFindEndOfResponse
- IsConnectionReused
- SetConnectionReused
- IsConnectionReusable
- GetTotalReceivedBytes
- GetLoadTimingInfo
- SendRequest
- Cancel
- OnRequestHeadersSent
- OnResponseHeadersUpdated
- OnDataReceived
- OnDataSent
- OnClose
- HasUploadData
- OnStreamCreated
- ReadAndSendRequestBodyData
- OnRequestBodyReadCompleted
- ScheduleBufferedReadCallback
- ShouldWaitForMoreBufferedData
- DoBufferedReadCallback
- DoCallback
- GetSSLInfo
- GetSSLCertRequestInfo
- IsSpdyHttpStream
- Drain
- SetPriority
#include "net/spdy/spdy_http_stream.h"
#include <algorithm>
#include <list>
#include "base/bind.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/strings/stringprintf.h"
#include "net/base/host_port_pair.h"
#include "net/base/net_log.h"
#include "net/base/net_util.h"
#include "net/base/upload_data_stream.h"
#include "net/http/http_request_headers.h"
#include "net/http/http_request_info.h"
#include "net/http/http_response_info.h"
#include "net/spdy/spdy_header_block.h"
#include "net/spdy/spdy_http_utils.h"
#include "net/spdy/spdy_protocol.h"
#include "net/spdy/spdy_session.h"
namespace net {
SpdyHttpStream::SpdyHttpStream(const base::WeakPtr<SpdySession>& spdy_session,
bool direct)
: weak_factory_(this),
spdy_session_(spdy_session),
is_reused_(spdy_session_->IsReused()),
stream_closed_(false),
closed_stream_status_(ERR_FAILED),
closed_stream_id_(0),
closed_stream_received_bytes_(0),
request_info_(NULL),
response_info_(NULL),
response_headers_status_(RESPONSE_HEADERS_ARE_INCOMPLETE),
user_buffer_len_(0),
request_body_buf_size_(0),
buffered_read_callback_pending_(false),
more_read_data_pending_(false),
direct_(direct) {
DCHECK(spdy_session_.get());
}
SpdyHttpStream::~SpdyHttpStream() {
if (stream_.get()) {
stream_->DetachDelegate();
DCHECK(!stream_.get());
}
}
int SpdyHttpStream::InitializeStream(const HttpRequestInfo* request_info,
RequestPriority priority,
const BoundNetLog& stream_net_log,
const CompletionCallback& callback) {
DCHECK(!stream_);
if (!spdy_session_)
return ERR_CONNECTION_CLOSED;
request_info_ = request_info;
if (request_info_->method == "GET") {
int error = spdy_session_->GetPushStream(request_info_->url, &stream_,
stream_net_log);
if (error != OK)
return error;
if (stream_.get()) {
DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
stream_->SetDelegate(this);
return OK;
}
}
int rv = stream_request_.StartRequest(
SPDY_REQUEST_RESPONSE_STREAM, spdy_session_, request_info_->url,
priority, stream_net_log,
base::Bind(&SpdyHttpStream::OnStreamCreated,
weak_factory_.GetWeakPtr(), callback));
if (rv == OK) {
stream_ = stream_request_.ReleaseStream();
stream_->SetDelegate(this);
}
return rv;
}
const HttpResponseInfo* SpdyHttpStream::GetResponseInfo() const {
return response_info_;
}
UploadProgress SpdyHttpStream::GetUploadProgress() const {
if (!request_info_ || !HasUploadData())
return UploadProgress();
return UploadProgress(request_info_->upload_data_stream->position(),
request_info_->upload_data_stream->size());
}
int SpdyHttpStream::ReadResponseHeaders(const CompletionCallback& callback) {
CHECK(!callback.is_null());
if (stream_closed_)
return closed_stream_status_;
CHECK(stream_.get());
if (response_headers_status_ == RESPONSE_HEADERS_ARE_COMPLETE) {
CHECK(!stream_->IsIdle());
return OK;
}
CHECK(callback_.is_null());
callback_ = callback;
return ERR_IO_PENDING;
}
int SpdyHttpStream::ReadResponseBody(
IOBuffer* buf, int buf_len, const CompletionCallback& callback) {
if (stream_.get())
CHECK(!stream_->IsIdle());
CHECK(buf);
CHECK(buf_len);
CHECK(!callback.is_null());
if (!response_body_queue_.IsEmpty()) {
return response_body_queue_.Dequeue(buf->data(), buf_len);
} else if (stream_closed_) {
return closed_stream_status_;
}
CHECK(callback_.is_null());
CHECK(!user_buffer_.get());
CHECK_EQ(0, user_buffer_len_);
callback_ = callback;
user_buffer_ = buf;
user_buffer_len_ = buf_len;
return ERR_IO_PENDING;
}
void SpdyHttpStream::Close(bool not_reusable) {
Cancel();
DCHECK(!stream_.get());
}
HttpStream* SpdyHttpStream::RenewStreamForAuth() {
return NULL;
}
bool SpdyHttpStream::IsResponseBodyComplete() const {
return stream_closed_;
}
bool SpdyHttpStream::CanFindEndOfResponse() const {
return true;
}
bool SpdyHttpStream::IsConnectionReused() const {
return is_reused_;
}
void SpdyHttpStream::SetConnectionReused() {
}
bool SpdyHttpStream::IsConnectionReusable() const {
return false;
}
int64 SpdyHttpStream::GetTotalReceivedBytes() const {
if (stream_closed_)
return closed_stream_received_bytes_;
if (!stream_)
return 0;
return stream_->raw_received_bytes();
}
bool SpdyHttpStream::GetLoadTimingInfo(LoadTimingInfo* load_timing_info) const {
if (stream_closed_) {
if (!closed_stream_has_load_timing_info_)
return false;
*load_timing_info = closed_stream_load_timing_info_;
return true;
}
if (!stream_ || stream_->stream_id() == 0)
return false;
return stream_->GetLoadTimingInfo(load_timing_info);
}
int SpdyHttpStream::SendRequest(const HttpRequestHeaders& request_headers,
HttpResponseInfo* response,
const CompletionCallback& callback) {
if (stream_closed_) {
return closed_stream_status_;
}
base::Time request_time = base::Time::Now();
CHECK(stream_.get());
stream_->SetRequestTime(request_time);
if (response_info_)
response_info_->request_time = request_time;
CHECK(!request_body_buf_.get());
if (HasUploadData()) {
request_body_buf_ = new IOBufferWithSize(kMaxSpdyFrameChunkSize);
request_body_buf_size_ = 0;
}
CHECK(!callback.is_null());
CHECK(response);
if (push_response_info_.get()) {
*response = *(push_response_info_.get());
push_response_info_.reset();
} else {
DCHECK_EQ(static_cast<HttpResponseInfo*>(NULL), response_info_);
}
response_info_ = response;
IPEndPoint address;
int result = stream_->GetPeerAddress(&address);
if (result != OK)
return result;
response_info_->socket_address = HostPortPair::FromIPEndPoint(address);
if (stream_->type() == SPDY_PUSH_STREAM) {
result = ERR_IO_PENDING;
} else {
scoped_ptr<SpdyHeaderBlock> headers(new SpdyHeaderBlock);
CreateSpdyHeadersFromHttpRequest(
*request_info_, request_headers,
headers.get(), stream_->GetProtocolVersion(),
direct_);
stream_->net_log().AddEvent(
NetLog::TYPE_HTTP_TRANSACTION_SPDY_SEND_REQUEST_HEADERS,
base::Bind(&SpdyHeaderBlockNetLogCallback, headers.get()));
result =
stream_->SendRequestHeaders(
headers.Pass(),
HasUploadData() ? MORE_DATA_TO_SEND : NO_MORE_DATA_TO_SEND);
}
if (result == ERR_IO_PENDING) {
CHECK(callback_.is_null());
callback_ = callback;
}
return result;
}
void SpdyHttpStream::Cancel() {
callback_.Reset();
if (stream_.get()) {
stream_->Cancel();
DCHECK(!stream_.get());
}
}
void SpdyHttpStream::OnRequestHeadersSent() {
if (!callback_.is_null())
DoCallback(OK);
if (HasUploadData())
ReadAndSendRequestBodyData();
}
SpdyResponseHeadersStatus SpdyHttpStream::OnResponseHeadersUpdated(
const SpdyHeaderBlock& response_headers) {
CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_INCOMPLETE);
if (!response_info_) {
DCHECK_EQ(stream_->type(), SPDY_PUSH_STREAM);
push_response_info_.reset(new HttpResponseInfo);
response_info_ = push_response_info_.get();
}
if (!SpdyHeadersToHttpResponse(
response_headers, stream_->GetProtocolVersion(), response_info_)) {
return RESPONSE_HEADERS_ARE_INCOMPLETE;
}
response_info_->response_time = stream_->response_time();
response_headers_status_ = RESPONSE_HEADERS_ARE_COMPLETE;
SSLInfo ssl_info;
NextProto protocol_negotiated = kProtoUnknown;
stream_->GetSSLInfo(&ssl_info,
&response_info_->was_npn_negotiated,
&protocol_negotiated);
response_info_->npn_negotiated_protocol =
SSLClientSocket::NextProtoToString(protocol_negotiated);
response_info_->request_time = stream_->GetRequestTime();
response_info_->connection_info =
HttpResponseInfo::ConnectionInfoFromNextProto(stream_->GetProtocol());
response_info_->vary_data
.Init(*request_info_, *response_info_->headers.get());
if (!callback_.is_null())
DoCallback(OK);
return RESPONSE_HEADERS_ARE_COMPLETE;
}
void SpdyHttpStream::OnDataReceived(scoped_ptr<SpdyBuffer> buffer) {
CHECK_EQ(response_headers_status_, RESPONSE_HEADERS_ARE_COMPLETE);
DCHECK(stream_.get());
DCHECK(!stream_->IsClosed() || stream_->type() == SPDY_PUSH_STREAM);
if (buffer) {
response_body_queue_.Enqueue(buffer.Pass());
if (user_buffer_.get()) {
ScheduleBufferedReadCallback();
}
}
}
void SpdyHttpStream::OnDataSent() {
request_body_buf_size_ = 0;
ReadAndSendRequestBodyData();
}
void SpdyHttpStream::OnClose(int status) {
if (stream_.get()) {
stream_closed_ = true;
closed_stream_status_ = status;
closed_stream_id_ = stream_->stream_id();
closed_stream_has_load_timing_info_ =
stream_->GetLoadTimingInfo(&closed_stream_load_timing_info_);
closed_stream_received_bytes_ = stream_->raw_received_bytes();
}
stream_.reset();
bool invoked_callback = false;
if (status == net::OK) {
invoked_callback = DoBufferedReadCallback();
}
if (!invoked_callback && !callback_.is_null())
DoCallback(status);
}
bool SpdyHttpStream::HasUploadData() const {
CHECK(request_info_);
return
request_info_->upload_data_stream &&
((request_info_->upload_data_stream->size() > 0) ||
request_info_->upload_data_stream->is_chunked());
}
void SpdyHttpStream::OnStreamCreated(
const CompletionCallback& callback,
int rv) {
if (rv == OK) {
stream_ = stream_request_.ReleaseStream();
stream_->SetDelegate(this);
}
callback.Run(rv);
}
void SpdyHttpStream::ReadAndSendRequestBodyData() {
CHECK(HasUploadData());
CHECK_EQ(request_body_buf_size_, 0);
if (request_info_->upload_data_stream->IsEOF())
return;
const int rv = request_info_->upload_data_stream
->Read(request_body_buf_.get(),
request_body_buf_->size(),
base::Bind(&SpdyHttpStream::OnRequestBodyReadCompleted,
weak_factory_.GetWeakPtr()));
if (rv != ERR_IO_PENDING) {
CHECK_GE(rv, 0);
OnRequestBodyReadCompleted(rv);
}
}
void SpdyHttpStream::OnRequestBodyReadCompleted(int status) {
CHECK_GE(status, 0);
request_body_buf_size_ = status;
const bool eof = request_info_->upload_data_stream->IsEOF();
if (eof) {
CHECK_GE(request_body_buf_size_, 0);
} else {
CHECK_GT(request_body_buf_size_, 0);
}
stream_->SendData(request_body_buf_.get(),
request_body_buf_size_,
eof ? NO_MORE_DATA_TO_SEND : MORE_DATA_TO_SEND);
}
void SpdyHttpStream::ScheduleBufferedReadCallback() {
if (buffered_read_callback_pending_) {
more_read_data_pending_ = true;
return;
}
more_read_data_pending_ = false;
buffered_read_callback_pending_ = true;
const base::TimeDelta kBufferTime = base::TimeDelta::FromMilliseconds(1);
base::MessageLoop::current()->PostDelayedTask(
FROM_HERE,
base::Bind(base::IgnoreResult(&SpdyHttpStream::DoBufferedReadCallback),
weak_factory_.GetWeakPtr()),
kBufferTime);
}
bool SpdyHttpStream::ShouldWaitForMoreBufferedData() const {
if (stream_closed_)
return false;
DCHECK_GT(user_buffer_len_, 0);
return response_body_queue_.GetTotalSize() <
static_cast<size_t>(user_buffer_len_);
}
bool SpdyHttpStream::DoBufferedReadCallback() {
buffered_read_callback_pending_ = false;
if (!stream_.get() && !stream_closed_)
return false;
int stream_status =
stream_closed_ ? closed_stream_status_ : stream_->response_status();
if (stream_status != OK)
return false;
if (more_read_data_pending_ && ShouldWaitForMoreBufferedData()) {
ScheduleBufferedReadCallback();
return false;
}
int rv = 0;
if (user_buffer_.get()) {
rv = ReadResponseBody(user_buffer_.get(), user_buffer_len_, callback_);
CHECK_NE(rv, ERR_IO_PENDING);
user_buffer_ = NULL;
user_buffer_len_ = 0;
DoCallback(rv);
return true;
}
return false;
}
void SpdyHttpStream::DoCallback(int rv) {
CHECK_NE(rv, ERR_IO_PENDING);
CHECK(!callback_.is_null());
CompletionCallback c = callback_;
callback_.Reset();
c.Run(rv);
}
void SpdyHttpStream::GetSSLInfo(SSLInfo* ssl_info) {
DCHECK(stream_.get());
bool using_npn;
NextProto protocol_negotiated = kProtoUnknown;
stream_->GetSSLInfo(ssl_info, &using_npn, &protocol_negotiated);
}
void SpdyHttpStream::GetSSLCertRequestInfo(
SSLCertRequestInfo* cert_request_info) {
DCHECK(stream_.get());
stream_->GetSSLCertRequestInfo(cert_request_info);
}
bool SpdyHttpStream::IsSpdyHttpStream() const {
return true;
}
void SpdyHttpStream::Drain(HttpNetworkSession* session) {
Close(false);
delete this;
}
void SpdyHttpStream::SetPriority(RequestPriority priority) {
}
}