Avro C++
DataFile.hh
00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021 
00022 #include "Config.hh"
00023 #include "Encoder.hh"
00024 #include "buffer/Buffer.hh"
00025 #include "ValidSchema.hh"
00026 #include "Specific.hh"
00027 #include "Stream.hh"
00028 
00029 #include <map>
00030 #include <string>
00031 #include <vector>
00032 
00033 #include "boost/array.hpp"
00034 #include "boost/utility.hpp"
00035 
00036 namespace avro {
00037 
00041 typedef boost::array<uint8_t, 16> DataFileSync;
00042 
00048 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
00049     const std::string filename_;
00050     const ValidSchema schema_;
00051     const EncoderPtr encoderPtr_;
00052     const size_t syncInterval_;
00053 
00054     std::auto_ptr<OutputStream> stream_;
00055     std::auto_ptr<OutputStream> buffer_;
00056     const DataFileSync sync_;
00057     int64_t objectCount_;
00058 
00059     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00060 
00061     Metadata metadata_;
00062 
00063     static std::auto_ptr<OutputStream> makeStream(const char* filename);
00064     static DataFileSync makeSync();
00065 
00066     void writeHeader();
00067     void setMetadata(const std::string& key, const std::string& value);
00068 
00072     void sync();
00073 
00074 public:
00078     Encoder& encoder() const { return *encoderPtr_; }
00079     
00084     void syncIfNeeded();
00085 
00089     void incr() {
00090         ++objectCount_;
00091     }
00095     DataFileWriterBase(const char* filename, const ValidSchema& schema,
00096         size_t syncInterval);
00097 
00098     ~DataFileWriterBase();
00103     void close();
00104 
00108     const ValidSchema& schema() const { return schema_; }
00109 
00113     void flush();
00114 };
00115 
00119 template <typename T>
00120 class DataFileWriter : boost::noncopyable {
00121     std::auto_ptr<DataFileWriterBase> base_;
00122 public:
00126     DataFileWriter(const char* filename, const ValidSchema& schema,
00127         size_t syncInterval = 16 * 1024) :
00128         base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00129 
00133     void write(const T& datum) {
00134         base_->syncIfNeeded();
00135         avro::encode(base_->encoder(), datum);
00136         base_->incr();
00137     }
00138 
00143     void close() { base_->close(); }
00144 
00148     const ValidSchema& schema() const { return base_->schema(); }
00149 
00153     void flush() { base_->flush(); }
00154 };
00155 
00159 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
00160     const std::string filename_;
00161     const std::auto_ptr<InputStream> stream_;
00162     const DecoderPtr decoder_;
00163     int64_t objectCount_;
00164     bool eof_;
00165 
00166     ValidSchema readerSchema_;
00167     ValidSchema dataSchema_;
00168     DecoderPtr dataDecoder_;
00169     std::auto_ptr<InputStream> dataStream_;
00170     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00171 
00172     Metadata metadata_;
00173     DataFileSync sync_;
00174 
00175     void readHeader();
00176 
00177     bool readDataBlock();
00178 public:
00182     Decoder& decoder() { return *dataDecoder_; }
00183 
00187     bool hasMore();
00188 
00192     void decr() { --objectCount_; }
00193 
00200     DataFileReaderBase(const char* filename);
00201 
00206     void init();
00207 
00215     void init(const ValidSchema& readerSchema);
00216 
00220     const ValidSchema& readerSchema() { return readerSchema_; }
00221 
00225     const ValidSchema& dataSchema() { return dataSchema_; }
00226 
00230     void close();
00231 };
00232 
00236 template <typename T>
00237 class DataFileReader : boost::noncopyable {
00238     std::auto_ptr<DataFileReaderBase> base_;
00239 public:
00244     DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00245         base_(new DataFileReaderBase(filename)) {
00246         base_->init(readerSchema);
00247     }
00248 
00253     DataFileReader(const char* filename) :
00254         base_(new DataFileReaderBase(filename)) {
00255         base_->init();
00256     }
00257 
00258 
00268     DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00269         base_->init();
00270     }
00271 
00281     DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00282         const ValidSchema& readerSchema) : base_(base) {
00283         base_->init(readerSchema);
00284     }
00285 
00291     bool read(T& datum) {
00292         if (base_->hasMore()) {
00293             base_->decr();
00294             avro::decode(base_->decoder(), datum);
00295             return true;
00296         }
00297         return false;
00298     }
00299 
00303     const ValidSchema& readerSchema() { return base_->readerSchema(); }
00304 
00308     const ValidSchema& dataSchema() { return base_->dataSchema(); }
00309 
00313     void close() { return base_->close(); }
00314 };
00315 
00316 }   // namespace avro
00317 #endif
00318 
00319