Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
update/update_statistics.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
Expand Down
14 changes: 14 additions & 0 deletions src/iceberg/json_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ constexpr std::string_view kActionSetSnapshotRef = "set-snapshot-ref";
constexpr std::string_view kActionSetProperties = "set-properties";
constexpr std::string_view kActionRemoveProperties = "remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
constexpr std::string_view kActionSetStatistics = "set-statistics";
constexpr std::string_view kActionRemoveStatistics = "remove-statistics";

// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
Expand Down Expand Up @@ -1399,6 +1401,18 @@ nlohmann::json ToJson(const TableUpdate& update) {
json[kLocation] = u.location();
break;
}
case TableUpdate::Kind::kSetStatistics: {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add FromJson equivalents and their test cases.

const auto& u = internal::checked_cast<const table::SetStatistics&>(update);
json[kAction] = kActionSetStatistics;
json[kStatistics] = ToJson(*u.statistics_file());
break;
}
case TableUpdate::Kind::kRemoveStatistics: {
const auto& u = internal::checked_cast<const table::RemoveStatistics&>(update);
json[kAction] = kActionRemoveStatistics;
json[kSnapshotId] = u.snapshot_id();
break;
}
}
return json;
}
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ iceberg_sources = files(
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_sort_order.cc',
'update/update_statistics.cc',
'util/bucket_util.cc',
'util/content_file_util.cc',
'util/conversions.cc',
Expand Down
8 changes: 8 additions & 0 deletions src/iceberg/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_statistics.h"
#include "iceberg/util/macros.h"

namespace iceberg {
Expand Down Expand Up @@ -192,6 +193,13 @@ Result<std::shared_ptr<ExpireSnapshots>> Table::NewExpireSnapshots() {
return transaction->NewExpireSnapshots();
}

Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
ICEBERG_ASSIGN_OR_RAISE(
auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate,
/*auto_commit=*/true));
return transaction->NewUpdateStatistics();
}

Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
4 changes: 4 additions & 0 deletions src/iceberg/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();

/// \brief Create a new UpdateStatistics to update the table statistics and commit the
/// changes.
virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();

protected:
Table(TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
Expand Down
57 changes: 51 additions & 6 deletions src/iceberg/table_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_update.h"
#include "iceberg/util/checked_cast.h"
Expand Down Expand Up @@ -612,14 +613,16 @@ class TableMetadataBuilder::Impl {
Status SetCurrentSchema(int32_t schema_id);
Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
void SetLocation(std::string_view location);
Status SetLocation(std::string_view location);
Status AddSnapshot(std::shared_ptr<Snapshot> snapshot);
Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const std::string& branch);
Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
Status RemoveRef(const std::string& name);
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
Status SetStatistics(const std::shared_ptr<StatisticsFile>& statistics_file);
Status RemoveStatistics(int64_t snapshot_id);

Result<std::unique_ptr<TableMetadata>> Build();

Expand Down Expand Up @@ -1032,12 +1035,13 @@ Result<int32_t> TableMetadataBuilder::Impl::AddSchema(const Schema& schema,
return new_schema_id;
}

void TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
Status TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing this? We don't need to return status if it never has any error.

if (location == metadata_.location) {
return;
return {};
}
metadata_.location = std::string(location);
changes_.push_back(std::make_unique<table::SetLocation>(std::string(location)));
return {};
}

Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot> snapshot) {
Expand Down Expand Up @@ -1173,6 +1177,45 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
return {};
}

Status TableMetadataBuilder::Impl::SetStatistics(
const std::shared_ptr<StatisticsFile>& statistics_file) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const std::shared_ptr<StatisticsFile>& statistics_file) {
std::shared_ptr<StatisticsFile> statistics_file) {

ICEBERG_CHECK(statistics_file != nullptr, "Cannot set null statistics file");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ICEBERG_CHECK(statistics_file != nullptr, "Cannot set null statistics file");
ICEBERG_PRECHECK(statistics_file != nullptr, "Cannot set null statistics file");

Note that PRECHECK returns InvalidArgument while CHECK returns ValidationFailed.


// Find and replace existing statistics for the same snapshot_id, or add new one
auto it = std::ranges::find_if(
metadata_.statistics,
[snapshot_id = statistics_file->snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});

if (it != metadata_.statistics.end()) {
*it = statistics_file;
} else {
metadata_.statistics.push_back(statistics_file);
}

changes_.push_back(std::make_unique<table::SetStatistics>(statistics_file));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use std::move

return {};
}

Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
auto it = std::ranges::find_if(metadata_.statistics, [snapshot_id](const auto& stat) {
return stat && stat->snapshot_id == snapshot_id;
});

if (it == metadata_.statistics.end()) {
return {};
}

// Remove statistics for the given snapshot_id
std::erase_if(metadata_.statistics, [snapshot_id](const auto& stat) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we combine find and erase in a single call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you can just call something like metadata_.statistics.erase(it) to avoid a 2nd iteration

return stat && stat->snapshot_id == snapshot_id;
});

changes_.push_back(std::make_unique<table::RemoveStatistics>(snapshot_id));
return {};
}

std::unordered_set<int64_t> TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
int64_t current_snapshot_id) const {
std::unordered_set<int64_t> added_snapshot_ids;
Expand Down Expand Up @@ -1590,11 +1633,13 @@ TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() {

TableMetadataBuilder& TableMetadataBuilder::SetStatistics(
const std::shared_ptr<StatisticsFile>& statistics_file) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetStatistics(statistics_file));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id) {
throw IcebergError(std::format("{} not implemented", __FUNCTION__));
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveStatistics(snapshot_id));
return *this;
}

TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
Expand All @@ -1620,7 +1665,7 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties(
}

TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) {
impl_->SetLocation(location);
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetLocation(location));
return *this;
}

Expand Down
47 changes: 47 additions & 0 deletions src/iceberg/table_update.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "iceberg/exception.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
#include "iceberg/statistics_file.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirements.h"

Expand Down Expand Up @@ -446,4 +447,50 @@ std::unique_ptr<TableUpdate> SetLocation::Clone() const {
return std::make_unique<SetLocation>(location_);
}

// SetStatistics

int64_t SetStatistics::snapshot_id() const { return statistics_file_->snapshot_id; }

void SetStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.SetStatistics(statistics_file_);
}

void SetStatistics::GenerateRequirements(TableUpdateContext& context) const {
// SetStatistics doesn't generate any requirements
}

bool SetStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetStatistics) {
return false;
}
const auto& other_set = static_cast<const SetStatistics&>(other);
return *statistics_file_ == *other_set.statistics_file_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: check null just in case.

}

std::unique_ptr<TableUpdate> SetStatistics::Clone() const {
return std::make_unique<SetStatistics>(statistics_file_);
}

// RemoveStatistics

void RemoveStatistics::ApplyTo(TableMetadataBuilder& builder) const {
builder.RemoveStatistics(snapshot_id_);
}

void RemoveStatistics::GenerateRequirements(TableUpdateContext& context) const {
// RemoveStatistics doesn't generate any requirements
}

bool RemoveStatistics::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveStatistics) {
return false;
}
const auto& other_remove = static_cast<const RemoveStatistics&>(other);
return snapshot_id_ == other_remove.snapshot_id_;
}

std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
return std::make_unique<RemoveStatistics>(snapshot_id_);
}

} // namespace iceberg::table
49 changes: 49 additions & 0 deletions src/iceberg/table_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class ICEBERG_EXPORT TableUpdate {
kSetProperties,
kRemoveProperties,
kSetLocation,
kSetStatistics,
kRemoveStatistics,
};

virtual ~TableUpdate();
Expand Down Expand Up @@ -509,6 +511,53 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate {
std::string location_;
};

/// \brief Represents setting statistics for a snapshot
class ICEBERG_EXPORT SetStatistics : public TableUpdate {
public:
explicit SetStatistics(std::shared_ptr<StatisticsFile> statistics_file)
: statistics_file_(std::move(statistics_file)) {}

int64_t snapshot_id() const;

const std::shared_ptr<StatisticsFile>& statistics_file() const {
return statistics_file_;
}

void ApplyTo(TableMetadataBuilder& builder) const override;

void GenerateRequirements(TableUpdateContext& context) const override;

Kind kind() const override { return Kind::kSetStatistics; }

bool Equals(const TableUpdate& other) const override;

std::unique_ptr<TableUpdate> Clone() const override;

private:
std::shared_ptr<StatisticsFile> statistics_file_;
};

/// \brief Represents removing statistics for a snapshot
class ICEBERG_EXPORT RemoveStatistics : public TableUpdate {
public:
explicit RemoveStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {}

int64_t snapshot_id() const { return snapshot_id_; }

void ApplyTo(TableMetadataBuilder& builder) const override;

void GenerateRequirements(TableUpdateContext& context) const override;

Kind kind() const override { return Kind::kRemoveStatistics; }

bool Equals(const TableUpdate& other) const override;

std::unique_ptr<TableUpdate> Clone() const override;

private:
int64_t snapshot_id_;
};

} // namespace table

} // namespace iceberg
3 changes: 2 additions & 1 deletion src/iceberg/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ if(ICEBERG_BUILD_BUNDLE)
update_partition_spec_test.cc
update_properties_test.cc
update_schema_test.cc
update_sort_order_test.cc)
update_sort_order_test.cc
update_statistics_test.cc)

add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)

Expand Down
Loading
Loading