root/net/tools/quic/quic_dispatcher.cc

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. OnAlarm
  2. QuicFramerVisitor
  3. OnPacket
  4. OnUnauthenticatedPublicHeader
  5. OnUnauthenticatedHeader
  6. OnError
  7. OnProtocolVersionMismatch
  8. OnPublicResetPacket
  9. OnVersionNegotiationPacket
  10. OnPacketComplete
  11. OnPacketHeader
  12. OnRevivedPacket
  13. OnFecProtectedPayload
  14. OnStreamFrame
  15. OnAckFrame
  16. OnCongestionFeedbackFrame
  17. OnStopWaitingFrame
  18. OnRstStreamFrame
  19. OnConnectionCloseFrame
  20. OnGoAwayFrame
  21. OnWindowUpdateFrame
  22. OnBlockedFrame
  23. OnFecData
  24. initial_flow_control_window_bytes_
  25. Initialize
  26. ProcessPacket
  27. OnUnauthenticatedPublicHeader
  28. OnUnauthenticatedHeader
  29. CleanUpSession
  30. DeleteSessions
  31. OnCanWrite
  32. HasPendingWrites
  33. Shutdown
  34. OnConnectionClosed
  35. OnWriteBlocked
  36. CreateWriter
  37. CreateWriterWrapper
  38. CreateQuicSession
  39. CreateQuicConnection
  40. set_writer
  41. HandlePacketForTimeWait

// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/tools/quic/quic_dispatcher.h"

#include <errno.h>

#include "base/debug/stack_trace.h"
#include "base/logging.h"
#include "base/stl_util.h"
#include "net/quic/quic_blocked_writer_interface.h"
#include "net/quic/quic_flags.h"
#include "net/quic/quic_utils.h"
#include "net/tools/quic/quic_default_packet_writer.h"
#include "net/tools/quic/quic_epoll_connection_helper.h"
#include "net/tools/quic/quic_packet_writer_wrapper.h"
#include "net/tools/quic/quic_socket_utils.h"

namespace net {

namespace tools {

using base::StringPiece;
using std::make_pair;

class DeleteSessionsAlarm : public EpollAlarm {
 public:
  explicit DeleteSessionsAlarm(QuicDispatcher* dispatcher)
      : dispatcher_(dispatcher) {
  }

  virtual int64 OnAlarm() OVERRIDE {
    EpollAlarm::OnAlarm();
    dispatcher_->DeleteSessions();
    return 0;
  }

 private:
  QuicDispatcher* dispatcher_;
};

class QuicDispatcher::QuicFramerVisitor : public QuicFramerVisitorInterface {
 public:
  explicit QuicFramerVisitor(QuicDispatcher* dispatcher)
      : dispatcher_(dispatcher) {}

  // QuicFramerVisitorInterface implementation
  virtual void OnPacket() OVERRIDE {}
  virtual bool OnUnauthenticatedPublicHeader(
      const QuicPacketPublicHeader& header) OVERRIDE {
    return dispatcher_->OnUnauthenticatedPublicHeader(header);
  }
  virtual bool OnUnauthenticatedHeader(
      const QuicPacketHeader& header) OVERRIDE {
    dispatcher_->OnUnauthenticatedHeader(header);
    return false;
  }
  virtual void OnError(QuicFramer* framer) OVERRIDE {
    DVLOG(1) << QuicUtils::ErrorToString(framer->error());
  }

  // The following methods should never get called because we always return
  // false from OnUnauthenticatedHeader().  As a result, we never process the
  // payload of the packet.
  virtual bool OnProtocolVersionMismatch(
      QuicVersion /*received_version*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual void OnPublicResetPacket(
      const QuicPublicResetPacket& /*packet*/) OVERRIDE {
    DCHECK(false);
  }
  virtual void OnVersionNegotiationPacket(
      const QuicVersionNegotiationPacket& /*packet*/) OVERRIDE {
    DCHECK(false);
  }
  virtual void OnPacketComplete() OVERRIDE {
    DCHECK(false);
  }
  virtual bool OnPacketHeader(const QuicPacketHeader& /*header*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual void OnRevivedPacket() OVERRIDE {
    DCHECK(false);
  }
  virtual void OnFecProtectedPayload(StringPiece /*payload*/) OVERRIDE {
    DCHECK(false);
  }
  virtual bool OnStreamFrame(const QuicStreamFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnAckFrame(const QuicAckFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnCongestionFeedbackFrame(
      const QuicCongestionFeedbackFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnStopWaitingFrame(
      const QuicStopWaitingFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnRstStreamFrame(const QuicRstStreamFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnConnectionCloseFrame(
      const QuicConnectionCloseFrame & /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnGoAwayFrame(const QuicGoAwayFrame& /*frame*/) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnWindowUpdateFrame(const QuicWindowUpdateFrame& /*frame*/)
      OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual bool OnBlockedFrame(const QuicBlockedFrame& frame) OVERRIDE {
    DCHECK(false);
    return false;
  }
  virtual void OnFecData(const QuicFecData& /*fec*/) OVERRIDE {
    DCHECK(false);
  }

 private:
  QuicDispatcher* dispatcher_;
};

QuicDispatcher::QuicDispatcher(const QuicConfig& config,
                               const QuicCryptoServerConfig& crypto_config,
                               const QuicVersionVector& supported_versions,
                               EpollServer* epoll_server,
                               uint32 initial_flow_control_window_bytes)
    : config_(config),
      crypto_config_(crypto_config),
      delete_sessions_alarm_(new DeleteSessionsAlarm(this)),
      epoll_server_(epoll_server),
      helper_(new QuicEpollConnectionHelper(epoll_server_)),
      supported_versions_(supported_versions),
      supported_versions_no_flow_control_(supported_versions),
      current_packet_(NULL),
      framer_(supported_versions, /*unused*/ QuicTime::Zero(), true),
      framer_visitor_(new QuicFramerVisitor(this)),
      initial_flow_control_window_bytes_(initial_flow_control_window_bytes) {
  framer_.set_visitor(framer_visitor_.get());
}

QuicDispatcher::~QuicDispatcher() {
  STLDeleteValues(&session_map_);
  STLDeleteElements(&closed_session_list_);
}

void QuicDispatcher::Initialize(int fd) {
  DCHECK(writer_ == NULL);
  writer_.reset(CreateWriterWrapper(CreateWriter(fd)));
  time_wait_list_manager_.reset(
      new QuicTimeWaitListManager(writer_.get(), this,
                                  epoll_server(), supported_versions()));

  // Remove all versions > QUIC_VERSION_16 from the
  // supported_versions_no_flow_control_ vector.
  QuicVersionVector::iterator it =
      find(supported_versions_no_flow_control_.begin(),
           supported_versions_no_flow_control_.end(), QUIC_VERSION_17);
  if (it != supported_versions_no_flow_control_.end()) {
    supported_versions_no_flow_control_.erase(
        supported_versions_no_flow_control_.begin(), it + 1);
  }
  CHECK(!supported_versions_no_flow_control_.empty());
}

void QuicDispatcher::ProcessPacket(const IPEndPoint& server_address,
                                   const IPEndPoint& client_address,
                                   const QuicEncryptedPacket& packet) {
  current_server_address_ = server_address;
  current_client_address_ = client_address;
  current_packet_ = &packet;
  // ProcessPacket will cause the packet to be dispatched in
  // OnUnauthenticatedPublicHeader, or sent to the time wait list manager
  // in OnAuthenticatedHeader.
  framer_.ProcessPacket(packet);
  // TODO(rjshade): Return a status describing if/why a packet was dropped,
  //                and log somehow.  Maybe expose as a varz.
}

bool QuicDispatcher::OnUnauthenticatedPublicHeader(
    const QuicPacketPublicHeader& header) {
  QuicSession* session = NULL;

  QuicConnectionId connection_id = header.connection_id;
  SessionMap::iterator it = session_map_.find(connection_id);
  if (it == session_map_.end()) {
    if (header.reset_flag) {
      return false;
    }
    if (time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id)) {
      return HandlePacketForTimeWait(header);
    }

    // Ensure the packet has a version negotiation bit set before creating a new
    // session for it.  All initial packets for a new connection are required to
    // have the flag set.  Otherwise it may be a stray packet.
    if (header.version_flag) {
      session = CreateQuicSession(connection_id, current_server_address_,
                                  current_client_address_);
    }

    if (session == NULL) {
      DVLOG(1) << "Failed to create session for " << connection_id;
      // Add this connection_id fo the time-wait state, to safely reject future
      // packets.

      if (header.version_flag &&
          !framer_.IsSupportedVersion(header.versions.front())) {
        // TODO(ianswett): Produce a no-version version negotiation packet.
        return false;
      }

      // Use the version in the packet if possible, otherwise assume the latest.
      QuicVersion version = header.version_flag ? header.versions.front() :
          supported_versions_.front();
      time_wait_list_manager_->AddConnectionIdToTimeWait(
          connection_id, version, NULL);
      DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(connection_id));
      return HandlePacketForTimeWait(header);
    }
    DVLOG(1) << "Created new session for " << connection_id;
    session_map_.insert(make_pair(connection_id, session));
  } else {
    session = it->second;
  }

  session->connection()->ProcessUdpPacket(
      current_server_address_, current_client_address_, *current_packet_);

  // Do not parse the packet further.  The session will process it completely.
  return false;
}

void QuicDispatcher::OnUnauthenticatedHeader(const QuicPacketHeader& header) {
  DCHECK(time_wait_list_manager_->IsConnectionIdInTimeWait(
      header.public_header.connection_id));
  time_wait_list_manager_->ProcessPacket(current_server_address_,
                                         current_client_address_,
                                         header.public_header.connection_id,
                                         header.packet_sequence_number);
}

void QuicDispatcher::CleanUpSession(SessionMap::iterator it) {
  QuicConnection* connection = it->second->connection();
  QuicEncryptedPacket* connection_close_packet =
          connection->ReleaseConnectionClosePacket();
  write_blocked_list_.erase(connection);
  time_wait_list_manager_->AddConnectionIdToTimeWait(it->first,
                                                     connection->version(),
                                                     connection_close_packet);
  session_map_.erase(it);
}

void QuicDispatcher::DeleteSessions() {
  STLDeleteElements(&closed_session_list_);
}

void QuicDispatcher::OnCanWrite() {
  // We got an EPOLLOUT: the socket should not be blocked.
  writer_->SetWritable();

  // Give each writer one attempt to write.
  int num_writers = write_blocked_list_.size();
  for (int i = 0; i < num_writers; ++i) {
    if (write_blocked_list_.empty()) {
      return;
    }
    QuicBlockedWriterInterface* blocked_writer =
        write_blocked_list_.begin()->first;
    write_blocked_list_.erase(write_blocked_list_.begin());
    blocked_writer->OnCanWrite();
    if (writer_->IsWriteBlocked()) {
      // We were unable to write.  Wait for the next EPOLLOUT. The writer is
      // responsible for adding itself to the blocked list via OnWriteBlocked().
      return;
    }
  }
}

bool QuicDispatcher::HasPendingWrites() const {
  return !write_blocked_list_.empty();
}

void QuicDispatcher::Shutdown() {
  while (!session_map_.empty()) {
    QuicSession* session = session_map_.begin()->second;
    session->connection()->SendConnectionClose(QUIC_PEER_GOING_AWAY);
    // Validate that the session removes itself from the session map on close.
    DCHECK(session_map_.empty() || session_map_.begin()->second != session);
  }
  DeleteSessions();
}

void QuicDispatcher::OnConnectionClosed(QuicConnectionId connection_id,
                                        QuicErrorCode error) {
  SessionMap::iterator it = session_map_.find(connection_id);
  if (it == session_map_.end()) {
    LOG(DFATAL) << "ConnectionId " << connection_id
                << " does not exist in the session map.  "
                << "Error: " << QuicUtils::ErrorToString(error);
    LOG(DFATAL) << base::debug::StackTrace().ToString();
    return;
  }

  DLOG_IF(INFO, error != QUIC_NO_ERROR) << "Closing connection ("
                                        << connection_id
                                        << ") due to error: "
                                        << QuicUtils::ErrorToString(error);

  if (closed_session_list_.empty()) {
    epoll_server_->RegisterAlarmApproximateDelta(
        0, delete_sessions_alarm_.get());
  }
  closed_session_list_.push_back(it->second);
  CleanUpSession(it);
}

void QuicDispatcher::OnWriteBlocked(QuicBlockedWriterInterface* writer) {
  DCHECK(writer_->IsWriteBlocked());
  write_blocked_list_.insert(make_pair(writer, true));
}

QuicPacketWriter* QuicDispatcher::CreateWriter(int fd) {
  return new QuicDefaultPacketWriter(fd);
}

QuicPacketWriterWrapper* QuicDispatcher::CreateWriterWrapper(
    QuicPacketWriter* writer) {
  return new QuicPacketWriterWrapper(writer);
}

QuicSession* QuicDispatcher::CreateQuicSession(
    QuicConnectionId connection_id,
    const IPEndPoint& server_address,
    const IPEndPoint& client_address) {
  QuicServerSession* session = new QuicServerSession(
      config_,
      CreateQuicConnection(connection_id,
                           server_address,
                           client_address,
                           initial_flow_control_window_bytes_),
      this);
  session->InitializeSession(crypto_config_);
  return session;
}

QuicConnection* QuicDispatcher::CreateQuicConnection(
    QuicConnectionId connection_id,
    const IPEndPoint& server_address,
    const IPEndPoint& client_address,
    uint32 initial_flow_control_window) {
  // If we have disabled per-stream flow control, then don't allow new
  // connections to talk QUIC_VERSION_17 or higher.
  if (FLAGS_enable_quic_stream_flow_control) {
    return new QuicConnection(connection_id, client_address, helper_.get(),
                              writer_.get(), true, supported_versions_,
                              initial_flow_control_window_bytes_);
  } else {
    DVLOG(1)
        << "Flow control disabled, creating QuicDispatcher WITHOUT version 17";
    return new QuicConnection(connection_id, client_address, helper_.get(),
                              writer_.get(), true,
                              supported_versions_no_flow_control_,
                              initial_flow_control_window_bytes_);
  }
}

void QuicDispatcher::set_writer(QuicPacketWriter* writer) {
  writer_->set_writer(writer);
}

bool QuicDispatcher::HandlePacketForTimeWait(
    const QuicPacketPublicHeader& header) {
  if (header.reset_flag) {
    // Public reset packets do not have sequence numbers, so ignore the packet.
    return false;
  }

  // Switch the framer to the correct version, so that the sequence number can
  // be parsed correctly.
  framer_.set_version(time_wait_list_manager_->GetQuicVersionFromConnectionId(
      header.connection_id));

  // Continue parsing the packet to extract the sequence number.  Then
  // send it to the time wait manager in OnUnathenticatedHeader.
  return true;
}

}  // namespace tools
}  // namespace net

/* [<][>][^][v][top][bottom][index][help] */