Stream.hh

00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021 
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 #include "boost/utility.hpp"
00026 #include "Exception.hh"
00027 
00028 namespace avro {
00029 
00033 class InputStream : boost::noncopyable {
00034 protected:
00035 
00039     InputStream() { }
00040 
00041 public:
00045     virtual ~InputStream() { }
00046 
00053     virtual bool next(const uint8_t** data, size_t* len) = 0;
00054 
00060     virtual void backup(size_t len) = 0;
00061 
00065     virtual void skip(size_t len) = 0;
00066 
00072     virtual size_t byteCount() const = 0;
00073 };
00074 
00078 class OutputStream : boost::noncopyable {
00079 protected:
00080 
00084     OutputStream() { }
00085 public:
00086 
00090     virtual ~OutputStream() { }
00091 
00097     virtual bool next(uint8_t** data, size_t* len) = 0;
00098 
00103     virtual void backup(size_t len) = 0;
00104 
00110     virtual uint64_t byteCount() const = 0;
00111 
00116     virtual void flush() = 0;
00117 };
00118 
00122 std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00123 
00129 std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00130 
00138 std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00139 
00147 std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00148     size_t bufferSize = 8 * 1024);
00149 
00154 std::auto_ptr<InputStream> fileInputStream(const char* filename,
00155     size_t bufferSize = 8 * 1024);
00156 
00158 struct StreamReader {
00162     InputStream* in_;
00163 
00167     const uint8_t* next_;
00168 
00172     const uint8_t* end_;
00173 
00177     StreamReader() : in_(0), next_(0), end_(0) { }
00178 
00182     StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00183 
00188     void reset(InputStream& is) {
00189         if (in_ != 0 && end_ != next_) {
00190             in_->backup(end_ - next_);
00191         }
00192         in_ = &is;
00193         next_ = end_ = 0;
00194     }
00195 
00200     uint8_t read() {
00201         if (next_ == end_) {
00202             more();
00203         }
00204         return *next_++;
00205     }
00206 
00211     void readBytes(uint8_t* b, size_t n) {
00212         while (n > 0) {
00213             if (next_ == end_) {
00214                 more();
00215             }
00216             size_t q = end_ - next_;
00217             if (q > n) {
00218                 q = n;
00219             }
00220             ::memcpy(b, next_, q);
00221             next_ += q;
00222             b += q;
00223             n -= q;
00224         }
00225     }
00226 
00231     void skipBytes(size_t n) {
00232         if (n > static_cast<size_t>(end_ - next_)) {
00233             n -= end_ - next_;
00234             next_ = end_;
00235             in_->skip(n);
00236         } else {
00237             next_ += n;
00238         }
00239     }
00240 
00247     bool fill() {
00248         size_t n = 0;
00249         while (in_->next(&next_, &n)) {
00250             if (n != 0) {
00251                 end_ = next_ + n;
00252                 return true;
00253             }
00254         }
00255         return false;
00256     }
00257 
00261     void more() {
00262         if (! fill()) {
00263             throw Exception("EOF reached");
00264         }
00265     }
00266 
00270     bool hasMore() {
00271         return (next_ == end_) ? fill() : true;
00272     }
00273 };
00274 
00278 struct StreamWriter {
00282     OutputStream* out_;
00283 
00287     uint8_t* next_;
00288     
00292     uint8_t* end_;
00293 
00297     StreamWriter() : out_(0), next_(0), end_(0) { }
00298 
00302     StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00303 
00308     void reset(OutputStream& os) {
00309         if (out_ != 0 && end_ != next_) {
00310             out_->backup(end_ - next_);
00311         }
00312         out_ = &os;
00313         next_ = end_;
00314     }
00315 
00319     void write(uint8_t c) {
00320         if (next_ == end_) {
00321             more();
00322         }
00323         *next_++ = c;
00324     }
00325 
00329     void writeBytes(const uint8_t* b, size_t n) {
00330         while (n > 0) {
00331             if (next_ == end_) {
00332                 more();
00333             }
00334             size_t q = end_ - next_;
00335             if (q > n) {
00336                 q = n;
00337             }
00338             ::memcpy(next_, b, q);
00339             next_ += q;
00340             b += q;
00341             n -= q;
00342         }
00343     }
00344 
00349     void flush() {
00350         if (next_ != end_) {
00351             out_->backup(end_ - next_);
00352             next_ = end_;
00353         }
00354         out_->flush();
00355     }
00356 
00360     void more() {
00361         size_t n = 0;
00362         while (out_->next(&next_, &n)) {
00363             if (n != 0) {
00364                 end_ = next_ + n;
00365                 return;
00366             }
00367         }
00368         throw Exception("EOF reached");
00369     }
00370 
00371 };
00372 
00377 inline void copy(InputStream& in, OutputStream& out)
00378 {
00379     const uint8_t *p = 0;
00380     size_t n = 0;
00381     StreamWriter w(out);
00382     while (in.next(&p, &n)) {
00383         w.writeBytes(p, n);
00384     }
00385     w.flush();
00386 }
00387 
00388 }   // namespace avro
00389 #endif
00390 
00391