Avro C++
|
00001 /* 00002 * Licensed to the Apache Software Foundation (ASF) under one 00003 * or more contributor license agreements. See the NOTICE file 00004 * distributed with this work for additional information 00005 * regarding copyright ownership. The ASF licenses this file 00006 * to you under the Apache License, Version 2.0 (the 00007 * "License"); you may not use this file except in compliance 00008 * with the License. You may obtain a copy of the License at 00009 * 00010 * http://www.apache.org/licenses/LICENSE-2.0 00011 * 00012 * Unless required by applicable law or agreed to in writing, software 00013 * distributed under the License is distributed on an "AS IS" BASIS, 00014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 00015 * See the License for the specific language governing permissions and 00016 * limitations under the License. 00017 */ 00018 00019 #ifndef avro_DataFile_hh__ 00020 #define avro_DataFile_hh__ 00021 00022 #include "Config.hh" 00023 #include "Encoder.hh" 00024 #include "buffer/Buffer.hh" 00025 #include "ValidSchema.hh" 00026 #include "Specific.hh" 00027 #include "Stream.hh" 00028 00029 #include <map> 00030 #include <string> 00031 #include <vector> 00032 00033 #include "boost/array.hpp" 00034 #include "boost/utility.hpp" 00035 00036 namespace avro { 00037 00041 typedef boost::array<uint8_t, 16> DataFileSync; 00042 00048 class AVRO_DECL DataFileWriterBase : boost::noncopyable { 00049 const std::string filename_; 00050 const ValidSchema schema_; 00051 const EncoderPtr encoderPtr_; 00052 const size_t syncInterval_; 00053 00054 std::auto_ptr<OutputStream> stream_; 00055 std::auto_ptr<OutputStream> buffer_; 00056 const DataFileSync sync_; 00057 int64_t objectCount_; 00058 00059 typedef std::map<std::string, std::vector<uint8_t> > Metadata; 00060 00061 Metadata metadata_; 00062 00063 static std::auto_ptr<OutputStream> makeStream(const char* filename); 00064 static DataFileSync makeSync(); 00065 00066 void writeHeader(); 00067 void setMetadata(const std::string& key, const std::string& value); 00068 00072 void sync(); 00073 00074 public: 00078 Encoder& encoder() const { return *encoderPtr_; } 00079 00084 void syncIfNeeded(); 00085 00089 void incr() { 00090 ++objectCount_; 00091 } 00095 DataFileWriterBase(const char* filename, const ValidSchema& schema, 00096 size_t syncInterval); 00097 00098 ~DataFileWriterBase(); 00103 void close(); 00104 00108 const ValidSchema& schema() const { return schema_; } 00109 00113 void flush(); 00114 }; 00115 00119 template <typename T> 00120 class DataFileWriter : boost::noncopyable { 00121 std::auto_ptr<DataFileWriterBase> base_; 00122 public: 00126 DataFileWriter(const char* filename, const ValidSchema& schema, 00127 size_t syncInterval = 16 * 1024) : 00128 base_(new DataFileWriterBase(filename, schema, syncInterval)) { } 00129 00133 void write(const T& datum) { 00134 base_->syncIfNeeded(); 00135 avro::encode(base_->encoder(), datum); 00136 base_->incr(); 00137 } 00138 00143 void close() { base_->close(); } 00144 00148 const ValidSchema& schema() const { return base_->schema(); } 00149 00153 void flush() { base_->flush(); } 00154 }; 00155 00159 class AVRO_DECL DataFileReaderBase : boost::noncopyable { 00160 const std::string filename_; 00161 const std::auto_ptr<InputStream> stream_; 00162 const DecoderPtr decoder_; 00163 int64_t objectCount_; 00164 bool eof_; 00165 00166 ValidSchema readerSchema_; 00167 ValidSchema dataSchema_; 00168 DecoderPtr dataDecoder_; 00169 std::auto_ptr<InputStream> dataStream_; 00170 typedef std::map<std::string, std::vector<uint8_t> > Metadata; 00171 00172 Metadata metadata_; 00173 DataFileSync sync_; 00174 00175 void readHeader(); 00176 00177 bool readDataBlock(); 00178 public: 00182 Decoder& decoder() { return *dataDecoder_; } 00183 00187 bool hasMore(); 00188 00192 void decr() { --objectCount_; } 00193 00200 DataFileReaderBase(const char* filename); 00201 00206 void init(); 00207 00215 void init(const ValidSchema& readerSchema); 00216 00220 const ValidSchema& readerSchema() { return readerSchema_; } 00221 00225 const ValidSchema& dataSchema() { return dataSchema_; } 00226 00230 void close(); 00231 }; 00232 00236 template <typename T> 00237 class DataFileReader : boost::noncopyable { 00238 std::auto_ptr<DataFileReaderBase> base_; 00239 public: 00244 DataFileReader(const char* filename, const ValidSchema& readerSchema) : 00245 base_(new DataFileReaderBase(filename)) { 00246 base_->init(readerSchema); 00247 } 00248 00253 DataFileReader(const char* filename) : 00254 base_(new DataFileReaderBase(filename)) { 00255 base_->init(); 00256 } 00257 00258 00268 DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) { 00269 base_->init(); 00270 } 00271 00281 DataFileReader(std::auto_ptr<DataFileReaderBase> base, 00282 const ValidSchema& readerSchema) : base_(base) { 00283 base_->init(readerSchema); 00284 } 00285 00291 bool read(T& datum) { 00292 if (base_->hasMore()) { 00293 base_->decr(); 00294 avro::decode(base_->decoder(), datum); 00295 return true; 00296 } 00297 return false; 00298 } 00299 00303 const ValidSchema& readerSchema() { return base_->readerSchema(); } 00304 00308 const ValidSchema& dataSchema() { return base_->dataSchema(); } 00309 00313 void close() { return base_->close(); } 00314 }; 00315 00316 } // namespace avro 00317 #endif 00318 00319