BufferDetail.hh
Go to the documentation of this file.00001
00019 #ifndef avro_BufferDetail_hh__
00020 #define avro_BufferDetail_hh__
00021
00022 #include <boost/shared_ptr.hpp>
00023 #include <boost/shared_array.hpp>
00024 #include <boost/static_assert.hpp>
00025 #include <boost/function.hpp>
00026 #ifdef HAVE_BOOST_ASIO
00027 #include <boost/asio/buffer.hpp>
00028 #endif
00029 #include <exception>
00030 #include <cassert>
00031 #include <deque>
00032
00040 namespace avro {
00041
00042 namespace detail {
00043
00044 typedef char data_type;
00045 typedef size_t size_type;
00046 #ifdef HAVE_BOOST_ASIO
00047 typedef boost::asio::const_buffer ConstAsioBuffer;
00048 typedef boost::asio::mutable_buffer MutableAsioBuffer;
00049 #endif
00050
00052 const size_type kMinBlockSize = 4096;
00053 const size_type kMaxBlockSize = 16384;
00054 const size_type kDefaultBlockSize = kMinBlockSize;
00055
00056 typedef boost::function<void(void)> free_func;
00057
00061 class CallOnDestroy {
00062 public:
00063 CallOnDestroy(const free_func &func) : func_(func)
00064 { }
00065 ~CallOnDestroy() {
00066 if (func_) {
00067 func_();
00068 }
00069 }
00070 private:
00071 free_func func_;
00072 };
00073
00091 class Chunk
00092 {
00093
00094 public:
00095
00096 typedef boost::shared_ptr<Chunk> SharedPtr;
00097
00099 Chunk(size_type size) :
00100 underlyingBlock_(new data_type[size]),
00101 readPos_(underlyingBlock_.get()),
00102 writePos_(readPos_),
00103 endPos_(readPos_ + size)
00104 { }
00105
00108 Chunk(const data_type *data, size_type size, const free_func &func) :
00109 callOnDestroy_(new CallOnDestroy(func)),
00110 readPos_(const_cast<data_type *>(data)),
00111 writePos_(readPos_ + size),
00112 endPos_(writePos_)
00113 { }
00114
00115 private:
00116
00117 boost::shared_ptr<CallOnDestroy> callOnDestroy_;
00118
00119 public:
00120
00123 void truncateFront(size_type howMuch) {
00124 readPos_ += howMuch;
00125 assert(readPos_ <= writePos_);
00126 }
00127
00130 void truncateBack(size_type howMuch) {
00131 writePos_ -= howMuch;
00132 assert(readPos_ <= writePos_);
00133 }
00134
00136 data_type *tellWritePos() const {
00137 return writePos_;
00138 }
00139
00141 const data_type *tellReadPos() const {
00142 return readPos_;
00143 }
00144
00146 void incrementCursor(size_type howMuch) {
00147 writePos_ += howMuch;
00148 assert(writePos_ <= endPos_);
00149 }
00150
00152 size_type dataSize() const {
00153 return (writePos_ - readPos_);
00154 }
00155
00157 size_type freeSize() const {
00158 return (endPos_ - writePos_);
00159 }
00160
00162 size_type capacity() const {
00163 return (endPos_ - readPos_);
00164 }
00165
00166 private:
00167
00168 friend bool operator==(const Chunk &lhs, const Chunk &rhs);
00169 friend bool operator!=(const Chunk &lhs, const Chunk &rhs);
00170
00171
00172 boost::shared_array<data_type> underlyingBlock_;
00173
00174 data_type *readPos_;
00175 data_type *writePos_;
00176 data_type *endPos_;
00177 };
00178
00182 inline bool operator==(const Chunk &lhs, const Chunk &rhs) {
00183 return lhs.underlyingBlock_ == rhs.underlyingBlock_;
00184 }
00185
00189 inline bool operator!=(const Chunk &lhs, const Chunk &rhs) {
00190 return lhs.underlyingBlock_ != rhs.underlyingBlock_;
00191 }
00192
00193
00203 class BufferImpl : boost::noncopyable
00204 {
00205
00208 void allocChunkChecked(size_type size = kDefaultBlockSize)
00209 {
00210 writeChunks_.push_back(Chunk(size));
00211 freeSpace_ += writeChunks_.back().freeSize();
00212 }
00213
00217 void allocChunk(size_type size)
00218 {
00219 if(size < kMinBlockSize) {
00220 size = kMinBlockSize;
00221 }
00222 else if (size > kMaxBlockSize) {
00223 size = kMaxBlockSize;
00224 }
00225 allocChunkChecked(size);
00226 }
00227
00230 void postWrite(size_type size)
00231 {
00232
00233
00234
00235
00236
00237 assert(size <= freeSpace_ && !writeChunks_.empty());
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253
00254
00255
00256
00257
00258
00259 if(readChunks_.empty() || (readChunks_.back() != writeChunks_.front())) {
00260 const Chunk &curChunk = writeChunks_.front();
00261 readChunks_.push_back(curChunk);
00262
00263
00264
00265
00266
00267
00268 readChunks_.back().truncateFront( curChunk.dataSize());
00269 }
00270
00271 assert(readChunks_.back().freeSize() == writeChunks_.front().freeSize());
00272
00273
00274
00275
00276 readChunks_.back().incrementCursor(size);
00277 writeChunks_.front().incrementCursor(size);
00278 size_ += size;
00279 freeSpace_ -= size;
00280
00281
00282
00283
00284 if(writeChunks_.front().freeSize() == 0) {
00285 writeChunks_.pop_front();
00286 }
00287 }
00288
00289 public:
00290
00291 typedef std::deque<Chunk> ChunkList;
00292 typedef boost::shared_ptr<BufferImpl> SharedPtr;
00293 typedef boost::shared_ptr<const BufferImpl> ConstSharedPtr;
00294
00296 BufferImpl() :
00297 freeSpace_(0),
00298 size_(0)
00299 { }
00300
00302 explicit BufferImpl(const BufferImpl &src) :
00303 readChunks_(src.readChunks_),
00304 freeSpace_(0),
00305 size_(src.size_)
00306 { }
00307
00309 size_type size() const {
00310 return size_;
00311 }
00312
00314 size_type freeSpace() const {
00315 return freeSpace_;
00316 }
00317
00320 void reserveFreeSpace(size_type reserveSize) {
00321 while(freeSpace_ < reserveSize) {
00322 allocChunk(reserveSize - freeSpace_);
00323 }
00324 }
00325
00327 ChunkList::const_iterator beginRead() const {
00328 return readChunks_.begin();
00329 }
00330
00332 ChunkList::const_iterator endRead() const {
00333 return readChunks_.end();
00334 }
00335
00337 ChunkList::const_iterator beginWrite() const {
00338 return writeChunks_.begin();
00339 }
00340
00342 ChunkList::const_iterator endWrite() const {
00343 return writeChunks_.end();
00344 }
00345
00347 template<typename T>
00348 void writeTo(T val, const boost::true_type&)
00349 {
00350 if(freeSpace_ && (sizeof(T) <= writeChunks_.front().freeSize())) {
00351
00352
00353 *(reinterpret_cast <T*> ( writeChunks_.front().tellWritePos()) ) = val;
00354 postWrite(sizeof(T));
00355 }
00356 else {
00357
00358
00359 writeTo(reinterpret_cast<data_type*>(&val), sizeof(T));
00360 }
00361 }
00362
00365 template<typename T>
00366 void writeTo(T val, const boost::false_type&)
00367 {
00368 BOOST_STATIC_ASSERT(sizeof(T)==0);
00369 }
00370
00372 size_type writeTo(const data_type *data, size_type size)
00373 {
00374 size_type bytesLeft = size;
00375 while(bytesLeft) {
00376
00377 if(freeSpace_ == 0) {
00378 allocChunkChecked();
00379 }
00380
00381 Chunk &chunk = writeChunks_.front();
00382 size_type toCopy = std::min<size_type>(chunk.freeSize(), bytesLeft);
00383 assert(toCopy);
00384 memcpy(chunk.tellWritePos(), data, toCopy);
00385 postWrite(toCopy);
00386 data += toCopy;
00387 bytesLeft -= toCopy;
00388 }
00389 return size;
00390 }
00391
00393 size_type wroteTo(size_type size)
00394 {
00395 assert(size <= freeSpace_);
00396 size_type bytesLeft = size;
00397 while (bytesLeft) {
00398
00399 Chunk &chunk = writeChunks_.front();
00400 size_type wrote = std::min<size_type>(chunk.freeSize(), bytesLeft);
00401 assert(wrote);
00402 postWrite(wrote);
00403 bytesLeft -= wrote;
00404 }
00405 return size;
00406 }
00407
00409 void append(const BufferImpl &src) {
00410 std::copy(src.readChunks_.begin(), src.readChunks_.end(), std::back_inserter(readChunks_));
00411 size_ += src.size_;
00412 }
00413
00415 void discardData() {
00416 readChunks_.clear();
00417 size_ = 0;
00418 }
00419
00421 void discardData(size_type bytes)
00422 {
00423 assert(bytes && bytes <= size_);
00424
00425 size_type bytesToDiscard = bytes;
00426 while( bytesToDiscard ) {
00427
00428 size_t currentSize = readChunks_.front().dataSize();
00429
00430
00431 if(currentSize <= bytesToDiscard) {
00432 readChunks_.pop_front();
00433 bytesToDiscard -= currentSize;
00434 }
00435 else {
00436 readChunks_.front().truncateFront(bytesToDiscard);
00437 bytesToDiscard = 0;
00438 }
00439 }
00440
00441 size_ -= bytes;
00442 }
00443
00446 void extractData(BufferImpl &dest, size_type bytes)
00447 {
00448 assert(bytes && bytes <= size_);
00449
00450 size_type bytesToExtract = bytes;
00451 while( bytesToExtract ) {
00452
00453 size_t currentSize = readChunks_.front().dataSize();
00454 dest.readChunks_.push_back(readChunks_.front());
00455
00456
00457 if(currentSize <= bytesToExtract) {
00458 readChunks_.pop_front();
00459 bytesToExtract -= currentSize;
00460 }
00461 else {
00462 readChunks_.front().truncateFront(bytesToExtract);
00463 size_t excess = currentSize - bytesToExtract;
00464 dest.readChunks_.back().truncateBack(excess);
00465 bytesToExtract = 0;
00466 }
00467 }
00468
00469 size_ -= bytes;
00470 dest.size_ += bytes;
00471 }
00472
00474 void extractData(BufferImpl &dest)
00475 {
00476 assert(dest.readChunks_.empty());
00477 dest.readChunks_.swap(readChunks_);
00478 dest.size_ = size_;
00479 size_ = 0;
00480 }
00481
00484 void copyData(BufferImpl &dest,
00485 ChunkList::const_iterator iter,
00486 size_type offset,
00487 size_type bytes) const
00488 {
00489
00490
00491
00492 size_type copied = 0;
00493 while(copied < bytes) {
00494
00495 dest.readChunks_.push_back(*iter);
00496
00497
00498
00499 dest.readChunks_.back().truncateFront(offset);
00500 offset = 0;
00501
00502 copied += dest.readChunks_.back().dataSize();
00503 ++iter;
00504 }
00505
00506
00507 size_type excess = copied - bytes;
00508 dest.readChunks_.back().truncateBack(excess);
00509
00510 dest.size_ += bytes;
00511 }
00512
00514 int numDataChunks() const {
00515 return readChunks_.size();
00516 }
00517
00520 int numFreeChunks() const {
00521 return writeChunks_.size();
00522 }
00523
00527 void appendForeignData(const data_type *data, size_type size, const free_func &func) {
00528 readChunks_.push_back(Chunk(data, size, func));
00529 size_ += size;
00530 }
00531
00532 private:
00533
00535 BufferImpl& operator=(const BufferImpl &src);
00536
00537
00538
00539
00540
00541
00542 ChunkList readChunks_;
00543 ChunkList writeChunks_;
00544
00545 size_type freeSpace_;
00546 size_type size_;
00547
00548 };
00549
00550 }
00551
00552 }
00553
00554 #endif