DataFile.hh

00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021 
00022 #include "Config.hh"
00023 #include "Encoder.hh"
00024 #include "buffer/Buffer.hh"
00025 #include "ValidSchema.hh"
00026 #include "Specific.hh"
00027 #include "Stream.hh"
00028 
00029 #include <map>
00030 #include <string>
00031 #include <vector>
00032 
00033 #include "boost/array.hpp"
00034 #include "boost/utility.hpp"
00035 
00036 namespace avro {
00037 
00041 typedef boost::array<uint8_t, 16> DataFileSync;
00042 
00048 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
00049     const std::string filename_;
00050     const ValidSchema schema_;
00051     const EncoderPtr encoderPtr_;
00052     const size_t syncInterval_;
00053 
00054     std::auto_ptr<OutputStream> stream_;
00055     std::auto_ptr<OutputStream> buffer_;
00056     const DataFileSync sync_;
00057     int64_t objectCount_;
00058 
00059     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00060 
00061     Metadata metadata_;
00062 
00063     static std::auto_ptr<OutputStream> makeStream(const char* filename);
00064     static DataFileSync makeSync();
00065 
00066     void writeHeader();
00067     void setMetadata(const std::string& key, const std::string& value);
00068 
00072     void sync();
00073 
00074 public:
00078     Encoder& encoder() const { return *encoderPtr_; }
00079     
00084     void syncIfNeeded();
00085 
00089     void incr() {
00090         ++objectCount_;
00091     }
00095     DataFileWriterBase(const char* filename, const ValidSchema& schema,
00096         size_t syncInterval);
00097 
00098     ~DataFileWriterBase();
00103     void close();
00104 
00108     const ValidSchema& schema() const { return schema_; }
00109 
00113     void flush();
00114 };
00115 
00119 template <typename T>
00120 class DataFileWriter : boost::noncopyable {
00121     std::auto_ptr<DataFileWriterBase> base_;
00122 public:
00126     DataFileWriter(const char* filename, const ValidSchema& schema,
00127         size_t syncInterval = 16 * 1024) :
00128         base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00129 
00133     void write(const T& datum) {
00134         base_->syncIfNeeded();
00135         avro::encode(base_->encoder(), datum);
00136         base_->incr();
00137     }
00138 
00143     void close() { base_->close(); }
00144 
00148     const ValidSchema& schema() const { return base_->schema(); }
00149 
00153     void flush() { base_->flush(); }
00154 };
00155 
00159 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
00160     const std::string filename_;
00161     const std::auto_ptr<InputStream> stream_;
00162     const DecoderPtr decoder_;
00163     int64_t objectCount_;
00164 
00165     ValidSchema readerSchema_;
00166     ValidSchema dataSchema_;
00167     DecoderPtr dataDecoder_;
00168     std::auto_ptr<InputStream> dataStream_;
00169     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00170 
00171     Metadata metadata_;
00172     DataFileSync sync_;
00173 
00174     void readHeader();
00175 
00176     bool readDataBlock();
00177 public:
00181     Decoder& decoder() { return *dataDecoder_; }
00182 
00186     bool hasMore();
00187 
00191     void decr() { --objectCount_; }
00192 
00199     DataFileReaderBase(const char* filename);
00200 
00205     void init();
00206 
00214     void init(const ValidSchema& readerSchema);
00215 
00219     const ValidSchema& readerSchema() { return readerSchema_; }
00220 
00224     const ValidSchema& dataSchema() { return dataSchema_; }
00225 
00229     void close();
00230 };
00231 
00235 template <typename T>
00236 class DataFileReader : boost::noncopyable {
00237     std::auto_ptr<DataFileReaderBase> base_;
00238 public:
00243     DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00244         base_(new DataFileReaderBase(filename)) {
00245         base_->init(readerSchema);
00246     }
00247 
00252     DataFileReader(const char* filename) :
00253         base_(new DataFileReaderBase(filename)) {
00254         base_->init();
00255     }
00256 
00257 
00267     DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00268         base_->init();
00269     }
00270 
00280     DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00281         const ValidSchema& readerSchema) : base_(base) {
00282         base_->init(readerSchema);
00283     }
00284 
00290     bool read(T& datum) {
00291         if (base_->hasMore()) {
00292             base_->decr();
00293             avro::decode(base_->decoder(), datum);
00294             return true;
00295         }
00296         return false;
00297     }
00298 
00302     const ValidSchema& readerSchema() { return base_->readerSchema(); }
00303 
00307     const ValidSchema& dataSchema() { return base_->dataSchema(); }
00308 
00312     void close() { return base_->close(); }
00313 };
00314 
00315 }   // namespace avro
00316 #endif
00317 
00318