diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 4a8b2cf52..14d20ff9e 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -148,10 +148,13 @@ Result> ProjectStructArray( return output_array; } -/// FIXME: Support ::arrow::LargeListArray. -Result> ProjectListArray( - const std::shared_ptr<::arrow::ListArray>& list_array, - const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type, +/// Templated implementation for projecting list arrays. +/// Works with both ListArray/ListType (32-bit offsets) and +/// LargeListArray/LargeListType (64-bit offsets). +template +Result> ProjectListArrayImpl( + const std::shared_ptr& list_array, + const std::shared_ptr& output_list_type, const ListType& list_type, std::span projections, const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) { if (projections.size() != 1) { @@ -176,12 +179,30 @@ Result> ProjectListArray( ProjectPrimitiveArray(list_array->values(), output_element_type, pool)); } - return std::make_shared<::arrow::ListArray>( + return std::make_shared( output_list_type, list_array->length(), list_array->value_offsets(), std::move(projected_values), list_array->null_bitmap(), list_array->null_count(), list_array->offset()); } +Result> ProjectListArray( + const std::shared_ptr<::arrow::ListArray>& list_array, + const std::shared_ptr<::arrow::ListType>& output_list_type, const ListType& list_type, + std::span projections, + const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) { + return ProjectListArrayImpl(list_array, output_list_type, list_type, projections, + metadata_context, pool); +} + +Result> ProjectLargeListArray( + const std::shared_ptr<::arrow::LargeListArray>& list_array, + const std::shared_ptr<::arrow::LargeListType>& output_list_type, + const ListType& list_type, std::span projections, + const arrow::MetadataColumnContext& metadata_context, ::arrow::MemoryPool* pool) { + return ProjectListArrayImpl(list_array, output_list_type, list_type, projections, + metadata_context, pool); +} + Result> ProjectMapArray( const std::shared_ptr<::arrow::MapArray>& map_array, const std::shared_ptr<::arrow::MapType>& output_map_type, const MapType& map_type, @@ -249,17 +270,26 @@ Result> ProjectNestedArray( projections, metadata_context, pool); } case TypeId::kList: { - if (output_arrow_type->id() != ::arrow::Type::LIST) { - return InvalidSchema("Expected list type, got: {}", - output_arrow_type->ToString()); + const auto& list_type = internal::checked_cast(nested_type); + + if (output_arrow_type->id() == ::arrow::Type::LIST) { + auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array); + auto output_list_type = + internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type); + return ProjectListArray(list_array, output_list_type, list_type, projections, + metadata_context, pool); } - auto list_array = internal::checked_pointer_cast<::arrow::ListArray>(array); - auto output_list_type = - internal::checked_pointer_cast<::arrow::ListType>(output_arrow_type); - const auto& list_type = internal::checked_cast(nested_type); - return ProjectListArray(list_array, output_list_type, list_type, projections, - metadata_context, pool); + if (output_arrow_type->id() == ::arrow::Type::LARGE_LIST) { + auto list_array = internal::checked_pointer_cast<::arrow::LargeListArray>(array); + auto output_list_type = + internal::checked_pointer_cast<::arrow::LargeListType>(output_arrow_type); + return ProjectLargeListArray(list_array, output_list_type, list_type, projections, + metadata_context, pool); + } + + return InvalidSchema("Expected list or large_list type, got: {}", + output_arrow_type->ToString()); } case TypeId::kMap: { if (output_arrow_type->id() != ::arrow::Type::MAP) { diff --git a/src/iceberg/test/parquet_data_test.cc b/src/iceberg/test/parquet_data_test.cc index d7b5a6ed8..9ed28114e 100644 --- a/src/iceberg/test/parquet_data_test.cc +++ b/src/iceberg/test/parquet_data_test.cc @@ -18,6 +18,9 @@ */ #include +#include +#include +#include #include #include #include @@ -503,4 +506,160 @@ TEST(ProjectRecordBatchTest, EmptyRecordBatch) { VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); } +TEST(ProjectRecordBatchTest, LargeListOfIntegers) { + // Create a LargeListArray manually (JSON parsing creates regular ListArray) + auto value_builder = std::make_shared<::arrow::Int32Builder>(); + ::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(), value_builder, + ::arrow::large_list(::arrow::int32())); + + // Build: [[1, 2, 3], [4, 5]] + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(value_builder->Append(1).ok()); + ASSERT_TRUE(value_builder->Append(2).ok()); + ASSERT_TRUE(value_builder->Append(3).ok()); + + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(value_builder->Append(4).ok()); + ASSERT_TRUE(value_builder->Append(5).ok()); + + auto large_list_array_result = list_builder.Finish(); + ASSERT_TRUE(large_list_array_result.ok()); + auto large_list_array = large_list_array_result.ValueOrDie(); + + // Create input record batch with the LargeListArray + auto input_arrow_schema = + ::arrow::schema({::arrow::field("numbers", ::arrow::large_list(::arrow::int32()))}); + auto input_record_batch = + ::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array}); + + // Create Iceberg schema (uses ListType which maps to both LIST and LARGE_LIST) + Schema iceberg_schema({ + SchemaField::MakeRequired( + 1, "numbers", + std::make_shared(SchemaField::MakeRequired(2, "element", int32()))), + }); + + // Create schema projection + auto schema_projection_result = + Project(iceberg_schema, iceberg_schema, /*prune_source=*/false); + ASSERT_THAT(schema_projection_result, IsOk()); + auto schema_projection = std::move(schema_projection_result.value()); + + // Create output Arrow schema with LargeListType + auto output_arrow_schema = + ::arrow::schema({::arrow::field("numbers", ::arrow::large_list(::arrow::int32()))}); + + // Project the record batch + auto project_result = ProjectRecordBatch( + input_record_batch, output_arrow_schema, iceberg_schema, schema_projection, + /*metadata_context=*/{}, ::arrow::default_memory_pool()); + ASSERT_THAT(project_result, IsOk()); + auto projected_record_batch = std::move(project_result.value()); + + // Verify the result is a LargeListArray + ASSERT_EQ(projected_record_batch->num_columns(), 1); + ASSERT_EQ(projected_record_batch->column(0)->type()->id(), ::arrow::Type::LARGE_LIST); + + // Verify the values + auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>( + projected_record_batch->column(0)); + ASSERT_EQ(projected_array->length(), 2); + + // First list: [1, 2, 3] + auto first_list = projected_array->value_slice(0); + ASSERT_EQ(first_list->length(), 3); + auto first_values = std::static_pointer_cast<::arrow::Int32Array>(first_list); + EXPECT_EQ(first_values->Value(0), 1); + EXPECT_EQ(first_values->Value(1), 2); + EXPECT_EQ(first_values->Value(2), 3); + + // Second list: [4, 5] + auto second_list = projected_array->value_slice(1); + ASSERT_EQ(second_list->length(), 2); + auto second_values = std::static_pointer_cast<::arrow::Int32Array>(second_list); + EXPECT_EQ(second_values->Value(0), 4); + EXPECT_EQ(second_values->Value(1), 5); +} + +TEST(ProjectRecordBatchTest, LargeListOfStructs) { + // Create a LargeListArray with struct elements + auto name_builder = std::make_shared<::arrow::StringBuilder>(); + auto age_builder = std::make_shared<::arrow::Int32Builder>(); + std::vector> field_builders = {name_builder, + age_builder}; + auto struct_type = ::arrow::struct_( + {::arrow::field("name", ::arrow::utf8()), ::arrow::field("age", ::arrow::int32())}); + auto struct_builder = std::make_shared<::arrow::StructBuilder>( + struct_type, ::arrow::default_memory_pool(), field_builders); + + ::arrow::LargeListBuilder list_builder(::arrow::default_memory_pool(), struct_builder, + ::arrow::large_list(struct_type)); + + // Build: [[{name: "Alice", age: 30}], [{name: "Bob", age: 25}, {name: "Carol", age: + // 35}]] + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(struct_builder->Append().ok()); + ASSERT_TRUE(name_builder->Append("Alice").ok()); + ASSERT_TRUE(age_builder->Append(30).ok()); + + ASSERT_TRUE(list_builder.Append().ok()); + ASSERT_TRUE(struct_builder->Append().ok()); + ASSERT_TRUE(name_builder->Append("Bob").ok()); + ASSERT_TRUE(age_builder->Append(25).ok()); + ASSERT_TRUE(struct_builder->Append().ok()); + ASSERT_TRUE(name_builder->Append("Carol").ok()); + ASSERT_TRUE(age_builder->Append(35).ok()); + + auto large_list_array_result = list_builder.Finish(); + ASSERT_TRUE(large_list_array_result.ok()); + auto large_list_array = large_list_array_result.ValueOrDie(); + + // Create input record batch + auto input_arrow_schema = + ::arrow::schema({::arrow::field("people", ::arrow::large_list(struct_type))}); + auto input_record_batch = + ::arrow::RecordBatch::Make(input_arrow_schema, 2, {large_list_array}); + + // Create Iceberg schema + Schema iceberg_schema({ + SchemaField::MakeRequired(1, "people", + std::make_shared(SchemaField::MakeRequired( + 2, "element", + std::make_shared(std::vector{ + SchemaField::MakeRequired(3, "name", string()), + SchemaField::MakeRequired(4, "age", int32()), + })))), + }); + + // Create schema projection + auto schema_projection_result = + Project(iceberg_schema, iceberg_schema, /*prune_source=*/false); + ASSERT_THAT(schema_projection_result, IsOk()); + auto schema_projection = std::move(schema_projection_result.value()); + + // Create output Arrow schema with LargeListType + auto output_arrow_schema = + ::arrow::schema({::arrow::field("people", ::arrow::large_list(struct_type))}); + + // Project the record batch + auto project_result = ProjectRecordBatch( + input_record_batch, output_arrow_schema, iceberg_schema, schema_projection, + /*metadata_context=*/{}, ::arrow::default_memory_pool()); + ASSERT_THAT(project_result, IsOk()); + auto projected_record_batch = std::move(project_result.value()); + + // Verify the result is a LargeListArray + ASSERT_EQ(projected_record_batch->num_columns(), 1); + ASSERT_EQ(projected_record_batch->column(0)->type()->id(), ::arrow::Type::LARGE_LIST); + + auto projected_array = std::static_pointer_cast<::arrow::LargeListArray>( + projected_record_batch->column(0)); + ASSERT_EQ(projected_array->length(), 2); + + // Verify first list has 1 element + EXPECT_EQ(projected_array->value_length(0), 1); + // Verify second list has 2 elements + EXPECT_EQ(projected_array->value_length(1), 2); +} + } // namespace iceberg::parquet