00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 #include "boost/utility.hpp"
00026 #include "Exception.hh"
00027
00028 namespace avro {
00029
00033 class InputStream : boost::noncopyable {
00034 protected:
00035
00039 InputStream() { }
00040
00041 public:
00045 virtual ~InputStream() { }
00046
00053 virtual bool next(const uint8_t** data, size_t* len) = 0;
00054
00060 virtual void backup(size_t len) = 0;
00061
00065 virtual void skip(size_t len) = 0;
00066
00072 virtual size_t byteCount() const = 0;
00073 };
00074
00078 class OutputStream : boost::noncopyable {
00079 protected:
00080
00084 OutputStream() { }
00085 public:
00086
00090 virtual ~OutputStream() { }
00091
00097 virtual bool next(uint8_t** data, size_t* len) = 0;
00098
00103 virtual void backup(size_t len) = 0;
00104
00110 virtual uint64_t byteCount() const = 0;
00111
00116 virtual void flush() = 0;
00117 };
00118
00122 std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00123
00129 std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00130
00138 std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00139
00147 std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00148 size_t bufferSize = 8 * 1024);
00149
00154 std::auto_ptr<InputStream> fileInputStream(const char* filename,
00155 size_t bufferSize = 8 * 1024);
00156
00158 struct StreamReader {
00162 InputStream* in_;
00163
00167 const uint8_t* next_;
00168
00172 const uint8_t* end_;
00173
00177 StreamReader() : in_(0), next_(0), end_(0) { }
00178
00182 StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00183
00188 void reset(InputStream& is) {
00189 if (in_ != 0 && end_ != next_) {
00190 in_->backup(end_ - next_);
00191 }
00192 in_ = &is;
00193 next_ = end_ = 0;
00194 }
00195
00200 uint8_t read() {
00201 if (next_ == end_) {
00202 more();
00203 }
00204 return *next_++;
00205 }
00206
00211 void readBytes(uint8_t* b, size_t n) {
00212 while (n > 0) {
00213 if (next_ == end_) {
00214 more();
00215 }
00216 size_t q = end_ - next_;
00217 if (q > n) {
00218 q = n;
00219 }
00220 ::memcpy(b, next_, q);
00221 next_ += q;
00222 b += q;
00223 n -= q;
00224 }
00225 }
00226
00231 void skipBytes(size_t n) {
00232 if (n > static_cast<size_t>(end_ - next_)) {
00233 n -= end_ - next_;
00234 next_ = end_;
00235 in_->skip(n);
00236 } else {
00237 next_ += n;
00238 }
00239 }
00240
00247 bool fill() {
00248 size_t n = 0;
00249 while (in_->next(&next_, &n)) {
00250 if (n != 0) {
00251 end_ = next_ + n;
00252 return true;
00253 }
00254 }
00255 return false;
00256 }
00257
00261 void more() {
00262 if (! fill()) {
00263 throw Exception("EOF reached");
00264 }
00265 }
00266
00270 bool hasMore() {
00271 return (next_ == end_) ? fill() : true;
00272 }
00273 };
00274
00278 struct StreamWriter {
00282 OutputStream* out_;
00283
00287 uint8_t* next_;
00288
00292 uint8_t* end_;
00293
00297 StreamWriter() : out_(0), next_(0), end_(0) { }
00298
00302 StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00303
00308 void reset(OutputStream& os) {
00309 if (out_ != 0 && end_ != next_) {
00310 out_->backup(end_ - next_);
00311 }
00312 out_ = &os;
00313 next_ = end_;
00314 }
00315
00319 void write(uint8_t c) {
00320 if (next_ == end_) {
00321 more();
00322 }
00323 *next_++ = c;
00324 }
00325
00329 void writeBytes(const uint8_t* b, size_t n) {
00330 while (n > 0) {
00331 if (next_ == end_) {
00332 more();
00333 }
00334 size_t q = end_ - next_;
00335 if (q > n) {
00336 q = n;
00337 }
00338 ::memcpy(next_, b, q);
00339 next_ += q;
00340 b += q;
00341 n -= q;
00342 }
00343 }
00344
00349 void flush() {
00350 if (next_ != end_) {
00351 out_->backup(end_ - next_);
00352 next_ = end_;
00353 }
00354 out_->flush();
00355 }
00356
00360 void more() {
00361 size_t n = 0;
00362 while (out_->next(&next_, &n)) {
00363 if (n != 0) {
00364 end_ = next_ + n;
00365 return;
00366 }
00367 }
00368 throw Exception("EOF reached");
00369 }
00370
00371 };
00372
00377 inline void copy(InputStream& in, OutputStream& out)
00378 {
00379 const uint8_t *p = 0;
00380 size_t n = 0;
00381 StreamWriter w(out);
00382 while (in.next(&p, &n)) {
00383 w.writeBytes(p, n);
00384 }
00385 w.flush();
00386 }
00387
00388 }
00389 #endif
00390
00391