Avro C++
Stream.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * https://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_Stream_hh__
20 #define avro_Stream_hh__
21 
22 #include <cstdint>
23 #include <cstring>
24 #include <memory>
25 
26 #include "boost/utility.hpp"
27 
28 #include "Config.hh"
29 #include "Exception.hh"
30 
31 namespace avro {
32 
36 class AVRO_DECL InputStream : boost::noncopyable {
37 protected:
41  InputStream() = default;
42 
43 public:
47  virtual ~InputStream() = default;
48 
55  virtual bool next(const uint8_t **data, size_t *len) = 0;
56 
62  virtual void backup(size_t len) = 0;
63 
67  virtual void skip(size_t len) = 0;
68 
74  virtual size_t byteCount() const = 0;
75 };
76 
77 typedef std::unique_ptr<InputStream> InputStreamPtr;
78 
82 class AVRO_DECL SeekableInputStream : public InputStream {
83 protected:
87  SeekableInputStream() = default;
88 
89 public:
93  ~SeekableInputStream() override = default;
94 
100  virtual void seek(int64_t position) = 0;
101 };
102 
103 typedef std::unique_ptr<SeekableInputStream> SeekableInputStreamPtr;
104 
108 class AVRO_DECL OutputStream : boost::noncopyable {
109 protected:
113  OutputStream() = default;
114 
115 public:
119  virtual ~OutputStream() = default;
120 
126  virtual bool next(uint8_t **data, size_t *len) = 0;
127 
132  virtual void backup(size_t len) = 0;
133 
139  virtual uint64_t byteCount() const = 0;
140 
145  virtual void flush() = 0;
146 };
147 
148 typedef std::unique_ptr<OutputStream> OutputStreamPtr;
149 
153 AVRO_DECL OutputStreamPtr memoryOutputStream(size_t chunkSize = 4 * 1024);
154 
160 AVRO_DECL InputStreamPtr memoryInputStream(const uint8_t *data, size_t len);
161 
169 AVRO_DECL InputStreamPtr memoryInputStream(const OutputStream &source);
170 
176 AVRO_DECL std::shared_ptr<std::vector<uint8_t>> snapshot(const OutputStream &source);
177 
185 AVRO_DECL OutputStreamPtr fileOutputStream(const char *filename,
186  size_t bufferSize = 8 * 1024);
187 
192 AVRO_DECL InputStreamPtr fileInputStream(
193  const char *filename, size_t bufferSize = 8 * 1024);
194 AVRO_DECL SeekableInputStreamPtr fileSeekableInputStream(
195  const char *filename, size_t bufferSize = 8 * 1024);
196 
202 AVRO_DECL OutputStreamPtr ostreamOutputStream(std::ostream &os,
203  size_t bufferSize = 8 * 1024);
204 
210 AVRO_DECL InputStreamPtr istreamInputStream(
211  std::istream &in, size_t bufferSize = 8 * 1024);
212 
223 AVRO_DECL InputStreamPtr nonSeekableIstreamInputStream(
224  std::istream &is, size_t bufferSize = 8 * 1024);
225 
227 struct StreamReader {
232 
236  const uint8_t *next_;
237 
241  const uint8_t *end_;
242 
246  StreamReader() : in_(nullptr), next_(nullptr), end_(nullptr) {}
247 
251  explicit StreamReader(InputStream &in) : in_(nullptr), next_(nullptr), end_(nullptr) { reset(in); }
252 
257  void reset(InputStream &is) {
258  if (in_ != nullptr && end_ != next_) {
259  in_->backup(end_ - next_);
260  }
261  in_ = &is;
262  next_ = end_ = nullptr;
263  }
264 
269  uint8_t read() {
270  if (next_ == end_) {
271  more();
272  }
273  return *next_++;
274  }
275 
280  void readBytes(uint8_t *b, size_t n) {
281  while (n > 0) {
282  if (next_ == end_) {
283  more();
284  }
285  size_t q = end_ - next_;
286  if (q > n) {
287  q = n;
288  }
289  ::memcpy(b, next_, q);
290  next_ += q;
291  b += q;
292  n -= q;
293  }
294  }
295 
300  void skipBytes(size_t n) {
301  if (n > static_cast<size_t>(end_ - next_)) {
302  n -= end_ - next_;
303  next_ = end_;
304  in_->skip(n);
305  } else {
306  next_ += n;
307  }
308  }
309 
316  bool fill() {
317  size_t n = 0;
318  while (in_->next(&next_, &n)) {
319  if (n != 0) {
320  end_ = next_ + n;
321  return true;
322  }
323  }
324  return false;
325  }
326 
330  void more() {
331  if (!fill()) {
332  throw Exception("EOF reached");
333  }
334  }
335 
339  bool hasMore() {
340  return next_ != end_ || fill();
341  }
342 
347  void drain(bool unRead) {
348  if (unRead) {
349  --next_;
350  }
351  in_->backup(end_ - next_);
352  end_ = next_;
353  }
354 };
355 
359 struct StreamWriter {
364 
368  uint8_t *next_;
369 
373  uint8_t *end_;
374 
378  StreamWriter() : out_(nullptr), next_(nullptr), end_(nullptr) {}
379 
383  explicit StreamWriter(OutputStream &out) : out_(nullptr), next_(nullptr), end_(nullptr) { reset(out); }
384 
389  void reset(OutputStream &os) {
390  if (out_ != nullptr && end_ != next_) {
391  out_->backup(end_ - next_);
392  }
393  out_ = &os;
394  next_ = end_;
395  }
396 
400  void write(uint8_t c) {
401  if (next_ == end_) {
402  more();
403  }
404  *next_++ = c;
405  }
406 
410  void writeBytes(const uint8_t *b, size_t n) {
411  while (n > 0) {
412  if (next_ == end_) {
413  more();
414  }
415  size_t q = end_ - next_;
416  if (q > n) {
417  q = n;
418  }
419  ::memcpy(next_, b, q);
420  next_ += q;
421  b += q;
422  n -= q;
423  }
424  }
425 
430  void flush() {
431  if (next_ != end_) {
432  out_->backup(end_ - next_);
433  next_ = end_;
434  }
435  out_->flush();
436  }
437 
442  int64_t byteCount() const {
443  return out_->byteCount();
444  }
445 
449  void more() {
450  size_t n = 0;
451  while (out_->next(&next_, &n)) {
452  if (n != 0) {
453  end_ = next_ + n;
454  return;
455  }
456  }
457  throw Exception("EOF reached");
458  }
459 };
460 
465 inline void copy(InputStream &in, OutputStream &out) {
466  const uint8_t *p = nullptr;
467  size_t n = 0;
468  StreamWriter w(out);
469  while (in.next(&p, &n)) {
470  w.writeBytes(p, n);
471  }
472  w.flush();
473 }
474 
475 } // namespace avro
476 #endif
avro::StreamReader::hasMore
bool hasMore()
Returns true if and only if the end of stream is not reached.
Definition: Stream.hh:339
avro::StreamWriter::reset
void reset(OutputStream &os)
Replaces the current underlying stream with a new one.
Definition: Stream.hh:389
avro::StreamWriter::writeBytes
void writeBytes(const uint8_t *b, size_t n)
Writes the specified number of bytes starting at b.
Definition: Stream.hh:410
avro::OutputStream
A no-copy output stream.
Definition: Stream.hh:108
avro::StreamReader::fill
bool fill()
Get as many byes from the underlying stream as possible in a single chunk.
Definition: Stream.hh:316
avro::SeekableInputStream
An InputStream which also supports seeking to a specific offset.
Definition: Stream.hh:82
avro::OutputStream::flush
virtual void flush()=0
Flushes any data remaining in the buffer to the stream's underlying store, if any.
avro::StreamReader::in_
InputStream * in_
The underlying input stream.
Definition: Stream.hh:231
avro::fileOutputStream
AVRO_DECL OutputStreamPtr fileOutputStream(const char *filename, size_t bufferSize=8 *1024)
Returns a new OutputStream whose contents would be stored in a file.
avro::nonSeekableIstreamInputStream
AVRO_DECL InputStreamPtr nonSeekableIstreamInputStream(std::istream &is, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given std::istream.
avro::memoryInputStream
AVRO_DECL InputStreamPtr memoryInputStream(const uint8_t *data, size_t len)
Returns a new InputStream, with the data from the given byte array.
avro::ostreamOutputStream
AVRO_DECL OutputStreamPtr ostreamOutputStream(std::ostream &os, size_t bufferSize=8 *1024)
Returns a new OutputStream whose contents will be sent to the given std::ostream.
avro::StreamWriter::flush
void flush()
backs up upto the currently written data and flushes the underlying stream.
Definition: Stream.hh:430
avro::memoryOutputStream
AVRO_DECL OutputStreamPtr memoryOutputStream(size_t chunkSize=4 *1024)
Returns a new OutputStream, which grows in memory chunks of specified size.
avro::StreamReader::skipBytes
void skipBytes(size_t n)
Skips the given number of bytes.
Definition: Stream.hh:300
avro::StreamReader::reset
void reset(InputStream &is)
Replaces the current input stream with the given one after backing up the original one if required.
Definition: Stream.hh:257
avro::InputStream
A no-copy input stream.
Definition: Stream.hh:36
avro::StreamWriter::end_
uint8_t * end_
One past the last location one can write to.
Definition: Stream.hh:373
avro::StreamWriter::next_
uint8_t * next_
The next location to write to.
Definition: Stream.hh:368
avro::InputStream::skip
virtual void skip(size_t len)=0
Skips number of bytes specified by len.
avro::InputStream::next
virtual bool next(const uint8_t **data, size_t *len)=0
Returns some of available data.
avro::StreamReader
A convenience class for reading from an InputStream.
Definition: Stream.hh:227
avro
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:30
avro::StreamWriter::byteCount
int64_t byteCount() const
Return the number of bytes written so far.
Definition: Stream.hh:442
avro::StreamReader::more
void more()
Tries to get more data and if it cannot, throws an exception.
Definition: Stream.hh:330
avro::fileInputStream
AVRO_DECL InputStreamPtr fileInputStream(const char *filename, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given file.
avro::StreamWriter::StreamWriter
StreamWriter()
Constructs a writer with no underlying stream.
Definition: Stream.hh:378
avro::snapshot
AVRO_DECL std::shared_ptr< std::vector< uint8_t > > snapshot(const OutputStream &source)
Returns the contents written so far into the output stream, which should be a memory output stream.
avro::StreamReader::drain
void drain(bool unRead)
Returns unused bytes back to the underlying stream.
Definition: Stream.hh:347
avro::StreamReader::next_
const uint8_t * next_
The next location to read from.
Definition: Stream.hh:236
avro::StreamReader::end_
const uint8_t * end_
One past the last valid location.
Definition: Stream.hh:241
avro::StreamWriter::more
void more()
Gets more space to write to.
Definition: Stream.hh:449
avro::istreamInputStream
AVRO_DECL InputStreamPtr istreamInputStream(std::istream &in, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given std::istream.
avro::StreamReader::readBytes
void readBytes(uint8_t *b, size_t n)
Reads the given number of bytes from the underlying stream.
Definition: Stream.hh:280
avro::OutputStream::backup
virtual void backup(size_t len)=0
"Returns" back to the stream some of the buffer obtained from in the last call to next().
avro::StreamReader::StreamReader
StreamReader(InputStream &in)
Constructs a reader with the given underlying stream.
Definition: Stream.hh:251
avro::copy
void copy(InputStream &in, OutputStream &out)
A convenience function to copy all the contents of an input stream into an output stream.
Definition: Stream.hh:465
avro::OutputStream::next
virtual bool next(uint8_t **data, size_t *len)=0
Returns a buffer that can be written into.
avro::StreamWriter
A convenience class to write data into an OutputStream.
Definition: Stream.hh:359
avro::StreamWriter::out_
OutputStream * out_
The underlying output stream for this writer.
Definition: Stream.hh:363
avro::StreamReader::StreamReader
StreamReader()
Constructs an empty reader.
Definition: Stream.hh:246
avro::InputStream::backup
virtual void backup(size_t len)=0
"Returns" back some of the data to the stream.
avro::OutputStream::byteCount
virtual uint64_t byteCount() const =0
Number of bytes written so far into this stream.
avro::StreamWriter::write
void write(uint8_t c)
Writes a single byte.
Definition: Stream.hh:400
avro::StreamWriter::StreamWriter
StreamWriter(OutputStream &out)
Constructs a new writer with the given underlying stream.
Definition: Stream.hh:383
avro::Exception
Wrapper for std::runtime_error that provides convenience constructor for boost::format objects.
Definition: Exception.hh:31
avro::StreamReader::read
uint8_t read()
Read just one byte from the underlying stream.
Definition: Stream.hh:269