snap/
write.rs

1/*!
2This module provides a `std::io::Write` implementation:
3
4- `write::FrameEncoder` wraps another `std::io::Write` implemenation, and
5  compresses data encoded using the Snappy frame format. Use this if you have
6  uncompressed data source and wish to write it as compressed data.
7
8It would also be possible to provide a `write::FrameDecoder`, which decompresses
9data as it writes it, but it hasn't been implemented yet.
10*/
11
12use std::fmt;
13use std::io::{self, Write};
14
15use crate::compress::Encoder;
16use crate::crc32::CheckSummer;
17pub use crate::error::IntoInnerError;
18use crate::frame::{
19    compress_frame, CHUNK_HEADER_AND_CRC_SIZE, MAX_COMPRESS_BLOCK_SIZE,
20    STREAM_IDENTIFIER,
21};
22use crate::MAX_BLOCK_SIZE;
23
24/// A writer for compressing a Snappy stream.
25///
26/// This `FrameEncoder` wraps any other writer that implements `io::Write`.
27/// Bytes written to this writer are compressed using the [Snappy frame
28/// format](https://github.com/google/snappy/blob/master/framing_format.txt)
29/// (file extension `sz`, MIME type `application/x-snappy-framed`).
30///
31/// Writes are buffered automatically, so there's no need to wrap the given
32/// writer in a `std::io::BufWriter`.
33///
34/// The writer will be flushed automatically when it is dropped. If an error
35/// occurs, it is ignored.
36pub struct FrameEncoder<W: io::Write> {
37    /// Our main internal state, split out for borrowck reasons (happily paid).
38    ///
39    /// Also, it's an `Option` so we can move out of it even though
40    /// `FrameEncoder` impls `Drop`.
41    inner: Option<Inner<W>>,
42    /// Our buffer of uncompressed bytes. This isn't part of `inner` because
43    /// we may write bytes directly from the caller if the given buffer was
44    /// big enough. As a result, the main `write` implementation needs to
45    /// accept either the internal buffer or the caller's bytes directly. Since
46    /// `write` requires a mutable borrow, we satisfy the borrow checker by
47    /// separating `src` from the rest of the state.
48    src: Vec<u8>,
49}
50
51struct Inner<W> {
52    /// The underlying writer.
53    w: W,
54    /// An encoder that we reuse that does the actual block based compression.
55    enc: Encoder,
56    /// A CRC32 checksummer that is configured to either use the portable
57    /// fallback version or the SSE4.2 accelerated version when the right CPU
58    /// features are available.
59    checksummer: CheckSummer,
60    /// The compressed bytes buffer. Bytes are compressed from src (usually)
61    /// to dst before being written to w.
62    dst: Vec<u8>,
63    /// When false, the stream identifier (with magic bytes) must precede the
64    /// next write.
65    wrote_stream_ident: bool,
66    /// Space for writing the header of a chunk before writing it to the
67    /// underlying writer.
68    chunk_header: [u8; 8],
69}
70
71impl<W: io::Write> FrameEncoder<W> {
72    /// Create a new writer for streaming Snappy compression.
73    pub fn new(wtr: W) -> FrameEncoder<W> {
74        FrameEncoder {
75            inner: Some(Inner {
76                w: wtr,
77                enc: Encoder::new(),
78                checksummer: CheckSummer::new(),
79                dst: vec![0; MAX_COMPRESS_BLOCK_SIZE],
80                wrote_stream_ident: false,
81                chunk_header: [0; CHUNK_HEADER_AND_CRC_SIZE],
82            }),
83            src: Vec::with_capacity(MAX_BLOCK_SIZE),
84        }
85    }
86
87    /// Returns the underlying stream, consuming and flushing this writer.
88    ///
89    /// If flushing the writer caused an error, then an `IntoInnerError` is
90    /// returned, which contains both the writer and the original writer.
91    pub fn into_inner(mut self) -> Result<W, IntoInnerError<FrameEncoder<W>>> {
92        match self.flush() {
93            Ok(()) => Ok(self.inner.take().unwrap().w),
94            Err(err) => Err(IntoInnerError::new(self, err)),
95        }
96    }
97
98    /// Gets a reference to the underlying writer in this encoder.
99    pub fn get_ref(&self) -> &W {
100        &self.inner.as_ref().unwrap().w
101    }
102
103    /// Gets a reference to the underlying writer in this encoder.
104    ///
105    /// Note that mutating the output/input state of the stream may corrupt
106    /// this encoder, so care must be taken when using this method.
107    pub fn get_mut(&mut self) -> &mut W {
108        &mut self.inner.as_mut().unwrap().w
109    }
110}
111
112impl<W: io::Write> Drop for FrameEncoder<W> {
113    fn drop(&mut self) {
114        if self.inner.is_some() {
115            // Ignore errors because we can't conceivably return an error and
116            // panicing in a dtor is bad juju.
117            let _ = self.flush();
118        }
119    }
120}
121
122impl<W: io::Write> io::Write for FrameEncoder<W> {
123    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
124        let mut total = 0;
125        // If there isn't enough room to add buf to src, then add only a piece
126        // of it, flush it and mush on.
127        loop {
128            let free = self.src.capacity() - self.src.len();
129            // n is the number of bytes extracted from buf.
130            let n = if buf.len() <= free {
131                break;
132            } else if self.src.is_empty() {
133                // If buf is bigger than our entire buffer then avoid
134                // the indirection and write the buffer directly.
135                self.inner.as_mut().unwrap().write(buf)?
136            } else {
137                self.src.extend_from_slice(&buf[0..free]);
138                self.flush()?;
139                free
140            };
141            buf = &buf[n..];
142            total += n;
143        }
144        // We're only here if buf.len() will fit within the available space of
145        // self.src.
146        debug_assert!(buf.len() <= (self.src.capacity() - self.src.len()));
147        self.src.extend_from_slice(buf);
148        total += buf.len();
149        // We should never expand or contract self.src.
150        debug_assert!(self.src.capacity() == MAX_BLOCK_SIZE);
151        Ok(total)
152    }
153
154    fn flush(&mut self) -> io::Result<()> {
155        if self.src.is_empty() {
156            return Ok(());
157        }
158        self.inner.as_mut().unwrap().write(&self.src)?;
159        self.src.truncate(0);
160        Ok(())
161    }
162}
163
164impl<W: io::Write> Inner<W> {
165    fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
166        let mut total = 0;
167        if !self.wrote_stream_ident {
168            self.wrote_stream_ident = true;
169            self.w.write_all(STREAM_IDENTIFIER)?;
170        }
171        while !buf.is_empty() {
172            // Advance buf and get our block.
173            let mut src = buf;
174            if src.len() > MAX_BLOCK_SIZE {
175                src = &src[0..MAX_BLOCK_SIZE];
176            }
177            buf = &buf[src.len()..];
178
179            let frame_data = compress_frame(
180                &mut self.enc,
181                self.checksummer,
182                src,
183                &mut self.chunk_header,
184                &mut self.dst,
185                false,
186            )?;
187            self.w.write_all(&self.chunk_header)?;
188            self.w.write_all(frame_data)?;
189            total += src.len();
190        }
191        Ok(total)
192    }
193}
194
195impl<W: fmt::Debug + io::Write> fmt::Debug for FrameEncoder<W> {
196    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197        f.debug_struct("FrameEncoder")
198            .field("inner", &self.inner)
199            .field("src", &"[...]")
200            .finish()
201    }
202}
203
204impl<W: fmt::Debug + io::Write> fmt::Debug for Inner<W> {
205    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
206        f.debug_struct("Inner")
207            .field("w", &self.w)
208            .field("enc", &self.enc)
209            .field("checksummer", &self.checksummer)
210            .field("dst", &"[...]")
211            .field("wrote_stream_ident", &self.wrote_stream_ident)
212            .field("chunk_header", &self.chunk_header)
213            .finish()
214    }
215}