BufferDetail.hh

Go to the documentation of this file.
00001 
00019 #ifndef avro_BufferDetail_hh__
00020 #define avro_BufferDetail_hh__
00021 
00022 #include <boost/shared_ptr.hpp>
00023 #include <boost/shared_array.hpp>
00024 #include <boost/static_assert.hpp>
00025 #include <boost/function.hpp>
00026 #ifdef HAVE_BOOST_ASIO
00027 #include <boost/asio/buffer.hpp>
00028 #endif
00029 #include <exception>
00030 #include <cassert>
00031 #include <deque>
00032 
00040 namespace avro {
00041 
00042 namespace detail {
00043 
00044 typedef char    data_type;
00045 typedef size_t  size_type;
00046 #ifdef HAVE_BOOST_ASIO
00047 typedef boost::asio::const_buffer   ConstAsioBuffer;
00048 typedef boost::asio::mutable_buffer MutableAsioBuffer;
00049 #endif
00050 
00052 const size_type kMinBlockSize = 4096;
00053 const size_type kMaxBlockSize = 16384;
00054 const size_type kDefaultBlockSize = kMinBlockSize;
00055 
00056 typedef boost::function<void(void)>  free_func;
00057 
00061 class CallOnDestroy {
00062   public:
00063     CallOnDestroy(const free_func &func) : func_(func)
00064     { }
00065     ~CallOnDestroy() {
00066         if (func_) {
00067             func_();
00068         }
00069     }
00070   private:
00071     free_func func_;
00072 };
00073 
00091 class Chunk 
00092 {
00093 
00094   public:
00095 
00096     typedef boost::shared_ptr<Chunk> SharedPtr;
00097 
00099     Chunk(size_type size) :
00100         underlyingBlock_(new data_type[size]), 
00101         readPos_(underlyingBlock_.get()),
00102         writePos_(readPos_),
00103         endPos_(readPos_ + size)
00104     { }
00105 
00108     Chunk(const data_type *data, size_type size, const free_func &func) :
00109         callOnDestroy_(new CallOnDestroy(func)),
00110         readPos_(const_cast<data_type *>(data)),
00111         writePos_(readPos_ + size),
00112         endPos_(writePos_)
00113     { }
00114 
00115   private:
00116     // reference counted object will call a functor when it's destroyed
00117     boost::shared_ptr<CallOnDestroy> callOnDestroy_;
00118 
00119   public:
00120 
00123     void truncateFront(size_type howMuch) {
00124         readPos_ += howMuch;
00125         assert(readPos_ <= writePos_);
00126     }
00127 
00130     void truncateBack(size_type howMuch) {
00131         writePos_ -= howMuch;
00132         assert(readPos_ <= writePos_);
00133     }
00134 
00136     data_type *tellWritePos() const {
00137         return writePos_;
00138     }
00139 
00141     const data_type *tellReadPos() const {
00142         return readPos_;
00143     }
00144 
00146     void incrementCursor(size_type howMuch) {
00147         writePos_  += howMuch; 
00148         assert(writePos_ <= endPos_);
00149     }
00150 
00152     size_type dataSize() const {
00153         return (writePos_ - readPos_);
00154     }
00155 
00157     size_type freeSize() const {
00158         return (endPos_ - writePos_);
00159     }
00160 
00162     size_type capacity() const {
00163         return (endPos_ - readPos_);
00164     }
00165 
00166   private:
00167 
00168     friend bool operator==(const Chunk &lhs, const Chunk &rhs);
00169     friend bool operator!=(const Chunk &lhs, const Chunk &rhs);
00170 
00171     // more than one buffer can share an underlying block, so use SharedPtr
00172     boost::shared_array<data_type> underlyingBlock_;
00173 
00174     data_type *readPos_;  
00175     data_type *writePos_; 
00176     data_type *endPos_;   
00177 };
00178 
00182 inline bool operator==(const Chunk &lhs, const Chunk &rhs) {
00183     return lhs.underlyingBlock_ == rhs.underlyingBlock_;
00184 }
00185 
00189 inline bool operator!=(const Chunk &lhs, const Chunk &rhs) {
00190     return lhs.underlyingBlock_ != rhs.underlyingBlock_;
00191 }
00192 
00193 
00203 class BufferImpl : boost::noncopyable
00204 {
00205 
00208     void allocChunkChecked(size_type size = kDefaultBlockSize) 
00209     {
00210         writeChunks_.push_back(Chunk(size));
00211         freeSpace_ += writeChunks_.back().freeSize();
00212     }
00213 
00217     void allocChunk(size_type size) 
00218     {
00219         if(size < kMinBlockSize) {
00220             size = kMinBlockSize;
00221         }
00222         else if (size > kMaxBlockSize) {
00223             size = kMaxBlockSize;
00224         }
00225         allocChunkChecked(size);
00226     }
00227 
00230     void postWrite(size_type size) 
00231     {
00232 
00233         // precondition to this function is that the writeChunk_.front()
00234         // contains the data that was just written, so make sure writeChunks_
00235         // is not empty:
00236         
00237         assert(size <= freeSpace_ && !writeChunks_.empty());
00238 
00239         // This is probably the one tricky part of BufferImpl.  The data that
00240         // was written now exists in writeChunks_.front().  Now we must make
00241         // sure that same data exists in readChunks_.back().
00242         //
00243         // There are two cases: 
00244         //
00245         // 1. readChunks_.last() and writeChunk_.front() refer to the same
00246         // underlying block, in which case they both just need their cursor
00247         // updated to reflect the new state.
00248         //
00249         // 2. readChunk_.last() is not the same block as writeChunks_.front(),
00250         // in which case it should be, since the writeChunk.front() contains
00251         // the next bit of data that will be appended to readChunks_, and
00252         // therefore needs to be copied there so we can proceed with updating
00253         // their state.
00254         //
00255 
00256         // if readChunks_ is not the same as writeChunks_.front(), make a copy
00257         // of it there
00258         
00259         if(readChunks_.empty() || (readChunks_.back() != writeChunks_.front())) {
00260             const Chunk &curChunk = writeChunks_.front();
00261             readChunks_.push_back(curChunk);
00262 
00263             // Any data that existed in the write chunk previously doesn't
00264             // belong to this buffer (otherwise it would have already been
00265             // added to the readChunk_ list).  Here, adjust the start of the
00266             // readChunk to begin after any data already existing in curChunk
00267             
00268             readChunks_.back().truncateFront( curChunk.dataSize());
00269         }
00270 
00271         assert(readChunks_.back().freeSize() == writeChunks_.front().freeSize());
00272 
00273         // update the states of both readChunks_ and writeChunks_ to indicate that they are
00274         // holding the new data
00275         
00276         readChunks_.back().incrementCursor(size);
00277         writeChunks_.front().incrementCursor(size);
00278         size_ += size;
00279         freeSpace_ -= size;
00280 
00281         // if there is no more free space in writeChunks_, the next write cannot use
00282         // it, so dispose of it now
00283         
00284         if(writeChunks_.front().freeSize() == 0) {
00285             writeChunks_.pop_front();
00286         }
00287     }
00288 
00289   public:
00290 
00291     typedef std::deque<Chunk> ChunkList;
00292     typedef boost::shared_ptr<BufferImpl> SharedPtr;
00293     typedef boost::shared_ptr<const BufferImpl> ConstSharedPtr;
00294 
00296     BufferImpl() :
00297         freeSpace_(0),
00298         size_(0)
00299     { }
00300 
00302     explicit BufferImpl(const BufferImpl &src) :
00303         readChunks_(src.readChunks_), 
00304         freeSpace_(0),
00305         size_(src.size_)
00306     { }
00307 
00309     size_type size() const {
00310         return size_;
00311     }
00312 
00314     size_type freeSpace() const {
00315         return freeSpace_;
00316     }
00317 
00320     void reserveFreeSpace(size_type reserveSize) {
00321         while(freeSpace_ < reserveSize) {
00322             allocChunk(reserveSize - freeSpace_);
00323         }
00324     }
00325 
00327     ChunkList::const_iterator beginRead() const {
00328         return readChunks_.begin();
00329     }
00330     
00332     ChunkList::const_iterator endRead() const {
00333         return readChunks_.end();
00334     }
00335 
00337     ChunkList::const_iterator beginWrite() const {
00338         return writeChunks_.begin();
00339     }
00340     
00342     ChunkList::const_iterator endWrite() const {
00343         return writeChunks_.end();
00344     }
00345 
00347     template<typename T>
00348     void writeTo(T val, const boost::true_type&)
00349     {
00350         if(freeSpace_ && (sizeof(T) <= writeChunks_.front().freeSize())) {
00351             // fast path, there's enough room in the writeable chunk to just
00352             // straight out copy it
00353             *(reinterpret_cast <T*> ( writeChunks_.front().tellWritePos()) ) = val;
00354             postWrite(sizeof(T));
00355         }
00356         else {
00357             // need to fixup chunks first, so use the regular memcpy 
00358             // writeTo method
00359             writeTo(reinterpret_cast<data_type*>(&val), sizeof(T));
00360         }
00361     }
00362 
00365     template<typename T>
00366     void writeTo(T val, const boost::false_type&) 
00367     {
00368         BOOST_STATIC_ASSERT(sizeof(T)==0);
00369     }
00370 
00372     size_type writeTo(const data_type *data, size_type size) 
00373     {
00374         size_type bytesLeft = size; 
00375         while(bytesLeft) {
00376 
00377             if(freeSpace_ == 0) {
00378                 allocChunkChecked();
00379             }
00380 
00381             Chunk &chunk = writeChunks_.front();
00382             size_type toCopy = std::min<size_type>(chunk.freeSize(), bytesLeft);
00383             assert(toCopy);
00384             memcpy(chunk.tellWritePos(), data, toCopy);
00385             postWrite(toCopy);
00386             data      += toCopy; 
00387             bytesLeft -= toCopy; 
00388         }
00389         return size;
00390     }
00391 
00393     size_type wroteTo(size_type size) 
00394     {
00395         assert(size <= freeSpace_);
00396         size_type bytesLeft = size;
00397         while (bytesLeft) {
00398 
00399             Chunk &chunk = writeChunks_.front();
00400             size_type wrote = std::min<size_type>(chunk.freeSize(), bytesLeft);
00401             assert(wrote);
00402             postWrite(wrote);
00403             bytesLeft -= wrote;
00404         }
00405         return size;
00406     }
00407 
00409     void append(const BufferImpl &src) {
00410         std::copy(src.readChunks_.begin(), src.readChunks_.end(), std::back_inserter(readChunks_));
00411         size_ += src.size_;
00412     }
00413 
00415     void discardData() {
00416         readChunks_.clear();
00417         size_ = 0;
00418     }
00419 
00421     void discardData(size_type bytes)
00422     {
00423         assert(bytes && bytes <= size_);
00424 
00425         size_type bytesToDiscard = bytes;
00426         while( bytesToDiscard ) {
00427           
00428             size_t currentSize = readChunks_.front().dataSize();
00429 
00430             // see if entire chunk is discarded
00431             if(currentSize <= bytesToDiscard) {
00432                 readChunks_.pop_front();
00433                 bytesToDiscard -= currentSize;
00434             }
00435             else {
00436                 readChunks_.front().truncateFront(bytesToDiscard);
00437                 bytesToDiscard = 0;
00438             }
00439         }
00440 
00441         size_ -= bytes;
00442     }
00443 
00446     void extractData(BufferImpl &dest, size_type bytes)
00447     {
00448         assert(bytes && bytes <= size_);
00449 
00450         size_type bytesToExtract = bytes;
00451         while( bytesToExtract ) {
00452           
00453             size_t currentSize = readChunks_.front().dataSize();
00454             dest.readChunks_.push_back(readChunks_.front());
00455 
00456             // see if entire chunk was extracted 
00457             if(currentSize <= bytesToExtract) {
00458                 readChunks_.pop_front();
00459                 bytesToExtract -= currentSize;
00460             }
00461             else {
00462                 readChunks_.front().truncateFront(bytesToExtract);
00463                 size_t excess = currentSize - bytesToExtract;
00464                 dest.readChunks_.back().truncateBack(excess);
00465                 bytesToExtract = 0;
00466             }
00467         }
00468 
00469         size_ -= bytes;
00470         dest.size_ += bytes;
00471     }
00472 
00474     void extractData(BufferImpl &dest) 
00475     {
00476         assert(dest.readChunks_.empty());
00477         dest.readChunks_.swap(readChunks_);
00478         dest.size_ = size_;
00479         size_ = 0;
00480     }
00481 
00484     void copyData(BufferImpl &dest, 
00485                   ChunkList::const_iterator iter,
00486                   size_type offset, 
00487                   size_type bytes) const
00488     {
00489         // now we are positioned to start the copying, copy as many
00490         // chunks as we need, the first chunk may have a non-zero offset 
00491         // if the data to copy is not at the start of the chunk 
00492         size_type copied = 0;
00493         while(copied < bytes) {
00494 
00495             dest.readChunks_.push_back(*iter);
00496 
00497             // offset only applies in the first chunk, 
00498             // all subsequent chunks are copied from the start
00499             dest.readChunks_.back().truncateFront(offset);
00500             offset = 0;
00501 
00502             copied += dest.readChunks_.back().dataSize(); 
00503             ++iter;
00504         }
00505 
00506         // if the last chunk copied has more bytes than we need, truncate it
00507         size_type excess = copied - bytes;
00508         dest.readChunks_.back().truncateBack(excess);
00509 
00510         dest.size_ += bytes;
00511     }
00512 
00514     int numDataChunks() const {
00515         return readChunks_.size();
00516     }
00517 
00520     int numFreeChunks() const {
00521         return writeChunks_.size();
00522     }
00523 
00527     void appendForeignData(const data_type *data, size_type size, const free_func &func) {
00528         readChunks_.push_back(Chunk(data, size, func));
00529         size_ += size;
00530     }
00531 
00532   private:
00533 
00535     BufferImpl& operator=(const BufferImpl &src);
00536     /* {
00537         readChunks_.assign(src.readChunks_.begin(), src.readChunks_.end());
00538         size_ = src.size();
00539         return *this;
00540     } */
00541 
00542     ChunkList readChunks_;     
00543     ChunkList writeChunks_;    
00544 
00545     size_type freeSpace_;  
00546     size_type size_;       
00547 
00548 };
00549 
00550 } // detail namespace
00551 
00552 } // namespace
00553 
00554 #endif
Generated on Thu Sep 2 18:40:54 2010 for Avro C++ by  doxygen 1.6.3