Stream.hh

00001 
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021 
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 #include "boost/utility.hpp"
00026 #include "Exception.hh"
00027 
00028 namespace avro {
00029 class InputStream : boost::noncopyable {
00030 public:
00031     InputStream() { }
00032     virtual ~InputStream() { }
00033 
00040     virtual bool next(const uint8_t** data, size_t* len) = 0;
00041 
00047     virtual void backup(size_t len) = 0;
00048 
00052     virtual void skip(size_t len) = 0;
00053 
00059     virtual size_t byteCount() const = 0;
00060 };
00061 
00062 class OutputStream : boost::noncopyable {
00063 public:
00064     OutputStream() { }
00065     virtual ~OutputStream() { }
00066 
00072     virtual bool next(uint8_t** data, size_t* len) = 0;
00073 
00078     virtual void backup(size_t len) = 0;
00079 
00085     virtual uint64_t byteCount() const = 0;
00086 
00091     virtual void flush() = 0;
00092 };
00093 
00097 std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00098 
00104 std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00105 
00113 std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00114 
00122 std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00123     size_t bufferSize = 8 * 1024);
00124 
00129 std::auto_ptr<InputStream> fileInputStream(const char* filename,
00130     size_t bufferSize = 8 * 1024);
00131 
00133 struct StreamReader {
00134     InputStream* in_;
00135     const uint8_t* next_;
00136     const uint8_t* end_;
00137 
00138     StreamReader() : in_(0), next_(0), end_(0) { }
00139     StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00140 
00141     void reset(InputStream& is) {
00142         if (in_ != 0) {
00143             in_->backup(end_ - next_);
00144         }
00145         in_ = &is;
00146         next_ = end_ = 0;
00147     }
00148 
00149     uint8_t read() {
00150         if (next_ == end_) {
00151             more();
00152         }
00153         return *next_++;
00154     }
00155 
00156     void readBytes(uint8_t* b, size_t n) {
00157         while (n > 0) {
00158             if (next_ == end_) {
00159                 more();
00160             }
00161             size_t q = end_ - next_;
00162             if (q > n) {
00163                 q = n;
00164             }
00165             ::memcpy(b, next_, q);
00166             next_ += q;
00167             b += q;
00168             n -= q;
00169         }
00170     }
00171 
00172     void skipBytes(size_t n) {
00173         if (n > (end_ - next_)) {
00174             n -= end_ - next_;
00175             next_ = end_;
00176             in_->skip(n);
00177         } else {
00178             next_ += n;
00179         }
00180     }
00181 
00182     bool fill() {
00183         size_t n = 0;
00184         while (in_->next(&next_, &n)) {
00185             if (n != 0) {
00186                 end_ = next_ + n;
00187                 return true;
00188             }
00189         }
00190         return false;
00191     }
00192 
00193     void more() {
00194         if (! fill()) {
00195             throw Exception("EOF reached");
00196         }
00197     }
00198 
00199     bool hasMore() {
00200         return (next_ == end_) ? fill() : true;
00201     }
00202 };
00203 
00207 struct StreamWriter {
00208     OutputStream* out_;
00209     uint8_t* next_;
00210     uint8_t* end_;
00211 
00212     StreamWriter() : out_(0), next_(0), end_(0) { }
00213     StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00214 
00215     void reset(OutputStream& os) {
00216         if (out_ != 0) {
00217             out_->backup(end_ - next_);
00218         }
00219         out_ = &os;
00220         next_ = end_;
00221     }
00222 
00223     void write(uint8_t c) {
00224         if (next_ == end_) {
00225             more();
00226         }
00227         *next_++ = c;
00228     }
00229 
00230     void writeBytes(const uint8_t* b, size_t n) {
00231         while (n > 0) {
00232             if (next_ == end_) {
00233                 more();
00234             }
00235             size_t q = end_ - next_;
00236             if (q > n) {
00237                 q = n;
00238             }
00239             ::memcpy(next_, b, q);
00240             next_ += q;
00241             b += q;
00242             n -= q;
00243         }
00244     }
00245 
00246     void more() {
00247         size_t n = 0;
00248         while (out_->next(&next_, &n)) {
00249             if (n != 0) {
00250                 end_ = next_ + n;
00251                 return;
00252             }
00253         }
00254         throw Exception("EOF reached");
00255     }
00256 
00257     void flush() {
00258         if (next_ != end_) {
00259             out_->backup(end_ - next_);
00260             next_ = end_;
00261         }
00262         out_->flush();
00263     }
00264 };
00265 
00270 inline void copy(InputStream& in, OutputStream& out)
00271 {
00272     const uint8_t *p = 0;
00273     size_t n = 0;
00274     StreamWriter w(out);
00275     while (in.next(&p, &n)) {
00276         w.writeBytes(p, n);
00277     }
00278     w.flush();
00279 }
00280 
00281 }   // namespace avro
00282 #endif
00283 
00284