Avro C++
DataFile.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_DataFile_hh__
20 #define avro_DataFile_hh__
21 
22 #include "Config.hh"
23 #include "Encoder.hh"
24 #include "buffer/Buffer.hh"
25 #include "ValidSchema.hh"
26 #include "Specific.hh"
27 #include "Stream.hh"
28 
29 #include <map>
30 #include <string>
31 #include <vector>
32 
33 #include "array"
34 #include "boost/utility.hpp"
35 #include <boost/iostreams/filtering_stream.hpp>
36 
37 namespace avro {
38 
40 enum Codec {
41  NULL_CODEC,
42  DEFLATE_CODEC,
43 
44 #ifdef SNAPPY_CODEC_AVAILABLE
45  SNAPPY_CODEC
46 #endif
47 
48 };
49 
50 const int SyncSize = 16;
54 typedef std::array<uint8_t, SyncSize> DataFileSync;
55 
61 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
62  const std::string filename_;
63  const ValidSchema schema_;
64  const EncoderPtr encoderPtr_;
65  const size_t syncInterval_;
66  Codec codec_;
67 
68  std::unique_ptr<OutputStream> stream_;
69  std::unique_ptr<OutputStream> buffer_;
70  const DataFileSync sync_;
71  int64_t objectCount_;
72 
73  typedef std::map<std::string, std::vector<uint8_t> > Metadata;
74 
75  Metadata metadata_;
76 
77  static std::unique_ptr<OutputStream> makeStream(const char* filename);
78  static DataFileSync makeSync();
79 
80  void writeHeader();
81  void setMetadata(const std::string& key, const std::string& value);
82 
86  void sync();
87 
91  void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec);
92 
93 public:
97  Encoder& encoder() const { return *encoderPtr_; }
98 
103  void syncIfNeeded();
104 
108  void incr() {
109  ++objectCount_;
110  }
114  DataFileWriterBase(const char* filename, const ValidSchema& schema,
115  size_t syncInterval, Codec codec = NULL_CODEC);
116  DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
117  const ValidSchema& schema, size_t syncInterval, Codec codec);
118 
124  void close();
125 
129  const ValidSchema& schema() const { return schema_; }
130 
134  void flush();
135 };
136 
140 template <typename T>
141 class DataFileWriter : boost::noncopyable {
142  std::unique_ptr<DataFileWriterBase> base_;
143 public:
147  DataFileWriter(const char* filename, const ValidSchema& schema,
148  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
149  base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { }
150 
151  DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema& schema,
152  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
153  base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) { }
154 
158  void write(const T& datum) {
159  base_->syncIfNeeded();
160  avro::encode(base_->encoder(), datum);
161  base_->incr();
162  }
163 
168  void close() { base_->close(); }
169 
173  const ValidSchema& schema() const { return base_->schema(); }
174 
178  void flush() { base_->flush(); }
179 };
180 
184 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
185  const std::string filename_;
186  const std::unique_ptr<InputStream> stream_;
187  const DecoderPtr decoder_;
188  int64_t objectCount_;
189  bool eof_;
190  Codec codec_;
191  int64_t blockStart_;
192  int64_t blockEnd_;
193 
194  ValidSchema readerSchema_;
195  ValidSchema dataSchema_;
196  DecoderPtr dataDecoder_;
197  std::unique_ptr<InputStream> dataStream_;
198  typedef std::map<std::string, std::vector<uint8_t> > Metadata;
199 
200  Metadata metadata_;
201  DataFileSync sync_;
202 
203  // for compressed buffer
204  std::unique_ptr<boost::iostreams::filtering_istream> os_;
205  std::vector<char> compressed_;
206  std::string uncompressed;
207  void readHeader();
208 
209  void readDataBlock();
210  void doSeek(int64_t position);
211 public:
215  Decoder& decoder() { return *dataDecoder_; }
216 
220  bool hasMore();
221 
225  void decr() { --objectCount_; }
226 
233  DataFileReaderBase(const char* filename);
234 
235  DataFileReaderBase(std::unique_ptr<InputStream> inputStream);
236 
241  void init();
242 
250  void init(const ValidSchema& readerSchema);
251 
255  const ValidSchema& readerSchema() { return readerSchema_; }
256 
260  const ValidSchema& dataSchema() { return dataSchema_; }
261 
265  void close();
266 
271  void seek(int64_t position);
272 
278  void sync(int64_t position);
279 
283  bool pastSync(int64_t position);
284 
288  int64_t previousSync();
289 };
290 
294 template <typename T>
295 class DataFileReader : boost::noncopyable {
296  std::unique_ptr<DataFileReaderBase> base_;
297 public:
302  DataFileReader(const char* filename, const ValidSchema& readerSchema) :
303  base_(new DataFileReaderBase(filename)) {
304  base_->init(readerSchema);
305  }
306 
307  DataFileReader(std::unique_ptr<InputStream> inputStream, const ValidSchema& readerSchema) :
308  base_(new DataFileReaderBase(std::move(inputStream))) {
309  base_->init(readerSchema);
310  }
311 
316  DataFileReader(const char* filename) :
317  base_(new DataFileReaderBase(filename)) {
318  base_->init();
319  }
320 
321  DataFileReader(std::unique_ptr<InputStream> inputStream) :
322  base_(new DataFileReaderBase(std::move(inputStream))) {
323  base_->init();
324  }
325 
335  DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) {
336  base_->init();
337  }
338 
348  DataFileReader(std::unique_ptr<DataFileReaderBase> base,
349  const ValidSchema& readerSchema) : base_(std::move(base)) {
350  base_->init(readerSchema);
351  }
352 
358  bool read(T& datum) {
359  if (base_->hasMore()) {
360  base_->decr();
361  avro::decode(base_->decoder(), datum);
362  return true;
363  }
364  return false;
365  }
366 
370  const ValidSchema& readerSchema() { return base_->readerSchema(); }
371 
375  const ValidSchema& dataSchema() { return base_->dataSchema(); }
376 
380  void close() { return base_->close(); }
381 
386  void seek(int64_t position) { base_->seek(position); }
387 
393  void sync(int64_t position) { base_->sync(position); }
394 
398  bool pastSync(int64_t position) { return base_->pastSync(position); }
399 
403  int64_t previousSync() { return base_->previousSync(); }
404 };
405 
406 } // namespace avro
407 #endif
Low level support for encoding avro values.
Type-independent portion of DataFileWriter.
Definition: DataFile.hh:61
void decode(Decoder &d, T &t)
Generic decoder function that makes use of the codec_traits.
Definition: Specific.hh:340
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:129
bool read(T &datum)
Reads the next entry from the data file.
Definition: DataFile.hh:358
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:255
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:173
std::array< uint8_t, SyncSize > DataFileSync
The sync value.
Definition: DataFile.hh:54
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:30
void decr()
Decrements the number of objects yet to read.
Definition: DataFile.hh:225
void close()
Closes the current file.
Definition: DataFile.hh:168
Definition: Node.hh:202
DataFileReader(const char *filename, const ValidSchema &readerSchema)
Constructs the reader for the given file and the reader is expected to use the given schema...
Definition: DataFile.hh:302
int64_t previousSync()
Return the last synchronization point before our current position.
Definition: DataFile.hh:403
An Avro datafile that can store objects of type T.
Definition: DataFile.hh:141
void incr()
Increments the object count.
Definition: DataFile.hh:108
void flush()
Flushes any unwritten data into the file.
Definition: DataFile.hh:178
DataFileReader(const char *filename)
Constructs the reader for the given file and the reader is expected to use the schema that is used wi...
Definition: DataFile.hh:316
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:375
std::shared_ptr< Encoder > EncoderPtr
Shared pointer to Encoder.
Definition: Encoder.hh:147
Decoder & decoder()
Returns the current decoder for this reader.
Definition: DataFile.hh:215
void write(const T &datum)
Writes the given piece of data into the file.
Definition: DataFile.hh:158
std::shared_ptr< Decoder > DecoderPtr
Shared pointer to Decoder.
Definition: Decoder.hh:177
Codec
Specify type of compression to use when writing data files.
Definition: DataFile.hh:40
void encode(Encoder &e, const T &t)
Generic encoder function that makes use of the codec_traits.
Definition: Specific.hh:332
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:370
The type independent portion of rader.
Definition: DataFile.hh:184
A ValidSchema is basically a non-mutable Schema that has passed some minumum of sanity checks...
Definition: ValidSchema.hh:40
void seek(int64_t position)
Move to a specific, known synchronization point, for example one returned from previousSync().
Definition: DataFile.hh:386
Reads the contents of data file one after another.
Definition: DataFile.hh:295
DataFileReader(std::unique_ptr< DataFileReaderBase > base, const ValidSchema &readerSchema)
Constructs a reader using the reader base.
Definition: DataFile.hh:348
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:260
DataFileWriter(const char *filename, const ValidSchema &schema, size_t syncInterval=16 *1024, Codec codec=NULL_CODEC)
Constructs a new data file.
Definition: DataFile.hh:147
DataFileReader(std::unique_ptr< DataFileReaderBase > base)
Constructs a reader using the reader base.
Definition: DataFile.hh:335
bool pastSync(int64_t position)
Return true if past the next synchronization point after a position.
Definition: DataFile.hh:398
The abstract base class for all Avro encoders.
Definition: Encoder.hh:52
Encoder & encoder() const
Returns the current encoder for this writer.
Definition: DataFile.hh:97
void close()
Closes the reader.
Definition: DataFile.hh:380
void sync(int64_t position)
Move to the next synchronization point after a position.
Definition: DataFile.hh:393
Decoder is an interface implemented by every decoder capable of decoding Avro data.
Definition: Decoder.hh:48