00001
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 #include "boost/utility.hpp"
00026 #include "Exception.hh"
00027
00028 namespace avro {
00029 class InputStream : boost::noncopyable {
00030 public:
00031 InputStream() { }
00032 virtual ~InputStream() { }
00033
00040 virtual bool next(const uint8_t** data, size_t* len) = 0;
00041
00047 virtual void backup(size_t len) = 0;
00048
00052 virtual void skip(size_t len) = 0;
00053
00059 virtual size_t byteCount() const = 0;
00060 };
00061
00062 class OutputStream : boost::noncopyable {
00063 public:
00064 OutputStream() { }
00065 virtual ~OutputStream() { }
00066
00072 virtual bool next(uint8_t** data, size_t* len) = 0;
00073
00078 virtual void backup(size_t len) = 0;
00079
00085 virtual uint64_t byteCount() const = 0;
00086
00091 virtual void flush() = 0;
00092 };
00093
00097 std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00098
00104 std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00105
00113 std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00114
00122 std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00123 size_t bufferSize = 8 * 1024);
00124
00129 std::auto_ptr<InputStream> fileInputStream(const char* filename,
00130 size_t bufferSize = 8 * 1024);
00131
00133 struct StreamReader {
00134 InputStream* in_;
00135 const uint8_t* next_;
00136 const uint8_t* end_;
00137
00138 StreamReader() : in_(0), next_(0), end_(0) { }
00139
00140 void reset(InputStream& is) {
00141 if (in_ != 0) {
00142 in_->backup(end_ - next_);
00143 }
00144 in_ = &is;
00145 next_ = end_ = 0;
00146 }
00147
00148 uint8_t read() {
00149 if (next_ == end_) {
00150 more();
00151 }
00152 return *next_++;
00153 }
00154
00155 void readBytes(uint8_t* b, size_t n) {
00156 while (n > 0) {
00157 if (next_ == end_) {
00158 more();
00159 }
00160 size_t q = end_ - next_;
00161 if (q > n) {
00162 q = n;
00163 }
00164 ::memcpy(b, next_, q);
00165 next_ += q;
00166 b += q;
00167 n -= q;
00168 }
00169 }
00170
00171 void skipBytes(size_t n) {
00172 if (n > (end_ - next_)) {
00173 n -= end_ - next_;
00174 next_ = end_;
00175 in_->skip(n);
00176 } else {
00177 next_ += n;
00178 }
00179 }
00180
00181 bool fill() {
00182 size_t n = 0;
00183 while (in_->next(&next_, &n)) {
00184 if (n != 0) {
00185 end_ = next_ + n;
00186 return true;
00187 }
00188 }
00189 return false;
00190 }
00191
00192 void more() {
00193 if (! fill()) {
00194 throw Exception("EOF reached");
00195 }
00196 }
00197
00198 bool hasMore() {
00199 return (next_ == end_) ? fill() : true;
00200 }
00201 };
00202
00206 struct StreamWriter {
00207 OutputStream* out_;
00208 uint8_t* next_;
00209 uint8_t* end_;
00210
00211 StreamWriter() : out_(0), next_(0), end_(0) { }
00212
00213 void reset(OutputStream& os) {
00214 if (out_ != 0) {
00215 out_->backup(end_ - next_);
00216 }
00217 out_ = &os;
00218 next_ = end_;
00219 }
00220
00221 void write(uint8_t c) {
00222 if (next_ == end_) {
00223 more();
00224 }
00225 *next_++ = c;
00226 }
00227
00228 void writeBytes(const uint8_t* b, size_t n) {
00229 while (n > 0) {
00230 if (next_ == end_) {
00231 more();
00232 }
00233 size_t q = end_ - next_;
00234 if (q > n) {
00235 q = n;
00236 }
00237 ::memcpy(next_, b, q);
00238 next_ += q;
00239 b += q;
00240 n -= q;
00241 }
00242 }
00243
00244 void more() {
00245 size_t n = 0;
00246 while (out_->next(&next_, &n)) {
00247 if (n != 0) {
00248 end_ = next_ + n;
00249 return;
00250 }
00251 }
00252 throw Exception("EOF reached");
00253 }
00254
00255 void flush() {
00256 if (next_ != end_) {
00257 out_->backup(end_ - next_);
00258 next_ = end_;
00259 }
00260 out_->flush();
00261 }
00262 };
00263 }
00264 #endif
00265
00266