Stream.hh

00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_Stream_hh__
00020 #define avro_Stream_hh__
00021 
00022 #include <memory>
00023 #include <string.h>
00024 #include <stdint.h>
00025 
00026 #include "boost/utility.hpp"
00027 
00028 #include "Config.hh"
00029 #include "Exception.hh"
00030 
00031 namespace avro {
00032 
00036 class AVRO_DECL InputStream : boost::noncopyable {
00037 protected:
00038 
00042     InputStream() { }
00043 
00044 public:
00048     virtual ~InputStream() { }
00049 
00056     virtual bool next(const uint8_t** data, size_t* len) = 0;
00057 
00063     virtual void backup(size_t len) = 0;
00064 
00068     virtual void skip(size_t len) = 0;
00069 
00075     virtual size_t byteCount() const = 0;
00076 };
00077 
00081 class AVRO_DECL OutputStream : boost::noncopyable {
00082 protected:
00083 
00087     OutputStream() { }
00088 public:
00089 
00093     virtual ~OutputStream() { }
00094 
00100     virtual bool next(uint8_t** data, size_t* len) = 0;
00101 
00106     virtual void backup(size_t len) = 0;
00107 
00113     virtual uint64_t byteCount() const = 0;
00114 
00119     virtual void flush() = 0;
00120 };
00121 
00125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
00126 
00132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
00133 
00141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
00142 
00150 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
00151     size_t bufferSize = 8 * 1024);
00152 
00157 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename,
00158     size_t bufferSize = 8 * 1024);
00159 
00160 
00161 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in,
00162     size_t bufferSize = 8 * 1024);
00163 
00165 struct StreamReader {
00169     InputStream* in_;
00170 
00174     const uint8_t* next_;
00175 
00179     const uint8_t* end_;
00180 
00184     StreamReader() : in_(0), next_(0), end_(0) { }
00185 
00189     StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
00190 
00195     void reset(InputStream& is) {
00196         if (in_ != 0 && end_ != next_) {
00197             in_->backup(end_ - next_);
00198         }
00199         in_ = &is;
00200         next_ = end_ = 0;
00201     }
00202 
00207     uint8_t read() {
00208         if (next_ == end_) {
00209             more();
00210         }
00211         return *next_++;
00212     }
00213 
00218     void readBytes(uint8_t* b, size_t n) {
00219         while (n > 0) {
00220             if (next_ == end_) {
00221                 more();
00222             }
00223             size_t q = end_ - next_;
00224             if (q > n) {
00225                 q = n;
00226             }
00227             ::memcpy(b, next_, q);
00228             next_ += q;
00229             b += q;
00230             n -= q;
00231         }
00232     }
00233 
00238     void skipBytes(size_t n) {
00239         if (n > static_cast<size_t>(end_ - next_)) {
00240             n -= end_ - next_;
00241             next_ = end_;
00242             in_->skip(n);
00243         } else {
00244             next_ += n;
00245         }
00246     }
00247 
00254     bool fill() {
00255         size_t n = 0;
00256         while (in_->next(&next_, &n)) {
00257             if (n != 0) {
00258                 end_ = next_ + n;
00259                 return true;
00260             }
00261         }
00262         return false;
00263     }
00264 
00268     void more() {
00269         if (! fill()) {
00270             throw Exception("EOF reached");
00271         }
00272     }
00273 
00277     bool hasMore() {
00278         return (next_ == end_) ? fill() : true;
00279     }
00280 };
00281 
00285 struct StreamWriter {
00289     OutputStream* out_;
00290 
00294     uint8_t* next_;
00295     
00299     uint8_t* end_;
00300 
00304     StreamWriter() : out_(0), next_(0), end_(0) { }
00305 
00309     StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
00310 
00315     void reset(OutputStream& os) {
00316         if (out_ != 0 && end_ != next_) {
00317             out_->backup(end_ - next_);
00318         }
00319         out_ = &os;
00320         next_ = end_;
00321     }
00322 
00326     void write(uint8_t c) {
00327         if (next_ == end_) {
00328             more();
00329         }
00330         *next_++ = c;
00331     }
00332 
00336     void writeBytes(const uint8_t* b, size_t n) {
00337         while (n > 0) {
00338             if (next_ == end_) {
00339                 more();
00340             }
00341             size_t q = end_ - next_;
00342             if (q > n) {
00343                 q = n;
00344             }
00345             ::memcpy(next_, b, q);
00346             next_ += q;
00347             b += q;
00348             n -= q;
00349         }
00350     }
00351 
00356     void flush() {
00357         if (next_ != end_) {
00358             out_->backup(end_ - next_);
00359             next_ = end_;
00360         }
00361         out_->flush();
00362     }
00363 
00367     void more() {
00368         size_t n = 0;
00369         while (out_->next(&next_, &n)) {
00370             if (n != 0) {
00371                 end_ = next_ + n;
00372                 return;
00373             }
00374         }
00375         throw Exception("EOF reached");
00376     }
00377 
00378 };
00379 
00384 inline void copy(InputStream& in, OutputStream& out)
00385 {
00386     const uint8_t *p = 0;
00387     size_t n = 0;
00388     StreamWriter w(out);
00389     while (in.next(&p, &n)) {
00390         w.writeBytes(p, n);
00391     }
00392     w.flush();
00393 }
00394 
00395 }   // namespace avro
00396 #endif
00397 
00398