00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021
00022 #include "Encoder.hh"
00023 #include "buffer/Buffer.hh"
00024 #include "ValidSchema.hh"
00025 #include "Specific.hh"
00026 #include "Stream.hh"
00027
00028 #include <map>
00029 #include <string>
00030 #include <vector>
00031
00032 #include "boost/array.hpp"
00033 #include "boost/utility.hpp"
00034
00035 namespace avro {
00036
00040 typedef boost::array<uint8_t, 16> DataFileSync;
00041
00047 class DataFileWriterBase : boost::noncopyable {
00048 const std::string filename_;
00049 const ValidSchema schema_;
00050 const EncoderPtr encoderPtr_;
00051 const size_t syncInterval_;
00052
00053 std::auto_ptr<OutputStream> stream_;
00054 std::auto_ptr<OutputStream> buffer_;
00055 const DataFileSync sync_;
00056 int64_t objectCount_;
00057
00058 typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00059
00060 Metadata metadata_;
00061
00062 static std::auto_ptr<OutputStream> makeStream(const char* filename);
00063 static DataFileSync makeSync();
00064
00065 void writeHeader();
00066 void setMetadata(const std::string& key, const std::string& value);
00067
00071 void sync();
00072
00073 public:
00077 Encoder& encoder() const { return *encoderPtr_; }
00078
00083 void syncIfNeeded();
00084
00088 void incr() {
00089 ++objectCount_;
00090 }
00094 DataFileWriterBase(const char* filename, const ValidSchema& schema,
00095 size_t syncInterval);
00096
00097 ~DataFileWriterBase();
00102 void close();
00103
00107 const ValidSchema& schema() const { return schema_; }
00108
00112 void flush();
00113 };
00114
00118 template <typename T>
00119 class DataFileWriter : boost::noncopyable {
00120 std::auto_ptr<DataFileWriterBase> base_;
00121 public:
00125 DataFileWriter(const char* filename, const ValidSchema& schema,
00126 size_t syncInterval = 16 * 1024) :
00127 base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00128
00132 void write(const T& datum) {
00133 base_->syncIfNeeded();
00134 avro::encode(base_->encoder(), datum);
00135 base_->incr();
00136 }
00137
00142 void close() { base_->close(); }
00143
00147 const ValidSchema& schema() const { return base_->schema(); }
00148
00152 void flush() { base_->flush(); }
00153 };
00154
00158 class DataFileReaderBase : boost::noncopyable {
00159 const std::string filename_;
00160 const std::auto_ptr<InputStream> stream_;
00161 const DecoderPtr decoder_;
00162 int64_t objectCount_;
00163
00164 ValidSchema readerSchema_;
00165 ValidSchema dataSchema_;
00166 DecoderPtr dataDecoder_;
00167 std::auto_ptr<InputStream> dataStream_;
00168 typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00169
00170 Metadata metadata_;
00171 DataFileSync sync_;
00172
00173 void readHeader();
00174
00175 bool readDataBlock();
00176 public:
00180 Decoder& decoder() { return *dataDecoder_; }
00181
00185 bool hasMore();
00186
00190 void decr() { --objectCount_; }
00191
00198 DataFileReaderBase(const char* filename);
00199
00204 void init();
00205
00213 void init(const ValidSchema& readerSchema);
00214
00218 const ValidSchema& readerSchema() { return readerSchema_; }
00219
00223 const ValidSchema& dataSchema() { return dataSchema_; }
00224
00228 void close();
00229 };
00230
00234 template <typename T>
00235 class DataFileReader : boost::noncopyable {
00236 std::auto_ptr<DataFileReaderBase> base_;
00237 public:
00242 DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00243 base_(new DataFileReaderBase(filename)) {
00244 base_->init(readerSchema);
00245 }
00246
00251 DataFileReader(const char* filename) :
00252 base_(new DataFileReaderBase(filename)) {
00253 base_->init();
00254 }
00255
00256
00266 DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00267 base_->init();
00268 }
00269
00279 DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00280 const ValidSchema& readerSchema) : base_(base) {
00281 base_->init(readerSchema);
00282 }
00283
00289 bool read(T& datum) {
00290 if (base_->hasMore()) {
00291 base_->decr();
00292 avro::decode(base_->decoder(), datum);
00293 return true;
00294 }
00295 return false;
00296 }
00297
00301 const ValidSchema& readerSchema() { return base_->readerSchema(); }
00302
00306 const ValidSchema& dataSchema() { return base_->dataSchema(); }
00307
00311 void close() { return base_->close(); }
00312 };
00313
00314 }
00315 #endif
00316
00317