4#include <sparrow/record_batch.hpp>
46 template <writable_stream TStream>
47 serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
48 : m_stream(stream), m_compression(compression)
66 void write(
const sparrow::record_batch& rb);
83 template <std::ranges::input_range R>
84 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
85 void write(
const R& record_batches)
88 if (std::ranges::empty(record_batches))
95 throw std::runtime_error(
"Cannot append to a serializer that has been ended");
100 const auto reserve_function = [&record_batches, &compressed_buffers_cache,
this]()
102 return std::accumulate(
103 record_batches.begin(),
104 record_batches.end(),
106 [&compressed_buffers_cache,
this](
size_t acc,
const sparrow::record_batch& rb)
108 return acc + calculate_record_batch_message_size(rb, m_compression, compressed_buffers_cache);
114 m_stream.reserve(reserve_function);
116 if (!m_schema_received)
118 m_schema_received =
true;
119 m_dtypes = get_column_dtypes(*record_batches.begin());
123 for (
const auto& rb : record_batches)
125 if (get_column_dtypes(rb) != m_dtypes)
127 throw std::invalid_argument(
"Record batch schema does not match serializer schema");
173 template <std::ranges::input_range R>
174 requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
177 write(record_batches);
214 static std::vector<sparrow::data_type> get_column_dtypes(
const sparrow::record_batch& rb);
216 bool m_schema_received{
false};
217 std::vector<sparrow::data_type> m_dtypes;
220 std::optional<CompressionType> m_compression;
Type-erased wrapper for any stream-like object.
A class for serializing Apache Arrow record batches to an output stream.
void write(const sparrow::record_batch &rb)
Writes a record batch to the serializer.
serializer(TStream &stream, std::optional< CompressionType > compression=std::nullopt)
Constructs a serializer object with a reference to a stream.
void write(const R &record_batches)
Writes a collection of record batches to the stream.
void end()
Finalizes the serialization process by writing end-of-stream marker.
serializer & operator<<(const sparrow::record_batch &rb)
serializer & operator<<(const R &record_batches)
~serializer()
Destructor for the serializer.
serializer & operator<<(serializer &(*manip)(serializer &))
SPARROW_IPC_API serialized_record_batch_info serialize_record_batch(const sparrow::record_batch &record_batch, any_output_stream &stream, std::optional< CompressionType > compression, std::optional< std::reference_wrapper< CompressionCache > > cache)
Serializes a record batch into a binary format following the Arrow IPC specification.
SPARROW_IPC_API void serialize_schema_message(const sparrow::record_batch &record_batch, any_output_stream &stream)
Serializes a schema message for a record batch into a byte buffer.
serializer & end_stream(serializer &serializer)
SPARROW_IPC_API std::size_t calculate_schema_message_size(const sparrow::record_batch &record_batch)
Calculates the total serialized size of a schema message.