Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 44 additions & 14 deletions src/iceberg/parquet/parquet_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,13 @@ Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
return output_array;
}

/// FIXME: Support ::arrow::LargeListArray.
Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
const std::shared_ptr<::arrow::ListArray>& list_array,
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
/// Templated implementation for projecting list arrays.
/// Works with both ListArray/ListType (32-bit offsets) and
/// LargeListArray/LargeListType (64-bit offsets).
template <typename ArrowListArrayType, typename ArrowListType>
Result<std::shared_ptr<::arrow::Array>> ProjectListArrayImpl(
const std::shared_ptr<ArrowListArrayType>& list_array,
const std::shared_ptr<ArrowListType>& output_list_type, const ListType& list_type,
std::span<const FieldProjection> projections,
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) {
if (projections.size() != 1) {
Expand All @@ -176,12 +179,30 @@ Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
ProjectPrimitiveArray(list_array->values(), output_element_type, pool));
}

return std::make_shared<::arrow::ListArray>(
return std::make_shared<ArrowListArrayType>(
output_list_type, list_array->length(), list_array->value_offsets(),
std::move(projected_values), list_array->null_bitmap(), list_array->null_count(),
list_array->offset());
}

Result<std::shared_ptr<::arrow::Array>> ProjectListArray(
const std::shared_ptr<::arrow::ListArray>& list_array,
const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type,
std::span<const FieldProjection> projections,
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) {
return ProjectListArrayImpl(list_array, output_list_type, list_type, projections,
metadata_context, pool);
}

Result<std::shared_ptr<::arrow::Array>> ProjectLargeListArray(
const std::shared_ptr<::arrow::LargeListArray>& list_array,
const std::shared_ptr<::arrow::LargeListType>& output_list_type,
const ListType& list_type, std::span<const FieldProjection> projections,
const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) {
return ProjectListArrayImpl(list_array, output_list_type, list_type, projections,
metadata_context, pool);
}

Result<std::shared_ptr<::arrow::Array>> ProjectMapArray(
const std::shared_ptr<::arrow::MapArray>& map_array,
const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type,
Expand Down Expand Up @@ -249,17 +270,26 @@ Result<std::shared_ptr<::arrow::Array>> ProjectNestedArray(
projections, metadata_context, pool);
}
case TypeId::kList: {
if (output_arrow_type->id() != ::arrow::Type::LIST) {
return InvalidSchema("Expected list type, got: {}",
output_arrow_type->ToString());
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);

if (output_arrow_type->id() == ::arrow::Type::LIST) {
auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
auto output_list_type =
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
return ProjectListArray(list_array, output_list_type, list_type, projections,
metadata_context, pool);
}

auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array);
auto output_list_type =
internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type);
const auto& list_type = internal::checked_cast<const ListType&>(nested_type);
return ProjectListArray(list_array, output_list_type, list_type, projections,
metadata_context, pool);
if (output_arrow_type->id() == ::arrow::Type::LARGE_LIST) {
auto list_array = internal::checked_pointer_cast<::arrow::LargeListArray>(array);
auto output_list_type =
internal::checked_pointer_cast<::arrow::LargeListType>(output_arrow_type);
return ProjectLargeListArray(list_array, output_list_type, list_type, projections,
metadata_context, pool);
}

return InvalidSchema("Expected list or large_list type, got: {}",
output_arrow_type->ToString());
}
case TypeId::kMap: {
if (output_arrow_type->id() != ::arrow::Type::MAP) {
Expand Down
159 changes: 159 additions & 0 deletions src/iceberg/test/parquet_data_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/

#include <arrow/array.h>
#include <arrow/array/builder_binary.h>
#include <arrow/array/builder_nested.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/c/bridge.h>
#include <arrow/json/from_string.h>
#include <arrow/record_batch.h>
Expand Down Expand Up @@ -503,4 +506,160 @@ TEST(ProjectRecordBatchTest, EmptyRecordBatch) {
VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json));
}

TEST(ProjectRecordBatchTest, LargeListOfIntegers) {
// Create a LargeListArray manually (JSON parsing creates regular ListArray)
auto value_builder = std::make_shared<::arrow::Int32Builder>();
::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(), value_builder,
::arrow::large_list(::arrow::int32()));

// Build: [[1, 2, 3], [4, 5]]
ASSERT_TRUE(list_builder.Append().ok());
ASSERT_TRUE(value_builder->Append(1).ok());
ASSERT_TRUE(value_builder->Append(2).ok());
ASSERT_TRUE(value_builder->Append(3).ok());

ASSERT_TRUE(list_builder.Append().ok());
ASSERT_TRUE(value_builder->Append(4).ok());
ASSERT_TRUE(value_builder->Append(5).ok());

auto large_list_array_result = list_builder.Finish();
ASSERT_TRUE(large_list_array_result.ok());
auto large_list_array = large_list_array_result.ValueOrDie();

// Create input record batch with the LargeListArray
auto input_arrow_schema =
::arrow::schema({::arrow::field("numbers", ::arrow::large_list(::arrow::int32()))});
auto input_record_batch =
::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array});

// Create Iceberg schema (uses ListType which maps to both LIST and LARGE_LIST)
Schema iceberg_schema({
SchemaField::MakeRequired(
1, "numbers",
std::make_shared<ListType>(SchemaField::MakeRequired(2, "element", int32()))),
});

// Create schema projection
auto schema_projection_result =
Project(iceberg_schema, iceberg_schema, /*prune_source=*/false);
ASSERT_THAT(schema_projection_result, IsOk());
auto schema_projection = std::move(schema_projection_result.value());

// Create output Arrow schema with LargeListType
auto output_arrow_schema =
::arrow::schema({::arrow::field("numbers", ::arrow::large_list(::arrow::int32()))});

// Project the record batch
auto project_result = ProjectRecordBatch(
input_record_batch, output_arrow_schema, iceberg_schema, schema_projection,
/*metadata_context=*/{}, ::arrow::default_memory_pool());
ASSERT_THAT(project_result, IsOk());
auto projected_record_batch = std::move(project_result.value());

// Verify the result is a LargeListArray
ASSERT_EQ(projected_record_batch->num_columns(), 1);
ASSERT_EQ(projected_record_batch->column(0)->type()->id(), ::arrow::Type::LARGE_LIST);

// Verify the values
auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>(
projected_record_batch->column(0));
ASSERT_EQ(projected_array->length(), 2);

// First list: [1, 2, 3]
auto first_list = projected_array->value_slice(0);
ASSERT_EQ(first_list->length(), 3);
auto first_values = std::static_pointer_cast<::arrow::Int32Array>(first_list);
EXPECT_EQ(first_values->Value(0), 1);
EXPECT_EQ(first_values->Value(1), 2);
EXPECT_EQ(first_values->Value(2), 3);

// Second list: [4, 5]
auto second_list = projected_array->value_slice(1);
ASSERT_EQ(second_list->length(), 2);
auto second_values = std::static_pointer_cast<::arrow::Int32Array>(second_list);
EXPECT_EQ(second_values->Value(0), 4);
EXPECT_EQ(second_values->Value(1), 5);
}

TEST(ProjectRecordBatchTest, LargeListOfStructs) {
// Create a LargeListArray with struct elements
auto name_builder = std::make_shared<::arrow::StringBuilder>();
auto age_builder = std::make_shared<::arrow::Int32Builder>();
std::vector<std::shared_ptr<::arrow::ArrayBuilder>> field_builders = {name_builder,
age_builder};
auto struct_type = ::arrow::struct_(
{::arrow::field("name", ::arrow::utf8()), ::arrow::field("age", ::arrow::int32())});
auto struct_builder = std::make_shared<::arrow::StructBuilder>(
struct_type, ::arrow::default_memory_pool(), field_builders);

::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(), struct_builder,
::arrow::large_list(struct_type));

// Build: [[{name: "Alice", age: 30}], [{name: "Bob", age: 25}, {name: "Carol", age:
// 35}]]
ASSERT_TRUE(list_builder.Append().ok());
ASSERT_TRUE(struct_builder->Append().ok());
ASSERT_TRUE(name_builder->Append("Alice").ok());
ASSERT_TRUE(age_builder->Append(30).ok());

ASSERT_TRUE(list_builder.Append().ok());
ASSERT_TRUE(struct_builder->Append().ok());
ASSERT_TRUE(name_builder->Append("Bob").ok());
ASSERT_TRUE(age_builder->Append(25).ok());
ASSERT_TRUE(struct_builder->Append().ok());
ASSERT_TRUE(name_builder->Append("Carol").ok());
ASSERT_TRUE(age_builder->Append(35).ok());

auto large_list_array_result = list_builder.Finish();
ASSERT_TRUE(large_list_array_result.ok());
auto large_list_array = large_list_array_result.ValueOrDie();

// Create input record batch
auto input_arrow_schema =
::arrow::schema({::arrow::field("people", ::arrow::large_list(struct_type))});
auto input_record_batch =
::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array});

// Create Iceberg schema
Schema iceberg_schema({
SchemaField::MakeRequired(1, "people",
std::make_shared<ListType>(SchemaField::MakeRequired(
2, "element",
std::make_shared<StructType>(std::vector<SchemaField>{
SchemaField::MakeRequired(3, "name", string()),
SchemaField::MakeRequired(4, "age", int32()),
})))),
});

// Create schema projection
auto schema_projection_result =
Project(iceberg_schema, iceberg_schema, /*prune_source=*/false);
ASSERT_THAT(schema_projection_result, IsOk());
auto schema_projection = std::move(schema_projection_result.value());

// Create output Arrow schema with LargeListType
auto output_arrow_schema =
::arrow::schema({::arrow::field("people", ::arrow::large_list(struct_type))});

// Project the record batch
auto project_result = ProjectRecordBatch(
input_record_batch, output_arrow_schema, iceberg_schema, schema_projection,
/*metadata_context=*/{}, ::arrow::default_memory_pool());
ASSERT_THAT(project_result, IsOk());
auto projected_record_batch = std::move(project_result.value());

// Verify the result is a LargeListArray
ASSERT_EQ(projected_record_batch->num_columns(), 1);
ASSERT_EQ(projected_record_batch->column(0)->type()->id(), ::arrow::Type::LARGE_LIST);

auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>(
projected_record_batch->column(0));
ASSERT_EQ(projected_array->length(), 2);

// Verify first list has 1 element
EXPECT_EQ(projected_array->value_length(0), 1);
// Verify second list has 2 elements
EXPECT_EQ(projected_array->value_length(1), 2);
}

} // namespace iceberg::parquet
Loading