00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025
00026 #include "boost/utility.hpp"
00027
00028 #include "Config.hh"
00029 #include "Exception.hh"
00030
00031 namespace avro {
00032
00036 class AVRO_DECL InputStream : boost::noncopyable {
00037 protected:
00038
00042 InputStream() { }
00043
00044 public:
00048 virtual ~InputStream() { }
00049
00056 virtual bool next(const uint8_t** data, size_t* len) = 0;
00057
00063 virtual void backup(size_t len) = 0;
00064
00068 virtual void skip(size_t len) = 0;
00069
00075 virtual size_t byteCount() const = 0;
00076 };
00077
00081 class AVRO_DECL OutputStream : boost::noncopyable {
00082 protected:
00083
00087 OutputStream() { }
00088 public:
00089
00093 virtual ~OutputStream() { }
00094
00100 virtual bool next(uint8_t** data, size_t* len) = 0;
00101
00106 virtual void backup(size_t len) = 0;
00107
00113 virtual uint64_t byteCount() const = 0;
00114
00119 virtual void flush() = 0;
00120 };
00121
00125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00126
00132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00133
00141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00142
00150 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00151 size_t bufferSize = 8 * 1024);
00152
00157 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename,
00158 size_t bufferSize = 8 * 1024);
00159
00160
00161 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in,
00162 size_t bufferSize = 8 * 1024);
00163
00165 struct StreamReader {
00169 InputStream* in_;
00170
00174 const uint8_t* next_;
00175
00179 const uint8_t* end_;
00180
00184 StreamReader() : in_(0), next_(0), end_(0) { }
00185
00189 StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00190
00195 void reset(InputStream& is) {
00196 if (in_ != 0 && end_ != next_) {
00197 in_->backup(end_ - next_);
00198 }
00199 in_ = &is;
00200 next_ = end_ = 0;
00201 }
00202
00207 uint8_t read() {
00208 if (next_ == end_) {
00209 more();
00210 }
00211 return *next_++;
00212 }
00213
00218 void readBytes(uint8_t* b, size_t n) {
00219 while (n > 0) {
00220 if (next_ == end_) {
00221 more();
00222 }
00223 size_t q = end_ - next_;
00224 if (q > n) {
00225 q = n;
00226 }
00227 ::memcpy(b, next_, q);
00228 next_ += q;
00229 b += q;
00230 n -= q;
00231 }
00232 }
00233
00238 void skipBytes(size_t n) {
00239 if (n > static_cast<size_t>(end_ - next_)) {
00240 n -= end_ - next_;
00241 next_ = end_;
00242 in_->skip(n);
00243 } else {
00244 next_ += n;
00245 }
00246 }
00247
00254 bool fill() {
00255 size_t n = 0;
00256 while (in_->next(&next_, &n)) {
00257 if (n != 0) {
00258 end_ = next_ + n;
00259 return true;
00260 }
00261 }
00262 return false;
00263 }
00264
00268 void more() {
00269 if (! fill()) {
00270 throw Exception("EOF reached");
00271 }
00272 }
00273
00277 bool hasMore() {
00278 return (next_ == end_) ? fill() : true;
00279 }
00280 };
00281
00285 struct StreamWriter {
00289 OutputStream* out_;
00290
00294 uint8_t* next_;
00295
00299 uint8_t* end_;
00300
00304 StreamWriter() : out_(0), next_(0), end_(0) { }
00305
00309 StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00310
00315 void reset(OutputStream& os) {
00316 if (out_ != 0 && end_ != next_) {
00317 out_->backup(end_ - next_);
00318 }
00319 out_ = &os;
00320 next_ = end_;
00321 }
00322
00326 void write(uint8_t c) {
00327 if (next_ == end_) {
00328 more();
00329 }
00330 *next_++ = c;
00331 }
00332
00336 void writeBytes(const uint8_t* b, size_t n) {
00337 while (n > 0) {
00338 if (next_ == end_) {
00339 more();
00340 }
00341 size_t q = end_ - next_;
00342 if (q > n) {
00343 q = n;
00344 }
00345 ::memcpy(next_, b, q);
00346 next_ += q;
00347 b += q;
00348 n -= q;
00349 }
00350 }
00351
00356 void flush() {
00357 if (next_ != end_) {
00358 out_->backup(end_ - next_);
00359 next_ = end_;
00360 }
00361 out_->flush();
00362 }
00363
00367 void more() {
00368 size_t n = 0;
00369 while (out_->next(&next_, &n)) {
00370 if (n != 0) {
00371 end_ = next_ + n;
00372 return;
00373 }
00374 }
00375 throw Exception("EOF reached");
00376 }
00377
00378 };
00379
00384 inline void copy(InputStream& in, OutputStream& out)
00385 {
00386 const uint8_t *p = 0;
00387 size_t n = 0;
00388 StreamWriter w(out);
00389 while (in.next(&p, &n)) {
00390 w.writeBytes(p, n);
00391 }
00392 w.flush();
00393 }
00394
00395 }
00396 #endif
00397
00398