sparrow-ipc 0.2.0
Loading...
Searching...
No Matches
write_and_read_streams.cpp
Go to the documentation of this file.
1#include <algorithm>
2#include <cstdlib>
3#include <filesystem>
4#include <fstream>
5#include <iostream>
6#include <random>
7#include <vector>
8
12
13#include <sparrow/record_batch.hpp>
14
15const std::filesystem::path arrow_testing_data_dir = ARROW_TESTING_DATA_DIR;
16const std::filesystem::path tests_resources_files_path = arrow_testing_data_dir / "data" / "arrow-ipc-stream"
17 / "integration" / "cpp-21.0.0";
18
19
20namespace sp = sparrow;
21
22// Random number generator
23std::random_device rd;
24std::mt19937 gen(rd());
25
26namespace utils
27{
32 sp::record_batch create_random_record_batch(size_t num_rows)
33 {
34 // Helper lambda to generate a vector with random values
35 auto generate_vector = [num_rows](auto generator)
36 {
37 using T = decltype(generator());
38 std::vector<T> values(num_rows);
39 std::generate(values.begin(), values.end(), generator);
40 return values;
41 };
42
43 // Create integer column with random values
44 std::uniform_int_distribution<int32_t> int_dist(0, 1000);
45 auto int_array = sp::primitive_array<int32_t>(generate_vector(
46 [&]()
47 {
48 return int_dist(gen);
49 }
50 ));
51
52 // Create float column with random values
53 std::uniform_real_distribution<float> float_dist(-100.0f, 100.0f);
54 auto float_array = sp::primitive_array<float>(generate_vector(
55 [&]()
56 {
57 return float_dist(gen);
58 }
59 ));
60
61 // Create boolean column with random values
62 std::uniform_int_distribution<int> bool_dist(0, 1);
63 auto bool_array = sp::primitive_array<bool>(generate_vector(
64 [&]()
65 {
66 return static_cast<bool>(bool_dist(gen));
67 }
68 ));
69
70 // Create string column with random values
71 const std::vector<std::string> sample_strings =
72 {"alpha", "beta", "gamma", "delta", "epsilon", "zeta", "eta", "theta", "iota", "kappa"};
73 std::uniform_int_distribution<size_t> str_dist(0, sample_strings.size() - 1);
74 size_t counter = 0;
75 auto string_array = sp::string_array(generate_vector(
76 [&]()
77 {
78 return sample_strings[str_dist(gen)] + "_" + std::to_string(counter++);
79 }
80 ));
81
82 // Create record batch with named columns (same schema for all batches)
83 return sp::record_batch(
84 {{"id", sp::array(std::move(int_array))},
85 {"value", sp::array(std::move(float_array))},
86 {"flag", sp::array(std::move(bool_array))},
87 {"name", sp::array(std::move(string_array))}}
88 );
89 }
90
96 const std::vector<sp::record_batch>& original_batches,
97 const std::vector<sp::record_batch>& deserialized_batches
98 )
99 {
100 if (original_batches.size() != deserialized_batches.size())
101 {
102 std::cerr << "ERROR: Batch count mismatch! Original: " << original_batches.size()
103 << ", Deserialized: " << deserialized_batches.size() << "\n";
104 return false;
105 }
106
107 bool all_match = true;
108 for (size_t batch_idx = 0; batch_idx < original_batches.size(); ++batch_idx)
109 {
110 const auto& original = original_batches[batch_idx];
111 const auto& deserialized = deserialized_batches[batch_idx];
112
113 // Check basic structure
114 if (original.nb_columns() != deserialized.nb_columns()
115 || original.nb_rows() != deserialized.nb_rows())
116 {
117 std::cerr << "ERROR: Batch " << batch_idx << " structure mismatch!\n";
118 all_match = false;
119 continue;
120 }
121
122 // Check column names
123 if (!std::ranges::equal(original.names(), deserialized.names()))
124 {
125 std::cerr << "WARNING: Batch " << batch_idx << " column names mismatch!\n";
126 }
127
128 // Check column data
129 for (size_t col_idx = 0; col_idx < original.nb_columns(); ++col_idx)
130 {
131 const auto& orig_col = original.get_column(col_idx);
132 const auto& deser_col = deserialized.get_column(col_idx);
133
134 if (orig_col.data_type() != deser_col.data_type())
135 {
136 std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx << " type mismatch!\n";
137 all_match = false;
138 continue;
139 }
140
141 // Check values
142 for (size_t row_idx = 0; row_idx < orig_col.size(); ++row_idx)
143 {
144 if (orig_col[row_idx] != deser_col[row_idx])
145 {
146 std::cerr << "ERROR: Batch " << batch_idx << ", column " << col_idx << ", row "
147 << row_idx << " value mismatch!\n";
148 std::cerr << " Original: " << orig_col[row_idx]
149 << ", Deserialized: " << deser_col[row_idx] << "\n";
150 all_match = false;
151 }
152 }
153 }
154 }
155
156 return all_match;
157 }
158}
159
163std::vector<sp::record_batch> create_record_batches(size_t num_batches, size_t rows_per_batch)
164{
165 std::cout << "1. Creating " << num_batches << " record batches with random values...\n";
166 std::cout << " Each batch has the same schema: (id: int32, value: float, flag: bool, name: string)\n";
167
168 std::vector<sp::record_batch> batches;
169 batches.reserve(num_batches);
170
171 for (size_t i = 0; i < num_batches; ++i)
172 {
173 batches.push_back(utils::create_random_record_batch(rows_per_batch));
174 }
175
176 std::cout << " Created " << batches.size() << " record batches\n";
177 for (size_t i = 0; i < batches.size(); ++i)
178 {
179 std::cout << std::format("{}\n\n", batches[i]);
180 }
181
182 return batches;
183}
184
185// [example_serialize_to_stream]
189std::vector<uint8_t> serialize_batches_to_stream(const std::vector<sp::record_batch>& batches)
190{
191 std::cout << "\n2. Serializing record batches to stream...\n";
192
193 std::vector<uint8_t> stream_data;
194 sparrow_ipc::memory_output_stream stream(stream_data);
195 sparrow_ipc::serializer serializer(stream);
196
197 // Serialize all batches using the streaming operator
198 serializer << batches << sparrow_ipc::end_stream;
199
200 std::cout << " Serialized stream size: " << stream_data.size() << " bytes\n";
201
202 return stream_data;
203}
204// [example_serialize_to_stream]
205
206// [example_deserialize_from_stream]
210std::vector<sp::record_batch> deserialize_stream_to_batches(const std::vector<uint8_t>& stream_data)
211{
212 std::cout << "\n3. Deserializing stream back to record batches...\n";
213
214 auto batches = sparrow_ipc::deserialize_stream(stream_data);
215
216 std::cout << " Deserialized " << batches.size() << " record batches\n";
217
218 return batches;
219}
220// [example_deserialize_from_stream]
221
222// [example_serialize_individual]
227 const std::vector<sp::record_batch>& batches,
228 const std::vector<uint8_t>& batch_stream_data
229)
230{
231 std::cout << "\n6. Demonstrating individual vs batch serialization...\n";
232
233 // Serialize individual batches one by one
234 std::vector<uint8_t> individual_stream_data;
235 sparrow_ipc::memory_output_stream individual_stream(individual_stream_data);
236 sparrow_ipc::serializer individual_serializer(individual_stream);
237
238 for (const auto& batch : batches)
239 {
240 individual_serializer << batch;
241 }
242 individual_serializer << sparrow_ipc::end_stream;
243
244 std::cout << " Individual serialization size: " << individual_stream_data.size() << " bytes\n";
245 std::cout << " Batch serialization size: " << batch_stream_data.size() << " bytes\n";
246
247 // Both should produce the same result
248 auto individual_deserialized = sparrow_ipc::deserialize_stream(individual_stream_data);
249
250 if (individual_deserialized.size() == batches.size())
251 {
252 std::cout << " ✓ Individual and batch serialization produce equivalent results\n";
253 }
254 else
255 {
256 std::cerr << " ✗ Individual and batch serialization mismatch!\n";
257 }
258}
259// [example_serialize_individual]
260
264bool verify_schema_consistency(const std::vector<sp::record_batch>& batches)
265{
266 std::cout << "\n7. Verifying schema consistency across all batches...\n";
267
268 if (batches.empty())
269 {
270 std::cout << " No batches to verify\n";
271 return true;
272 }
273
274 bool schema_consistent = true;
275 for (size_t i = 1; i < batches.size(); ++i)
276 {
277 if (batches[0].nb_columns() != batches[i].nb_columns())
278 {
279 std::cerr << " ERROR: Batch " << i << " has different number of columns!\n";
280 schema_consistent = false;
281 }
282
283 for (size_t col_idx = 0; col_idx < batches[0].nb_columns() && col_idx < batches[i].nb_columns();
284 ++col_idx)
285 {
286 const auto& col0 = batches[0].get_column(col_idx);
287 const auto& col_i = batches[i].get_column(col_idx);
288
289 if (col0.data_type() != col_i.data_type())
290 {
291 std::cerr << " ERROR: Batch " << i << ", column " << col_idx << " has different type!\n";
292 schema_consistent = false;
293 }
294
295 if (col0.name() != col_i.name())
296 {
297 std::cerr << " ERROR: Batch " << i << ", column " << col_idx << " has different name!\n";
298 schema_consistent = false;
299 }
300 }
301 }
302
303 if (schema_consistent)
304 {
305 std::cout << " ✓ All batches have consistent schema!\n";
306 }
307 else
308 {
309 std::cerr << " ✗ Schema inconsistency detected!\n";
310 }
311
312 return schema_consistent;
313}
314
319{
320 std::cout << "\n8. Reading a primitive stream file from test resources...\n";
321
322 const std::filesystem::path primitive_stream_file = tests_resources_files_path
323 / "generated_primitive.stream";
324
325 if (std::filesystem::exists(primitive_stream_file))
326 {
327 std::cout << " Reading file: " << primitive_stream_file << "\n";
328
329 // Read the stream file
330 std::ifstream stream_file(primitive_stream_file, std::ios::in | std::ios::binary);
331 if (!stream_file.is_open())
332 {
333 std::cerr << " ERROR: Could not open stream file!\n";
334 }
335 else
336 {
337 const std::vector<uint8_t> file_stream_data(
338 (std::istreambuf_iterator<char>(stream_file)),
339 (std::istreambuf_iterator<char>())
340 );
341 stream_file.close();
342
343 std::cout << " File size: " << file_stream_data.size() << " bytes\n";
344
345 // Deserialize the stream
346 auto file_batches = sparrow_ipc::deserialize_stream(file_stream_data);
347
348 std::cout << " Deserialized " << file_batches.size() << " record batch(es) from file\n";
349
350 // Display the first batch
351 if (!file_batches.empty())
352 {
353 std::cout << " First batch from file:\n";
354 std::cout << std::format("{}\n", file_batches[0]);
355 }
356 }
357 }
358 else
359 {
360 std::cout << " Note: Test resource file not found at " << primitive_stream_file << "\n";
361 std::cout << " This is expected if test data is not available.\n";
362 }
363}
364
365int main()
366{
367 std::cout << "=== Sparrow IPC Stream Write and Read Example ===\n";
368 std::cout << "Note: All record batches in a stream must have the same schema.\n\n";
369
370 try
371 {
372 // Configuration
373 constexpr size_t num_batches = 5;
374 constexpr size_t rows_per_batch = 10;
375
376 // Step 1: Create several record batches with the SAME schema but random values
377 auto original_batches = create_record_batches(num_batches, rows_per_batch);
378
379 // Step 2: Serialize the record batches to a stream
380 auto stream_data = serialize_batches_to_stream(original_batches);
381
382 // Step 3: Deserialize the stream back to record batches
383 auto deserialized_batches = deserialize_stream_to_batches(stream_data);
384
385 // Step 4: Verify that original and deserialized data match
386 std::cout << "\n4. Verifying data integrity...\n";
387
388 if (utils::verify_batches_match(original_batches, deserialized_batches))
389 {
390 std::cout << " ✓ All data matches perfectly!\n";
391 }
392 else
393 {
394 std::cerr << " ✗ Data verification failed!\n";
395 return 1;
396 }
397
398 // Step 5: Display sample data from the first batch
399 std::cout << "\n5. Sample data from the first batch:\n";
400 std::cout << std::format("{}\n", original_batches[0]);
401
402 // Step 6: Demonstrate individual serialization vs batch serialization
403 demonstrate_serialization_methods(original_batches, stream_data);
404
405 // Step 7: Verify schema consistency
406 verify_schema_consistency(deserialized_batches);
407
408 // Step 8: Read and display a primitive stream file from test resources
410
411 std::cout << "\n=== Example completed successfully! ===\n";
412 }
413 catch (const std::exception& e)
414 {
415 std::cerr << "Error: " << e.what() << "\n";
416 return EXIT_FAILURE;
417 }
418
419 return EXIT_SUCCESS;
420}
An output stream that writes data to a contiguous memory buffer.
A class for serializing Apache Arrow record batches to an output stream.
SPARROW_IPC_API std::vector< sparrow::record_batch > deserialize_stream(std::span< const uint8_t > data)
Deserializes an Arrow IPC stream from binary data into a vector of record batches.
serializer & end_stream(serializer &serializer)
bool verify_batches_match(const std::vector< sp::record_batch > &original_batches, const std::vector< sp::record_batch > &deserialized_batches)
Verify that two sets of record batches are identical Returns true if all batches match,...
sp::record_batch create_random_record_batch(size_t num_rows)
Helper function to create a record batch with the same schema but random values All batches have: int...
std::vector< uint8_t > serialize_batches_to_stream(const std::vector< sp::record_batch > &batches)
Serialize record batches to a stream.
std::mt19937 gen(rd())
void demonstrate_serialization_methods(const std::vector< sp::record_batch > &batches, const std::vector< uint8_t > &batch_stream_data)
Demonstrate individual vs batch serialization.
const std::filesystem::path arrow_testing_data_dir
bool verify_schema_consistency(const std::vector< sp::record_batch > &batches)
Verify schema consistency across all batches.
std::random_device rd
std::vector< sp::record_batch > deserialize_stream_to_batches(const std::vector< uint8_t > &stream_data)
Deserialize stream back to record batches.
void read_and_display_test_file()
Read and display a primitive stream file from test resources.
std::vector< sp::record_batch > create_record_batches(size_t num_batches, size_t rows_per_batch)
Create multiple record batches with the same schema but random values.
const std::filesystem::path tests_resources_files_path