root/net/base/upload_data_stream.cc

/* [<][>][^][v][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. weak_ptr_factory_
  2. weak_ptr_factory_
  3. CreateWithReader
  4. Init
  5. Read
  6. IsEOF
  7. IsInMemory
  8. AppendChunk
  9. Reset
  10. InitInternal
  11. ResumePendingInit
  12. ReadInternal
  13. ResumePendingRead
  14. ProcessReadResult

// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/base/upload_data_stream.h"

#include "base/logging.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/upload_bytes_element_reader.h"
#include "net/base/upload_element_reader.h"

namespace net {

UploadDataStream::UploadDataStream(
    ScopedVector<UploadElementReader> element_readers,
    int64 identifier)
    : element_readers_(element_readers.Pass()),
      element_index_(0),
      total_size_(0),
      current_position_(0),
      identifier_(identifier),
      is_chunked_(false),
      last_chunk_appended_(false),
      read_failed_(false),
      initialized_successfully_(false),
      weak_ptr_factory_(this) {
}

UploadDataStream::UploadDataStream(Chunked /*chunked*/, int64 identifier)
    : element_index_(0),
      total_size_(0),
      current_position_(0),
      identifier_(identifier),
      is_chunked_(true),
      last_chunk_appended_(false),
      read_failed_(false),
      initialized_successfully_(false),
      weak_ptr_factory_(this) {
}

UploadDataStream::~UploadDataStream() {
}

UploadDataStream* UploadDataStream::CreateWithReader(
    scoped_ptr<UploadElementReader> reader,
    int64 identifier) {
  ScopedVector<UploadElementReader> readers;
  readers.push_back(reader.release());
  return new UploadDataStream(readers.Pass(), identifier);
}

int UploadDataStream::Init(const CompletionCallback& callback) {
  Reset();
  return InitInternal(0, callback);
}

int UploadDataStream::Read(IOBuffer* buf,
                           int buf_len,
                           const CompletionCallback& callback) {
  DCHECK(initialized_successfully_);
  DCHECK_GT(buf_len, 0);
  return ReadInternal(new DrainableIOBuffer(buf, buf_len), callback);
}

bool UploadDataStream::IsEOF() const {
  DCHECK(initialized_successfully_);
  if (!is_chunked_)
    return current_position_ == total_size_;

  // If the upload data is chunked, check if the last chunk is appended and all
  // elements are consumed.
  return element_index_ == element_readers_.size() && last_chunk_appended_;
}

bool UploadDataStream::IsInMemory() const {
  // Chunks are in memory, but UploadData does not have all the chunks at
  // once. Chunks are provided progressively with AppendChunk() as chunks
  // are ready. Check is_chunked_ here, rather than relying on the loop
  // below, as there is a case that is_chunked_ is set to true, but the
  // first chunk is not yet delivered.
  if (is_chunked_)
    return false;

  for (size_t i = 0; i < element_readers_.size(); ++i) {
    if (!element_readers_[i]->IsInMemory())
      return false;
  }
  return true;
}

void UploadDataStream::AppendChunk(const char* bytes,
                                   int bytes_len,
                                   bool is_last_chunk) {
  DCHECK(is_chunked_);
  DCHECK(!last_chunk_appended_);
  last_chunk_appended_ = is_last_chunk;

  // Initialize a reader for the newly appended chunk. We leave |total_size_| at
  // zero, since for chunked uploads, we may not know the total size.
  std::vector<char> data(bytes, bytes + bytes_len);
  UploadElementReader* reader = new UploadOwnedBytesElementReader(&data);
  const int rv = reader->Init(net::CompletionCallback());
  DCHECK_EQ(OK, rv);
  element_readers_.push_back(reader);

  // Resume pending read.
  if (!pending_chunked_read_callback_.is_null()) {
    base::Closure callback = pending_chunked_read_callback_;
    pending_chunked_read_callback_.Reset();
    callback.Run();
  }
}

void UploadDataStream::Reset() {
  weak_ptr_factory_.InvalidateWeakPtrs();
  pending_chunked_read_callback_.Reset();
  initialized_successfully_ = false;
  read_failed_ = false;
  current_position_ = 0;
  total_size_ = 0;
  element_index_ = 0;
}

int UploadDataStream::InitInternal(int start_index,
                                   const CompletionCallback& callback) {
  DCHECK(!initialized_successfully_);

  // Call Init() for all elements.
  for (size_t i = start_index; i < element_readers_.size(); ++i) {
    UploadElementReader* reader = element_readers_[i];
    // When new_result is ERR_IO_PENDING, InitInternal() will be called
    // with start_index == i + 1 when reader->Init() finishes.
    const int result = reader->Init(
        base::Bind(&UploadDataStream::ResumePendingInit,
                   weak_ptr_factory_.GetWeakPtr(),
                   i + 1,
                   callback));
    if (result != OK) {
      DCHECK(result != ERR_IO_PENDING || !callback.is_null());
      return result;
    }
  }

  // Finalize initialization.
  if (!is_chunked_) {
    uint64 total_size = 0;
    for (size_t i = 0; i < element_readers_.size(); ++i) {
      UploadElementReader* reader = element_readers_[i];
      total_size += reader->GetContentLength();
    }
    total_size_ = total_size;
  }
  initialized_successfully_ = true;
  return OK;
}

void UploadDataStream::ResumePendingInit(int start_index,
                                         const CompletionCallback& callback,
                                         int previous_result) {
  DCHECK(!initialized_successfully_);
  DCHECK(!callback.is_null());
  DCHECK_NE(ERR_IO_PENDING, previous_result);

  // Check the last result.
  if (previous_result != OK) {
    callback.Run(previous_result);
    return;
  }

  const int result = InitInternal(start_index, callback);
  if (result != ERR_IO_PENDING)
    callback.Run(result);
}

int UploadDataStream::ReadInternal(scoped_refptr<DrainableIOBuffer> buf,
                                   const CompletionCallback& callback) {
  DCHECK(initialized_successfully_);

  while (!read_failed_ && element_index_ < element_readers_.size()) {
    UploadElementReader* reader = element_readers_[element_index_];

    if (reader->BytesRemaining() == 0) {
      ++element_index_;
      continue;
    }

    if (buf->BytesRemaining() == 0)
      break;

    int result = reader->Read(
        buf.get(),
        buf->BytesRemaining(),
        base::Bind(base::IgnoreResult(&UploadDataStream::ResumePendingRead),
                   weak_ptr_factory_.GetWeakPtr(),
                   buf,
                   callback));
    if (result == ERR_IO_PENDING) {
      DCHECK(!callback.is_null());
      return ERR_IO_PENDING;
    }
    ProcessReadResult(buf, result);
  }

  if (read_failed_) {
    // Chunked transfers may only contain byte readers, so cannot have read
    // failures.
    DCHECK(!is_chunked_);

    // If an error occured during read operation, then pad with zero.
    // Otherwise the server will hang waiting for the rest of the data.
    const int num_bytes_to_fill =
        std::min(static_cast<uint64>(buf->BytesRemaining()),
                 size() - position() - buf->BytesConsumed());
    DCHECK_LE(0, num_bytes_to_fill);
    memset(buf->data(), 0, num_bytes_to_fill);
    buf->DidConsume(num_bytes_to_fill);
  }

  const int bytes_copied = buf->BytesConsumed();
  current_position_ += bytes_copied;
  DCHECK(is_chunked_ || total_size_ >= current_position_);

  if (is_chunked_ && !IsEOF() && bytes_copied == 0) {
    DCHECK(!callback.is_null());
    DCHECK(pending_chunked_read_callback_.is_null());
    pending_chunked_read_callback_ =
        base::Bind(&UploadDataStream::ResumePendingRead,
                   weak_ptr_factory_.GetWeakPtr(),
                   buf,
                   callback,
                   OK);
    return ERR_IO_PENDING;
  }

  // Returning 0 is allowed only when IsEOF() == true.
  DCHECK(bytes_copied != 0 || IsEOF());
  return bytes_copied;
}

void UploadDataStream::ResumePendingRead(scoped_refptr<DrainableIOBuffer> buf,
                                         const CompletionCallback& callback,
                                         int previous_result) {
  DCHECK(!callback.is_null());

  ProcessReadResult(buf, previous_result);

  const int result = ReadInternal(buf, callback);
  if (result != ERR_IO_PENDING)
    callback.Run(result);
}

void UploadDataStream::ProcessReadResult(scoped_refptr<DrainableIOBuffer> buf,
                                         int result) {
  DCHECK_NE(ERR_IO_PENDING, result);
  DCHECK(!read_failed_);

  if (result >= 0)
    buf->DidConsume(result);
  else
    read_failed_ = true;
}

}  // namespace net

/* [<][>][^][v][top][bottom][index][help] */