Avro C++
DataFile.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * https://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_DataFile_hh__
20 #define avro_DataFile_hh__
21 
22 #include "Config.hh"
23 #include "Encoder.hh"
24 #include "buffer/Buffer.hh"
25 #include "ValidSchema.hh"
26 #include "Specific.hh"
27 #include "Stream.hh"
28 
29 #include <map>
30 #include <string>
31 #include <vector>
32 
33 #include "array"
34 #include "boost/utility.hpp"
35 #include <boost/iostreams/filtering_stream.hpp>
36 
37 namespace avro {
38 
40 enum Codec {
41  NULL_CODEC,
42  DEFLATE_CODEC,
43 
44 #ifdef SNAPPY_CODEC_AVAILABLE
45  SNAPPY_CODEC
46 #endif
47 
48 };
49 
50 const int SyncSize = 16;
54 typedef std::array<uint8_t, SyncSize> DataFileSync;
55 
61 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
62  const std::string filename_;
63  const ValidSchema schema_;
64  const EncoderPtr encoderPtr_;
65  const size_t syncInterval_;
66  Codec codec_;
67 
68  std::unique_ptr<OutputStream> stream_;
69  std::unique_ptr<OutputStream> buffer_;
70  const DataFileSync sync_;
71  int64_t objectCount_;
72 
73  typedef std::map<std::string, std::vector<uint8_t> > Metadata;
74 
75  Metadata metadata_;
76  int64_t lastSync_;
77 
78  static std::unique_ptr<OutputStream> makeStream(const char* filename);
79  static DataFileSync makeSync();
80 
81  void writeHeader();
82  void setMetadata(const std::string& key, const std::string& value);
83 
87  void sync();
88 
92  void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec);
93 
94 public:
98  Encoder& encoder() const { return *encoderPtr_; }
99 
104  void syncIfNeeded();
105 
109  uint64_t getCurrentBlockStart();
110 
114  void incr() {
115  ++objectCount_;
116  }
120  DataFileWriterBase(const char* filename, const ValidSchema& schema,
121  size_t syncInterval, Codec codec = NULL_CODEC);
122  DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
123  const ValidSchema& schema, size_t syncInterval, Codec codec);
124 
130  void close();
131 
135  const ValidSchema& schema() const { return schema_; }
136 
140  void flush();
141 };
142 
146 template <typename T>
147 class DataFileWriter : boost::noncopyable {
148  std::unique_ptr<DataFileWriterBase> base_;
149 public:
153  DataFileWriter(const char* filename, const ValidSchema& schema,
154  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
155  base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) { }
156 
157  DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema& schema,
158  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) :
159  base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) { }
160 
164  void write(const T& datum) {
165  base_->syncIfNeeded();
166  avro::encode(base_->encoder(), datum);
167  base_->incr();
168  }
169 
173  uint64_t getCurrentBlockStart() { return base_->getCurrentBlockStart(); }
174 
175 
180  void close() { base_->close(); }
181 
185  const ValidSchema& schema() const { return base_->schema(); }
186 
190  void flush() { base_->flush(); }
191 };
192 
196 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
197  const std::string filename_;
198  const std::unique_ptr<InputStream> stream_;
199  const DecoderPtr decoder_;
200  int64_t objectCount_;
201  bool eof_;
202  Codec codec_;
203  int64_t blockStart_;
204  int64_t blockEnd_;
205 
206  ValidSchema readerSchema_;
207  ValidSchema dataSchema_;
208  DecoderPtr dataDecoder_;
209  std::unique_ptr<InputStream> dataStream_;
210  typedef std::map<std::string, std::vector<uint8_t> > Metadata;
211 
212  Metadata metadata_;
213  DataFileSync sync_;
214 
215  // for compressed buffer
216  std::unique_ptr<boost::iostreams::filtering_istream> os_;
217  std::vector<char> compressed_;
218  std::string uncompressed;
219  void readHeader();
220 
221  void readDataBlock();
222  void doSeek(int64_t position);
223 public:
227  Decoder& decoder() { return *dataDecoder_; }
228 
232  bool hasMore();
233 
237  void decr() { --objectCount_; }
238 
245  DataFileReaderBase(const char* filename);
246 
247  DataFileReaderBase(std::unique_ptr<InputStream> inputStream);
248 
253  void init();
254 
262  void init(const ValidSchema& readerSchema);
263 
267  const ValidSchema& readerSchema() { return readerSchema_; }
268 
272  const ValidSchema& dataSchema() { return dataSchema_; }
273 
277  void close();
278 
283  void seek(int64_t position);
284 
290  void sync(int64_t position);
291 
295  bool pastSync(int64_t position);
296 
300  int64_t previousSync();
301 };
302 
306 template <typename T>
307 class DataFileReader : boost::noncopyable {
308  std::unique_ptr<DataFileReaderBase> base_;
309 public:
314  DataFileReader(const char* filename, const ValidSchema& readerSchema) :
315  base_(new DataFileReaderBase(filename)) {
316  base_->init(readerSchema);
317  }
318 
319  DataFileReader(std::unique_ptr<InputStream> inputStream, const ValidSchema& readerSchema) :
320  base_(new DataFileReaderBase(std::move(inputStream))) {
321  base_->init(readerSchema);
322  }
323 
328  DataFileReader(const char* filename) :
329  base_(new DataFileReaderBase(filename)) {
330  base_->init();
331  }
332 
333  DataFileReader(std::unique_ptr<InputStream> inputStream) :
334  base_(new DataFileReaderBase(std::move(inputStream))) {
335  base_->init();
336  }
337 
347  DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) {
348  base_->init();
349  }
350 
360  DataFileReader(std::unique_ptr<DataFileReaderBase> base,
361  const ValidSchema& readerSchema) : base_(std::move(base)) {
362  base_->init(readerSchema);
363  }
364 
370  bool read(T& datum) {
371  if (base_->hasMore()) {
372  base_->decr();
373  avro::decode(base_->decoder(), datum);
374  return true;
375  }
376  return false;
377  }
378 
382  const ValidSchema& readerSchema() { return base_->readerSchema(); }
383 
387  const ValidSchema& dataSchema() { return base_->dataSchema(); }
388 
392  void close() { return base_->close(); }
393 
398  void seek(int64_t position) { base_->seek(position); }
399 
405  void sync(int64_t position) { base_->sync(position); }
406 
410  bool pastSync(int64_t position) { return base_->pastSync(position); }
411 
415  int64_t previousSync() { return base_->previousSync(); }
416 };
417 
418 } // namespace avro
419 #endif
Low level support for encoding avro values.
Type-independent portion of DataFileWriter.
Definition: DataFile.hh:61
void decode(Decoder &d, T &t)
Generic decoder function that makes use of the codec_traits.
Definition: Specific.hh:339
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:135
bool read(T &datum)
Reads the next entry from the data file.
Definition: DataFile.hh:370
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:267
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:185
std::array< uint8_t, SyncSize > DataFileSync
The sync value.
Definition: DataFile.hh:54
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:30
void decr()
Decrements the number of objects yet to read.
Definition: DataFile.hh:237
void close()
Closes the current file.
Definition: DataFile.hh:180
Definition: Node.hh:202
DataFileReader(const char *filename, const ValidSchema &readerSchema)
Constructs the reader for the given file and the reader is expected to use the given schema...
Definition: DataFile.hh:314
int64_t previousSync()
Return the last synchronization point before our current position.
Definition: DataFile.hh:415
An Avro datafile that can store objects of type T.
Definition: DataFile.hh:147
void incr()
Increments the object count.
Definition: DataFile.hh:114
void flush()
Flushes any unwritten data into the file.
Definition: DataFile.hh:190
DataFileReader(const char *filename)
Constructs the reader for the given file and the reader is expected to use the schema that is used wi...
Definition: DataFile.hh:328
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:387
std::shared_ptr< Encoder > EncoderPtr
Shared pointer to Encoder.
Definition: Encoder.hh:147
Decoder & decoder()
Returns the current decoder for this reader.
Definition: DataFile.hh:227
void write(const T &datum)
Writes the given piece of data into the file.
Definition: DataFile.hh:164
std::shared_ptr< Decoder > DecoderPtr
Shared pointer to Decoder.
Definition: Decoder.hh:177
Codec
Specify type of compression to use when writing data files.
Definition: DataFile.hh:40
void encode(Encoder &e, const T &t)
Generic encoder function that makes use of the codec_traits.
Definition: Specific.hh:331
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:382
The type independent portion of rader.
Definition: DataFile.hh:196
uint64_t getCurrentBlockStart()
Returns the byte offset (within the current file) of the start of the current block being written...
Definition: DataFile.hh:173
A ValidSchema is basically a non-mutable Schema that has passed some minumum of sanity checks...
Definition: ValidSchema.hh:40
void seek(int64_t position)
Move to a specific, known synchronization point, for example one returned from previousSync().
Definition: DataFile.hh:398
Reads the contents of data file one after another.
Definition: DataFile.hh:307
DataFileReader(std::unique_ptr< DataFileReaderBase > base, const ValidSchema &readerSchema)
Constructs a reader using the reader base.
Definition: DataFile.hh:360
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:272
DataFileWriter(const char *filename, const ValidSchema &schema, size_t syncInterval=16 *1024, Codec codec=NULL_CODEC)
Constructs a new data file.
Definition: DataFile.hh:153
DataFileReader(std::unique_ptr< DataFileReaderBase > base)
Constructs a reader using the reader base.
Definition: DataFile.hh:347
bool pastSync(int64_t position)
Return true if past the next synchronization point after a position.
Definition: DataFile.hh:410
The abstract base class for all Avro encoders.
Definition: Encoder.hh:52
Encoder & encoder() const
Returns the current encoder for this writer.
Definition: DataFile.hh:98
void close()
Closes the reader.
Definition: DataFile.hh:392
void sync(int64_t position)
Move to the next synchronization point after a position.
Definition: DataFile.hh:405
Decoder is an interface implemented by every decoder capable of decoding Avro data.
Definition: Decoder.hh:48