DataFile.hh

00001 
00019 #ifndef avro_DataFile_hh__
00020 #define avro_DataFile_hh__
00021 
00022 #include "Encoder.hh"
00023 #include "buffer/Buffer.hh"
00024 #include "ValidSchema.hh"
00025 #include "Specific.hh"
00026 #include "Stream.hh"
00027 
00028 #include <map>
00029 #include <string>
00030 #include <vector>
00031 
00032 #include "boost/array.hpp"
00033 #include "boost/utility.hpp"
00034 
00035 namespace avro {
00036 
00037 typedef boost::array<uint8_t, 16> DataFileSync;
00038 
00044 class DataFileWriterBase : boost::noncopyable {
00045     const std::string filename_;
00046     const ValidSchema schema_;
00047     const EncoderPtr encoderPtr_;
00048     const size_t syncInterval_;
00049 
00050     std::auto_ptr<OutputStream> stream_;
00051     std::auto_ptr<OutputStream> buffer_;
00052     const DataFileSync sync_;
00053     int64_t objectCount_;
00054 
00055     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00056 
00057     Metadata metadata_;
00058 
00059     static std::auto_ptr<OutputStream> makeStream(const char* filename);
00060     static DataFileSync makeSync();
00061 
00062     void writeHeader();
00063     void setMetadata(const std::string& key, const std::string& value);
00064 
00068     void sync();
00069 
00070 public:
00071     Encoder& encoder() const { return *encoderPtr_; }
00072     
00073     void syncIfNeeded();
00074 
00075     void incr() {
00076         ++objectCount_;
00077     }
00081     DataFileWriterBase(const char* filename, const ValidSchema& schema,
00082         size_t syncInterval);
00083 
00084     ~DataFileWriterBase();
00089     void close();
00090 
00094     const ValidSchema& schema() const { return schema_; }
00095 
00099     void flush();
00100 };
00101 
00105 template <typename T>
00106 class DataFileWriter : boost::noncopyable {
00107     std::auto_ptr<DataFileWriterBase> base_;
00108 public:
00112     DataFileWriter(const char* filename, const ValidSchema& schema,
00113         size_t syncInterval = 16 * 1024) :
00114         base_(new DataFileWriterBase(filename, schema, syncInterval)) { }
00115 
00119     void write(const T& datum) {
00120         base_->syncIfNeeded();
00121         avro::encode(base_->encoder(), datum);
00122         base_->incr();
00123     }
00124 
00129     void close() { base_->close(); }
00130 
00134     const ValidSchema& schema() const { return base_->schema(); }
00135 
00139     void flush() { base_->flush(); }
00140 };
00141 
00142 class DataFileReaderBase : boost::noncopyable {
00143     const std::string filename_;
00144     const std::auto_ptr<InputStream> stream_;
00145     const DecoderPtr decoder_;
00146     int64_t objectCount_;
00147 
00148     ValidSchema readerSchema_;
00149     ValidSchema dataSchema_;
00150     DecoderPtr dataDecoder_;
00151     std::auto_ptr<InputStream> dataStream_;
00152     typedef std::map<std::string, std::vector<uint8_t> > Metadata;
00153 
00154     Metadata metadata_;
00155     DataFileSync sync_;
00156 
00157     void readHeader();
00158 
00159     bool readDataBlock();
00160 public:
00161     Decoder& decoder() { return *dataDecoder_; }
00162 
00166     bool hasMore();
00167 
00168     void decr() { --objectCount_; }
00169 
00176     DataFileReaderBase(const char* filename);
00177 
00182     void init();
00183 
00191     void init(const ValidSchema& readerSchema);
00192 
00196     const ValidSchema& readerSchema() { return readerSchema_; }
00197 
00201     const ValidSchema& dataSchema() { return dataSchema_; }
00202 
00206     void close();
00207 };
00208 
00209 template <typename T>
00210 class DataFileReader : boost::noncopyable {
00211     std::auto_ptr<DataFileReaderBase> base_;
00212 public:
00217     DataFileReader(const char* filename, const ValidSchema& readerSchema) :
00218         base_(new DataFileReaderBase(filename)) {
00219         base_->init(readerSchema);
00220     }
00221 
00226     DataFileReader(const char* filename) :
00227         base_(new DataFileReaderBase(filename)) {
00228         base_->init();
00229     }
00230 
00231 
00241     DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
00242         base_->init();
00243     }
00244 
00254     DataFileReader(std::auto_ptr<DataFileReaderBase> base,
00255         const ValidSchema& readerSchema) : base_(base) {
00256         base_->init(readerSchema);
00257     }
00258 
00259     bool read(T& datum) {
00260         if (base_->hasMore()) {
00261             base_->decr();
00262             avro::decode(base_->decoder(), datum);
00263             return true;
00264         }
00265         return false;
00266     }
00267 
00271     const ValidSchema& readerSchema() { return base_->readerSchema(); }
00272 
00276     const ValidSchema& dataSchema() { return base_->dataSchema(); }
00277 
00281     void close() { return base_->close(); }
00282 };
00283 
00284 }   // namespace avro
00285 #endif
00286 
00287