Stream.hh

00001 
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021 
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 #include "boost/utility.hpp"
00026 #include "Exception.hh"
00027 
00028 namespace avro {
00029 class InputStream : boost::noncopyable {
00030 public:
00031     InputStream() { }
00032     virtual ~InputStream() { }
00033 
00040     virtual bool next(const uint8_t** data, size_t* len) = 0;
00041 
00047     virtual void backup(size_t len) = 0;
00048 
00052     virtual void skip(size_t len) = 0;
00053 
00059     virtual size_t byteCount() const = 0;
00060 };
00061 
00062 class OutputStream : boost::noncopyable {
00063 public:
00064     OutputStream() { }
00065     virtual ~OutputStream() { }
00066 
00072     virtual bool next(uint8_t** data, size_t* len) = 0;
00073 
00078     virtual void backup(size_t len) = 0;
00079 
00085     virtual uint64_t byteCount() const = 0;
00086 
00091     virtual void flush() = 0;
00092 };
00093 
00097 std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00098 
00104 std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00105 
00113 std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00114 
00122 std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00123     size_t bufferSize = 8 * 1024);
00124 
00129 std::auto_ptr<InputStream> fileInputStream(const char* filename,
00130     size_t bufferSize = 8 * 1024);
00131 
00133 struct StreamReader {
00134     InputStream* in_;
00135     const uint8_t* next_;
00136     const uint8_t* end_;
00137 
00138     StreamReader() : in_(0), next_(0), end_(0) { }
00139 
00140     void reset(InputStream& is) {
00141         if (in_ != 0) {
00142             in_->backup(end_ - next_);
00143         }
00144         in_ = &is;
00145         next_ = end_ = 0;
00146     }
00147 
00148     uint8_t read() {
00149         if (next_ == end_) {
00150             more();
00151         }
00152         return *next_++;
00153     }
00154 
00155     void readBytes(uint8_t* b, size_t n) {
00156         while (n > 0) {
00157             if (next_ == end_) {
00158                 more();
00159             }
00160             size_t q = end_ - next_;
00161             if (q > n) {
00162                 q = n;
00163             }
00164             ::memcpy(b, next_, q);
00165             next_ += q;
00166             b += q;
00167             n -= q;
00168         }
00169     }
00170 
00171     void skipBytes(size_t n) {
00172         if (n > (end_ - next_)) {
00173             n -= end_ - next_;
00174             next_ = end_;
00175             in_->skip(n);
00176         } else {
00177             next_ += n;
00178         }
00179     }
00180 
00181     bool fill() {
00182         size_t n = 0;
00183         while (in_->next(&next_, &n)) {
00184             if (n != 0) {
00185                 end_ = next_ + n;
00186                 return true;
00187             }
00188         }
00189         return false;
00190     }
00191 
00192     void more() {
00193         if (! fill()) {
00194             throw Exception("EOF reached");
00195         }
00196     }
00197 
00198     bool hasMore() {
00199         return (next_ == end_) ? fill() : true;
00200     }
00201 };
00202 
00206 struct StreamWriter {
00207     OutputStream* out_;
00208     uint8_t* next_;
00209     uint8_t* end_;
00210 
00211     StreamWriter() : out_(0), next_(0), end_(0) { }
00212 
00213     void reset(OutputStream& os) {
00214         if (out_ != 0) {
00215             out_->backup(end_ - next_);
00216         }
00217         out_ = &os;
00218         next_ = end_;
00219     }
00220 
00221     void write(uint8_t c) {
00222         if (next_ == end_) {
00223             more();
00224         }
00225         *next_++ = c;
00226     }
00227 
00228     void writeBytes(const uint8_t* b, size_t n) {
00229         while (n > 0) {
00230             if (next_ == end_) {
00231                 more();
00232             }
00233             size_t q = end_ - next_;
00234             if (q > n) {
00235                 q = n;
00236             }
00237             ::memcpy(next_, b, q);
00238             next_ += q;
00239             b += q;
00240             n -= q;
00241         }
00242     }
00243 
00244     void more() {
00245         size_t n = 0;
00246         while (out_->next(&next_, &n)) {
00247             if (n != 0) {
00248                 end_ = next_ + n;
00249                 return;
00250             }
00251         }
00252         throw Exception("EOF reached");
00253     }
00254 
00255     void flush() {
00256         if (next_ != end_) {
00257             out_->backup(end_ - next_);
00258             next_ = end_;
00259         }
00260         out_->flush();
00261     }
00262 };
00263 }   // namespace avro
00264 #endif
00265 
00266