Skip to content
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
be1fd1d
prior to llm build fix
arthurpassos Mar 19, 2026
b82ce39
making progress
arthurpassos Mar 25, 2026
2687a20
fix version
arthurpassos Mar 25, 2026
4175e33
export part working now
arthurpassos Mar 26, 2026
d9a27c7
progress before refactor
arthurpassos Mar 31, 2026
fdf6bf7
checkpoint, commented out partitioning..
arthurpassos Mar 31, 2026
bca4aeb
compatibility check seems ok
arthurpassos Mar 31, 2026
843961c
simplify code
arthurpassos Mar 31, 2026
80753da
fix tst that was.. not working for some super odd reason
arthurpassos Mar 31, 2026
fc93f76
fix compatibility check for year vs years
arthurpassos Apr 1, 2026
abbc77d
fix
arthurpassos Apr 1, 2026
57f3a6a
progress
arthurpassos Apr 1, 2026
1ee1b34
do not even recompute partition value, just use it from the source
arthurpassos Apr 1, 2026
52a812a
somehow fix the concurrency problem
arthurpassos Apr 1, 2026
da5b6be
make it actually transactional
arthurpassos Apr 2, 2026
3664749
Merge branch 'antalya-26.1' into export_partition_iceberg
arthurpassos Apr 2, 2026
a878a1e
add test for crash during 2phase commit
arthurpassos Apr 2, 2026
b5bd0eb
not quite good
arthurpassos Apr 3, 2026
5b0e833
put writefullpath in zk and add some comments
arthurpassos Apr 6, 2026
998a992
try to fix fast_test
arthurpassos Apr 6, 2026
1aa1b31
again
arthurpassos Apr 6, 2026
f21d66a
again
arthurpassos Apr 6, 2026
26827e2
again
arthurpassos Apr 6, 2026
6c61948
partially fix path bug
arthurpassos Apr 6, 2026
99ce30f
some more improvements
arthurpassos Apr 6, 2026
4259ec3
vibe coded ffix for catalog concurrent writes
arthurpassos Apr 7, 2026
c9dd096
simplify code, calculate partition values on the fly to avoid complex…
arthurpassos Apr 7, 2026
b180a3b
vibe coded tests
arthurpassos Apr 8, 2026
1f73187
some vibe coded tests
arthurpassos Apr 8, 2026
2f5643d
interesting stuff
arthurpassos Apr 8, 2026
6a19f92
is this the culprit
arthurpassos Apr 8, 2026
569803b
remove unused method
arthurpassos Apr 9, 2026
20aa695
one more temp fix
arthurpassos Apr 10, 2026
c80182c
fix possible deadlock
arthurpassos Apr 10, 2026
763dec1
Merge branch 'antalya-26.1' into export_partition_iceberg
arthurpassos Apr 10, 2026
18eb4d0
interesting fix
arthurpassos Apr 13, 2026
d9564ac
check if export has already been committed before checking for partit…
arthurpassos Apr 13, 2026
498ed04
add docs around storageobjectstoragecluster cast
arthurpassos Apr 13, 2026
6750617
lock before reading variable.. come arthur, you are better than this
arthurpassos Apr 13, 2026
3e89a7b
do not cleanup data files on commit failure
arthurpassos Apr 15, 2026
c75576a
sidecar name
arthurpassos Apr 15, 2026
1cb9321
simplify partition columns extraction code
arthurpassos Apr 15, 2026
28b6695
make tests more debuggable by using better table names
arthurpassos Apr 16, 2026
7c2f69b
check clickhouse-export-tid upon commit retry to prevent duplicates
arthurpassos Apr 16, 2026
59f4134
add export part tests
arthurpassos Apr 16, 2026
cc850d6
simplify tests
arthurpassos Apr 17, 2026
3b12e1e
uncomment test, remove stale, and fix deal with export toctoe when th…
arthurpassos Apr 17, 2026
8143c67
fix post iceberg commit code exceptions that could lead to missing files
arthurpassos Apr 17, 2026
f8bacec
fix possible int64 -> int32 overflow on record count
arthurpassos Apr 17, 2026
d10e165
address possibility of task remaining in commit state forever
arthurpassos Apr 17, 2026
5e907c4
Merge branch 'antalya-26.1' into export_partition_iceberg
arthurpassos Apr 18, 2026
7a56e5c
add task timeout
arthurpassos Apr 20, 2026
397a2d0
settingschangehistory
arthurpassos Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ static struct InitFiu
ONCE(disk_object_storage_fail_precommit_metadata_transaction) \
REGULAR(slowdown_parallel_replicas_local_plan_read) \
ONCE(iceberg_writes_cleanup) \
ONCE(iceberg_export_after_commit_before_zk_completed) \
ONCE(backup_add_empty_memory_table) \
PAUSEABLE_ONCE(backup_pause_on_start) \
PAUSEABLE_ONCE(restore_pause_on_start) \
Expand Down
46 changes: 45 additions & 1 deletion src/Storages/ExportReplicatedMergeTreePartitionManifest.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <base/types.h>
#include <Core/Field.h>
#include <Interpreters/StorageID.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Array.h>
Expand Down Expand Up @@ -118,6 +119,14 @@ struct ExportReplicatedMergeTreePartitionManifest
MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy;
String filename_pattern;
bool lock_inside_the_task; /// todo temporary
bool write_full_path_in_iceberg_metadata = false;
String iceberg_metadata_json;
/// Iceberg-only: JSON array of partition column values (after transforms) for this partition.
/// Columns and types are derived at commit time from iceberg_metadata_json; only values are persisted.
String partition_values_json;
/// Transient: parsed form of partition_values_json, populated by fromJsonString.
/// Not serialized to ZooKeeper. Used at commit time to avoid re-parsing JSON on each commit attempt.
std::vector<Field> partition_values;

std::string toJsonString() const
{
Expand All @@ -129,7 +138,17 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("destination_table", destination_table);
json.set("source_replica", source_replica);
json.set("number_of_parts", number_of_parts);


if (!iceberg_metadata_json.empty())
{
json.set("iceberg_metadata_json", iceberg_metadata_json);
}

if (!partition_values_json.empty())
{
json.set("partition_values_json", partition_values_json);
}

Poco::JSON::Array::Ptr parts_array = new Poco::JSON::Array();
for (const auto & part : parts)
parts_array->add(part);
Expand All @@ -145,6 +164,7 @@ struct ExportReplicatedMergeTreePartitionManifest
json.set("max_retries", max_retries);
json.set("ttl_seconds", ttl_seconds);
json.set("lock_inside_the_task", lock_inside_the_task);
json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
Expand All @@ -166,6 +186,28 @@ struct ExportReplicatedMergeTreePartitionManifest
manifest.source_replica = json->getValue<String>("source_replica");
manifest.number_of_parts = json->getValue<size_t>("number_of_parts");
manifest.max_retries = json->getValue<size_t>("max_retries");

if (json->has("iceberg_metadata_json"))
{
manifest.iceberg_metadata_json = json->getValue<String>("iceberg_metadata_json");
}

if (json->has("partition_values_json"))
{
manifest.partition_values_json = json->getValue<String>("partition_values_json");

Poco::JSON::Parser val_parser;
auto arr = val_parser.parse(manifest.partition_values_json).extract<Poco::JSON::Array::Ptr>();
for (size_t i = 0; i < arr->size(); ++i)
{
Poco::Dynamic::Var var = arr->get(static_cast<unsigned int>(i));
if (var.isString())
manifest.partition_values.push_back(Field(var.extract<String>()));
else
manifest.partition_values.push_back(Field(var.convert<Int64>()));
}
}

auto parts_array = json->getArray("parts");
for (size_t i = 0; i < parts_array->size(); ++i)
manifest.parts.push_back(parts_array->getElement<String>(static_cast<unsigned int>(i)));
Expand All @@ -192,6 +234,8 @@ struct ExportReplicatedMergeTreePartitionManifest

manifest.lock_inside_the_task = json->getValue<bool>("lock_inside_the_task");

manifest.write_full_path_in_iceberg_metadata = json->getValue<bool>("write_full_path_in_iceberg_metadata");
Comment thread
arthurpassos marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve backward compatibility for manifest parsing

Treating write_full_path_in_iceberg_metadata as mandatory breaks reading manifests that were written before this field existed. During rolling upgrades, existing ZooKeeper metadata.json entries for in-flight exports won't have this key, so getValue<bool> throws and status/polling paths that call fromJsonString cannot process those tasks. Please make this field optional with a default (false) when absent.

Useful? React with 👍 / 👎.


return manifest;
}
};
Expand Down
16 changes: 15 additions & 1 deletion src/Storages/IStorage.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <Core/Field.h>
#include <Core/Names.h>
#include <Core/QueryProcessingStage.h>
#include <Databases/IDatabase.h>
Expand All @@ -20,6 +21,7 @@
#include <Common/RWLock.h>
#include <Common/TypePromotion.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <Poco/JSON/Object.h>

#include <expected>
#include <optional>
Expand Down Expand Up @@ -476,7 +478,7 @@ class IStorage : public std::enable_shared_from_this<IStorage>, public TypePromo
ContextPtr /*context*/,
bool /*async_insert*/);

virtual bool supportsImport() const
virtual bool supportsImport(ContextPtr) const
{
return false;
}
Expand All @@ -493,16 +495,28 @@ It is currently only implemented in StorageObjectStorage.
bool /* overwrite_if_exists */,
std::size_t /* max_bytes_per_file */,
std::size_t /* max_rows_per_file */,
const std::optional<std::string> & /* iceberg_metadata_json_string */,
const std::optional<FormatSettings> & /* format_settings */,
ContextPtr /* context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Import is not implemented for storage {}", getName());
}

struct IcebergCommitExportPartitionArguments
{
std::string metadata_json_string;
/// Partition column values (after transforms). Callers are responsible for
/// populating this: the partition-export path parses them from the persisted
/// JSON string, while the direct EXPORT PART path reads them from the part's
/// partition key.
std::vector<Field> partition_values;
};

virtual void commitExportPartitionTransaction(
const String & /* transaction_id */,
const String & /* partition_id */,
const Strings & /* exported_paths */,
const IcebergCommitExportPartitionArguments & /* iceberg_commit_export_partition_arguments */,
ContextPtr /* local_context */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "commitExportPartitionTransaction is not implemented for storage type {}", getName());
Expand Down
19 changes: 19 additions & 0 deletions src/Storages/MergeTree/ExportPartTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Common/ProfileEventsScope.h>
#include <Databases/DatabaseReplicated.h>
#include <Storages/MergeTree/ExportList.h>
#include <Storages/IStorage.h>
#include <Formats/FormatFactory.h>
#include <Databases/enableAllExperimentalSettings.h>
#include <Processors/Sinks/SinkToStorage.h>
Expand Down Expand Up @@ -218,6 +219,7 @@ bool ExportPartTask::executeStep()
manifest.file_already_exists_policy == MergeTreePartExportManifest::FileAlreadyExistsPolicy::overwrite,
manifest.settings[Setting::export_merge_tree_part_max_bytes_per_file],
manifest.settings[Setting::export_merge_tree_part_max_rows_per_file],
manifest.iceberg_metadata_json,
getFormatSettings(local_context),
local_context);

Expand Down Expand Up @@ -295,6 +297,23 @@ bool ExportPartTask::executeStep()
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Export part was cancelled");
}

/// For the direct EXPORT PART → Iceberg path there is no deferred-commit callback
/// (the partition-export path provides one that writes to ZooKeeper).
/// Commit the Iceberg metadata inline here so the rows become visible immediately.
if (destination_storage->isDataLake() && !manifest.completion_callback)
{
IStorage::IcebergCommitExportPartitionArguments iceberg_args;
iceberg_args.metadata_json_string = manifest.iceberg_metadata_json;
iceberg_args.partition_values = manifest.partition_values;

destination_storage->commitExportPartitionTransaction(
manifest.transaction_id,
manifest.data_part->info.getPartitionId(),
(*exports_list_entry)->destination_file_paths,
iceberg_args,
local_context);
}

std::lock_guard inner_lock(storage.export_manifests_mutex);
storage.writePartLog(
PartLogElement::Type::EXPORT_PART,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace
const zkutil::ZooKeeperPtr & zk,
const std::string & entry_path,
const LoggerPtr & log,
const ContextPtr & context,
const ContextPtr & storage_context,
const std::string & key,
const ExportReplicatedMergeTreePartitionManifest & metadata,
const time_t now,
Expand All @@ -58,6 +58,8 @@ namespace
}
else if (is_pending)
{
auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage_context, metadata);

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGetChildren);
std::vector<std::string> parts_in_processing_or_pending;
Expand Down
138 changes: 79 additions & 59 deletions src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,57 @@ namespace ErrorCodes

namespace
{
ContextPtr getContextCopyWithTaskSettings(const ContextPtr & context, const ExportReplicatedMergeTreePartitionManifest & manifest)


Coordination::Requests getErrorRequests(
const std::filesystem::path & export_path,
const std::string & replica_name,
const zkutil::ZooKeeperPtr & zk,
const LoggerPtr & log,
const std::string & part_name,
const std::optional<Exception> & exception
)
{
auto context_copy = Context::createCopy(context);
context_copy->makeQueryContextForExportPart();
context_copy->setCurrentQueryId(manifest.query_id);
context_copy->setSetting("output_format_parallel_formatting", manifest.parallel_formatting);
context_copy->setSetting("output_format_parquet_parallel_encoding", manifest.parquet_parallel_encoding);
context_copy->setSetting("max_threads", manifest.max_threads);
context_copy->setSetting("export_merge_tree_part_file_already_exists_policy", String(magic_enum::enum_name(manifest.file_already_exists_policy)));
context_copy->setSetting("export_merge_tree_part_max_bytes_per_file", manifest.max_bytes_per_file);
context_copy->setSetting("export_merge_tree_part_max_rows_per_file", manifest.max_rows_per_file);

/// always skip pending mutations and patch parts because we already validated the parts during query processing
context_copy->setSetting("export_merge_tree_part_throw_on_pending_mutations", false);
context_copy->setSetting("export_merge_tree_part_throw_on_pending_patch_parts", false);

context_copy->setSetting("export_merge_tree_part_filename_pattern", manifest.filename_pattern);

return context_copy;
Coordination::Requests ops;

const auto exceptions_per_replica_path = export_path / "exceptions_per_replica" / replica_name;
const auto count_path = exceptions_per_replica_path / "count";
const auto last_exception_path = exceptions_per_replica_path / "last_exception";

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists);
if (zk->exists(exceptions_per_replica_path))
{
LOG_INFO(log, "ExportPartition scheduler task: Exceptions per replica path exists, no need to create it");
std::string num_exceptions_string;
if (zk->tryGet(count_path, num_exceptions_string))
{
const auto num_exceptions = parse<size_t>(num_exceptions_string) + 1;
ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1));
}
else
{
/// TODO maybe we should find a better way to handle this case, not urgent
LOG_INFO(log, "ExportPartition scheduler task: Failed to get number of exceptions, will not increment it");
}

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet);

ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1));
ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1));
}
else
{
LOG_INFO(log, "ExportPartition scheduler task: Exceptions per replica path does not exist, will create it");
ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(count_path, "1", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception->message(), zkutil::CreateMode::Persistent));
}

return ops;
}
}

Expand Down Expand Up @@ -199,7 +231,7 @@ void ExportPartitionTaskScheduler::run()

LOG_INFO(storage.log, "ExportPartition scheduler task: Scheduling part export: {}", zk_part_name);

auto context = getContextCopyWithTaskSettings(storage.getContext(), manifest);
auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest);

/// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly.
/// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being
Expand All @@ -217,11 +249,13 @@ void ExportPartitionTaskScheduler::run()
context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value,
context->getSettingsCopy(),
storage.getInMemoryMetadataPtr(),
manifest.iceberg_metadata_json,
[this, key, zk_part_name, manifest, destination_storage]
(MergeTreePartExportManifest::CompletionCallbackResult result)
{
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
});
},
manifest.partition_values);

part_export_manifest.task = std::make_shared<ExportPartFromPartitionExportTask>(storage, key, part_export_manifest);

Expand Down Expand Up @@ -263,13 +297,15 @@ void ExportPartitionTaskScheduler::run()
part->name,
destination_storage_id,
manifest.transaction_id,
getContextCopyWithTaskSettings(storage.getContext(), manifest),
context,
manifest.iceberg_metadata_json,
/*allow_outdated_parts*/ true,
[this, key, zk_part_name, manifest, destination_storage]
(MergeTreePartExportManifest::CompletionCallbackResult result)
{
handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result);
});
},
manifest.partition_values);

scheduled_exports_count++;
}
Expand Down Expand Up @@ -342,7 +378,25 @@ void ExportPartitionTaskScheduler::handlePartExportSuccess(

LOG_INFO(storage.log, "ExportPartition scheduler task: All parts are processed, will try to commit export partition");

ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, storage.getContext());
try
{
auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest);
ExportPartitionUtils::commit(manifest, destination_storage, zk, storage.log.load(), export_path, context);
}
catch (const Exception & e)
{
const auto error_requests = getErrorRequests(export_path, storage.replica_name, zk, storage.log.load(), part_name, e);

LOG_INFO(storage.log, "ExportPartition scheduler task: Caught exception while committing export partition, {}", e.message());

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti);
Coordination::Responses responses;
if (Coordination::Error::ZOK != zk->tryMulti(error_requests, responses))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to update zookeeper with the commit exception");
}
}
}

void ExportPartitionTaskScheduler::handlePartExportFailure(
Expand Down Expand Up @@ -450,42 +504,8 @@ void ExportPartitionTaskScheduler::handlePartExportFailure(
LOG_INFO(storage.log, "ExportPartition scheduler task: Retry count limit not exceeded for part {}, will increment retry count", part_name);
}

const auto exceptions_per_replica_path = export_path / "exceptions_per_replica" / storage.replica_name;
const auto count_path = exceptions_per_replica_path / "count";
const auto last_exception_path = exceptions_per_replica_path / "last_exception";

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperExists);
if (zk->exists(exceptions_per_replica_path))
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Exceptions per replica path exists, no need to create it");
std::string num_exceptions_string;
if (zk->tryGet(count_path, num_exceptions_string))
{
const auto num_exceptions = parse<size_t>(num_exceptions_string) + 1;
ops.emplace_back(zkutil::makeSetRequest(count_path, std::to_string(num_exceptions), -1));
}
else
{
/// TODO maybe we should find a better way to handle this case, not urgent
LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to get number of exceptions, will not increment it");
}

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperGet);

ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "part", part_name, -1));
ops.emplace_back(zkutil::makeSetRequest(last_exception_path / "exception", exception->message(), -1));
}
else
{
LOG_INFO(storage.log, "ExportPartition scheduler task: Exceptions per replica path does not exist, will create it");
ops.emplace_back(zkutil::makeCreateRequest(exceptions_per_replica_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(count_path, "1", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(last_exception_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "part", part_name, zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(last_exception_path / "exception", exception->message(), zkutil::CreateMode::Persistent));
}
auto error_requests = getErrorRequests(export_path, storage.replica_name, zk, storage.log.load(), part_name, exception);
ops.insert(ops.end(), error_requests.begin(), error_requests.end());

ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests);
ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperMulti);
Expand Down
Loading
Loading