Avro C++
DataFile.hh
00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021 
00022 #include "Config.hh"
00023 #include "Encoder.hh"
00024 #include "buffer/Buffer.hh"
00025 #include "ValidSchema.hh"
00026 #include "Specific.hh"
00027 #include "Stream.hh"
00028 
00029 #include <map>
00030 #include <string>
00031 #include <vector>
00032 
00033 #include "boost/array.hpp"
00034 #include "boost/utility.hpp"
00035 #include <boost/iostreams/filtering_stream.hpp>
00036 #include <boost/scoped_ptr.hpp>
00037 
00038 namespace avro {
00039 
00041 enum Codec {
00042   NULL_CODEC,
00043   DEFLATE_CODEC
00044 };
00045 
00049 typedef boost::array<uint8_t, 16> DataFileSync;
00050 
00056 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
00057     const std::string filename_;
00058     const ValidSchema schema_;
00059     const EncoderPtr encoderPtr_;
00060     const size_t syncInterval_;
00061     Codec codec_;
00062 
00063     std::auto_ptr<OutputStream> stream_;
00064     std::auto_ptr<OutputStream> buffer_;
00065     const DataFileSync sync_;
00066     int64_t objectCount_;
00067 
00068     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00069 
00070     Metadata metadata_;
00071 
00072     static std::auto_ptr<OutputStream> makeStream(const char* filename);
00073     static DataFileSync makeSync();
00074 
00075     void writeHeader();
00076     void setMetadata(const std::string& key, const std::string& value);
00077 
00081     void sync();
00082 
00083 public:
00087     Encoder& encoder() const { return *encoderPtr_; }
00088 
00093     void syncIfNeeded();
00094 
00098     void incr() {
00099         ++objectCount_;
00100     }
00104     DataFileWriterBase(const char* filename, const ValidSchema& schema,
00105         size_t syncInterval, Codec codec = NULL_CODEC);
00106 
00107     ~DataFileWriterBase();
00112     void close();
00113 
00117     const ValidSchema& schema() const { return schema_; }
00118 
00122     void flush();
00123 };
00124 
00128 template <typename T>
00129 class DataFileWriter : boost::noncopyable {
00130     std::auto_ptr<DataFileWriterBase> base_;
00131 public:
00135     DataFileWriter(const char* filename, const ValidSchema& schema,
00136         size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
00137         base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { }
00138 
00142     void write(const T& datum) {
00143         base_->syncIfNeeded();
00144         avro::encode(base_->encoder(), datum);
00145         base_->incr();
00146     }
00147 
00152     void close() { base_->close(); }
00153 
00157     const ValidSchema& schema() const { return base_->schema(); }
00158 
00162     void flush() { base_->flush(); }
00163 };
00164 
00168 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
00169     const std::string filename_;
00170     const std::auto_ptr<InputStream> stream_;
00171     const DecoderPtr decoder_;
00172     int64_t objectCount_;
00173     bool eof_;
00174     Codec codec_;
00175 
00176     ValidSchema readerSchema_;
00177     ValidSchema dataSchema_;
00178     DecoderPtr dataDecoder_;
00179     std::auto_ptr<InputStream> dataStream_;
00180     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00181 
00182     Metadata metadata_;
00183     DataFileSync sync_;
00184 
00185     // for compressed buffer
00186     boost::scoped_ptr<boost::iostreams::filtering_istream> os_;
00187     std::vector<char> compressed_;
00188 
00189     void readHeader();
00190 
00191     bool readDataBlock();
00192 public:
00196     Decoder& decoder() { return *dataDecoder_; }
00197 
00201     bool hasMore();
00202 
00206     void decr() { --objectCount_; }
00207 
00214     DataFileReaderBase(const char* filename);
00215 
00220     void init();
00221 
00229     void init(const ValidSchema& readerSchema);
00230 
00234     const ValidSchema& readerSchema() { return readerSchema_; }
00235 
00239     const ValidSchema& dataSchema() { return dataSchema_; }
00240 
00244     void close();
00245 };
00246 
00250 template <typename T>
00251 class DataFileReader : boost::noncopyable {
00252     std::auto_ptr<DataFileReaderBase> base_;
00253 public:
00258     DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00259         base_(new DataFileReaderBase(filename)) {
00260         base_->init(readerSchema);
00261     }
00262 
00267     DataFileReader(const char* filename) :
00268         base_(new DataFileReaderBase(filename)) {
00269         base_->init();
00270     }
00271 
00272 
00282     DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00283         base_->init();
00284     }
00285 
00295     DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00296         const ValidSchema& readerSchema) : base_(base) {
00297         base_->init(readerSchema);
00298     }
00299 
00305     bool read(T& datum) {
00306         if (base_->hasMore()) {
00307             base_->decr();
00308             avro::decode(base_->decoder(), datum);
00309             return true;
00310         }
00311         return false;
00312     }
00313 
00317     const ValidSchema& readerSchema() { return base_->readerSchema(); }
00318 
00322     const ValidSchema& dataSchema() { return base_->dataSchema(); }
00323 
00327     void close() { return base_->close(); }
00328 };
00329 
00330 }   // namespace avro
00331 #endif