Avro C++
Stream.hh
00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021 
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 
00026 #include "boost/utility.hpp"
00027 
00028 #include "Config.hh"
00029 #include "Exception.hh"
00030 
00031 namespace avro {
00032 
00036 class AVRO_DECL InputStream : boost::noncopyable {
00037 protected:
00038 
00042     InputStream() { }
00043 
00044 public:
00048     virtual ~InputStream() { }
00049 
00056     virtual bool next(const uint8_t** data, size_t* len) = 0;
00057 
00063     virtual void backup(size_t len) = 0;
00064 
00068     virtual void skip(size_t len) = 0;
00069 
00075     virtual size_t byteCount() const = 0;
00076 };
00077 
00081 class AVRO_DECL OutputStream : boost::noncopyable {
00082 protected:
00083 
00087     OutputStream() { }
00088 public:
00089 
00093     virtual ~OutputStream() { }
00094 
00100     virtual bool next(uint8_t** data, size_t* len) = 0;
00101 
00106     virtual void backup(size_t len) = 0;
00107 
00113     virtual uint64_t byteCount() const = 0;
00114 
00119     virtual void flush() = 0;
00120 };
00121 
00125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00126 
00132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00133 
00141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00142 
00148 AVRO_DECL boost::shared_ptr<std::vector<uint8_t> > snapshot(const OutputStream& source);
00149 
00157 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00158     size_t bufferSize = 8 * 1024);
00159 
00164 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename,
00165     size_t bufferSize = 8 * 1024);
00166 
00172 AVRO_DECL std::auto_ptr<OutputStream> ostreamOutputStream(std::ostream& os,
00173     size_t bufferSize = 8 * 1024);
00174 
00180 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in,
00181     size_t bufferSize = 8 * 1024);
00182 
00184 struct StreamReader {
00188     InputStream* in_;
00189 
00193     const uint8_t* next_;
00194 
00198     const uint8_t* end_;
00199 
00203     StreamReader() : in_(0), next_(0), end_(0) { }
00204 
00208     StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00209 
00214     void reset(InputStream& is) {
00215         if (in_ != 0 && end_ != next_) {
00216             in_->backup(end_ - next_);
00217         }
00218         in_ = &is;
00219         next_ = end_ = 0;
00220     }
00221 
00226     uint8_t read() {
00227         if (next_ == end_) {
00228             more();
00229         }
00230         return *next_++;
00231     }
00232 
00237     void readBytes(uint8_t* b, size_t n) {
00238         while (n > 0) {
00239             if (next_ == end_) {
00240                 more();
00241             }
00242             size_t q = end_ - next_;
00243             if (q > n) {
00244                 q = n;
00245             }
00246             ::memcpy(b, next_, q);
00247             next_ += q;
00248             b += q;
00249             n -= q;
00250         }
00251     }
00252 
00257     void skipBytes(size_t n) {
00258         if (n > static_cast<size_t>(end_ - next_)) {
00259             n -= end_ - next_;
00260             next_ = end_;
00261             in_->skip(n);
00262         } else {
00263             next_ += n;
00264         }
00265     }
00266 
00273     bool fill() {
00274         size_t n = 0;
00275         while (in_->next(&next_, &n)) {
00276             if (n != 0) {
00277                 end_ = next_ + n;
00278                 return true;
00279             }
00280         }
00281         return false;
00282     }
00283 
00287     void more() {
00288         if (! fill()) {
00289             throw Exception("EOF reached");
00290         }
00291     }
00292 
00296     bool hasMore() {
00297         return (next_ == end_) ? fill() : true;
00298     }
00299 };
00300 
00304 struct StreamWriter {
00308     OutputStream* out_;
00309 
00313     uint8_t* next_;
00314     
00318     uint8_t* end_;
00319 
00323     StreamWriter() : out_(0), next_(0), end_(0) { }
00324 
00328     StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00329 
00334     void reset(OutputStream& os) {
00335         if (out_ != 0 && end_ != next_) {
00336             out_->backup(end_ - next_);
00337         }
00338         out_ = &os;
00339         next_ = end_;
00340     }
00341 
00345     void write(uint8_t c) {
00346         if (next_ == end_) {
00347             more();
00348         }
00349         *next_++ = c;
00350     }
00351 
00355     void writeBytes(const uint8_t* b, size_t n) {
00356         while (n > 0) {
00357             if (next_ == end_) {
00358                 more();
00359             }
00360             size_t q = end_ - next_;
00361             if (q > n) {
00362                 q = n;
00363             }
00364             ::memcpy(next_, b, q);
00365             next_ += q;
00366             b += q;
00367             n -= q;
00368         }
00369     }
00370 
00375     void flush() {
00376         if (next_ != end_) {
00377             out_->backup(end_ - next_);
00378             next_ = end_;
00379         }
00380         out_->flush();
00381     }
00382 
00386     void more() {
00387         size_t n = 0;
00388         while (out_->next(&next_, &n)) {
00389             if (n != 0) {
00390                 end_ = next_ + n;
00391                 return;
00392             }
00393         }
00394         throw Exception("EOF reached");
00395     }
00396 
00397 };
00398 
00403 inline void copy(InputStream& in, OutputStream& out)
00404 {
00405     const uint8_t *p = 0;
00406     size_t n = 0;
00407     StreamWriter w(out);
00408     while (in.next(&p, &n)) {
00409         w.writeBytes(p, n);
00410     }
00411     w.flush();
00412 }
00413 
00414 }   // namespace avro
00415 #endif
00416 
00417