00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021
00022 #include "Config.hh"
00023 #include "Encoder.hh"
00024 #include "buffer/Buffer.hh"
00025 #include "ValidSchema.hh"
00026 #include "Specific.hh"
00027 #include "Stream.hh"
00028
00029 #include <map>
00030 #include <string>
00031 #include <vector>
00032
00033 #include "boost/array.hpp"
00034 #include "boost/utility.hpp"
00035
00036 namespace avro {
00037
00041 typedef boost::array<uint8_t, 16> DataFileSync;
00042
00048 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
00049 const std::string filename_;
00050 const ValidSchema schema_;
00051 const EncoderPtr encoderPtr_;
00052 const size_t syncInterval_;
00053
00054 std::auto_ptr<OutputStream> stream_;
00055 std::auto_ptr<OutputStream> buffer_;
00056 const DataFileSync sync_;
00057 int64_t objectCount_;
00058
00059 typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00060
00061 Metadata metadata_;
00062
00063 static std::auto_ptr<OutputStream> makeStream(const char* filename);
00064 static DataFileSync makeSync();
00065
00066 void writeHeader();
00067 void setMetadata(const std::string& key, const std::string& value);
00068
00072 void sync();
00073
00074 public:
00078 Encoder& encoder() const { return *encoderPtr_; }
00079
00084 void syncIfNeeded();
00085
00089 void incr() {
00090 ++objectCount_;
00091 }
00095 DataFileWriterBase(const char* filename, const ValidSchema& schema,
00096 size_t syncInterval);
00097
00098 ~DataFileWriterBase();
00103 void close();
00104
00108 const ValidSchema& schema() const { return schema_; }
00109
00113 void flush();
00114 };
00115
00119 template <typename T>
00120 class DataFileWriter : boost::noncopyable {
00121 std::auto_ptr<DataFileWriterBase> base_;
00122 public:
00126 DataFileWriter(const char* filename, const ValidSchema& schema,
00127 size_t syncInterval = 16 * 1024) :
00128 base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00129
00133 void write(const T& datum) {
00134 base_->syncIfNeeded();
00135 avro::encode(base_->encoder(), datum);
00136 base_->incr();
00137 }
00138
00143 void close() { base_->close(); }
00144
00148 const ValidSchema& schema() const { return base_->schema(); }
00149
00153 void flush() { base_->flush(); }
00154 };
00155
00159 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
00160 const std::string filename_;
00161 const std::auto_ptr<InputStream> stream_;
00162 const DecoderPtr decoder_;
00163 int64_t objectCount_;
00164
00165 ValidSchema readerSchema_;
00166 ValidSchema dataSchema_;
00167 DecoderPtr dataDecoder_;
00168 std::auto_ptr<InputStream> dataStream_;
00169 typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00170
00171 Metadata metadata_;
00172 DataFileSync sync_;
00173
00174 void readHeader();
00175
00176 bool readDataBlock();
00177 public:
00181 Decoder& decoder() { return *dataDecoder_; }
00182
00186 bool hasMore();
00187
00191 void decr() { --objectCount_; }
00192
00199 DataFileReaderBase(const char* filename);
00200
00205 void init();
00206
00214 void init(const ValidSchema& readerSchema);
00215
00219 const ValidSchema& readerSchema() { return readerSchema_; }
00220
00224 const ValidSchema& dataSchema() { return dataSchema_; }
00225
00229 void close();
00230 };
00231
00235 template <typename T>
00236 class DataFileReader : boost::noncopyable {
00237 std::auto_ptr<DataFileReaderBase> base_;
00238 public:
00243 DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00244 base_(new DataFileReaderBase(filename)) {
00245 base_->init(readerSchema);
00246 }
00247
00252 DataFileReader(const char* filename) :
00253 base_(new DataFileReaderBase(filename)) {
00254 base_->init();
00255 }
00256
00257
00267 DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00268 base_->init();
00269 }
00270
00280 DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00281 const ValidSchema& readerSchema) : base_(base) {
00282 base_->init(readerSchema);
00283 }
00284
00290 bool read(T& datum) {
00291 if (base_->hasMore()) {
00292 base_->decr();
00293 avro::decode(base_->decoder(), datum);
00294 return true;
00295 }
00296 return false;
00297 }
00298
00302 const ValidSchema& readerSchema() { return base_->readerSchema(); }
00303
00307 const ValidSchema& dataSchema() { return base_->dataSchema(); }
00308
00312 void close() { return base_->close(); }
00313 };
00314
00315 }
00316 #endif
00317
00318