sparrow-ipc 0.2.0
Loading...
Searching...
No Matches
chunk_memory_serializer.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <numeric>
4#include <optional>
5#include <ranges>
6#include <stdexcept>
7#include <vector>
8
9#include <sparrow/record_batch.hpp>
10
18
19namespace sparrow_ipc
20{
37 {
38 public:
39
46 chunk_serializer(chunked_memory_output_stream<std::vector<std::vector<uint8_t>>>& stream, std::optional<CompressionType> compression = std::nullopt);
47
58 void write(const sparrow::record_batch& rb);
59
72 template <std::ranges::input_range R>
73 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
74 void write(const R& record_batches);
75
92 chunk_serializer& operator<<(const sparrow::record_batch& rb);
93
112 template <std::ranges::input_range R>
113 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
114 chunk_serializer& operator<<(const R& record_batches);
115
124 void end();
125
126 private:
127
128 bool m_schema_received{false};
129 std::vector<sparrow::data_type> m_dtypes;
131 bool m_ended{false};
132 std::optional<CompressionType> m_compression;
133 };
134
135 // Implementation
136
137 template <std::ranges::input_range R>
138 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
139 void chunk_serializer::write(const R& record_batches)
140 {
141 if (m_ended)
142 {
143 throw std::runtime_error("Cannot append record batches to a serializer that has been ended");
144 }
145
146 m_pstream->reserve((m_schema_received ? 0 : 1) + m_pstream->size() + record_batches.size());
147
148 if (!m_schema_received)
149 {
150 m_schema_received = true;
151 m_dtypes = get_column_dtypes(*record_batches.begin());
152 std::vector<uint8_t> schema_buffer;
153 memory_output_stream stream(schema_buffer);
154 any_output_stream astream(stream);
155 serialize_schema_message(*record_batches.begin(), astream);
156 m_pstream->write(std::move(schema_buffer));
157 }
158
159 for (const auto& rb : record_batches)
160 {
161 if (get_column_dtypes(rb) != m_dtypes)
162 {
163 throw std::invalid_argument("Record batch schema does not match serializer schema");
164 }
165 std::vector<uint8_t> buffer;
166 memory_output_stream stream(buffer);
167 any_output_stream astream(stream);
168 CompressionCache compressed_buffers_cache;
169 serialize_record_batch(rb, astream, m_compression, compressed_buffers_cache);
170 m_pstream->write(std::move(buffer));
171 }
172 }
173
174 inline chunk_serializer& chunk_serializer::operator<<(const sparrow::record_batch& rb)
175 {
176 write(rb);
177 return *this;
178 }
179
180 template <std::ranges::input_range R>
181 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
183 {
184 write(record_batches);
185 return *this;
186 }
187}
Type-erased wrapper for any stream-like object.
void write(const sparrow::record_batch &rb)
Writes a single record batch to the chunked stream.
chunk_serializer & operator<<(const sparrow::record_batch &rb)
chunk_serializer(chunked_memory_output_stream< std::vector< std::vector< uint8_t > > > &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a chunk serializer with a reference to a chunked memory output stream.
void end()
Finalizes the chunk serialization by writing an end-of-stream marker.
An output stream that writes data into separate memory chunks.
void reserve(std::size_t size)
Reserves capacity in the chunk container.
An output stream that writes data to a contiguous memory buffer.
#define SPARROW_IPC_API
Definition config.hpp:12
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
SPARROW_IPC_API std::vector< sparrow::data_type > get_column_dtypes(const sparrow::record_batch &rb)