sparrow-ipc 0.2.0
Loading...
Searching...
No Matches
serializer.hpp
Go to the documentation of this file.
1#include <cstddef>
2#include <numeric>
3
4#include <sparrow/record_batch.hpp>
5
10
11namespace sparrow_ipc
12{
35 {
36 public:
37
46 template <writable_stream TStream>
47 serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
48 : m_stream(stream), m_compression(compression)
49 {
50 }
51
60
66 void write(const sparrow::record_batch& rb);
67
83 template <std::ranges::input_range R>
84 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
85 void write(const R& record_batches)
86 {
87 CompressionCache compressed_buffers_cache;
88 if (std::ranges::empty(record_batches))
89 {
90 return;
91 }
92
93 if (m_ended)
94 {
95 throw std::runtime_error("Cannot append to a serializer that has been ended");
96 }
97
98 // NOTE `reserve_function` is making us store a cache for the compressed buffers at this level.
99 // The benefit of capacity allocation should be evaluated vs storing a cache of compressed buffers of record batches.
100 const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
101 {
102 return std::accumulate(
103 record_batches.begin(),
104 record_batches.end(),
105 m_stream.size(),
106 [&compressed_buffers_cache, this](size_t acc, const sparrow::record_batch& rb)
107 {
108 return acc + calculate_record_batch_message_size(rb, m_compression, compressed_buffers_cache);
109 }
110 )
111 + (m_schema_received ? 0 : calculate_schema_message_size(*record_batches.begin()));
112 };
113
114 m_stream.reserve(reserve_function);
115
116 if (!m_schema_received)
117 {
118 m_schema_received = true;
119 m_dtypes = get_column_dtypes(*record_batches.begin());
120 serialize_schema_message(*record_batches.begin(), m_stream);
121 }
122
123 for (const auto& rb : record_batches)
124 {
125 if (get_column_dtypes(rb) != m_dtypes)
126 {
127 throw std::invalid_argument("Record batch schema does not match serializer schema");
128 }
129 serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
130 }
131 }
132
149 serializer& operator<<(const sparrow::record_batch& rb)
150 {
151 write(rb);
152 return *this;
153 }
154
173 template <std::ranges::input_range R>
174 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
175 serializer& operator<<(const R& record_batches)
176 {
177 write(record_batches);
178 return *this;
179 }
180
196 {
197 return manip(*this);
198 }
199
210 void end();
211
212 private:
213
214 static std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb);
215
216 bool m_schema_received{false};
217 std::vector<sparrow::data_type> m_dtypes;
218 any_output_stream m_stream;
219 bool m_ended{false};
220 std::optional<CompressionType> m_compression;
221 };
222
224 {
225 serializer.end();
226 return serializer;
227 }
228}
Type-erased wrapper for any stream-like object.
A class for serializing Apache Arrow record batches to an output stream.
void write(const sparrow::record_batch &rb)
Writes a record batch to the serializer.
serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a serializer object with a reference to a stream.
void write(const R &record_batches)
Writes a collection of record batches to the stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
serializer & operator<<(const sparrow::record_batch &rb)
serializer & operator<<(const R &record_batches)
~serializer()
Destructor for the serializer.
serializer & operator<<(serializer &(*manip)(serializer &))
#define SPARROW_IPC_API
Definition config.hpp:12
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
serializer & end_stream(serializer &serializer)
SPARROW_IPC_API std::size_t calculate_schema_message_size(const sparrow::record_batch &record_batch)
Calculates the total serialized size of a schema message.