root/native_client_sdk/src/libraries/nacl_io/socket/udp_node.cc

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. packet_
  2. UDPInterface
  3. node_
  4. Start
  5. Run
  6. Start
  7. Run
  8. emitter_
  9. Destroy
  10. GetEventEmitter
  11. Init
  12. QueueInput
  13. QueueOutput
  14. Bind
  15. Connect
  16. Recv_Locked
  17. Send_Locked

// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "nacl_io/socket/udp_node.h"

#include <errno.h>
#include <string.h>

#include <algorithm>

#include "nacl_io/pepper_interface.h"
#include "nacl_io/socket/packet.h"
#include "nacl_io/socket/udp_event_emitter.h"
#include "nacl_io/stream/stream_fs.h"

namespace {
const size_t kMaxPacketSize = 65536;
const size_t kDefaultFifoSize = kMaxPacketSize * 8;
}

namespace nacl_io {

class UdpWork : public StreamFs::Work {
 public:
  explicit UdpWork(const ScopedUdpEventEmitter& emitter)
      : StreamFs::Work(emitter->stream()->stream()),
        emitter_(emitter),
        packet_(NULL) {}

  ~UdpWork() { delete packet_; }

  UDPSocketInterface* UDPInterface() {
    return filesystem()->ppapi()->GetUDPSocketInterface();
  }

 protected:
  ScopedUdpEventEmitter emitter_;
  Packet* packet_;
};

class UdpSendWork : public UdpWork {
 public:
  explicit UdpSendWork(const ScopedUdpEventEmitter& emitter,
                       const ScopedSocketNode& node)
      : UdpWork(emitter), node_(node) {}

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());

    // Does the stream exist, and can it send?
    if (!node_->TestStreamFlags(SSF_CAN_SEND))
      return false;

    packet_ = emitter_->ReadTXPacket_Locked();
    if (NULL == packet_)
      return false;

    int err = UDPInterface()->SendTo(node_->socket_resource(),
                                     packet_->buffer(),
                                     packet_->len(),
                                     packet_->addr(),
                                     filesystem()->GetRunCompletion(this));
    if (err != PP_OK_COMPLETIONPENDING) {
      // Anything else, we should assume the socket has gone bad.
      node_->SetError_Locked(err);
      return false;
    }

    node_->SetStreamFlags(SSF_SENDING);
    return true;
  }

  virtual void Run(int32_t length_error) {
    AUTO_LOCK(emitter_->GetLock());

    if (length_error < 0) {
      node_->SetError_Locked(length_error);
      return;
    }

    // If we did send, then Q more work.
    node_->ClearStreamFlags(SSF_SENDING);
    node_->QueueOutput();
  }

 private:
  // We assume that transmits will always complete.  If the upstream
  // actually back pressures, enough to prevent the Send callback
  // from triggering, this resource may never go away.
  ScopedSocketNode node_;
};

class UdpRecvWork : public UdpWork {
 public:
  explicit UdpRecvWork(const ScopedUdpEventEmitter& emitter)
      : UdpWork(emitter) {
    data_ = new char[kMaxPacketSize];
  }

  ~UdpRecvWork() { delete[] data_; }

  virtual bool Start(int32_t val) {
    AUTO_LOCK(emitter_->GetLock());
    UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());

    // Does the stream exist, and can it recv?
    if (NULL == stream || !stream->TestStreamFlags(SSF_CAN_RECV))
      return false;

    // Check if we are already receiving.
    if (stream->TestStreamFlags(SSF_RECVING))
      return false;

    stream->SetStreamFlags(SSF_RECVING);
    int err = UDPInterface()->RecvFrom(stream->socket_resource(),
                                       data_,
                                       kMaxPacketSize,
                                       &addr_,
                                       filesystem()->GetRunCompletion(this));
    if (err != PP_OK_COMPLETIONPENDING) {
      stream->SetError_Locked(err);
      return false;
    }

    return true;
  }

  virtual void Run(int32_t length_error) {
    AUTO_LOCK(emitter_->GetLock());
    UdpNode* stream = static_cast<UdpNode*>(emitter_->stream());
    if (NULL == stream)
      return;

    // On successful receive we queue more input
    if (length_error > 0) {
      Packet* packet = new Packet(filesystem()->ppapi());
      packet->Copy(data_, length_error, addr_);
      emitter_->WriteRXPacket_Locked(packet);
      stream->ClearStreamFlags(SSF_RECVING);
      stream->QueueInput();
    } else {
      stream->SetError_Locked(length_error);
    }
  }

 private:
  char* data_;
  PP_Resource addr_;
};

UdpNode::UdpNode(Filesystem* filesystem)
    : SocketNode(filesystem),
      emitter_(new UdpEventEmitter(kDefaultFifoSize, kDefaultFifoSize)) {
  emitter_->AttachStream(this);
}

void UdpNode::Destroy() {
  emitter_->DetachStream();
  SocketNode::Destroy();
}

UdpEventEmitter* UdpNode::GetEventEmitter() { return emitter_.get(); }

Error UdpNode::Init(int open_flags) {
  Error err = SocketNode::Init(open_flags);
  if (err != 0)
    return err;

  if (UDPInterface() == NULL)
    return EACCES;

  socket_resource_ =
      UDPInterface()->Create(filesystem_->ppapi()->GetInstance());
  if (0 == socket_resource_)
    return EACCES;

  return 0;
}

void UdpNode::QueueInput() {
  UdpRecvWork* work = new UdpRecvWork(emitter_);
  stream()->EnqueueWork(work);
}

void UdpNode::QueueOutput() {
  if (!TestStreamFlags(SSF_CAN_SEND))
    return;

  if (TestStreamFlags(SSF_SENDING))
    return;

  UdpSendWork* work = new UdpSendWork(emitter_, ScopedSocketNode(this));
  stream()->EnqueueWork(work);
}

Error UdpNode::Bind(const struct sockaddr* addr, socklen_t len) {
  if (0 == socket_resource_)
    return EBADF;

  /* Only bind once. */
  if (IsBound())
    return EINVAL;

  PP_Resource out_addr = SockAddrToResource(addr, len);
  if (0 == out_addr)
    return EINVAL;

  int err =
      UDPInterface()->Bind(socket_resource_, out_addr, PP_BlockUntilComplete());
  filesystem_->ppapi()->ReleaseResource(out_addr);
  if (err != 0)
    return PPErrorToErrno(err);

  // Get the address that was actually bound (in case addr was 0.0.0.0:0).
  out_addr = UDPInterface()->GetBoundAddress(socket_resource_);
  if (out_addr == 0)
    return EINVAL;

  // Now that we are bound, we can start sending and receiving.
  SetStreamFlags(SSF_CAN_SEND | SSF_CAN_RECV);
  QueueInput();

  local_addr_ = out_addr;
  return 0;
}

Error UdpNode::Connect(const HandleAttr& attr,
                       const struct sockaddr* addr,
                       socklen_t len) {
  if (0 == socket_resource_)
    return EBADF;

  /* Connect for UDP is the default dest, it's legal to change it. */
  if (remote_addr_ != 0) {
    filesystem_->ppapi()->ReleaseResource(remote_addr_);
    remote_addr_ = 0;
  }

  remote_addr_ = SockAddrToResource(addr, len);
  if (0 == remote_addr_)
    return EINVAL;

  return 0;
}

Error UdpNode::Recv_Locked(void* buf,
                           size_t len,
                           PP_Resource* out_addr,
                           int* out_len) {
  Packet* packet = emitter_->ReadRXPacket_Locked();
  *out_len = 0;
  *out_addr = 0;

  if (packet) {
    int capped_len = static_cast<int32_t>(std::min<int>(len, packet->len()));
    memcpy(buf, packet->buffer(), capped_len);

    if (packet->addr() != 0) {
      filesystem_->ppapi()->AddRefResource(packet->addr());
      *out_addr = packet->addr();
    }

    *out_len = capped_len;
    delete packet;
    return 0;
  }

  // Should never happen, Recv_Locked should not be called
  // unless already in a POLLIN state.
  return EBADF;
}

Error UdpNode::Send_Locked(const void* buf,
                           size_t len,
                           PP_Resource addr,
                           int* out_len) {
  if (!IsBound()) {
    // Pepper requires a socket to be bound before it can send.
    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = 0;
    memset(&addr.sin_addr, 0, sizeof(addr.sin_addr));
    Error err =
        Bind(reinterpret_cast<const struct sockaddr*>(&addr), sizeof(addr));
    if (err != 0)
      return err;
  }

  *out_len = 0;
  int capped_len = static_cast<int32_t>(std::min<int>(len, kMaxPacketSize));
  Packet* packet = new Packet(filesystem_->ppapi());
  packet->Copy(buf, capped_len, addr);

  emitter_->WriteTXPacket_Locked(packet);
  *out_len = capped_len;
  return 0;
}

}  // namespace nacl_io

/* [<][>][^][v][top][bottom][index][help] */