Avro C++
|
00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one 00003 * or more contributor license agreements. See the NOTICE file 00004 * distributed with this work for additional information 00005 * regarding copyright ownership. The ASF licenses this file 00006 * to you under the Apache License, Version 2.0 (the 00007 * "License"); you may not use this file except in compliance 00008 * with the License. You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 00019 #ifndef avro_Stream_hh__ 00020 #define avro_Stream_hh__ 00021 00022 #include <memory> 00023 #include <string.h> 00024 #include <stdint.h> 00025 00026 #include "boost/utility.hpp" 00027 00028 #include "Config.hh" 00029 #include "Exception.hh" 00030 00031 namespace avro { 00032 00036 class AVRO_DECL InputStream : boost::noncopyable { 00037 protected: 00038 00042 InputStream() { } 00043 00044 public: 00048 virtual ~InputStream() { } 00049 00056 virtual bool next(const uint8_t** data, size_t* len) = 0; 00057 00063 virtual void backup(size_t len) = 0; 00064 00068 virtual void skip(size_t len) = 0; 00069 00075 virtual size_t byteCount() const = 0; 00076 }; 00077 00081 class AVRO_DECL OutputStream : boost::noncopyable { 00082 protected: 00083 00087 OutputStream() { } 00088 public: 00089 00093 virtual ~OutputStream() { } 00094 00100 virtual bool next(uint8_t** data, size_t* len) = 0; 00101 00106 virtual void backup(size_t len) = 0; 00107 00113 virtual uint64_t byteCount() const = 0; 00114 00119 virtual void flush() = 0; 00120 }; 00121 00125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024); 00126 00132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len); 00133 00141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source); 00142 00148 AVRO_DECL boost::shared_ptr<std::vector<uint8_t> > snapshot(const OutputStream& source); 00149 00157 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename, 00158 size_t bufferSize = 8 * 1024); 00159 00164 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename, 00165 size_t bufferSize = 8 * 1024); 00166 00172 AVRO_DECL std::auto_ptr<OutputStream> ostreamOutputStream(std::ostream& os, 00173 size_t bufferSize = 8 * 1024); 00174 00180 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in, 00181 size_t bufferSize = 8 * 1024); 00182 00184 struct StreamReader { 00188 InputStream* in_; 00189 00193 const uint8_t* next_; 00194 00198 const uint8_t* end_; 00199 00203 StreamReader() : in_(0), next_(0), end_(0) { } 00204 00208 StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); } 00209 00214 void reset(InputStream& is) { 00215 if (in_ != 0 && end_ != next_) { 00216 in_->backup(end_ - next_); 00217 } 00218 in_ = &is; 00219 next_ = end_ = 0; 00220 } 00221 00226 uint8_t read() { 00227 if (next_ == end_) { 00228 more(); 00229 } 00230 return *next_++; 00231 } 00232 00237 void readBytes(uint8_t* b, size_t n) { 00238 while (n > 0) { 00239 if (next_ == end_) { 00240 more(); 00241 } 00242 size_t q = end_ - next_; 00243 if (q > n) { 00244 q = n; 00245 } 00246 ::memcpy(b, next_, q); 00247 next_ += q; 00248 b += q; 00249 n -= q; 00250 } 00251 } 00252 00257 void skipBytes(size_t n) { 00258 if (n > static_cast<size_t>(end_ - next_)) { 00259 n -= end_ - next_; 00260 next_ = end_; 00261 in_->skip(n); 00262 } else { 00263 next_ += n; 00264 } 00265 } 00266 00273 bool fill() { 00274 size_t n = 0; 00275 while (in_->next(&next_, &n)) { 00276 if (n != 0) { 00277 end_ = next_ + n; 00278 return true; 00279 } 00280 } 00281 return false; 00282 } 00283 00287 void more() { 00288 if (! fill()) { 00289 throw Exception("EOF reached"); 00290 } 00291 } 00292 00296 bool hasMore() { 00297 return (next_ == end_) ? fill() : true; 00298 } 00299 }; 00300 00304 struct StreamWriter { 00308 OutputStream* out_; 00309 00313 uint8_t* next_; 00314 00318 uint8_t* end_; 00319 00323 StreamWriter() : out_(0), next_(0), end_(0) { } 00324 00328 StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); } 00329 00334 void reset(OutputStream& os) { 00335 if (out_ != 0 && end_ != next_) { 00336 out_->backup(end_ - next_); 00337 } 00338 out_ = &os; 00339 next_ = end_; 00340 } 00341 00345 void write(uint8_t c) { 00346 if (next_ == end_) { 00347 more(); 00348 } 00349 *next_++ = c; 00350 } 00351 00355 void writeBytes(const uint8_t* b, size_t n) { 00356 while (n > 0) { 00357 if (next_ == end_) { 00358 more(); 00359 } 00360 size_t q = end_ - next_; 00361 if (q > n) { 00362 q = n; 00363 } 00364 ::memcpy(next_, b, q); 00365 next_ += q; 00366 b += q; 00367 n -= q; 00368 } 00369 } 00370 00375 void flush() { 00376 if (next_ != end_) { 00377 out_->backup(end_ - next_); 00378 next_ = end_; 00379 } 00380 out_->flush(); 00381 } 00382 00386 void more() { 00387 size_t n = 0; 00388 while (out_->next(&next_, &n)) { 00389 if (n != 0) { 00390 end_ = next_ + n; 00391 return; 00392 } 00393 } 00394 throw Exception("EOF reached"); 00395 } 00396 00397 }; 00398 00403 inline void copy(InputStream& in, OutputStream& out) 00404 { 00405 const uint8_t *p = 0; 00406 size_t n = 0; 00407 StreamWriter w(out); 00408 while (in.next(&p, &n)) { 00409 w.writeBytes(p, n); 00410 } 00411 w.flush(); 00412 } 00413 00414 } // namespace avro 00415 #endif 00416 00417