DataFile.hh

00001 /*
00002  * Licensed to the Apache Software Foundation (ASF) under one
00003  * or more contributor license agreements.  See the NOTICE file
00004  * distributed with this work for additional information
00005  * regarding copyright ownership.  The ASF licenses this file
00006  * to you under the Apache License, Version 2.0 (the
00007  * "License"); you may not use this file except in compliance
00008  * with the License.  You may obtain a copy of the License at
00009  *
00010  *     http://www.apache.org/licenses/LICENSE-2.0
00011  *
00012  * Unless required by applicable law or agreed to in writing, software
00013  * distributed under the License is distributed on an "AS IS" BASIS,
00014  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00015  * See the License for the specific language governing permissions and
00016  * limitations under the License.
00017  */
00018 
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021 
00022 #include "Encoder.hh"
00023 #include "buffer/Buffer.hh"
00024 #include "ValidSchema.hh"
00025 #include "Specific.hh"
00026 #include "Stream.hh"
00027 
00028 #include <map>
00029 #include <string>
00030 #include <vector>
00031 
00032 #include "boost/array.hpp"
00033 #include "boost/utility.hpp"
00034 
00035 namespace avro {
00036 
00040 typedef boost::array<uint8_t, 16> DataFileSync;
00041 
00047 class DataFileWriterBase : boost::noncopyable {
00048     const std::string filename_;
00049     const ValidSchema schema_;
00050     const EncoderPtr encoderPtr_;
00051     const size_t syncInterval_;
00052 
00053     std::auto_ptr<OutputStream> stream_;
00054     std::auto_ptr<OutputStream> buffer_;
00055     const DataFileSync sync_;
00056     int64_t objectCount_;
00057 
00058     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00059 
00060     Metadata metadata_;
00061 
00062     static std::auto_ptr<OutputStream> makeStream(const char* filename);
00063     static DataFileSync makeSync();
00064 
00065     void writeHeader();
00066     void setMetadata(const std::string& key, const std::string& value);
00067 
00071     void sync();
00072 
00073 public:
00077     Encoder& encoder() const { return *encoderPtr_; }
00078     
00083     void syncIfNeeded();
00084 
00088     void incr() {
00089         ++objectCount_;
00090     }
00094     DataFileWriterBase(const char* filename, const ValidSchema& schema,
00095         size_t syncInterval);
00096 
00097     ~DataFileWriterBase();
00102     void close();
00103 
00107     const ValidSchema& schema() const { return schema_; }
00108 
00112     void flush();
00113 };
00114 
00118 template <typename T>
00119 class DataFileWriter : boost::noncopyable {
00120     std::auto_ptr<DataFileWriterBase> base_;
00121 public:
00125     DataFileWriter(const char* filename, const ValidSchema& schema,
00126         size_t syncInterval = 16 * 1024) :
00127         base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00128 
00132     void write(const T& datum) {
00133         base_->syncIfNeeded();
00134         avro::encode(base_->encoder(), datum);
00135         base_->incr();
00136     }
00137 
00142     void close() { base_->close(); }
00143 
00147     const ValidSchema& schema() const { return base_->schema(); }
00148 
00152     void flush() { base_->flush(); }
00153 };
00154 
00158 class DataFileReaderBase : boost::noncopyable {
00159     const std::string filename_;
00160     const std::auto_ptr<InputStream> stream_;
00161     const DecoderPtr decoder_;
00162     int64_t objectCount_;
00163 
00164     ValidSchema readerSchema_;
00165     ValidSchema dataSchema_;
00166     DecoderPtr dataDecoder_;
00167     std::auto_ptr<InputStream> dataStream_;
00168     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00169 
00170     Metadata metadata_;
00171     DataFileSync sync_;
00172 
00173     void readHeader();
00174 
00175     bool readDataBlock();
00176 public:
00180     Decoder& decoder() { return *dataDecoder_; }
00181 
00185     bool hasMore();
00186 
00190     void decr() { --objectCount_; }
00191 
00198     DataFileReaderBase(const char* filename);
00199 
00204     void init();
00205 
00213     void init(const ValidSchema& readerSchema);
00214 
00218     const ValidSchema& readerSchema() { return readerSchema_; }
00219 
00223     const ValidSchema& dataSchema() { return dataSchema_; }
00224 
00228     void close();
00229 };
00230 
00234 template <typename T>
00235 class DataFileReader : boost::noncopyable {
00236     std::auto_ptr<DataFileReaderBase> base_;
00237 public:
00242     DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00243         base_(new DataFileReaderBase(filename)) {
00244         base_->init(readerSchema);
00245     }
00246 
00251     DataFileReader(const char* filename) :
00252         base_(new DataFileReaderBase(filename)) {
00253         base_->init();
00254     }
00255 
00256 
00266     DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00267         base_->init();
00268     }
00269 
00279     DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00280         const ValidSchema& readerSchema) : base_(base) {
00281         base_->init(readerSchema);
00282     }
00283 
00289     bool read(T& datum) {
00290         if (base_->hasMore()) {
00291             base_->decr();
00292             avro::decode(base_->decoder(), datum);
00293             return true;
00294         }
00295         return false;
00296     }
00297 
00301     const ValidSchema& readerSchema() { return base_->readerSchema(); }
00302 
00306     const ValidSchema& dataSchema() { return base_->dataSchema(); }
00307 
00311     void close() { return base_->close(); }
00312 };
00313 
00314 }   // namespace avro
00315 #endif
00316 
00317