Avro C++
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator
Stream.hh
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #ifndef avro_Stream_hh__
20 #define avro_Stream_hh__
21 
22 #include <memory>
23 #include <string.h>
24 #include <stdint.h>
25 
26 #include "boost/utility.hpp"
27 
28 #include "Config.hh"
29 #include "Exception.hh"
30 
31 namespace avro {
32 
36 class AVRO_DECL InputStream : boost::noncopyable {
37 protected:
38 
43 
44 public:
48  virtual ~InputStream() { }
49 
56  virtual bool next(const uint8_t** data, size_t* len) = 0;
57 
63  virtual void backup(size_t len) = 0;
64 
68  virtual void skip(size_t len) = 0;
69 
75  virtual size_t byteCount() const = 0;
76 };
77 
81 class AVRO_DECL OutputStream : boost::noncopyable {
82 protected:
83 
88 public:
89 
93  virtual ~OutputStream() { }
94 
100  virtual bool next(uint8_t** data, size_t* len) = 0;
101 
106  virtual void backup(size_t len) = 0;
107 
113  virtual uint64_t byteCount() const = 0;
114 
119  virtual void flush() = 0;
120 };
121 
125 AVRO_DECL std::auto_ptr<OutputStream> memoryOutputStream(size_t chunkSize = 4 * 1024);
126 
132 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const uint8_t* data, size_t len);
133 
141 AVRO_DECL std::auto_ptr<InputStream> memoryInputStream(const OutputStream& source);
142 
148 AVRO_DECL boost::shared_ptr<std::vector<uint8_t> > snapshot(const OutputStream& source);
149 
157 AVRO_DECL std::auto_ptr<OutputStream> fileOutputStream(const char* filename,
158  size_t bufferSize = 8 * 1024);
159 
164 AVRO_DECL std::auto_ptr<InputStream> fileInputStream(const char* filename,
165  size_t bufferSize = 8 * 1024);
166 
172 AVRO_DECL std::auto_ptr<OutputStream> ostreamOutputStream(std::ostream& os,
173  size_t bufferSize = 8 * 1024);
174 
180 AVRO_DECL std::auto_ptr<InputStream> istreamInputStream(std::istream& in,
181  size_t bufferSize = 8 * 1024);
182 
184 struct StreamReader {
189 
193  const uint8_t* next_;
194 
198  const uint8_t* end_;
199 
203  StreamReader() : in_(0), next_(0), end_(0) { }
204 
208  StreamReader(InputStream& in) : in_(0), next_(0), end_(0) { reset(in); }
209 
214  void reset(InputStream& is) {
215  if (in_ != 0 && end_ != next_) {
216  in_->backup(end_ - next_);
217  }
218  in_ = &is;
219  next_ = end_ = 0;
220  }
221 
226  uint8_t read() {
227  if (next_ == end_) {
228  more();
229  }
230  return *next_++;
231  }
232 
237  void readBytes(uint8_t* b, size_t n) {
238  while (n > 0) {
239  if (next_ == end_) {
240  more();
241  }
242  size_t q = end_ - next_;
243  if (q > n) {
244  q = n;
245  }
246  ::memcpy(b, next_, q);
247  next_ += q;
248  b += q;
249  n -= q;
250  }
251  }
252 
257  void skipBytes(size_t n) {
258  if (n > static_cast<size_t>(end_ - next_)) {
259  n -= end_ - next_;
260  next_ = end_;
261  in_->skip(n);
262  } else {
263  next_ += n;
264  }
265  }
266 
273  bool fill() {
274  size_t n = 0;
275  while (in_->next(&next_, &n)) {
276  if (n != 0) {
277  end_ = next_ + n;
278  return true;
279  }
280  }
281  return false;
282  }
283 
287  void more() {
288  if (! fill()) {
289  throw Exception("EOF reached");
290  }
291  }
292 
296  bool hasMore() {
297  return (next_ == end_) ? fill() : true;
298  }
299 };
300 
304 struct StreamWriter {
309 
313  uint8_t* next_;
314 
318  uint8_t* end_;
319 
323  StreamWriter() : out_(0), next_(0), end_(0) { }
324 
328  StreamWriter(OutputStream& out) : out_(0), next_(0), end_(0) { reset(out); }
329 
334  void reset(OutputStream& os) {
335  if (out_ != 0 && end_ != next_) {
336  out_->backup(end_ - next_);
337  }
338  out_ = &os;
339  next_ = end_;
340  }
341 
345  void write(uint8_t c) {
346  if (next_ == end_) {
347  more();
348  }
349  *next_++ = c;
350  }
351 
355  void writeBytes(const uint8_t* b, size_t n) {
356  while (n > 0) {
357  if (next_ == end_) {
358  more();
359  }
360  size_t q = end_ - next_;
361  if (q > n) {
362  q = n;
363  }
364  ::memcpy(next_, b, q);
365  next_ += q;
366  b += q;
367  n -= q;
368  }
369  }
370 
375  void flush() {
376  if (next_ != end_) {
377  out_->backup(end_ - next_);
378  next_ = end_;
379  }
380  out_->flush();
381  }
382 
386  void more() {
387  size_t n = 0;
388  while (out_->next(&next_, &n)) {
389  if (n != 0) {
390  end_ = next_ + n;
391  return;
392  }
393  }
394  throw Exception("EOF reached");
395  }
396 
397 };
398 
403 inline void copy(InputStream& in, OutputStream& out)
404 {
405  const uint8_t *p = 0;
406  size_t n = 0;
407  StreamWriter w(out);
408  while (in.next(&p, &n)) {
409  w.writeBytes(p, n);
410  }
411  w.flush();
412 }
413 
414 } // namespace avro
415 #endif
416 
417 
void copy(InputStream &in, OutputStream &out)
A convenience function to copy all the contents of an input stream into an output stream...
Definition: Stream.hh:403
StreamWriter(OutputStream &out)
Constructs a new writer with the given underlying stream.
Definition: Stream.hh:328
virtual void backup(size_t len)=0
"Returns" back some of the data to the stream.
const uint8_t * next_
The next location to read from.
Definition: Stream.hh:193
void flush()
backs up upto the currently written data and flushes the underlying stream.
Definition: Stream.hh:375
StreamReader(InputStream &in)
Constructs a reader with the given underlying stream.
Definition: Stream.hh:208
uint8_t * end_
One past the last location one can write to.
Definition: Stream.hh:318
StreamReader()
Constructs an empty reader.
Definition: Stream.hh:203
virtual ~InputStream()
Destructor.
Definition: Stream.hh:48
A bunch of templates and specializations for encoding and decoding specific types.
Definition: AvroParse.hh:31
void skipBytes(size_t n)
Skips the given number of bytes.
Definition: Stream.hh:257
AVRO_DECL std::auto_ptr< OutputStream > memoryOutputStream(size_t chunkSize=4 *1024)
Returns a new OutputStream, which grows in memory chunks of specified size.
void writeBytes(const uint8_t *b, size_t n)
Writes the specified number of bytes starting at b.
Definition: Stream.hh:355
void readBytes(uint8_t *b, size_t n)
Reads the given number of bytes from the underlying stream.
Definition: Stream.hh:237
bool hasMore()
Returns true if and only if the end of stream is not reached.
Definition: Stream.hh:296
void more()
Gets more space to write to.
Definition: Stream.hh:386
InputStream * in_
The underlying input stream.
Definition: Stream.hh:188
void write(uint8_t c)
Writes a single byte.
Definition: Stream.hh:345
StreamWriter()
Constructs a writer with no underlying stream.
Definition: Stream.hh:323
virtual bool next(uint8_t **data, size_t *len)=0
Returns a buffer that can be written into.
void reset(OutputStream &os)
Replaces the current underlying stream with a new one.
Definition: Stream.hh:334
AVRO_DECL std::auto_ptr< InputStream > istreamInputStream(std::istream &in, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given std::istream.
AVRO_DECL std::auto_ptr< InputStream > fileInputStream(const char *filename, size_t bufferSize=8 *1024)
Returns a new InputStream whose contents come from the given file.
AVRO_DECL std::auto_ptr< InputStream > memoryInputStream(const uint8_t *data, size_t len)
Returns a new InputStream, with the data from the given byte array.
virtual void backup(size_t len)=0
"Returns" back to the stream some of the buffer obtained from in the last call to next()...
const uint8_t * end_
One past the last valid location.
Definition: Stream.hh:198
AVRO_DECL boost::shared_ptr< std::vector< uint8_t > > snapshot(const OutputStream &source)
Returns the contents written so far into the output stream, which should be a memory output stream...
uint8_t * next_
The next location to write to.
Definition: Stream.hh:313
void reset(InputStream &is)
Replaces the current input stream with the given one after backing up the original one if required...
Definition: Stream.hh:214
A convenience class for reading from an InputStream.
Definition: Stream.hh:184
A no-copy output stream.
Definition: Stream.hh:81
Wrapper for std::runtime_error that provides convenience constructor for boost::format objects...
Definition: Exception.hh:31
bool fill()
Get as many byes from the underlying stream as possible in a single chunk.
Definition: Stream.hh:273
A no-copy input stream.
Definition: Stream.hh:36
InputStream()
An empty constuctor.
Definition: Stream.hh:42
virtual void flush()=0
Flushes any data remaining in the buffer to the stream's underlying store, if any.
OutputStream()
An empty constuctor.
Definition: Stream.hh:87
OutputStream * out_
The underlying output stream for this writer.
Definition: Stream.hh:308
AVRO_DECL std::auto_ptr< OutputStream > ostreamOutputStream(std::ostream &os, size_t bufferSize=8 *1024)
Returns a new OutputStream whose contents will be sent to the given std::ostream. ...
A convinience class to write data into an OutputStream.
Definition: Stream.hh:304
AVRO_DECL std::auto_ptr< OutputStream > fileOutputStream(const char *filename, size_t bufferSize=8 *1024)
Returns a new OutputStream whose contents would be stored in a file.
void more()
Tries to get more data and if it cannot, throws an exception.
Definition: Stream.hh:287
virtual void skip(size_t len)=0
Skips number of bytes specified by len.
virtual ~OutputStream()
Destructor.
Definition: Stream.hh:93
virtual bool next(const uint8_t **data, size_t *len)=0
Returns some of available data.
uint8_t read()
Read just one byte from the underlying stream.
Definition: Stream.hh:226