diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 86a0efd7e..9a49997d2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -84,6 +84,7 @@ set(ICEBERG_SOURCES update/expire_snapshots.cc update/pending_update.cc update/snapshot_update.cc + update/set_snapshot.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 87f508cd5..858f187a4 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -104,6 +104,7 @@ iceberg_sources = files( 'type.cc', 'update/expire_snapshots.cc', 'update/pending_update.cc', + 'update/set_snapshot.cc', 'update/snapshot_update.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index c79ac53fb..e85a77925 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -192,6 +192,13 @@ Result> Table::NewExpireSnapshots() { return transaction->NewExpireSnapshots(); } +Result> Table::NewSetSnapshot() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewSetSnapshot(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index cc9482486..9aa8532ee 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -151,6 +151,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewExpireSnapshots(); + /// \brief Create a new SetSnapshot to set the current snapshot or rollback to a + /// previous snapshot and commit the changes. + virtual Result> NewSetSnapshot(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1f6ab5521..a80bc1d15 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -171,6 +171,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES expire_snapshots_test.cc + set_snapshot_test.cc transaction_test.cc update_partition_spec_test.cc update_properties_test.cc diff --git a/src/iceberg/test/set_snapshot_test.cc b/src/iceberg/test/set_snapshot_test.cc new file mode 100644 index 000000000..c25c3cc97 --- /dev/null +++ b/src/iceberg/test/set_snapshot_test.cc @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/set_snapshot.h" + +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" + +namespace iceberg { + +// Test fixture for SetSnapshot tests +class SetSnapshotTest : public UpdateTestBase { + protected: + // Snapshot IDs from TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotId = 3051729675574597004; + static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; + + // Timestamps from TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770; + static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770; +}; + +TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot); + + set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); + + // Commit and verify the change was persisted + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId); + + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Try to set to a non-existent snapshot + int64_t invalid_snapshot_id = 9999999999999999; + set_snapshot->SetCurrentSnapshot(invalid_snapshot_id); + + // Should fail during Apply + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("is not found")); +} + +TEST_F(SetSnapshotTest, RollbackToValid) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Rollback to the oldest snapshot (which is an ancestor) + set_snapshot->RollbackTo(kOldestSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Try to rollback to a non-existent snapshot + int64_t invalid_snapshot_id = 9999999999999999; + set_snapshot->RollbackTo(invalid_snapshot_id); + + // Should fail during Apply + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); +} + +TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Rollback to a time between the two snapshots + // This should select the oldest snapshot + int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2; + set_snapshot->RollbackToTime(time_between); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Try to rollback to a time before any snapshot + int64_t time_before_all = kOldestSnapshotTimestamp - 1000000; + set_snapshot->RollbackToTime(time_before_all); + + // Should fail - no snapshot older than the specified time + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than")); +} + +TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Rollback to a timestamp just after the oldest snapshot + // This should return the oldest snapshot (the latest one before this time) + int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1; + set_snapshot->RollbackToTime(time_just_after_oldest); + + // Apply and verify - should return the oldest snapshot + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, ApplyWithoutChanges) { + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + // Apply without making any changes (NOOP) + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + + // Should return current snapshot + EXPECT_EQ(snapshot_id, kCurrentSnapshotId); + + // Commit NOOP and verify nothing changed + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index f763c5670..8f6a49b35 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -24,6 +24,7 @@ #include "iceberg/catalog.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -32,6 +33,7 @@ #include "iceberg/table_update.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" @@ -183,6 +185,12 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); } } break; + case PendingUpdate::Kind::kSetSnapshot: { + auto& set_snapshot = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply()); + metadata_builder_->SetBranchSnapshot(snapshot_id, + std::string(SnapshotRef::kMainBranch)); + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -280,4 +288,11 @@ Result> Transaction::NewExpireSnapshots() { return expire_snapshots; } +Result> Transaction::NewSetSnapshot() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr set_snapshot, + SetSnapshot::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot)); + return set_snapshot; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 057a27a90..fe5c2cc95 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -26,6 +26,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/update/set_snapshot.h" namespace iceberg { @@ -82,6 +83,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewExpireSnapshots(); + /// \brief Create a new SetSnapshot to set the current snapshot or rollback to a + /// previous snapshot and commit the changes. + Result> NewSetSnapshot(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index c8854031d..61e5eb21d 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -194,6 +194,7 @@ class UpdateProperties; class UpdateSchema; class UpdateSortOrder; class ExpireSnapshots; +class SetSnapshot; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4238e0222..4ce209d76 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'pending_update.h', + 'set_snapshot.h', 'snapshot_update.h', 'update_partition_spec.h', 'update_schema.h', diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 8a8329eee..c6c5fc760 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: enum class Kind : uint8_t { kExpireSnapshots, + kSetSnapshot, kUpdatePartitionSpec, kUpdateProperties, kUpdateSchema, diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc new file mode 100644 index 000000000..21ebbd710 --- /dev/null +++ b/src/iceberg/update/set_snapshot.cc @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/set_snapshot.h" + +#include +#include +#include + +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +Result> SetSnapshot::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create SetSnapshot without a transaction"); + return std::shared_ptr(new SetSnapshot(std::move(transaction))); +} + +SetSnapshot::SetSnapshot(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +SetSnapshot::~SetSnapshot() = default; + +SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) { + // Validate that the snapshot exists + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot, base().SnapshotById(snapshot_id)); + ICEBERG_BUILDER_CHECK(snapshot != nullptr, + "Cannot roll back to unknown snapshot id: {}", snapshot_id); + target_snapshot_id_ = snapshot_id; + return *this; +} + +SetSnapshot& SetSnapshot::RollbackToTime(int64_t timestamp_ms) { + // Find the latest snapshot by timestamp older than timestamp_ms + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot_opt, + FindLatestAncestorOlderThan(timestamp_ms)); + + ICEBERG_BUILDER_CHECK(snapshot_opt.has_value(), + "Cannot roll back, no valid snapshot older than: {}", + timestamp_ms); + + target_snapshot_id_ = snapshot_opt.value()->snapshot_id; + is_rollback_ = true; + + return *this; +} + +SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { + // Validate that the snapshot exists + auto snapshot_result = base().SnapshotById(snapshot_id); + ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", snapshot_id); + + // Validate that the snapshot is an ancestor of the current state + ICEBERG_BUILDER_ASSIGN_OR_RETURN(bool is_ancestor, + SnapshotUtil::IsAncestorOf(base(), snapshot_id)); + ICEBERG_BUILDER_CHECK( + is_ancestor, + "Cannot roll back to snapshot, not an ancestor of the current state: {}", + snapshot_id); + + return SetCurrentSnapshot(snapshot_id); +} + +Result SetSnapshot::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + const TableMetadata& base_metadata = transaction_->current(); + + // If no target snapshot was configured, return current state (NOOP) + if (!target_snapshot_id_.has_value()) { + ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, base_metadata.Snapshot()); + return current_snapshot->snapshot_id; + } + + // Validate that the snapshot exists + auto snapshot_result = base_metadata.SnapshotById(target_snapshot_id_.value()); + ICEBERG_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", + target_snapshot_id_.value()); + + // If this is a rollback, validate that the target is still an ancestor + if (is_rollback_) { + ICEBERG_ASSIGN_OR_RAISE( + bool is_ancestor, + SnapshotUtil::IsAncestorOf(base_metadata, target_snapshot_id_.value())); + ICEBERG_CHECK(is_ancestor, + "Cannot roll back to {}: not an ancestor of the current table state", + target_snapshot_id_.value()); + } + + return target_snapshot_id_.value(); +} + +Result>> SetSnapshot::FindLatestAncestorOlderThan( + int64_t timestamp_ms) const { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, SnapshotUtil::CurrentAncestors(base())); + + TimePointMs target_timestamp = TimePointMsFromUnixMs(timestamp_ms); + TimePointMs latest_timestamp = TimePointMsFromUnixMs(0); + std::shared_ptr result = nullptr; + + for (auto& snapshot : ancestors) { + if (snapshot == nullptr) { + continue; + } + auto current_timestamp = snapshot->timestamp_ms; + if (current_timestamp < target_timestamp && current_timestamp > latest_timestamp) { + latest_timestamp = current_timestamp; // Save timestamp before move + result = std::move(snapshot); + } + } + + if (result == nullptr) { + return std::nullopt; + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h new file mode 100644 index 000000000..c7abf7034 --- /dev/null +++ b/src/iceberg/update/set_snapshot.h @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/set_snapshot.h +/// \brief Sets the current snapshot directly or by rolling back. + +namespace iceberg { + +/// \brief Sets the current snapshot directly or by rolling back. +class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~SetSnapshot() override; + + /// \brief Sets the table's current state to a specific Snapshot identified by id. + SetSnapshot& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Rolls back the table's state to the last Snapshot before the given timestamp. + SetSnapshot& RollbackToTime(int64_t timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot identified by id. + SetSnapshot& RollbackTo(int64_t snapshot_id); + + Kind kind() const final { return Kind::kSetSnapshot; } + + /// \brief Apply the pending changes and return the target snapshot ID. + Result Apply(); + + private: + explicit SetSnapshot(std::shared_ptr transaction); + + /// \brief Find the latest snapshot whose timestamp is before the provided timestamp. + /// + /// \param timestamp_ms Lookup snapshots before this timestamp + /// \return The snapshot that was current at the given timestamp, or nullopt + Result>> FindLatestAncestorOlderThan( + int64_t timestamp_ms) const; + + std::optional target_snapshot_id_; + bool is_rollback_{false}; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index c3b93be8b..f6b8de055 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -69,6 +69,22 @@ Result SnapshotUtil::IsAncestorOf(const Table& table, return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id); } +Result SnapshotUtil::IsAncestorOf(const TableMetadata& metadata, + int64_t ancestor_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot()); + ICEBERG_CHECK(current != nullptr, "Current snapshot is null"); + + // Create a lookup function that uses the metadata + auto lookup = [&metadata](int64_t id) -> Result> { + return metadata.SnapshotById(id); + }; + + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(current->snapshot_id, lookup)); + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) { + return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id; + }); +} + Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_parent_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); @@ -86,6 +102,19 @@ Result>> SnapshotUtil::CurrentAncestors( return AncestorsOf(table, current_result.value()); } +Result>> SnapshotUtil::CurrentAncestors( + const TableMetadata& metadata) { + auto current_result = metadata.Snapshot(); + ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {}); + + // Create a lookup function that uses the metadata + auto lookup = [&metadata](int64_t id) -> Result> { + return metadata.SnapshotById(id); + }; + + return AncestorsOf(current_result.value(), lookup); +} + Result> SnapshotUtil::CurrentAncestorIds(const Table& table) { return CurrentAncestors(table).and_then(ToIds); } @@ -116,7 +145,7 @@ Result>> SnapshotUtil::OldestAncestorAft std::optional> last_snapshot = std::nullopt; ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current)); - for (const auto& snapshot : ancestors) { + for (auto& snapshot : ancestors) { auto snapshot_timestamp_ms = snapshot->timestamp_ms; if (snapshot_timestamp_ms < timestamp_ms) { return last_snapshot; diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index ca106cb33..0a000c691 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -70,6 +70,15 @@ class ICEBERG_EXPORT SnapshotUtil { /// \return true if ancestor_snapshot_id is an ancestor of the current snapshot static Result IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the metadata's current + /// state. + /// + /// \param metadata The table metadata to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \return true if ancestor_snapshot_id is an ancestor of the current snapshot + static Result IsAncestorOf(const TableMetadata& metadata, + int64_t ancestor_snapshot_id); + /// \brief Returns whether some ancestor of snapshot_id has parentId matches /// ancestor_parent_snapshot_id. /// @@ -88,6 +97,14 @@ class ICEBERG_EXPORT SnapshotUtil { static Result>> CurrentAncestors( const Table& table); + /// \brief Returns a vector that traverses the metadata's snapshots from the current to + /// the last known ancestor. + /// + /// \param metadata The table metadata + /// \return A vector from the metadata's current snapshot to its last known ancestor + static Result>> CurrentAncestors( + const TableMetadata& metadata); + /// \brief Returns the snapshot IDs for the ancestors of the current table state. /// /// Ancestor IDs are ordered by commit time, descending. The first ID is the current