Avro C++
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
DataFile.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_DataFile_hh__
20 #define avro_DataFile_hh__
21 
22 #include "Config.hh"
23 #include "Encoder.hh"
24 #include "buffer/Buffer.hh"
25 #include "ValidSchema.hh"
26 #include "Specific.hh"
27 #include "Stream.hh"
28 
29 #include <map>
30 #include <string>
31 #include <vector>
32 
33 #include "boost/array.hpp"
34 #include "boost/utility.hpp"
35 #include <boost/iostreams/filtering_stream.hpp>
36 #include <boost/scoped_ptr.hpp>
37 
38 namespace avro {
39 
41 enum Codec {
42  NULL_CODEC,
43  DEFLATE_CODEC
44 };
45 
49 typedef boost::array<uint8_t, 16> DataFileSync;
50 
56 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
57  const std::string filename_;
58  const ValidSchema schema_;
59  const EncoderPtr encoderPtr_;
60  const size_t syncInterval_;
61  Codec codec_;
62 
63  std::auto_ptr<OutputStream> stream_;
64  std::auto_ptr<OutputStream> buffer_;
65  const DataFileSync sync_;
66  int64_t objectCount_;
67 
68  typedef std::map<std::string, std::vector<uint8_t> > Metadata;
69 
70  Metadata metadata_;
71 
72  static std::auto_ptr<OutputStream> makeStream(const char* filename);
73  static DataFileSync makeSync();
74 
75  void writeHeader();
76  void setMetadata(const std::string& key, const std::string& value);
77 
81  void sync();
82 
83 public:
87  Encoder& encoder() const { return *encoderPtr_; }
88 
93  void syncIfNeeded();
94 
98  void incr() {
99  ++objectCount_;
100  }
104  DataFileWriterBase(const char* filename, const ValidSchema& schema,
105  size_t syncInterval, Codec codec = NULL_CODEC);
106 
112  void close();
113 
117  const ValidSchema& schema() const { return schema_; }
118 
122  void flush();
123 };
124 
128 template <typename T>
129 class DataFileWriter : boost::noncopyable {
130  std::auto_ptr<DataFileWriterBase> base_;
131 public:
135  DataFileWriter(const char* filename, const ValidSchema& schema,
136  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
137  base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { }
138 
142  void write(const T& datum) {
143  base_->syncIfNeeded();
144  avro::encode(base_->encoder(), datum);
145  base_->incr();
146  }
147 
152  void close() { base_->close(); }
153 
157  const ValidSchema& schema() const { return base_->schema(); }
158 
162  void flush() { base_->flush(); }
163 };
164 
168 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
169  const std::string filename_;
170  const std::auto_ptr<InputStream> stream_;
171  const DecoderPtr decoder_;
172  int64_t objectCount_;
173  bool eof_;
174  Codec codec_;
175 
176  ValidSchema readerSchema_;
177  ValidSchema dataSchema_;
178  DecoderPtr dataDecoder_;
179  std::auto_ptr<InputStream> dataStream_;
180  typedef std::map<std::string, std::vector<uint8_t> > Metadata;
181 
182  Metadata metadata_;
183  DataFileSync sync_;
184 
185  // for compressed buffer
186  boost::scoped_ptr<boost::iostreams::filtering_istream> os_;
187  std::vector<char> compressed_;
188 
189  void readHeader();
190 
191  bool readDataBlock();
192 public:
196  Decoder& decoder() { return *dataDecoder_; }
197 
201  bool hasMore();
202 
206  void decr() { --objectCount_; }
207 
214  DataFileReaderBase(const char* filename);
215 
220  void init();
221 
229  void init(const ValidSchema& readerSchema);
230 
234  const ValidSchema& readerSchema() { return readerSchema_; }
235 
239  const ValidSchema& dataSchema() { return dataSchema_; }
240 
244  void close();
245 };
246 
250 template <typename T>
251 class DataFileReader : boost::noncopyable {
252  std::auto_ptr<DataFileReaderBase> base_;
253 public:
258  DataFileReader(const char* filename, const ValidSchema& readerSchema) :
259  base_(new DataFileReaderBase(filename)) {
260  base_->init(readerSchema);
261  }
262 
267  DataFileReader(const char* filename) :
268  base_(new DataFileReaderBase(filename)) {
269  base_->init();
270  }
271 
272 
282  DataFileReader(std::auto_ptr<DataFileReaderBase> base) : base_(base) {
283  base_->init();
284  }
285 
295  DataFileReader(std::auto_ptr<DataFileReaderBase> base,
296  const ValidSchema& readerSchema) : base_(base) {
297  base_->init(readerSchema);
298  }
299 
305  bool read(T& datum) {
306  if (base_->hasMore()) {
307  base_->decr();
308  avro::decode(base_->decoder(), datum);
309  return true;
310  }
311  return false;
312  }
313 
317  const ValidSchema& readerSchema() { return base_->readerSchema(); }
318 
322  const ValidSchema& dataSchema() { return base_->dataSchema(); }
323 
327  void close() { return base_->close(); }
328 };
329 
330 } // namespace avro
331 #endif
Low level support for encoding avro values.
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:157
Type-independent portion of DataFileWriter.
Definition: DataFile.hh:56
void decode(Decoder &d, T &t)
Generic decoder function that makes use of the codec_traits.
Definition: Specific.hh:304
bool read(T &datum)
Reads the next entry from the data file.
Definition: DataFile.hh:305
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:234
boost::shared_ptr< Decoder > DecoderPtr
Shared pointer to Decoder.
Definition: Decoder.hh:161
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:31
void decr()
Decrements the number of objects yet to read.
Definition: DataFile.hh:206
void close()
Closes the current file.
Definition: DataFile.hh:152
DataFileReader(const char *filename, const ValidSchema &readerSchema)
Constructs the reader for the given file and the reader is expected to use the given schema...
Definition: DataFile.hh:258
DataFileReader(std::auto_ptr< DataFileReaderBase > base)
Constructs a reader using the reader base.
Definition: DataFile.hh:282
An Avro datafile that can store objects of type T.
Definition: DataFile.hh:129
void incr()
Increments the object count.
Definition: DataFile.hh:98
void flush()
Flushes any unwritten data into the file.
Definition: DataFile.hh:162
DataFileReader(const char *filename)
Constructs the reader for the given file and the reader is expected to use the schema that is used wi...
Definition: DataFile.hh:267
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:322
Decoder & decoder()
Returns the current decoder for this reader.
Definition: DataFile.hh:196
void write(const T &datum)
Writes the given piece of data into the file.
Definition: DataFile.hh:142
Codec
Specify type of compression to use when writing data files.
Definition: DataFile.hh:41
void encode(Encoder &e, const T &t)
Generic encoder function that makes use of the codec_traits.
Definition: Specific.hh:296
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:317
The type independent portion of rader.
Definition: DataFile.hh:168
A ValidSchema is basically a non-mutable Schema that has passed some minumum of sanity checks...
Definition: ValidSchema.hh:40
Encoder & encoder() const
Returns the current encoder for this writer.
Definition: DataFile.hh:87
Reads the contents of data file one after another.
Definition: DataFile.hh:251
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:239
boost::shared_ptr< Encoder > EncoderPtr
Shared pointer to Encoder.
Definition: Encoder.hh:144
DataFileWriter(const char *filename, const ValidSchema &schema, size_t syncInterval=16 *1024, Codec codec=NULL_CODEC)
Constructs a new data file.
Definition: DataFile.hh:135
boost::array< uint8_t, 16 > DataFileSync
The sync value.
Definition: DataFile.hh:49
DataFileReader(std::auto_ptr< DataFileReaderBase > base, const ValidSchema &readerSchema)
Constructs a reader using the reader base.
Definition: DataFile.hh:295
The abstract base class for all Avro encoders.
Definition: Encoder.hh:53
void close()
Closes the reader.
Definition: DataFile.hh:327
Decoder is an interface implemented by every decoder capable of decoding Avro data.
Definition: Decoder.hh:49
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:117