sparrow-ipc 0.2.0
Loading...
Searching...
No Matches
deserializer_example.cpp
Go to the documentation of this file.
1
9
10#include <iostream>
11#include <span>
12#include <vector>
13
14#include <sparrow/record_batch.hpp>
15
20
21namespace sp = sparrow;
22namespace sp_ipc = sparrow_ipc;
23
27std::vector<sp::record_batch> create_sample_batches(size_t count)
28{
29 std::vector<sp::record_batch> batches;
30 for (size_t i = 0; i < count; ++i)
31 {
32 auto int_array = sp::primitive_array<int32_t>(
33 {static_cast<int32_t>(i * 10), static_cast<int32_t>(i * 10 + 1), static_cast<int32_t>(i * 10 + 2)}
34 );
35 auto string_array = sp::string_array(std::vector<std::string>{
36 "batch_" + std::to_string(i) + "_a",
37 "batch_" + std::to_string(i) + "_b",
38 "batch_" + std::to_string(i) + "_c"
39 });
40 batches.push_back(sp::record_batch(
41 {{"id", sp::array(std::move(int_array))}, {"name", sp::array(std::move(string_array))}}
42 ));
43 }
44 return batches;
45}
46
50std::vector<uint8_t> serialize_batches(const std::vector<sp::record_batch>& batches)
51{
52 std::vector<uint8_t> buffer;
53 sp_ipc::memory_output_stream stream(buffer);
54 sp_ipc::serializer ser(stream);
55 ser << batches << sp_ipc::end_stream;
56 return buffer;
57}
58
59// [example_deserialize_stream]
66std::vector<sp::record_batch> deserialize_stream_example(const std::vector<uint8_t>& stream_data)
67{
68 // Deserialize the entire stream at once
69 auto batches = sp_ipc::deserialize_stream(stream_data);
70 return batches;
71}
72
73// [example_deserialize_stream]
74
75// [example_deserializer_basic]
82void deserializer_basic_example(const std::vector<uint8_t>& stream_data)
83{
84 // Create a container to hold the deserialized batches
85 std::vector<sp::record_batch> batches;
86
87 // Create a deserializer that will append to our container
88 sp_ipc::deserializer deser(batches);
89
90 // Deserialize the stream data
91 deser.deserialize(std::span<const uint8_t>(stream_data));
92
93 // Process the accumulated batches
94 for (const auto& batch : batches)
95 {
96 std::cout << "Batch with " << batch.nb_rows() << " rows and " << batch.nb_columns() << " columns\n";
97 }
98}
99
100// [example_deserializer_basic]
101
102// [example_deserializer_incremental]
109void deserializer_incremental_example(const std::vector<std::vector<uint8_t>>& stream_chunks)
110{
111 // Container to accumulate all deserialized batches
112 std::vector<sp::record_batch> batches;
113
114 // Create a deserializer
115 sp_ipc::deserializer deser(batches);
116
117 // Deserialize chunks as they arrive using the streaming operator
118 for (const auto& chunk : stream_chunks)
119 {
120 deser << std::span<const uint8_t>(chunk);
121 std::cout << "After chunk: " << batches.size() << " batches accumulated\n";
122 }
123
124 // All batches are now available in the container
125 std::cout << "Total batches deserialized: " << batches.size() << "\n";
126}
127
128// [example_deserializer_incremental]
129
130// [example_deserializer_chaining]
137 const std::vector<uint8_t>& chunk1,
138 const std::vector<uint8_t>& chunk2,
139 const std::vector<uint8_t>& chunk3
140)
141{
142 std::vector<sp::record_batch> batches;
143 sp_ipc::deserializer deser(batches);
144
145 // Chain multiple deserializations in a single expression
146 deser << std::span<const uint8_t>(chunk1) << std::span<const uint8_t>(chunk2)
147 << std::span<const uint8_t>(chunk3);
148
149 std::cout << "Deserialized " << batches.size() << " batches from 3 chunks\n";
150}
151
152// [example_deserializer_chaining]
153
154int main()
155{
156 std::cout << "=== Sparrow IPC Deserializer Examples ===\n\n";
157
158 try
159 {
160 // Create sample data
161 auto original_batches = create_sample_batches(3);
162 auto stream_data = serialize_batches(original_batches);
163
164 std::cout << "1. Function API Example (deserialize_stream)\n";
165 std::cout << " ----------------------------------------\n";
166 auto deserialized = deserialize_stream_example(stream_data);
167 std::cout << " Deserialized " << deserialized.size() << " batches\n\n";
168
169 std::cout << "2. Basic Deserializer Class Example\n";
170 std::cout << " ---------------------------------\n";
171 deserializer_basic_example(stream_data);
172 std::cout << "\n";
173
174 std::cout << "3. Incremental Deserialization Example\n";
175 std::cout << " ------------------------------------\n";
176 // Create multiple chunks (each containing different batches)
177 std::vector<std::vector<uint8_t>> chunks;
178 for (size_t i = 0; i < 3; ++i)
179 {
180 auto batch = create_sample_batches(1);
181 chunks.push_back(serialize_batches(batch));
182 }
184 std::cout << "\n";
185
186 std::cout << "4. Chaining Example\n";
187 std::cout << " -----------------\n";
188 deserializer_chaining_example(chunks[0], chunks[1], chunks[2]);
189
190 std::cout << "\n=== All examples completed successfully! ===\n";
191 }
192 catch (const std::exception& e)
193 {
194 std::cerr << "Error: " << e.what() << "\n";
195 return 1;
196 }
197
198 return 0;
199}
void deserialize(std::span< const uint8_t > data)
An output stream that writes data to a contiguous memory buffer.
A class for serializing Apache Arrow record batches to an output stream.
void deserializer_chaining_example(const std::vector< uint8_t > &chunk1, const std::vector< uint8_t > &chunk2, const std::vector< uint8_t > &chunk3)
Example: Chaining multiple deserializations.
void deserializer_incremental_example(const std::vector< std::vector< uint8_t > > &stream_chunks)
Example: Incremental deserialization with the deserializer class.
void deserializer_basic_example(const std::vector< uint8_t > &stream_data)
Example: Basic usage of the deserializer class.
std::vector< sp::record_batch > create_sample_batches(size_t count)
Helper function to create sample record batches for demonstration.
std::vector< uint8_t > serialize_batches(const std::vector< sp::record_batch > &batches)
Helper function to serialize batches to a byte buffer.
std::vector< sp::record_batch > deserialize_stream_example(const std::vector< uint8_t > &stream_data)
Example: Deserialize a stream using the function API.
SPARROW_IPC_API std::vector< sparrow::record_batch > deserialize_stream(std::span< const uint8_t > data)
Deserializes an Arrow IPC stream from binary data into a vector of record batches.
serializer & end_stream(serializer &serializer)