sparrow-ipc 0.2.0
Loading...
Searching...
No Matches
deserialize_decimal_array.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <span>
4
5#include <sparrow/arrow_interface/arrow_array_schema_proxy.hpp>
6#include <sparrow/buffer/buffer.hpp>
7#include <sparrow/decimal_array.hpp>
8
9#include "Message_generated.h"
13
14namespace sparrow_ipc
15{
16 template <sparrow::decimal_type T>
17 [[nodiscard]] sparrow::decimal_array<T> deserialize_non_owning_decimal(
18 const org::apache::arrow::flatbuf::RecordBatch& record_batch,
19 std::span<const uint8_t> body,
20 std::string_view name,
21 const std::optional<std::vector<sparrow::metadata_pair>>& metadata,
22 bool nullable,
23 size_t& buffer_index,
24 int32_t scale,
25 int32_t precision
26 )
27 {
28 constexpr std::size_t sizeof_decimal = sizeof(typename T::integer_type);
29 std::string format_str = "d:" + std::to_string(precision) + "," + std::to_string(scale);
30 if constexpr (sizeof_decimal != 16) // We don't need to specify the size for 128-bit
31 // decimals
32 {
33 format_str += "," + std::to_string(sizeof_decimal * 8);
34 }
35
36 // Set up flags based on nullable
37 std::optional<std::unordered_set<sparrow::ArrowFlag>> flags;
38 if (nullable)
39 {
40 flags = std::unordered_set<sparrow::ArrowFlag>{sparrow::ArrowFlag::NULLABLE};
41 }
42
43 ArrowSchema schema = make_non_owning_arrow_schema(
44 format_str,
45 name.data(),
46 metadata,
47 flags,
48 0,
49 nullptr,
50 nullptr
51 );
52
53 const auto compression = record_batch.compression();
54 std::vector<arrow_array_private_data::optionally_owned_buffer> buffers;
55
56 auto validity_buffer_span = utils::get_buffer(record_batch, body, buffer_index);
57 auto data_buffer_span = utils::get_buffer(record_batch, body, buffer_index);
58
59 if (compression)
60 {
61 buffers.push_back(utils::get_decompressed_buffer(validity_buffer_span, compression));
62
63 // For decimal types, we need to ensure proper alignment of the decompressed data.
64 // The decompressed buffer itself is aligned, but we need to copy it to ensure
65 // the decimal values (especially int128 and int256) start at a properly aligned address.
66 auto decompressed_data = utils::get_decompressed_buffer(data_buffer_span, compression);
67 std::visit([&buffers](auto&& arg) {
68 using variant_type = std::decay_t<decltype(arg)>;
69 if constexpr (std::is_same_v<variant_type, sparrow::buffer<std::uint8_t>>)
70 {
71 // Already a buffer, move it
72 buffers.emplace_back(std::move(arg));
73 }
74 else
75 {
76 // It's a span, copy to ensure alignment
77 sparrow::buffer<std::uint8_t> aligned_buffer(arg.begin(), arg.end(), sparrow::buffer<std::uint8_t>::default_allocator());
78 buffers.emplace_back(std::move(aligned_buffer));
79 }
80 }, std::move(decompressed_data));
81 }
82 else
83 {
84 buffers.emplace_back(validity_buffer_span);
85 sparrow::buffer<std::uint8_t> data_buffer_copy(data_buffer_span.begin(), data_buffer_span.end(), sparrow::buffer<std::uint8_t>::default_allocator());
86 buffers.emplace_back(std::move(data_buffer_copy));
87 }
88
89 const auto [bitmap_ptr, null_count] = utils::get_bitmap_pointer_and_null_count(
90 validity_buffer_span,
91 record_batch.length()
92 );
93
95 record_batch.length(),
96 null_count,
97 0,
98 0,
99 nullptr,
100 nullptr,
101 std::move(buffers)
102 );
103 sparrow::arrow_proxy ap{std::move(array), std::move(schema)};
104 return sparrow::decimal_array<T>(std::move(ap));
105 }
106}
std::span< const uint8_t > get_buffer(const org::apache::arrow::flatbuf::RecordBatch &record_batch, std::span< const uint8_t > body, size_t &buffer_index)
Extracts a buffer from a RecordBatch's body.
std::variant< sparrow::buffer< std::uint8_t >, std::span< const std::uint8_t > > get_decompressed_buffer(std::span< const uint8_t > buffer_span, const org::apache::arrow::flatbuf::BodyCompression *compression)
Retrieves a decompressed buffer or a view of the original buffer.
std::pair< std::uint8_t *, int64_t > get_bitmap_pointer_and_null_count(std::span< const uint8_t > validity_buffer_span, const int64_t length)
Extracts bitmap pointer and null count from a validity buffer span.
sparrow::decimal_array< T > deserialize_non_owning_decimal(const org::apache::arrow::flatbuf::RecordBatch &record_batch, std::span< const uint8_t > body, std::string_view name, const std::optional< std::vector< sparrow::metadata_pair > > &metadata, bool nullable, size_t &buffer_index, int32_t scale, int32_t precision)
ArrowSchema make_non_owning_arrow_schema(std::string_view format, const char *name, std::optional< M > metadata, std::optional< std::unordered_set< sparrow::ArrowFlag > > flags, size_t children_count, ArrowSchema **children, ArrowSchema *dictionary)
ArrowArray make_arrow_array(int64_t length, int64_t null_count, int64_t offset, size_t children_count, ArrowArray **children, ArrowArray *dictionary, Arg &&private_data_arg)