Avro C++
Stream.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * https://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_Stream_hh__
20 #define avro_Stream_hh__
21 
22 #include <memory>
23 #include <string.h>
24 #include <stdint.h>
25 
26 #include "boost/utility.hpp"
27 
28 #include "Config.hh"
29 #include "Exception.hh"
30 
31 namespace avro {
32 
36 class AVRO_DECL InputStream : boost::noncopyable {
37 protected:
38 
43 
44 public:
48  virtual ~InputStream() { }
49 
56  virtual bool next(const uint8_t** data, size_t* len) = 0;
57 
63  virtual void backup(size_t len) = 0;
64 
68  virtual void skip(size_t len) = 0;
69 
75  virtual size_t byteCount() const = 0;
76 };
77 
78 typedef std::unique_ptr<InputStream> InputStreamPtr;
79 
83 class AVRO_DECL SeekableInputStream : public InputStream {
84 protected:
85 
90 
91 public:
95  virtual ~SeekableInputStream() { }
96 
102  virtual void seek(int64_t position) = 0;
103 };
104 
105 typedef std::unique_ptr<SeekableInputStream> SeekableInputStreamPtr;
106 
110 class AVRO_DECL OutputStream : boost::noncopyable {
111 protected:
112 
117 public:
118 
122  virtual ~OutputStream() { }
123 
129  virtual bool next(uint8_t** data, size_t* len) = 0;
130 
135  virtual void backup(size_t len) = 0;
136 
142  virtual uint64_t byteCount() const = 0;
143 
148  virtual void flush() = 0;
149 };
150 
151 typedef std::unique_ptr<OutputStream> OutputStreamPtr;
152 
156 AVRO_DECL OutputStreamPtr memoryOutputStream(size_t chunkSize = 4 * 1024);
157 
163 AVRO_DECL InputStreamPtr memoryInputStream(const uint8_t* data, size_t len);
164 
172 AVRO_DECL InputStreamPtr memoryInputStream(const OutputStream& source);
173 
179 AVRO_DECL std::shared_ptr<std::vector<uint8_t> > snapshot(const OutputStream& source);
180 
188 AVRO_DECL OutputStreamPtr fileOutputStream(const char* filename,
189  size_t bufferSize = 8 * 1024);
190 
195 AVRO_DECL InputStreamPtr fileInputStream(
196  const char *filename, size_t bufferSize = 8 * 1024);
197 AVRO_DECL SeekableInputStreamPtr fileSeekableInputStream(
198  const char *filename, size_t bufferSize = 8 * 1024);
199 
205 AVRO_DECL OutputStreamPtr ostreamOutputStream(std::ostream& os,
206  size_t bufferSize = 8 * 1024);
207 
213 AVRO_DECL InputStreamPtr istreamInputStream(
214  std::istream &in, size_t bufferSize = 8 * 1024);
215 
226 AVRO_DECL InputStreamPtr nonSeekableIstreamInputStream(
227  std::istream& is, size_t bufferSize = 8 * 1024);
228 
229 
231 struct StreamReader {
236 
240  const uint8_t* next_;
241 
245  const uint8_t* end_;
246 
250  StreamReader() : in_(0), next_(0), end_(0) { }
251 
255  StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
256 
261  void reset(InputStream& is) {
262  if (in_ != 0 && end_ != next_) {
263  in_->backup(end_ - next_);
264  }
265  in_ = &is;
266  next_ = end_ = 0;
267  }
268 
273  uint8_t read() {
274  if (next_ == end_) {
275  more();
276  }
277  return *next_++;
278  }
279 
284  void readBytes(uint8_t* b, size_t n) {
285  while (n > 0) {
286  if (next_ == end_) {
287  more();
288  }
289  size_t q = end_ - next_;
290  if (q > n) {
291  q = n;
292  }
293  ::memcpy(b, next_, q);
294  next_ += q;
295  b += q;
296  n -= q;
297  }
298  }
299 
304  void skipBytes(size_t n) {
305  if (n > static_cast<size_t>(end_ - next_)) {
306  n -= end_ - next_;
307  next_ = end_;
308  in_->skip(n);
309  } else {
310  next_ += n;
311  }
312  }
313 
320  bool fill() {
321  size_t n = 0;
322  while (in_->next(&next_, &n)) {
323  if (n != 0) {
324  end_ = next_ + n;
325  return true;
326  }
327  }
328  return false;
329  }
330 
334  void more() {
335  if (! fill()) {
336  throw Exception("EOF reached");
337  }
338  }
339 
343  bool hasMore() {
344  return (next_ == end_) ? fill() : true;
345  }
346 
351  void drain(bool unRead) {
352  if (unRead) {
353  --next_;
354  }
355  in_->backup(end_ - next_);
356  end_ = next_;
357  }
358 };
359 
363 struct StreamWriter {
368 
372  uint8_t* next_;
373 
377  uint8_t* end_;
378 
382  StreamWriter() : out_(0), next_(0), end_(0) { }
383 
387  StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
388 
393  void reset(OutputStream& os) {
394  if (out_ != 0 && end_ != next_) {
395  out_->backup(end_ - next_);
396  }
397  out_ = &os;
398  next_ = end_;
399  }
400 
404  void write(uint8_t c) {
405  if (next_ == end_) {
406  more();
407  }
408  *next_++ = c;
409  }
410 
414  void writeBytes(const uint8_t* b, size_t n) {
415  while (n > 0) {
416  if (next_ == end_) {
417  more();
418  }
419  size_t q = end_ - next_;
420  if (q > n) {
421  q = n;
422  }
423  ::memcpy(next_, b, q);
424  next_ += q;
425  b += q;
426  n -= q;
427  }
428  }
429 
434  void flush() {
435  if (next_ != end_) {
436  out_->backup(end_ - next_);
437  next_ = end_;
438  }
439  out_->flush();
440  }
441 
446  int64_t byteCount() const {
447  return out_->byteCount();
448  }
449 
453  void more() {
454  size_t n = 0;
455  while (out_->next(&next_, &n)) {
456  if (n != 0) {
457  end_ = next_ + n;
458  return;
459  }
460  }
461  throw Exception("EOF reached");
462  }
463 };
464 
469 inline void copy(InputStream& in, OutputStream& out)
470 {
471  const uint8_t *p = 0;
472  size_t n = 0;
473  StreamWriter w(out);
474  while (in.next(&p, &n)) {
475  w.writeBytes(p, n);
476  }
477  w.flush();
478 }
479 
480 } // namespace avro
481 #endif
482 
483 
void copy(InputStream &in, OutputStream &out)
A convenience function to copy all the contents of an input stream into an output stream...
Definition: Stream.hh:469
StreamWriter(OutputStream &out)
Constructs a new writer with the given underlying stream.
Definition: Stream.hh:387
virtual void backup(size_t len)=0
"Returns" back some of the data to the stream.
AVRO_DECL OutputStreamPtr memoryOutputStream(size_t chunkSize=4 *1024)
Returns a new OutputStream, which grows in memory chunks of specified size.
const uint8_t * next_
The next location to read from.
Definition: Stream.hh:240
int64_t byteCount() const
Return the number of bytes written so far.
Definition: Stream.hh:446
void flush()
backs up upto the currently written data and flushes the underlying stream.
Definition: Stream.hh:434
StreamReader(InputStream &in)
Constructs a reader with the given underlying stream.
Definition: Stream.hh:255
uint8_t * end_
One past the last location one can write to.
Definition: Stream.hh:377
StreamReader()
Constructs an empty reader.
Definition: Stream.hh:250
virtual ~InputStream()
Destructor.
Definition: Stream.hh:48
virtual uint64_t byteCount() const =0
Number of bytes written so far into this stream.
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:30
void skipBytes(size_t n)
Skips the given number of bytes.
Definition: Stream.hh:304
void writeBytes(const uint8_t *b, size_t n)
Writes the specified number of bytes starting at b.
Definition: Stream.hh:414
void readBytes(uint8_t *b, size_t n)
Reads the given number of bytes from the underlying stream.
Definition: Stream.hh:284
bool hasMore()
Returns true if and only if the end of stream is not reached.
Definition: Stream.hh:343
virtual ~SeekableInputStream()
Destructor.
Definition: Stream.hh:95
void more()
Gets more space to write to.
Definition: Stream.hh:453
InputStream * in_
The underlying input stream.
Definition: Stream.hh:235
An InputStream which also supports seeking to a specific offset.
Definition: Stream.hh:83
void drain(bool unRead)
Returns unused bytes back to the underlying stream.
Definition: Stream.hh:351
void write(uint8_t c)
Writes a single byte.
Definition: Stream.hh:404
StreamWriter()
Constructs a writer with no underlying stream.
Definition: Stream.hh:382
virtual bool next(uint8_t **data, size_t *len)=0
Returns a buffer that can be written into.
void reset(OutputStream &os)
Replaces the current underlying stream with a new one.
Definition: Stream.hh:393
virtual void backup(size_t len)=0
"Returns" back to the stream some of the buffer obtained from in the last call to next()...
AVRO_DECL InputStreamPtr nonSeekableIstreamInputStream(std::istream &is, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given std::istream.
AVRO_DECL OutputStreamPtr fileOutputStream(const char *filename, size_t bufferSize=8 *1024)
Returns a new OutputStream whose contents would be stored in a file.
const uint8_t * end_
One past the last valid location.
Definition: Stream.hh:245
uint8_t * next_
The next location to write to.
Definition: Stream.hh:372
void reset(InputStream &is)
Replaces the current input stream with the given one after backing up the original one if required...
Definition: Stream.hh:261
A convenience class for reading from an InputStream.
Definition: Stream.hh:231
AVRO_DECL InputStreamPtr fileInputStream(const char *filename, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given file.
SeekableInputStream()
An empty constuctor.
Definition: Stream.hh:89
A no-copy output stream.
Definition: Stream.hh:110
Wrapper for std::runtime_error that provides convenience constructor for boost::format objects...
Definition: Exception.hh:31
AVRO_DECL std::shared_ptr< std::vector< uint8_t > > snapshot(const OutputStream &source)
Returns the contents written so far into the output stream, which should be a memory output stream...
AVRO_DECL InputStreamPtr istreamInputStream(std::istream &in, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given std::istream.
bool fill()
Get as many byes from the underlying stream as possible in a single chunk.
Definition: Stream.hh:320
A no-copy input stream.
Definition: Stream.hh:36
InputStream()
An empty constuctor.
Definition: Stream.hh:42
virtual void flush()=0
Flushes any data remaining in the buffer to the stream&#39;s underlying store, if any.
OutputStream()
An empty constuctor.
Definition: Stream.hh:116
OutputStream * out_
The underlying output stream for this writer.
Definition: Stream.hh:367
AVRO_DECL OutputStreamPtr ostreamOutputStream(std::ostream &os, size_t bufferSize=8 *1024)
Returns a new OutputStream whose contents will be sent to the given std::ostream. ...
AVRO_DECL InputStreamPtr memoryInputStream(const uint8_t *data, size_t len)
Returns a new InputStream, with the data from the given byte array.
A convinience class to write data into an OutputStream.
Definition: Stream.hh:363
void more()
Tries to get more data and if it cannot, throws an exception.
Definition: Stream.hh:334
virtual void skip(size_t len)=0
Skips number of bytes specified by len.
virtual ~OutputStream()
Destructor.
Definition: Stream.hh:122
virtual bool next(const uint8_t **data, size_t *len)=0
Returns some of available data.
uint8_t read()
Read just one byte from the underlying stream.
Definition: Stream.hh:273