-
Notifications
You must be signed in to change notification settings - Fork 17
Export partition to apache iceberg #1618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: antalya-26.1
Are you sure you want to change the base?
Changes from 39 commits
be1fd1d
b82ce39
2687a20
4175e33
d9a27c7
fdf6bf7
bca4aeb
843961c
80753da
fc93f76
abbc77d
57f3a6a
1ee1b34
52a812a
da5b6be
3664749
a878a1e
b5bd0eb
5b0e833
998a992
1aa1b31
f21d66a
26827e2
6c61948
99ce30f
4259ec3
c9dd096
b180a3b
1f73187
2f5643d
6a19f92
569803b
20aa695
c80182c
763dec1
18eb4d0
d9564ac
498ed04
6750617
3e89a7b
c75576a
1cb9321
28b6695
7c2f69b
59f4134
cc850d6
3b12e1e
8143c67
f8bacec
d10e165
5e907c4
7a56e5c
397a2d0
e09291a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| #pragma once | ||
|
|
||
| #include <base/types.h> | ||
| #include <Common/Exception.h> | ||
| #include <Interpreters/StorageID.h> | ||
| #include <Poco/JSON/Object.h> | ||
| #include <Poco/JSON/Array.h> | ||
|
|
@@ -118,6 +119,8 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; | ||
| String filename_pattern; | ||
| bool lock_inside_the_task; /// todo temporary | ||
| bool write_full_path_in_iceberg_metadata = false; | ||
| String iceberg_metadata_json; | ||
|
|
||
| std::string toJsonString() const | ||
| { | ||
|
|
@@ -129,7 +132,12 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| json.set("destination_table", destination_table); | ||
| json.set("source_replica", source_replica); | ||
| json.set("number_of_parts", number_of_parts); | ||
|
|
||
|
|
||
| if (!iceberg_metadata_json.empty()) | ||
| { | ||
| json.set("iceberg_metadata_json", iceberg_metadata_json); | ||
| } | ||
|
|
||
| Poco::JSON::Array::Ptr parts_array = new Poco::JSON::Array(); | ||
| for (const auto & part : parts) | ||
| parts_array->add(part); | ||
|
|
@@ -145,6 +153,7 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| json.set("max_retries", max_retries); | ||
| json.set("ttl_seconds", ttl_seconds); | ||
| json.set("lock_inside_the_task", lock_inside_the_task); | ||
| json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); | ||
| std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM | ||
| oss.exceptions(std::ios::failbit); | ||
| Poco::JSON::Stringifier::stringify(json, oss); | ||
|
|
@@ -166,6 +175,12 @@ struct ExportReplicatedMergeTreePartitionManifest | |
| manifest.source_replica = json->getValue<String>("source_replica"); | ||
| manifest.number_of_parts = json->getValue<size_t>("number_of_parts"); | ||
| manifest.max_retries = json->getValue<size_t>("max_retries"); | ||
|
|
||
| if (json->has("iceberg_metadata_json")) | ||
| { | ||
| manifest.iceberg_metadata_json = json->getValue<String>("iceberg_metadata_json"); | ||
| } | ||
|
|
||
| auto parts_array = json->getArray("parts"); | ||
| for (size_t i = 0; i < parts_array->size(); ++i) | ||
| manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i))); | ||
|
|
@@ -192,6 +207,8 @@ struct ExportReplicatedMergeTreePartitionManifest | |
|
|
||
| manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task"); | ||
|
|
||
| manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata"); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Treating Useful? React with 👍 / 👎. |
||
|
|
||
| return manifest; | ||
| } | ||
| }; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,7 @@ | |
| #include <Common/ZooKeeper/Types.h> | ||
| #include <Common/ZooKeeper/ZooKeeper.h> | ||
| #include <Common/ProfileEvents.h> | ||
| #include <Common/FailPoint.h> | ||
| #include <Interpreters/DatabaseCatalog.h> | ||
|
|
||
| namespace ProfileEvents | ||
|
|
@@ -21,6 +22,17 @@ namespace ProfileEvents | |
|
|
||
| namespace DB | ||
| { | ||
|
|
||
| namespace ErrorCodes | ||
| { | ||
| extern const int FAULT_INJECTED; | ||
| } | ||
|
|
||
| namespace FailPoints | ||
| { | ||
| extern const char export_partition_status_change_throw[]; | ||
| } | ||
|
|
||
| namespace | ||
| { | ||
| /* | ||
|
|
@@ -34,7 +46,8 @@ namespace | |
| const zkutil::ZooKeeperPtr & zk, | ||
| const std::string & entry_path, | ||
| const LoggerPtr & log, | ||
| const ContextPtr & context, | ||
| const ContextPtr & storage_context, | ||
| StorageReplicatedMergeTree & storage, | ||
| const std::string & key, | ||
| const ExportReplicatedMergeTreePartitionManifest & metadata, | ||
| const time_t now, | ||
|
|
@@ -58,6 +71,8 @@ namespace | |
| } | ||
| else if (is_pending) | ||
| { | ||
| auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage_context, metadata); | ||
|
|
||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); | ||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren); | ||
| std::vector<std::string> parts_in_processing_or_pending; | ||
|
|
@@ -81,7 +96,18 @@ namespace | |
| } | ||
|
|
||
| /// it sounds like a replica exported the last part, but was not able to commit the export. Try to fix it | ||
| ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context); | ||
| try | ||
| { | ||
| ExportPartitionUtils::commit(metadata, destination_storage, zk, log, entry_path, context, storage); | ||
| } | ||
| catch (const Exception & e) | ||
| { | ||
| LOG_WARNING(log, | ||
| "ExportPartition Manifest Updating Task: " | ||
| "Caught exception while committing export for {}: {}", | ||
| entry_path, e.message()); | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
|
|
@@ -558,6 +584,7 @@ void ExportPartitionManifestUpdatingTask::poll() | |
| entry_path, | ||
| storage.log.load(), | ||
| storage.getContext(), | ||
| storage, | ||
| key, | ||
| metadata, | ||
| now, | ||
|
|
@@ -656,62 +683,110 @@ void ExportPartitionManifestUpdatingTask::addStatusChange(const std::string & ke | |
|
|
||
| void ExportPartitionManifestUpdatingTask::handleStatusChanges() | ||
| { | ||
| std::lock_guard lock(status_changes_mutex); | ||
| std::lock_guard task_entries_lock(storage.export_merge_tree_partition_mutex); | ||
| auto zk = storage.getZooKeeper(); | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status changes. Number of status changes: {}", status_changes.size()); | ||
| /// copy the events to a local queue to avoid holding the status_changes_mutex while also holding export_merge_tree_partition_mutex | ||
| std::queue<std::string> local_status_changes; | ||
| { | ||
| std::lock_guard lock(status_changes_mutex); | ||
| std::swap(status_changes, local_status_changes); | ||
| } | ||
|
Comment on lines
+779
to
+781
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Swapping all pending status changes into a local queue means any exception during processing drops the remaining events. Useful? React with 👍 / 👎.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in 18eb4d0 |
||
|
|
||
| while (!status_changes.empty()) | ||
| try | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status change for task {}", status_changes.front()); | ||
| const auto key = status_changes.front(); | ||
| status_changes.pop(); | ||
| std::lock_guard task_entries_lock(storage.export_merge_tree_partition_mutex); | ||
| auto zk = storage.getZooKeeper(); | ||
|
|
||
| auto it = storage.export_merge_tree_partition_task_entries_by_key.find(key); | ||
| if (it == storage.export_merge_tree_partition_task_entries_by_key.end()) | ||
| continue; | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status changes. Number of status changes: {}", local_status_changes.size()); | ||
|
|
||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); | ||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); | ||
| /// get new status from zk | ||
| std::string new_status_string; | ||
| if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string)) | ||
| while (!local_status_changes.empty()) | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to get new status for task {}, skipping", key); | ||
| continue; | ||
| } | ||
| const auto & key = local_status_changes.front(); | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: handling status change for task {}", key); | ||
|
|
||
| const auto new_status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(new_status_string); | ||
| if (!new_status) | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", new_status_string, key); | ||
| continue; | ||
| } | ||
| fiu_do_on(FailPoints::export_partition_status_change_throw, | ||
| { | ||
| throw Exception(ErrorCodes::FAULT_INJECTED, | ||
| "Failpoint: simulating exception during status change handling for key {}", key); | ||
| }); | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data()); | ||
| auto it = storage.export_merge_tree_partition_task_entries_by_key.find(key); | ||
| if (it == storage.export_merge_tree_partition_task_entries_by_key.end()) | ||
| { | ||
| local_status_changes.pop(); | ||
| continue; | ||
| } | ||
|
|
||
| /// If status changed to KILLED, cancel local export operations | ||
| if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) | ||
| { | ||
| try | ||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); | ||
| ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet); | ||
| /// get new status from zk | ||
| std::string new_status_string; | ||
| if (!zk->tryGet(fs::path(storage.zookeeper_path) / "exports" / key / "status", new_status_string)) | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Failed to get new status for task {}, skipping", key); | ||
| local_status_changes.pop(); | ||
| continue; | ||
| } | ||
|
|
||
| const auto new_status = magic_enum::enum_cast<ExportReplicatedMergeTreePartitionTaskEntry::Status>(new_status_string); | ||
| if (!new_status) | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key); | ||
| storage.killExportPart(it->manifest.transaction_id); | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating Task: Invalid status {} for task {}, skipping", new_status_string, key); | ||
| local_status_changes.pop(); | ||
| continue; | ||
| } | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: status changed for task {}. New status: {}", key, magic_enum::enum_name(*new_status).data()); | ||
|
|
||
| /// If status changed to KILLED, cancel local export operations | ||
| if (*new_status == ExportReplicatedMergeTreePartitionTaskEntry::Status::KILLED) | ||
| { | ||
| try | ||
| { | ||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: killing export partition for task {}", key); | ||
| storage.killExportPart(it->manifest.transaction_id); | ||
| } | ||
| catch (...) | ||
| { | ||
| tryLogCurrentException(storage.log, __PRETTY_FUNCTION__); | ||
| } | ||
| } | ||
| catch (...) | ||
|
|
||
| it->status = *new_status; | ||
|
|
||
| if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) | ||
| { | ||
| tryLogCurrentException(storage.log, __PRETTY_FUNCTION__); | ||
| /// we no longer need to keep the data parts alive | ||
| it->part_references.clear(); | ||
| } | ||
|
|
||
| local_status_changes.pop(); | ||
| } | ||
| } | ||
| catch (...) | ||
| { | ||
| tryLogCurrentException(storage.log, __PRETTY_FUNCTION__); | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: exception thrown while handling status changes, enqueuing remaining status changes back to the status_changes queue. Number of remaining status changes: {}", local_status_changes.size()); | ||
|
|
||
| it->status = *new_status; | ||
| std::lock_guard lock(status_changes_mutex); | ||
|
|
||
| if (it->status != ExportReplicatedMergeTreePartitionTaskEntry::Status::PENDING) | ||
| /// It is possible that an exception is thrown while handling the status. In this scenario | ||
| /// we need to enqueue the remaining status changes back to the status_changes queue not to lose them. | ||
| /// The other solution to this problem would be to ignore it and schedule a poll - maybe it is simpler? | ||
| if (!local_status_changes.empty()) | ||
| { | ||
| /// we no longer need to keep the data parts alive | ||
| it->part_references.clear(); | ||
| // Prepend remaining items before any newly-arrived items | ||
| while (!status_changes.empty()) | ||
| { | ||
| local_status_changes.push(std::move(status_changes.front())); | ||
| status_changes.pop(); | ||
| } | ||
|
|
||
| std::swap(status_changes, local_status_changes); | ||
| } | ||
|
|
||
| LOG_INFO(storage.log, "ExportPartition Manifest Updating task: The new number of pending status after enqueueing unprocessed ones is {}", status_changes.size()); | ||
|
|
||
| throw; | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.