Avro C++
DataFile.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * https://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_DataFile_hh__
20 #define avro_DataFile_hh__
21 
22 #include "Config.hh"
23 #include "Encoder.hh"
24 #include "Specific.hh"
25 #include "Stream.hh"
26 #include "ValidSchema.hh"
27 #include "buffer/Buffer.hh"
28 
29 #include <map>
30 #include <string>
31 #include <vector>
32 
33 #include "array"
34 #include "boost/utility.hpp"
35 #include <boost/iostreams/filtering_stream.hpp>
36 
37 namespace avro {
38 
40 enum Codec {
41  NULL_CODEC,
42  DEFLATE_CODEC,
43 
44 #ifdef SNAPPY_CODEC_AVAILABLE
45  SNAPPY_CODEC
46 #endif
47 
48 };
49 
50 const int SyncSize = 16;
54 typedef std::array<uint8_t, SyncSize> DataFileSync;
55 
61 class AVRO_DECL DataFileWriterBase : boost::noncopyable {
62  const std::string filename_;
63  const ValidSchema schema_;
64  const EncoderPtr encoderPtr_;
65  const size_t syncInterval_;
66  Codec codec_;
67 
68  std::unique_ptr<OutputStream> stream_;
69  std::unique_ptr<OutputStream> buffer_;
70  const DataFileSync sync_;
71  int64_t objectCount_;
72 
73  typedef std::map<std::string, std::vector<uint8_t>> Metadata;
74 
75  Metadata metadata_;
76  int64_t lastSync_;
77 
78  static std::unique_ptr<OutputStream> makeStream(const char *filename);
79  static DataFileSync makeSync();
80 
81  void writeHeader();
82  void setMetadata(const std::string &key, const std::string &value);
83 
87  void sync();
88 
92  void init(const ValidSchema &schema, size_t syncInterval, const Codec &codec);
93 
94 public:
98  Encoder &encoder() const { return *encoderPtr_; }
99 
104  void syncIfNeeded();
105 
109  uint64_t getCurrentBlockStart() const;
110 
114  void incr() {
115  ++objectCount_;
116  }
120  DataFileWriterBase(const char *filename, const ValidSchema &schema,
121  size_t syncInterval, Codec codec = NULL_CODEC);
122  DataFileWriterBase(std::unique_ptr<OutputStream> outputStream,
123  const ValidSchema &schema, size_t syncInterval, Codec codec);
124 
130  void close();
131 
135  const ValidSchema &schema() const { return schema_; }
136 
140  void flush();
141 };
142 
146 template<typename T>
147 class DataFileWriter : boost::noncopyable {
148  std::unique_ptr<DataFileWriterBase> base_;
149 
150 public:
154  DataFileWriter(const char *filename, const ValidSchema &schema,
155  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(filename, schema, syncInterval, codec)) {}
156 
157  DataFileWriter(std::unique_ptr<OutputStream> outputStream, const ValidSchema &schema,
158  size_t syncInterval = 16 * 1024, Codec codec = NULL_CODEC) : base_(new DataFileWriterBase(std::move(outputStream), schema, syncInterval, codec)) {}
159 
163  void write(const T &datum) {
164  base_->syncIfNeeded();
165  avro::encode(base_->encoder(), datum);
166  base_->incr();
167  }
168 
172  uint64_t getCurrentBlockStart() { return base_->getCurrentBlockStart(); }
173 
178  void close() { base_->close(); }
179 
183  const ValidSchema &schema() const { return base_->schema(); }
184 
188  void flush() { base_->flush(); }
189 };
190 
194 class AVRO_DECL DataFileReaderBase : boost::noncopyable {
195  const std::string filename_;
196  const std::unique_ptr<InputStream> stream_;
197  const DecoderPtr decoder_;
198  int64_t objectCount_;
199  bool eof_;
200  Codec codec_;
201  int64_t blockStart_{};
202  int64_t blockEnd_{};
203 
204  ValidSchema readerSchema_;
205  ValidSchema dataSchema_;
206  DecoderPtr dataDecoder_;
207  std::unique_ptr<InputStream> dataStream_;
208  typedef std::map<std::string, std::vector<uint8_t>> Metadata;
209 
210  Metadata metadata_;
211  DataFileSync sync_{};
212 
213  // for compressed buffer
214  std::unique_ptr<boost::iostreams::filtering_istream> os_;
215  std::vector<char> compressed_;
216  std::string uncompressed;
217  void readHeader();
218 
219  void readDataBlock();
220  void doSeek(int64_t position);
221 
222 public:
226  Decoder &decoder() { return *dataDecoder_; }
227 
231  bool hasMore();
232 
236  void decr() { --objectCount_; }
237 
244  explicit DataFileReaderBase(const char *filename);
245 
246  explicit DataFileReaderBase(std::unique_ptr<InputStream> inputStream);
247 
252  void init();
253 
261  void init(const ValidSchema &readerSchema);
262 
266  const ValidSchema &readerSchema() { return readerSchema_; }
267 
271  const ValidSchema &dataSchema() { return dataSchema_; }
272 
276  void close();
277 
282  void seek(int64_t position);
283 
289  void sync(int64_t position);
290 
294  bool pastSync(int64_t position);
295 
299  int64_t previousSync() const;
300 };
301 
305 template<typename T>
306 class DataFileReader : boost::noncopyable {
307  std::unique_ptr<DataFileReaderBase> base_;
308 
309 public:
314  DataFileReader(const char *filename, const ValidSchema &readerSchema) : base_(new DataFileReaderBase(filename)) {
315  base_->init(readerSchema);
316  }
317 
318  DataFileReader(std::unique_ptr<InputStream> inputStream, const ValidSchema &readerSchema) : base_(new DataFileReaderBase(std::move(inputStream))) {
319  base_->init(readerSchema);
320  }
321 
326  explicit DataFileReader(const char *filename) : base_(new DataFileReaderBase(filename)) {
327  base_->init();
328  }
329 
330  explicit DataFileReader(std::unique_ptr<InputStream> inputStream) : base_(new DataFileReaderBase(std::move(inputStream))) {
331  base_->init();
332  }
333 
343  explicit DataFileReader(std::unique_ptr<DataFileReaderBase> base) : base_(std::move(base)) {
344  base_->init();
345  }
346 
356  DataFileReader(std::unique_ptr<DataFileReaderBase> base,
357  const ValidSchema &readerSchema) : base_(std::move(base)) {
358  base_->init(readerSchema);
359  }
360 
366  bool read(T &datum) {
367  if (base_->hasMore()) {
368  base_->decr();
369  avro::decode(base_->decoder(), datum);
370  return true;
371  }
372  return false;
373  }
374 
378  const ValidSchema &readerSchema() { return base_->readerSchema(); }
379 
383  const ValidSchema &dataSchema() { return base_->dataSchema(); }
384 
388  void close() { return base_->close(); }
389 
394  void seek(int64_t position) { base_->seek(position); }
395 
401  void sync(int64_t position) { base_->sync(position); }
402 
406  bool pastSync(int64_t position) { return base_->pastSync(position); }
407 
411  int64_t previousSync() { return base_->previousSync(); }
412 };
413 
414 } // namespace avro
415 #endif
avro::DataFileReader::DataFileReader
DataFileReader(const char *filename)
Constructs the reader for the given file and the reader is expected to use the schema that is used wi...
Definition: DataFile.hh:326
avro::DataFileReaderBase::dataSchema
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:271
avro::DataFileReader::seek
void seek(int64_t position)
Move to a specific, known synchronization point, for example one returned from previousSync().
Definition: DataFile.hh:394
avro::DataFileReaderBase
The type independent portion of reader.
Definition: DataFile.hh:194
avro::DataFileWriter::write
void write(const T &datum)
Writes the given piece of data into the file.
Definition: DataFile.hh:163
avro::Encoder
The abstract base class for all Avro encoders.
Definition: Encoder.hh:52
avro::DataFileWriterBase::incr
void incr()
Increments the object count.
Definition: DataFile.hh:114
avro::DataFileReader::readerSchema
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:378
avro::Decoder
Decoder is an interface implemented by every decoder capable of decoding Avro data.
Definition: Decoder.hh:48
avro::DecoderPtr
std::shared_ptr< Decoder > DecoderPtr
Shared pointer to Decoder.
Definition: Decoder.hh:177
avro::DataFileReader::DataFileReader
DataFileReader(std::unique_ptr< DataFileReaderBase > base)
Constructs a reader using the reader base.
Definition: DataFile.hh:343
avro::DataFileReader::pastSync
bool pastSync(int64_t position)
Return true if past the next synchronization point after a position.
Definition: DataFile.hh:406
avro::DataFileReader
Reads the contents of data file one after another.
Definition: DataFile.hh:306
avro::DataFileWriterBase::encoder
Encoder & encoder() const
Returns the current encoder for this writer.
Definition: DataFile.hh:98
avro::DataFileReader::DataFileReader
DataFileReader(std::unique_ptr< DataFileReaderBase > base, const ValidSchema &readerSchema)
Constructs a reader using the reader base.
Definition: DataFile.hh:356
avro::DataFileReader::close
void close()
Closes the reader.
Definition: DataFile.hh:388
avro::DataFileWriter
An Avro datafile that can store objects of type T.
Definition: DataFile.hh:147
avro::DataFileWriterBase
Type-independent portion of DataFileWriter.
Definition: DataFile.hh:61
avro::DataFileReader::read
bool read(T &datum)
Reads the next entry from the data file.
Definition: DataFile.hh:366
avro::DataFileWriter::DataFileWriter
DataFileWriter(const char *filename, const ValidSchema &schema, size_t syncInterval=16 *1024, Codec codec=NULL_CODEC)
Constructs a new data file.
Definition: DataFile.hh:154
avro::DataFileSync
std::array< uint8_t, SyncSize > DataFileSync
The sync value.
Definition: DataFile.hh:54
avro::DataFileReader::sync
void sync(int64_t position)
Move to the next synchronization point after a position.
Definition: DataFile.hh:401
avro
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:30
avro::EncoderPtr
std::shared_ptr< Encoder > EncoderPtr
Shared pointer to Encoder.
Definition: Encoder.hh:147
avro::ValidSchema
A ValidSchema is basically a non-mutable Schema that has passed some minimum of sanity checks.
Definition: ValidSchema.hh:40
avro::DataFileReaderBase::decr
void decr()
Decrements the number of objects yet to read.
Definition: DataFile.hh:236
avro::encode
void encode(Encoder &e, const T &t)
Generic encoder function that makes use of the codec_traits.
Definition: Specific.hh:343
avro::decode
void decode(Decoder &d, T &t)
Generic decoder function that makes use of the codec_traits.
Definition: Specific.hh:351
avro::DataFileReader::DataFileReader
DataFileReader(const char *filename, const ValidSchema &readerSchema)
Constructs the reader for the given file and the reader is expected to use the given schema.
Definition: DataFile.hh:314
Encoder.hh
avro::DataFileReader::dataSchema
const ValidSchema & dataSchema()
Returns the schema stored with the data file.
Definition: DataFile.hh:383
avro::DataFileReaderBase::decoder
Decoder & decoder()
Returns the current decoder for this reader.
Definition: DataFile.hh:226
avro::DataFileWriter::schema
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:183
avro::Codec
Codec
Specify type of compression to use when writing data files.
Definition: DataFile.hh:40
avro::DataFileReaderBase::readerSchema
const ValidSchema & readerSchema()
Returns the schema for this object.
Definition: DataFile.hh:266
avro::DataFileWriter::close
void close()
Closes the current file.
Definition: DataFile.hh:178
avro::DataFileReader::previousSync
int64_t previousSync()
Return the last synchronization point before our current position.
Definition: DataFile.hh:411
avro::DataFileWriter::getCurrentBlockStart
uint64_t getCurrentBlockStart()
Returns the byte offset (within the current file) of the start of the current block being written.
Definition: DataFile.hh:172
avro::DataFileWriter::flush
void flush()
Flushes any unwritten data into the file.
Definition: DataFile.hh:188
avro::DataFileWriterBase::schema
const ValidSchema & schema() const
Returns the schema for this data file.
Definition: DataFile.hh:135