Appends a record batch using the stream insertion operator.
Appends a record batch using the stream insertion operator.This operator provides a convenient stream-like interface for appending record batches to the serializer. It delegates to the append() method and returns a reference to the serializer to enable method chaining.
#include <cstddef>
#include <numeric>
#include <sparrow/record_batch.hpp>
{
{
public:
template <writable_stream TStream>
serializer(TStream& stream, std::optional<CompressionType> compression = std::nullopt)
: m_stream(stream), m_compression(compression)
{
}
~serializer();
void write(const sparrow::record_batch& rb);
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
void write(const R& record_batches)
{
CompressionCache compressed_buffers_cache;
if (std::ranges::empty(record_batches))
{
return;
}
if (m_ended)
{
throw std::runtime_error("Cannot append to a serializer that has been ended");
}
const auto reserve_function = [&record_batches, &compressed_buffers_cache, this]()
{
return std::accumulate(
record_batches.begin(),
record_batches.end(),
m_stream.size(),
[&compressed_buffers_cache, this](size_t acc, const sparrow::record_batch& rb)
{
return acc + calculate_record_batch_message_size(rb, m_compression, compressed_buffers_cache);
}
)
+ (m_schema_received ? 0 : calculate_schema_message_size(*record_batches.begin()));
};
m_stream.reserve(reserve_function);
if (!m_schema_received)
{
m_schema_received = true;
m_dtypes = get_column_dtypes(*record_batches.begin());
serialize_schema_message(*record_batches.begin(), m_stream);
}
for (const auto& rb : record_batches)
{
if (get_column_dtypes(rb) != m_dtypes)
{
throw std::invalid_argument("Record batch schema does not match serializer schema");
}
serialize_record_batch(rb, m_stream, m_compression, compressed_buffers_cache);
}
}
serializer& operator<<(const sparrow::record_batch& rb)
{
write(rb);
return *this;
}
template <std::ranges::input_range R>
requires std::same_as<std::ranges::range_value_t<R>, sparrow::record_batch>
serializer& operator<<(const R& record_batches)
{
write(record_batches);
return *this;
}
serializer& operator<<(serializer& (*manip)(serializer&))
{
return manip(*this);
}
void end();
private:
static std::vector<sparrow::data_type> get_column_dtypes(const sparrow::record_batch& rb);
bool m_schema_received{false};
std::vector<sparrow::data_type> m_dtypes;
any_output_stream m_stream;
bool m_ended{false};
std::optional<CompressionType> m_compression;
};
inline serializer&
end_stream(serializer& serializer)
{
return serializer;
}
}
void end()
Finalizes the serialization process by writing end-of-stream marker.
serializer & end_stream(serializer &serializer)