This source file includes following definitions.
- packet_
- UDPInterface
- node_
- Start
- Run
- Start
- Run
- emitter_
- Destroy
- GetEventEmitter
- Init
- QueueInput
- QueueOutput
- Bind
- Connect
- Recv_Locked
- Send_Locked
#include "nacl_io/socket/udp_node.h"
#include <errno.h>
#include <string.h>
#include <algorithm>
#include "nacl_io/pepper_interface.h"
#include "nacl_io/socket/packet.h"
#include "nacl_io/socket/udp_event_emitter.h"
#include "nacl_io/stream/stream_fs.h"
namespace {
const size_t kMaxPacketSize = 65536;
const size_t kDefaultFifoSize = kMaxPacketSize * 8;
}
namespace nacl_io {
class UdpWork : public StreamFs::Work {
public:
explicit UdpWork(const ScopedUdpEventEmitter& emitter)
: StreamFs::Work(emitter->stream()->stream()),
emitter_(emitter),
packet_(NULL) {}
~UdpWork() { delete packet_; }
UDPSocketInterface* UDPInterface() {
return filesystem()->ppapi()->GetUDPSocketInterface();
}
protected:
ScopedUdpEventEmitter emitter_;
Packet* packet_;
};
class UdpSendWork : public UdpWork {
public:
explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
const ScopedSocketNode& node)
: UdpWork(emitter), node_(node) {}
virtual bool Start(int32_t val) {
AUTO_LOCK(emitter_->GetLock());
if (!node_->TestStreamFlags(SSF_CAN_SEND))
return false;
packet_ = emitter_->ReadTXPacket_Locked();
if (NULL == packet_)
return false;
int err = UDPInterface()->SendTo(node_->socket_resource(),
packet_->buffer(),
packet_->len(),
packet_->addr(),
filesystem()->GetRunCompletion(this));
if (err != PP_OK_COMPLETIONPENDING) {
node_->SetError_Locked(err);
return false;
}
node_->SetStreamFlags(SSF_SENDING);
return true;
}
virtual void Run(int32_t length_error) {
AUTO_LOCK(emitter_->GetLock());
if (length_error < 0) {
node_->SetError_Locked(length_error);
return;
}
node_->ClearStreamFlags(SSF_SENDING);
node_->QueueOutput();
}
private:
ScopedSocketNode node_;
};
class UdpRecvWork : public UdpWork {
public:
explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
: UdpWork(emitter) {
data_ = new char[kMaxPacketSize];
}
~UdpRecvWork() { delete[] data_; }
virtual bool Start(int32_t val) {
AUTO_LOCK(emitter_->GetLock());
UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
return false;
if (stream->TestStreamFlags(SSF_RECVING))
return false;
stream->SetStreamFlags(SSF_RECVING);
int err = UDPInterface()->RecvFrom(stream->socket_resource(),
data_,
kMaxPacketSize,
&addr_,
filesystem()->GetRunCompletion(this));
if (err != PP_OK_COMPLETIONPENDING) {
stream->SetError_Locked(err);
return false;
}
return true;
}
virtual void Run(int32_t length_error) {
AUTO_LOCK(emitter_->GetLock());
UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
if (NULL == stream)
return;
if (length_error > 0) {
Packet* packet = new Packet(filesystem()->ppapi());
packet->Copy(data_, length_error, addr_);
emitter_->WriteRXPacket_Locked(packet);
stream->ClearStreamFlags(SSF_RECVING);
stream->QueueInput();
} else {
stream->SetError_Locked(length_error);
}
}
private:
char* data_;
PP_Resource addr_;
};
UdpNode::UdpNode(Filesystem* filesystem)
: SocketNode(filesystem),
emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
emitter_->AttachStream(this);
}
void UdpNode::Destroy() {
emitter_->DetachStream();
SocketNode::Destroy();
}
UdpEventEmitter* UdpNode::GetEventEmitter() { return emitter_.get(); }
Error UdpNode::Init(int open_flags) {
Error err = SocketNode::Init(open_flags);
if (err != 0)
return err;
if (UDPInterface() == NULL)
return EACCES;
socket_resource_ =
UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
if (0 == socket_resource_)
return EACCES;
return 0;
}
void UdpNode::QueueInput() {
UdpRecvWork* work = new UdpRecvWork(emitter_);
stream()->EnqueueWork(work);
}
void UdpNode::QueueOutput() {
if (!TestStreamFlags(SSF_CAN_SEND))
return;
if (TestStreamFlags(SSF_SENDING))
return;
UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
stream()->EnqueueWork(work);
}
Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
if (0 == socket_resource_)
return EBADF;
if (IsBound())
return EINVAL;
PP_Resource out_addr = SockAddrToResource(addr, len);
if (0 == out_addr)
return EINVAL;
int err =
UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
filesystem_->ppapi()->ReleaseResource(out_addr);
if (err != 0)
return PPErrorToErrno(err);
out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
if (out_addr == 0)
return EINVAL;
SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
QueueInput();
local_addr_ = out_addr;
return 0;
}
Error UdpNode::Connect(const HandleAttr& attr,
const struct sockaddr* addr,
socklen_t len) {
if (0 == socket_resource_)
return EBADF;
if (remote_addr_ != 0) {
filesystem_->ppapi()->ReleaseResource(remote_addr_);
remote_addr_ = 0;
}
remote_addr_ = SockAddrToResource(addr, len);
if (0 == remote_addr_)
return EINVAL;
return 0;
}
Error UdpNode::Recv_Locked(void* buf,
size_t len,
PP_Resource* out_addr,
int* out_len) {
Packet* packet = emitter_->ReadRXPacket_Locked();
*out_len = 0;
*out_addr = 0;
if (packet) {
int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
memcpy(buf, packet->buffer(), capped_len);
if (packet->addr() != 0) {
filesystem_->ppapi()->AddRefResource(packet->addr());
*out_addr = packet->addr();
}
*out_len = capped_len;
delete packet;
return 0;
}
return EBADF;
}
Error UdpNode::Send_Locked(const void* buf,
size_t len,
PP_Resource addr,
int* out_len) {
if (!IsBound()) {
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = 0;
memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
Error err =
Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
if (err != 0)
return err;
}
*out_len = 0;
int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
Packet* packet = new Packet(filesystem_->ppapi());
packet->Copy(buf, capped_len, addr);
emitter_->WriteTXPacket_Locked(packet);
*out_len = capped_len;
return 0;
}
}