sparrow-ipc 0.2.0
Loading...
Searching...
No Matches
stream_file_serializer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <cstddef>
4#include <numeric>
5#include <optional>
6#include <vector>
7
8#include <sparrow/record_batch.hpp>
9
16
17namespace sparrow_ipc
18{
25 {
26 int64_t offset;
28 int64_t body_length;
29 };
30
40 const sparrow::record_batch& record_batch,
41 const std::vector<record_batch_block>& record_batch_blocks,
42 any_output_stream& stream
43 );
44
66 [[nodiscard]] SPARROW_IPC_API std::vector<sparrow::record_batch>
67 deserialize_file(std::span<const uint8_t> data);
68
92 {
93 public:
94
103 template <writable_stream TStream>
104 stream_file_serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
105 : m_stream(stream), m_compression(compression)
106 {
107 }
108
117
125 void write(const sparrow::record_batch& rb);
126
146 template <std::ranges::input_range R>
147 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
148 void write(const R& record_batches)
149 {
150 CompressionCache compressed_buffers_cache;
151 if (std::ranges::empty(record_batches))
152 {
153 return;
154 }
155
156 if (m_ended)
157 {
158 throw std::runtime_error("Cannot write to a file serializer that has been ended");
159 }
160
161 // Write file header magic on first write
162 if (!m_header_written)
163 {
165 m_stream.add_padding();
166 m_header_written = true;
167 }
168
169 // NOTE `reserve_function` is making us store a cache for the compressed buffers at this level.
170 // The benefit of capacity allocation should be evaluated vs storing a cache of compressed buffers of record batches.
171 const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
172 {
173 return std::accumulate(
174 record_batches.begin(),
175 record_batches.end(),
176 m_stream.size(),
177 [&compressed_buffers_cache, this](size_t acc, const sparrow::record_batch& rb)
178 {
179 return acc + calculate_record_batch_message_size(rb, m_compression, compressed_buffers_cache);
180 }
181 )
182 + (m_schema_received ? 0 : calculate_schema_message_size(*record_batches.begin()));
183 };
184
185 m_stream.reserve(reserve_function);
186
188 {
189 m_schema_received = true;
190 m_first_record_batch = *record_batches.begin();
191 m_dtypes = get_column_dtypes(*record_batches.begin());
192 serialize_schema_message(*record_batches.begin(), m_stream);
193 }
194
195 for (const auto& rb : record_batches)
196 {
197 if (get_column_dtypes(rb) != m_dtypes)
198 {
199 throw std::invalid_argument("Record batch schema does not match file serializer schema");
200 }
201
202 // Offset is from the start of the file to the record batch message
203 const int64_t offset = static_cast<int64_t>(m_stream.size());
204
205 // Serialize and get block info
206 const auto info = serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
207
208 m_record_batch_blocks.emplace_back(offset, info.metadata_length, info.body_length);
209 }
210 }
211
228 stream_file_serializer& operator<<(const sparrow::record_batch& rb)
229 {
230 write(rb);
231 return *this;
232 }
233
252 template <std::ranges::input_range R>
253 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
254 stream_file_serializer& operator<<(const R& record_batches)
255 {
256 write(record_batches);
257 return *this;
258 }
259
275 {
276 return manip(*this);
277 }
278
295 void end();
296
297 bool m_header_written{false};
298 bool m_schema_received{false};
299 std::optional<sparrow::record_batch> m_first_record_batch;
300 std::vector<sparrow::data_type> m_dtypes;
302 bool m_ended{false};
303 std::optional<CompressionType> m_compression;
304 std::vector<record_batch_block> m_record_batch_blocks;
305 };
306
325}
Type-erased wrapper for any stream-like object.
A class for serializing Apache Arrow record batches to an output stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
A class for serializing Apache Arrow record batches to the IPC file format.
std::vector< record_batch_block > m_record_batch_blocks
stream_file_serializer & operator<<(stream_file_serializer &(*manip)(stream_file_serializer &))
std::vector< sparrow::data_type > m_dtypes
std::optional< CompressionType > m_compression
void write(const R &record_batches)
Writes a collection of record batches to the file.
std::optional< sparrow::record_batch > m_first_record_batch
~stream_file_serializer()
Destructor for the stream_file_serializer.
stream_file_serializer & operator<<(const sparrow::record_batch &rb)
void write(const sparrow::record_batch &rb)
Writes a single record batch to the file.
stream_file_serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a stream_file_serializer object with a reference to a stream.
void end()
Finalizes the file serialization by writing footer and trailing magic bytes.
stream_file_serializer & operator<<(const R &record_batches)
#define SPARROW_IPC_API
Definition config.hpp:12
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
SPARROW_IPC_API std::vector< sparrow::record_batch > deserialize_file(std::span< const uint8_t > data)
Deserializes Arrow IPC file format into a vector of record batches.
constexpr std::array< std::uint8_t, 8 > arrow_file_header_magic
Magic bytes with padding for file header (8 bytes total for alignment)
SPARROW_IPC_API size_t write_footer(const sparrow::record_batch &record_batch, const std::vector< record_batch_block > &record_batch_blocks, any_output_stream &stream)
Writes the Arrow IPC file footer.
stream_file_serializer & end_file(stream_file_serializer &serializer)
SPARROW_IPC_API std::size_t calculate_schema_message_size(const sparrow::record_batch &record_batch)
Calculates the total serialized size of a schema message.
SPARROW_IPC_API std::vector< sparrow::data_type > get_column_dtypes(const sparrow::record_batch &rb)
Represents a block entry in the Arrow IPC file footer.
int64_t body_length
Length of the record batch body (data buffers)
int32_t metadata_length
Length of the metadata (FlatBuffer message)
int64_t offset
Offset from the start of the file to the record batch message.