This source file includes following definitions.
- IsTcpClientSocket
- JingleSocketOptionToP2PSocketOption
- error_
- TraceSendThrottlingState
- Init
- InitAcceptedTcp
- GetLocalAddress
- GetRemoteAddress
- Send
- SendTo
- Close
- GetState
- GetOption
- SetOption
- DoSetOption
- GetError
- SetError
- OnOpen
- OnIncomingTcpConnection
- OnSendComplete
- OnError
- OnDataReceived
- Start
- GetResolvedAddress
- GetError
- Destroy
- OnAddressResolved
- CreateUdpSocket
- CreateServerTcpSocket
- CreateClientTcpSocket
- CreateAsyncResolver
#include "content/renderer/p2p/ipc_socket_factory.h"
#include <algorithm>
#include <deque>
#include "base/compiler_specific.h"
#include "base/debug/trace_event.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/threading/non_thread_safe.h"
#include "content/renderer/p2p/host_address_request.h"
#include "content/renderer/p2p/socket_client_delegate.h"
#include "content/renderer/p2p/socket_client_impl.h"
#include "content/renderer/p2p/socket_dispatcher.h"
#include "jingle/glue/utils.h"
#include "third_party/libjingle/source/talk/base/asyncpacketsocket.h"
namespace content {
namespace {
const int kDefaultNonSetOptionValue = -1;
bool IsTcpClientSocket(P2PSocketType type) {
return (type == P2P_SOCKET_STUN_TCP_CLIENT) ||
(type == P2P_SOCKET_TCP_CLIENT) ||
(type == P2P_SOCKET_STUN_SSLTCP_CLIENT) ||
(type == P2P_SOCKET_SSLTCP_CLIENT) ||
(type == P2P_SOCKET_TLS_CLIENT) ||
(type == P2P_SOCKET_STUN_TLS_CLIENT);
}
bool JingleSocketOptionToP2PSocketOption(talk_base::Socket::Option option,
P2PSocketOption* ipc_option) {
switch (option) {
case talk_base::Socket::OPT_RCVBUF:
*ipc_option = P2P_SOCKET_OPT_RCVBUF;
break;
case talk_base::Socket::OPT_SNDBUF:
*ipc_option = P2P_SOCKET_OPT_SNDBUF;
break;
case talk_base::Socket::OPT_DSCP:
*ipc_option = P2P_SOCKET_OPT_DSCP;
break;
case talk_base::Socket::OPT_DONTFRAGMENT:
case talk_base::Socket::OPT_NODELAY:
case talk_base::Socket::OPT_IPV6_V6ONLY:
case talk_base::Socket::OPT_RTP_SENDTIME_EXTN_ID:
return false;
default:
NOTREACHED();
return false;
}
return true;
}
const size_t kMaximumInFlightBytes = 64 * 1024;
class IpcPacketSocket : public talk_base::AsyncPacketSocket,
public P2PSocketClientDelegate {
public:
IpcPacketSocket();
virtual ~IpcPacketSocket();
bool Init(P2PSocketType type, P2PSocketClientImpl* client,
const talk_base::SocketAddress& local_address,
const talk_base::SocketAddress& remote_address);
virtual talk_base::SocketAddress GetLocalAddress() const OVERRIDE;
virtual talk_base::SocketAddress GetRemoteAddress() const OVERRIDE;
virtual int Send(const void *pv, size_t cb,
const talk_base::PacketOptions& options) OVERRIDE;
virtual int SendTo(const void *pv, size_t cb,
const talk_base::SocketAddress& addr,
const talk_base::PacketOptions& options) OVERRIDE;
virtual int Close() OVERRIDE;
virtual State GetState() const OVERRIDE;
virtual int GetOption(talk_base::Socket::Option option, int* value) OVERRIDE;
virtual int SetOption(talk_base::Socket::Option option, int value) OVERRIDE;
virtual int GetError() const OVERRIDE;
virtual void SetError(int error) OVERRIDE;
virtual void OnOpen(const net::IPEndPoint& address) OVERRIDE;
virtual void OnIncomingTcpConnection(
const net::IPEndPoint& address,
P2PSocketClient* client) OVERRIDE;
virtual void OnSendComplete() OVERRIDE;
virtual void OnError() OVERRIDE;
virtual void OnDataReceived(const net::IPEndPoint& address,
const std::vector<char>& data,
const base::TimeTicks& timestamp) OVERRIDE;
private:
enum InternalState {
IS_UNINITIALIZED,
IS_OPENING,
IS_OPEN,
IS_CLOSED,
IS_ERROR,
};
void TraceSendThrottlingState() const;
void InitAcceptedTcp(P2PSocketClient* client,
const talk_base::SocketAddress& local_address,
const talk_base::SocketAddress& remote_address);
int DoSetOption(P2PSocketOption option, int value);
P2PSocketType type_;
base::MessageLoop* message_loop_;
scoped_refptr<P2PSocketClient> client_;
talk_base::SocketAddress local_address_;
talk_base::SocketAddress remote_address_;
InternalState state_;
size_t send_bytes_available_;
std::deque<size_t> in_flight_packet_sizes_;
bool writable_signal_expected_;
int error_;
int options_[P2P_SOCKET_OPT_MAX];
DISALLOW_COPY_AND_ASSIGN(IpcPacketSocket);
};
class AsyncAddressResolverImpl : public base::NonThreadSafe,
public talk_base::AsyncResolverInterface {
public:
AsyncAddressResolverImpl(P2PSocketDispatcher* dispatcher);
virtual ~AsyncAddressResolverImpl();
virtual void Start(const talk_base::SocketAddress& addr) OVERRIDE;
virtual bool GetResolvedAddress(
int family, talk_base::SocketAddress* addr) const OVERRIDE;
virtual int GetError() const OVERRIDE;
virtual void Destroy(bool wait) OVERRIDE;
private:
virtual void OnAddressResolved(const net::IPAddressList& addresses);
scoped_refptr<P2PAsyncAddressResolver> resolver_;
int port_;
std::vector<talk_base::IPAddress> addresses_;
};
IpcPacketSocket::IpcPacketSocket()
: type_(P2P_SOCKET_UDP),
message_loop_(base::MessageLoop::current()),
state_(IS_UNINITIALIZED),
send_bytes_available_(kMaximumInFlightBytes),
writable_signal_expected_(false),
error_(0) {
COMPILE_ASSERT(kMaximumInFlightBytes > 0, would_send_at_zero_rate);
std::fill_n(options_, static_cast<int> (P2P_SOCKET_OPT_MAX),
kDefaultNonSetOptionValue);
}
IpcPacketSocket::~IpcPacketSocket() {
if (state_ == IS_OPENING || state_ == IS_OPEN ||
state_ == IS_ERROR) {
Close();
}
}
void IpcPacketSocket::TraceSendThrottlingState() const {
TRACE_COUNTER_ID1("p2p", "P2PSendBytesAvailable", local_address_.port(),
send_bytes_available_);
TRACE_COUNTER_ID1("p2p", "P2PSendPacketsInFlight", local_address_.port(),
in_flight_packet_sizes_.size());
}
bool IpcPacketSocket::Init(P2PSocketType type,
P2PSocketClientImpl* client,
const talk_base::SocketAddress& local_address,
const talk_base::SocketAddress& remote_address) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
DCHECK_EQ(state_, IS_UNINITIALIZED);
type_ = type;
client_ = client;
local_address_ = local_address;
remote_address_ = remote_address;
state_ = IS_OPENING;
net::IPEndPoint local_endpoint;
if (!jingle_glue::SocketAddressToIPEndPoint(
local_address, &local_endpoint)) {
return false;
}
net::IPEndPoint remote_endpoint;
if (!remote_address.IsNil() &&
!jingle_glue::SocketAddressToIPEndPoint(
remote_address, &remote_endpoint)) {
return false;
}
P2PHostAndIPEndPoint remote_info(remote_address.hostname(), remote_endpoint);
client->Init(type, local_endpoint, remote_info, this);
return true;
}
void IpcPacketSocket::InitAcceptedTcp(
P2PSocketClient* client,
const talk_base::SocketAddress& local_address,
const talk_base::SocketAddress& remote_address) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
DCHECK_EQ(state_, IS_UNINITIALIZED);
client_ = client;
local_address_ = local_address;
remote_address_ = remote_address;
state_ = IS_OPEN;
TraceSendThrottlingState();
client_->SetDelegate(this);
}
talk_base::SocketAddress IpcPacketSocket::GetLocalAddress() const {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
return local_address_;
}
talk_base::SocketAddress IpcPacketSocket::GetRemoteAddress() const {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
return remote_address_;
}
int IpcPacketSocket::Send(const void *data, size_t data_size,
const talk_base::PacketOptions& options) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
return SendTo(data, data_size, remote_address_, options);
}
int IpcPacketSocket::SendTo(const void *data, size_t data_size,
const talk_base::SocketAddress& address,
const talk_base::PacketOptions& options) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
switch (state_) {
case IS_UNINITIALIZED:
NOTREACHED();
return EWOULDBLOCK;
case IS_OPENING:
return EWOULDBLOCK;
case IS_CLOSED:
return ENOTCONN;
case IS_ERROR:
return error_;
case IS_OPEN:
break;
}
if (data_size == 0) {
NOTREACHED();
return 0;
}
if (data_size > send_bytes_available_) {
TRACE_EVENT_INSTANT1("p2p", "MaxPendingBytesWouldBlock",
TRACE_EVENT_SCOPE_THREAD,
"id",
client_->GetSocketID());
writable_signal_expected_ = true;
error_ = EWOULDBLOCK;
return -1;
}
net::IPEndPoint address_chrome;
if (!jingle_glue::SocketAddressToIPEndPoint(address, &address_chrome)) {
NOTREACHED();
error_ = EINVAL;
return -1;
}
send_bytes_available_ -= data_size;
in_flight_packet_sizes_.push_back(data_size);
TraceSendThrottlingState();
const char* data_char = reinterpret_cast<const char*>(data);
std::vector<char> data_vector(data_char, data_char + data_size);
client_->SendWithDscp(address_chrome, data_vector, options);
return data_size;
}
int IpcPacketSocket::Close() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
client_->Close();
state_ = IS_CLOSED;
return 0;
}
talk_base::AsyncPacketSocket::State IpcPacketSocket::GetState() const {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
switch (state_) {
case IS_UNINITIALIZED:
NOTREACHED();
return STATE_CLOSED;
case IS_OPENING:
return STATE_BINDING;
case IS_OPEN:
if (IsTcpClientSocket(type_)) {
return STATE_CONNECTED;
} else {
return STATE_BOUND;
}
case IS_CLOSED:
case IS_ERROR:
return STATE_CLOSED;
}
NOTREACHED();
return STATE_CLOSED;
}
int IpcPacketSocket::GetOption(talk_base::Socket::Option option, int* value) {
P2PSocketOption p2p_socket_option = P2P_SOCKET_OPT_MAX;
if (!JingleSocketOptionToP2PSocketOption(option, &p2p_socket_option)) {
return -1;
}
*value = options_[p2p_socket_option];
return 0;
}
int IpcPacketSocket::SetOption(talk_base::Socket::Option option, int value) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
P2PSocketOption p2p_socket_option = P2P_SOCKET_OPT_MAX;
if (!JingleSocketOptionToP2PSocketOption(option, &p2p_socket_option)) {
return -1;
}
options_[p2p_socket_option] = value;
if (state_ == IS_OPEN) {
return DoSetOption(p2p_socket_option, value);
}
return 0;
}
int IpcPacketSocket::DoSetOption(P2PSocketOption option, int value) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
DCHECK_EQ(state_, IS_OPEN);
client_->SetOption(option, value);
return 0;
}
int IpcPacketSocket::GetError() const {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
return error_;
}
void IpcPacketSocket::SetError(int error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
error_ = error;
}
void IpcPacketSocket::OnOpen(const net::IPEndPoint& address) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
if (!jingle_glue::IPEndPointToSocketAddress(address, &local_address_)) {
NOTREACHED();
OnError();
return;
}
state_ = IS_OPEN;
TraceSendThrottlingState();
for (int i = 0; i < P2P_SOCKET_OPT_MAX; ++i) {
if (options_[i] != kDefaultNonSetOptionValue)
DoSetOption(static_cast<P2PSocketOption> (i), options_[i]);
}
SignalAddressReady(this, local_address_);
if (IsTcpClientSocket(type_))
SignalConnect(this);
}
void IpcPacketSocket::OnIncomingTcpConnection(
const net::IPEndPoint& address,
P2PSocketClient* client) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
talk_base::SocketAddress remote_address;
if (!jingle_glue::IPEndPointToSocketAddress(address, &remote_address)) {
NOTREACHED();
}
socket->InitAcceptedTcp(client, local_address_, remote_address);
SignalNewConnection(this, socket.release());
}
void IpcPacketSocket::OnSendComplete() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
CHECK(!in_flight_packet_sizes_.empty());
send_bytes_available_ += in_flight_packet_sizes_.front();
DCHECK_LE(send_bytes_available_, kMaximumInFlightBytes);
in_flight_packet_sizes_.pop_front();
TraceSendThrottlingState();
if (writable_signal_expected_ && send_bytes_available_ > 0) {
SignalReadyToSend(this);
writable_signal_expected_ = false;
}
}
void IpcPacketSocket::OnError() {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
state_ = IS_ERROR;
error_ = ECONNABORTED;
}
void IpcPacketSocket::OnDataReceived(const net::IPEndPoint& address,
const std::vector<char>& data,
const base::TimeTicks& timestamp) {
DCHECK_EQ(base::MessageLoop::current(), message_loop_);
talk_base::SocketAddress address_lj;
if (!jingle_glue::IPEndPointToSocketAddress(address, &address_lj)) {
NOTREACHED();
return;
}
talk_base::PacketTime packet_time(timestamp.ToInternalValue(), 0);
SignalReadPacket(this, &data[0], data.size(), address_lj,
packet_time);
}
AsyncAddressResolverImpl::AsyncAddressResolverImpl(
P2PSocketDispatcher* dispatcher)
: resolver_(new P2PAsyncAddressResolver(dispatcher)) {
}
AsyncAddressResolverImpl::~AsyncAddressResolverImpl() {
}
void AsyncAddressResolverImpl::Start(const talk_base::SocketAddress& addr) {
DCHECK(CalledOnValidThread());
port_ = addr.port();
resolver_->Start(addr, base::Bind(
&AsyncAddressResolverImpl::OnAddressResolved,
base::Unretained(this)));
}
bool AsyncAddressResolverImpl::GetResolvedAddress(
int family, talk_base::SocketAddress* addr) const {
DCHECK(CalledOnValidThread());
if (addresses_.empty())
return false;
for (size_t i = 0; i < addresses_.size(); ++i) {
if (family == addresses_[i].family()) {
addr->SetResolvedIP(addresses_[i]);
addr->SetPort(port_);
return true;
}
}
return false;
}
int AsyncAddressResolverImpl::GetError() const {
DCHECK(CalledOnValidThread());
return addresses_.empty() ? -1 : 0;
}
void AsyncAddressResolverImpl::Destroy(bool wait) {
DCHECK(CalledOnValidThread());
resolver_->Cancel();
delete this;
}
void AsyncAddressResolverImpl::OnAddressResolved(
const net::IPAddressList& addresses) {
DCHECK(CalledOnValidThread());
for (size_t i = 0; i < addresses.size(); ++i) {
talk_base::SocketAddress socket_address;
if (!jingle_glue::IPEndPointToSocketAddress(
net::IPEndPoint(addresses[i], 0), &socket_address)) {
NOTREACHED();
}
addresses_.push_back(socket_address.ipaddr());
}
SignalDone(this);
}
}
IpcPacketSocketFactory::IpcPacketSocketFactory(
P2PSocketDispatcher* socket_dispatcher)
: socket_dispatcher_(socket_dispatcher) {
}
IpcPacketSocketFactory::~IpcPacketSocketFactory() {
}
talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateUdpSocket(
const talk_base::SocketAddress& local_address, int min_port, int max_port) {
talk_base::SocketAddress crome_address;
P2PSocketClientImpl* socket_client =
new P2PSocketClientImpl(socket_dispatcher_);
scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
if (!socket->Init(P2P_SOCKET_UDP, socket_client,
local_address, talk_base::SocketAddress())) {
return NULL;
}
return socket.release();
}
talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateServerTcpSocket(
const talk_base::SocketAddress& local_address, int min_port, int max_port,
int opts) {
if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP)
return NULL;
P2PSocketType type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
P2P_SOCKET_STUN_TCP_SERVER : P2P_SOCKET_TCP_SERVER;
P2PSocketClientImpl* socket_client =
new P2PSocketClientImpl(socket_dispatcher_);
scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
if (!socket->Init(type, socket_client, local_address,
talk_base::SocketAddress())) {
return NULL;
}
return socket.release();
}
talk_base::AsyncPacketSocket* IpcPacketSocketFactory::CreateClientTcpSocket(
const talk_base::SocketAddress& local_address,
const talk_base::SocketAddress& remote_address,
const talk_base::ProxyInfo& proxy_info,
const std::string& user_agent, int opts) {
P2PSocketType type;
if (opts & talk_base::PacketSocketFactory::OPT_SSLTCP) {
type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
P2P_SOCKET_STUN_SSLTCP_CLIENT : P2P_SOCKET_SSLTCP_CLIENT;
} else if (opts & talk_base::PacketSocketFactory::OPT_TLS) {
type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
P2P_SOCKET_STUN_TLS_CLIENT : P2P_SOCKET_TLS_CLIENT;
} else {
type = (opts & talk_base::PacketSocketFactory::OPT_STUN) ?
P2P_SOCKET_STUN_TCP_CLIENT : P2P_SOCKET_TCP_CLIENT;
}
P2PSocketClientImpl* socket_client =
new P2PSocketClientImpl(socket_dispatcher_);
scoped_ptr<IpcPacketSocket> socket(new IpcPacketSocket());
if (!socket->Init(type, socket_client, local_address, remote_address))
return NULL;
return socket.release();
}
talk_base::AsyncResolverInterface*
IpcPacketSocketFactory::CreateAsyncResolver() {
scoped_ptr<AsyncAddressResolverImpl> resolver(
new AsyncAddressResolverImpl(socket_dispatcher_));
return resolver.release();
}
}