This source file includes following definitions.
- Parse32WithLimit
- Encode32
- Init
- Read
- ReadVarint32
- ReadVarint32Signed
- ShareSubstream
- ReadSubstream
- Skip
- Write
- WriteVarint32
- WriteVarint32Signed
- WriteSizeVarint32
- Append
- Retire
- Init
- Init
- ReadSet
- Empty
- Init
- CopyHeaderTo
- CopyTo
- WriteSet
#include "courgette/streams.h"
#include <memory.h>
#include "base/basictypes.h"
#include "base/logging.h"
namespace courgette {
static const unsigned int kStreamsSerializationFormatVersion = 20090218;
class Varint {
public:
static const int kMax32 = 5;
static const uint8* Parse32WithLimit(const uint8* source, const uint8* limit,
uint32* output);
static uint8* Encode32(uint8* destination, uint32 value);
};
const uint8* Varint::Parse32WithLimit(const uint8* source,
const uint8* limit,
uint32* output) {
uint32 digit, result;
if (source >= limit)
return NULL;
digit = *(source++);
result = digit & 127;
if (digit < 128) {
*output = result;
return source;
}
if (source >= limit)
return NULL;
digit = *(source++);
result |= (digit & 127) << 7;
if (digit < 128) {
*output = result;
return source;
}
if (source >= limit)
return NULL;
digit = *(source++);
result |= (digit & 127) << 14;
if (digit < 128) {
*output = result;
return source;
}
if (source >= limit)
return NULL;
digit = *(source++);
result |= (digit & 127) << 21;
if (digit < 128) {
*output = result;
return source;
}
if (source >= limit)
return NULL;
digit = *(source++);
result |= (digit & 127) << 28;
if (digit < 128) {
*output = result;
return source;
}
return NULL;
}
inline uint8* Varint::Encode32(uint8* destination, uint32 value) {
while (value >= 128) {
*(destination++) = value | 128;
value = value >> 7;
}
*(destination++) = value;
return destination;
}
void SourceStream::Init(const SinkStream& sink) {
Init(sink.Buffer(), sink.Length());
}
bool SourceStream::Read(void* destination, size_t count) {
if (current_ + count > end_)
return false;
memcpy(destination, current_, count);
current_ += count;
return true;
}
bool SourceStream::ReadVarint32(uint32* output_value) {
const uint8* after = Varint::Parse32WithLimit(current_, end_, output_value);
if (!after)
return false;
current_ = after;
return true;
}
bool SourceStream::ReadVarint32Signed(int32* output_value) {
uint32 unsigned_value;
if (!ReadVarint32(&unsigned_value))
return false;
if (unsigned_value & 1)
*output_value = ~static_cast<int32>(unsigned_value >> 1);
else
*output_value = (unsigned_value >> 1);
return true;
}
bool SourceStream::ShareSubstream(size_t offset, size_t length,
SourceStream* substream) {
if (offset > Remaining())
return false;
if (length > Remaining() - offset)
return false;
substream->Init(current_ + offset, length);
return true;
}
bool SourceStream::ReadSubstream(size_t length, SourceStream* substream) {
if (!ShareSubstream(0, length, substream))
return false;
current_ += length;
return true;
}
bool SourceStream::Skip(size_t byte_count) {
if (current_ + byte_count > end_)
return false;
current_ += byte_count;
return true;
}
CheckBool SinkStream::Write(const void* data, size_t byte_count) {
return buffer_.append(static_cast<const char*>(data), byte_count);
}
CheckBool SinkStream::WriteVarint32(uint32 value) {
uint8 buffer[Varint::kMax32];
uint8* end = Varint::Encode32(buffer, value);
return Write(buffer, end - buffer);
}
CheckBool SinkStream::WriteVarint32Signed(int32 value) {
bool ret;
if (value < 0)
ret = WriteVarint32(~value * 2 + 1);
else
ret = WriteVarint32(value * 2);
return ret;
}
CheckBool SinkStream::WriteSizeVarint32(size_t value) {
uint32 narrowed_value = static_cast<uint32>(value);
LOG_ASSERT(value == narrowed_value);
return WriteVarint32(narrowed_value);
}
CheckBool SinkStream::Append(SinkStream* other) {
bool ret = Write(other->buffer_.data(), other->buffer_.size());
if (ret)
other->Retire();
return ret;
}
void SinkStream::Retire() {
buffer_.clear();
}
SourceStreamSet::SourceStreamSet()
: count_(kMaxStreams) {
}
SourceStreamSet::~SourceStreamSet() {
}
bool SourceStreamSet::Init(const void* source, size_t byte_count) {
const uint8* start = static_cast<const uint8*>(source);
const uint8* end = start + byte_count;
unsigned int version;
const uint8* finger = Varint::Parse32WithLimit(start, end, &version);
if (finger == NULL)
return false;
if (version != kStreamsSerializationFormatVersion)
return false;
unsigned int count;
finger = Varint::Parse32WithLimit(finger, end, &count);
if (finger == NULL)
return false;
if (count > kMaxStreams)
return false;
count_ = count;
unsigned int lengths[kMaxStreams];
size_t accumulated_length = 0;
for (size_t i = 0; i < count_; ++i) {
finger = Varint::Parse32WithLimit(finger, end, &lengths[i]);
if (finger == NULL)
return false;
accumulated_length += lengths[i];
}
if (static_cast<size_t>(end - finger) != accumulated_length)
return false;
accumulated_length = finger - start;
for (size_t i = 0; i < count_; ++i) {
stream(i)->Init(start + accumulated_length, lengths[i]);
accumulated_length += lengths[i];
}
return true;
}
bool SourceStreamSet::Init(SourceStream* source) {
return Init(source->Buffer(), source->Remaining());
}
bool SourceStreamSet::ReadSet(SourceStreamSet* set) {
uint32 stream_count = 0;
SourceStream* control_stream = this->stream(0);
if (!control_stream->ReadVarint32(&stream_count))
return false;
uint32 lengths[kMaxStreams] = {};
for (size_t i = 0; i < stream_count; ++i) {
if (!control_stream->ReadVarint32(&lengths[i]))
return false;
}
for (size_t i = 0; i < stream_count; ++i) {
if (!this->stream(i)->ReadSubstream(lengths[i], set->stream(i)))
return false;
}
return true;
}
bool SourceStreamSet::Empty() const {
for (size_t i = 0; i < count_; ++i) {
if (streams_[i].Remaining() != 0)
return false;
}
return true;
}
SinkStreamSet::SinkStreamSet()
: count_(kMaxStreams) {
}
SinkStreamSet::~SinkStreamSet() {
}
void SinkStreamSet::Init(size_t stream_index_limit) {
count_ = stream_index_limit;
}
CheckBool SinkStreamSet::CopyHeaderTo(SinkStream* header) {
bool ret = header->WriteVarint32(kStreamsSerializationFormatVersion);
if (ret) {
ret = header->WriteSizeVarint32(count_);
for (size_t i = 0; ret && i < count_; ++i) {
ret = header->WriteSizeVarint32(stream(i)->Length());
}
}
return ret;
}
CheckBool SinkStreamSet::CopyTo(SinkStream *combined_stream) {
SinkStream header;
bool ret = CopyHeaderTo(&header);
if (!ret)
return ret;
size_t length = header.Length();
for (size_t i = 0; i < count_; ++i) {
length += stream(i)->Length();
}
ret = combined_stream->Reserve(length);
if (ret) {
ret = combined_stream->Append(&header);
for (size_t i = 0; ret && i < count_; ++i) {
ret = combined_stream->Append(stream(i));
}
}
return ret;
}
CheckBool SinkStreamSet::WriteSet(SinkStreamSet* set) {
uint32 lengths[kMaxStreams];
size_t stream_count = 0;
for (size_t i = 0; i < kMaxStreams; ++i) {
SinkStream* stream = set->stream(i);
lengths[i] = static_cast<uint32>(stream->Length());
if (lengths[i] > 0)
stream_count = i + 1;
}
SinkStream* control_stream = this->stream(0);
bool ret = control_stream->WriteSizeVarint32(stream_count);
for (size_t i = 0; ret && i < stream_count; ++i) {
ret = control_stream->WriteSizeVarint32(lengths[i]);
}
for (size_t i = 0; ret && i < stream_count; ++i) {
ret = this->stream(i)->Append(set->stream(i));
}
return ret;
}
}