This source file includes following definitions.
- IsEmpty
- IsEqual
- weak_factory_
- StartReceiving
- ReceiveNextPacket
- SendPacket
- OnSent
#include "media/cast/transport/transport/udp_transport.h"
#include <algorithm>
#include <string>
#include "base/bind.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/rand_util.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/rand_callback.h"
namespace media {
namespace cast {
namespace transport {
namespace {
const int kMaxPacketSize = 1500;
bool IsEmpty(const net::IPEndPoint& addr) {
net::IPAddressNumber empty_addr(addr.address().size());
return std::equal(
empty_addr.begin(), empty_addr.end(), addr.address().begin()) &&
!addr.port();
}
bool IsEqual(const net::IPEndPoint& addr1, const net::IPEndPoint& addr2) {
return addr1.port() == addr2.port() && std::equal(addr1.address().begin(),
addr1.address().end(),
addr2.address().begin());
}
}
UdpTransport::UdpTransport(
net::NetLog* net_log,
const scoped_refptr<base::SingleThreadTaskRunner>& io_thread_proxy,
const net::IPEndPoint& local_end_point,
const net::IPEndPoint& remote_end_point,
const CastTransportStatusCallback& status_callback)
: io_thread_proxy_(io_thread_proxy),
local_addr_(local_end_point),
remote_addr_(remote_end_point),
udp_socket_(new net::UDPSocket(net::DatagramSocket::DEFAULT_BIND,
net::RandIntCallback(),
net_log,
net::NetLog::Source())),
send_pending_(false),
client_connected_(false),
status_callback_(status_callback),
weak_factory_(this) {
DCHECK(!IsEmpty(local_end_point) || !IsEmpty(remote_end_point));
}
UdpTransport::~UdpTransport() {}
void UdpTransport::StartReceiving(
const PacketReceiverCallback& packet_receiver) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
packet_receiver_ = packet_receiver;
udp_socket_->AllowAddressReuse();
udp_socket_->SetMulticastLoopbackMode(true);
if (!IsEmpty(local_addr_)) {
if (udp_socket_->Bind(local_addr_) < 0) {
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
LOG(ERROR) << "Failed to bind local address.";
return;
}
} else if (!IsEmpty(remote_addr_)) {
if (udp_socket_->Connect(remote_addr_) < 0) {
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
LOG(ERROR) << "Failed to connect to remote address.";
return;
}
client_connected_ = true;
} else {
NOTREACHED() << "Either local or remote address has to be defined.";
}
ReceiveNextPacket(net::ERR_IO_PENDING);
}
void UdpTransport::ReceiveNextPacket(int length_or_status) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
while (true) {
if (length_or_status == net::ERR_IO_PENDING) {
next_packet_.reset(new Packet(kMaxPacketSize));
recv_buf_ = new net::WrappedIOBuffer(
reinterpret_cast<char*>(&next_packet_->front()));
length_or_status = udp_socket_->RecvFrom(
recv_buf_,
kMaxPacketSize,
&recv_addr_,
base::Bind(&UdpTransport::ReceiveNextPacket,
weak_factory_.GetWeakPtr()));
if (length_or_status == net::ERR_IO_PENDING)
return;
}
if (length_or_status < 0) {
VLOG(1) << "Failed to receive packet: Status code is "
<< length_or_status << ". Stop receiving packets.";
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
return;
}
if (IsEmpty(remote_addr_)) {
remote_addr_ = recv_addr_;
VLOG(1) << "Setting remote address from first received packet: "
<< remote_addr_.ToString();
} else if (!IsEqual(remote_addr_, recv_addr_)) {
VLOG(1) << "Ignoring packet received from an unrecognized address: "
<< recv_addr_.ToString() << ".";
length_or_status = net::ERR_IO_PENDING;
continue;
}
next_packet_->resize(length_or_status);
packet_receiver_.Run(next_packet_.Pass());
length_or_status = net::ERR_IO_PENDING;
}
}
bool UdpTransport::SendPacket(const Packet& packet) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
if (send_pending_) {
VLOG(1) << "Cannot send because of pending IO.";
return false;
}
scoped_refptr<net::IOBuffer> buf =
new net::IOBuffer(static_cast<int>(packet.size()));
memcpy(buf->data(), &packet[0], packet.size());
int ret;
if (client_connected_) {
ret = udp_socket_->Write(
buf,
static_cast<int>(packet.size()),
base::Bind(&UdpTransport::OnSent, weak_factory_.GetWeakPtr(), buf));
} else if (!IsEmpty(remote_addr_)) {
ret = udp_socket_->SendTo(
buf,
static_cast<int>(packet.size()),
remote_addr_,
base::Bind(&UdpTransport::OnSent, weak_factory_.GetWeakPtr(), buf));
} else {
return false;
}
if (ret == net::ERR_IO_PENDING)
send_pending_ = true;
return ret >= net::OK;
}
void UdpTransport::OnSent(const scoped_refptr<net::IOBuffer>& buf, int result) {
DCHECK(io_thread_proxy_->RunsTasksOnCurrentThread());
send_pending_ = false;
if (result < 0) {
LOG(ERROR) << "Failed to send packet: " << result << ".";
status_callback_.Run(TRANSPORT_SOCKET_ERROR);
}
}
}
}
}