1use std::{io::Write, marker::PhantomData, ops::RangeInclusive};
19
20use serde::Serialize;
21
22use crate::encode::encode_internal;
23use crate::serde::ser_schema::SchemaAwareWriteSerializer;
24use crate::{
25 AvroResult, AvroSchema, Schema,
26 error::Details,
27 headers::{HeaderBuilder, RabinFingerprintHeader},
28 schema::ResolvedOwnedSchema,
29 types::Value,
30};
31
32pub struct GenericSingleObjectWriter {
36 buffer: Vec<u8>,
37 resolved: ResolvedOwnedSchema,
38}
39
40impl GenericSingleObjectWriter {
41 pub fn new_with_capacity(
42 schema: &Schema,
43 initial_buffer_cap: usize,
44 ) -> AvroResult<GenericSingleObjectWriter> {
45 let header_builder = RabinFingerprintHeader::from_schema(schema);
46 Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap, header_builder)
47 }
48
49 pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
50 schema: &Schema,
51 initial_buffer_cap: usize,
52 header_builder: HB,
53 ) -> AvroResult<GenericSingleObjectWriter> {
54 let mut buffer = Vec::with_capacity(initial_buffer_cap);
55 let header = header_builder.build_header();
56 buffer.extend_from_slice(&header);
57
58 Ok(GenericSingleObjectWriter {
59 buffer,
60 resolved: ResolvedOwnedSchema::try_from(schema.clone())?,
61 })
62 }
63
64 const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
65
66 pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) -> AvroResult<usize> {
68 let original_length = self.buffer.len();
69 if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
70 Err(Details::IllegalSingleObjectWriterState.into())
71 } else {
72 write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?;
73 writer
74 .write_all(&self.buffer)
75 .map_err(Details::WriteBytes)?;
76 let len = self.buffer.len();
77 self.buffer.truncate(original_length);
78 Ok(len)
79 }
80 }
81
82 pub fn write_value<W: Write>(&mut self, v: Value, writer: &mut W) -> AvroResult<usize> {
84 self.write_value_ref(&v, writer)
85 }
86}
87
88pub struct SpecificSingleObjectWriter<T>
90where
91 T: AvroSchema,
92{
93 resolved: ResolvedOwnedSchema,
94 header: Vec<u8>,
95 _model: PhantomData<T>,
96}
97
98impl<T> SpecificSingleObjectWriter<T>
99where
100 T: AvroSchema,
101{
102 pub fn new() -> AvroResult<Self> {
103 let schema = T::get_schema();
104 let header = RabinFingerprintHeader::from_schema(&schema).build_header();
105 let resolved = ResolvedOwnedSchema::new(schema)?;
106 Ok(Self {
108 resolved,
109 header,
110 _model: PhantomData,
111 })
112 }
113
114 pub fn new_with_header_builder(header_builder: impl HeaderBuilder) -> AvroResult<Self> {
115 let header = header_builder.build_header();
116 let resolved = ResolvedOwnedSchema::new(T::get_schema())?;
117 Ok(Self {
118 resolved,
119 header,
120 _model: PhantomData,
121 })
122 }
123
124 #[deprecated(since = "0.22.0", note = "Use new() instead")]
126 pub fn with_capacity(_buffer_cap: usize) -> AvroResult<Self> {
127 Self::new()
128 }
129}
130
131impl<T> SpecificSingleObjectWriter<T>
132where
133 T: AvroSchema + Into<Value>,
134{
135 pub fn write_value<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
142 writer
143 .write_all(&self.header)
144 .map_err(Details::WriteBytes)?;
145 let value: Value = data.into();
146 let bytes = write_value_ref_owned_resolved(&self.resolved, &value, writer)?;
147 Ok(bytes + self.header.len())
148 }
149}
150
151impl<T> SpecificSingleObjectWriter<T>
152where
153 T: AvroSchema + Serialize,
154{
155 pub fn write_ref<W: Write>(&self, data: &T, writer: &mut W) -> AvroResult<usize> {
162 writer
163 .write_all(&self.header)
164 .map_err(Details::WriteBytes)?;
165
166 let mut serializer = SchemaAwareWriteSerializer::new(
167 writer,
168 self.resolved.get_root_schema(),
169 self.resolved.get_names(),
170 None,
171 );
172 let bytes = data.serialize(&mut serializer)?;
173
174 Ok(bytes + self.header.len())
175 }
176
177 pub fn write<W: Write>(&self, data: T, writer: &mut W) -> AvroResult<usize> {
184 self.write_ref(&data, writer)
185 }
186}
187
188fn write_value_ref_owned_resolved<W: Write>(
189 resolved_schema: &ResolvedOwnedSchema,
190 value: &Value,
191 writer: &mut W,
192) -> AvroResult<usize> {
193 let root_schema = resolved_schema.get_root_schema();
194 if let Some(reason) = value.validate_internal(
195 root_schema,
196 resolved_schema.get_names(),
197 root_schema.namespace(),
198 ) {
199 return Err(Details::ValidationWithReason {
200 value: value.clone(),
201 schema: root_schema.clone(),
202 reason,
203 }
204 .into());
205 }
206 encode_internal(
207 value,
208 root_schema,
209 resolved_schema.get_names(),
210 root_schema.namespace(),
211 writer,
212 )
213}
214
215#[cfg(test)]
216mod tests {
217 use apache_avro_test_helper::TestResult;
218 use uuid::Uuid;
219
220 use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin};
221
222 use super::*;
223
224 #[derive(Serialize, Clone)]
225 struct TestSingleObjectWriter {
226 a: i64,
227 b: f64,
228 c: Vec<String>,
229 }
230
231 impl AvroSchema for TestSingleObjectWriter {
232 fn get_schema() -> Schema {
233 let schema = r#"
234 {
235 "type":"record",
236 "name":"TestSingleObjectWrtierSerialize",
237 "fields":[
238 {
239 "name":"a",
240 "type":"long"
241 },
242 {
243 "name":"b",
244 "type":"double"
245 },
246 {
247 "name":"c",
248 "type":{
249 "type":"array",
250 "items":"string"
251 }
252 }
253 ]
254 }
255 "#;
256 Schema::parse_str(schema).unwrap()
257 }
258 }
259
260 impl From<TestSingleObjectWriter> for Value {
261 fn from(obj: TestSingleObjectWriter) -> Value {
262 Value::Record(vec![
263 ("a".into(), obj.a.into()),
264 ("b".into(), obj.b.into()),
265 (
266 "c".into(),
267 Value::Array(obj.c.into_iter().map(|s| s.into()).collect()),
268 ),
269 ])
270 }
271 }
272
273 #[test]
274 fn test_single_object_writer() -> TestResult {
275 let mut buf: Vec<u8> = Vec::new();
276 let obj = TestSingleObjectWriter {
277 a: 300,
278 b: 34.555,
279 c: vec!["cat".into(), "dog".into()],
280 };
281 let mut writer = GenericSingleObjectWriter::new_with_capacity(
282 &TestSingleObjectWriter::get_schema(),
283 1024,
284 )
285 .expect("Should resolve schema");
286 let value = obj.into();
287 let written_bytes = writer
288 .write_value_ref(&value, &mut buf)
289 .expect("Error serializing properly");
290
291 assert!(buf.len() > 10, "no bytes written");
292 assert_eq!(buf.len(), written_bytes);
293 assert_eq!(buf[0], 0xC3);
294 assert_eq!(buf[1], 0x01);
295 assert_eq!(
296 &buf[2..10],
297 &TestSingleObjectWriter::get_schema()
298 .fingerprint::<Rabin>()
299 .bytes[..]
300 );
301 let mut msg_binary = Vec::new();
302 encode(
303 &value,
304 &TestSingleObjectWriter::get_schema(),
305 &mut msg_binary,
306 )
307 .expect("encode should have failed by here as a dependency of any writing");
308 assert_eq!(&buf[10..], &msg_binary[..]);
309
310 Ok(())
311 }
312
313 #[test]
314 fn test_single_object_writer_with_header_builder() -> TestResult {
315 let mut buf: Vec<u8> = Vec::new();
316 let obj = TestSingleObjectWriter {
317 a: 300,
318 b: 34.555,
319 c: vec!["cat".into(), "dog".into()],
320 };
321 let schema_uuid = Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
322 let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
323 let mut writer = GenericSingleObjectWriter::new_with_capacity_and_header_builder(
324 &TestSingleObjectWriter::get_schema(),
325 1024,
326 header_builder,
327 )
328 .expect("Should resolve schema");
329 let value = obj.into();
330 writer
331 .write_value_ref(&value, &mut buf)
332 .expect("Error serializing properly");
333
334 assert_eq!(buf[0], 0x03);
335 assert_eq!(buf[1], 0x00);
336 assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
337 Ok(())
338 }
339
340 #[test]
341 fn test_writer_parity() -> TestResult {
342 let obj1 = TestSingleObjectWriter {
343 a: 300,
344 b: 34.555,
345 c: vec!["cat".into(), "dog".into()],
346 };
347
348 let mut buf1: Vec<u8> = Vec::new();
349 let mut buf2: Vec<u8> = Vec::new();
350 let mut buf3: Vec<u8> = Vec::new();
351 let mut buf4: Vec<u8> = Vec::new();
352
353 let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
354 &TestSingleObjectWriter::get_schema(),
355 1024,
356 )
357 .expect("Should resolve schema");
358 let specific_writer = SpecificSingleObjectWriter::<TestSingleObjectWriter>::new()
359 .expect("Resolved should pass");
360 specific_writer
361 .write_ref(&obj1, &mut buf1)
362 .expect("Serialization expected");
363 specific_writer
364 .write_ref(&obj1, &mut buf2)
365 .expect("Serialization expected");
366 specific_writer
367 .write_value(obj1.clone(), &mut buf3)
368 .expect("Serialization expected");
369
370 generic_writer
371 .write_value(obj1.into(), &mut buf4)
372 .expect("Serialization expected");
373
374 assert_eq!(buf1, buf2);
375 assert_eq!(buf2, buf3);
376 assert_eq!(buf3, buf4);
377
378 Ok(())
379 }
380
381 #[test]
382 fn avro_rs_439_specific_single_object_writer_ref() -> TestResult {
383 #[derive(Serialize)]
384 struct Recursive {
385 field: bool,
386 recurse: Option<Box<Recursive>>,
387 }
388
389 impl AvroSchema for Recursive {
390 fn get_schema() -> Schema {
391 Schema::parse_str(
392 r#"{
393 "name": "Recursive",
394 "type": "record",
395 "fields": [
396 { "name": "field", "type": "boolean" },
397 { "name": "recurse", "type": ["null", "Recursive"] }
398 ]
399 }"#,
400 )
401 .unwrap()
402 }
403 }
404
405 let mut buffer = Vec::new();
406 let writer = SpecificSingleObjectWriter::new()?;
407
408 writer.write(
409 Recursive {
410 field: true,
411 recurse: Some(Box::new(Recursive {
412 field: false,
413 recurse: None,
414 })),
415 },
416 &mut buffer,
417 )?;
418 assert_eq!(
419 buffer,
420 &[195, 1, 83, 223, 43, 26, 181, 179, 227, 224, 1, 2, 0, 0][..]
421 );
422
423 Ok(())
424 }
425}