Skip to content

Commit 63fd05e

Browse files
committed
[flink] fix namespace for lineage for table path to warehouse path
1 parent d312694 commit 63fd05e

2 files changed

Lines changed: 15 additions & 11 deletions

File tree

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lineage/LineageUtils.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.flink.lineage;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.catalog.CatalogUtils;
2223
import org.apache.paimon.table.Table;
2324

2425
import org.apache.flink.api.connector.source.Boundedness;
@@ -38,8 +39,6 @@
3839
*/
3940
public class LineageUtils {
4041

41-
private static final String PAIMON_DATASET_PREFIX = "paimon://";
42-
4342
private static final Set<String> PAIMON_OPTION_KEYS =
4443
CoreOptions.getOptions().stream().map(opt -> opt.key()).collect(Collectors.toSet());
4544

@@ -49,6 +48,7 @@ public class LineageUtils {
4948
*/
5049
private static Map<String, String> buildConfigMap(Table table) {
5150
Map<String, String> config = new HashMap<>();
51+
config.put("type", "paimon");
5252
config.put("partition-keys", String.join(",", table.partitionKeys()));
5353
config.put("primary-keys", String.join(",", table.primaryKeys()));
5454

@@ -60,12 +60,12 @@ private static Map<String, String> buildConfigMap(Table table) {
6060
}
6161

6262
/**
63-
* Returns the lineage namespace for a Paimon table. The namespace uses the {@code paimon://}
64-
* scheme followed by the table's physical warehouse path, e.g. {@code
65-
* "paimon://s3://my-bucket/warehouse/mydb.db/mytable"}.
63+
* Returns the lineage namespace for a Paimon table. The namespace is the warehouse path derived
64+
* via {@link CatalogUtils#warehouse(String)}, e.g. {@code "s3://my-bucket/warehouse"} for
65+
* object stores or {@code "file:/tmp/warehouse"} for local paths.
6666
*/
6767
public static String getNamespace(Table table) {
68-
return PAIMON_DATASET_PREFIX + CoreOptions.path(table.options());
68+
return CatalogUtils.warehouse(CoreOptions.path(table.options()).toString());
6969
}
7070

7171
/**

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lineage/LineageUtilsTest.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,14 @@ class LineageUtilsTest {
5454

5555
@TempDir java.nio.file.Path temp;
5656

57+
private Path warehouse;
5758
private Path tablePath;
5859

5960
@BeforeEach
6061
void setUp() {
61-
tablePath = new Path(temp.toUri().toString());
62+
// mirror real Paimon layout: <warehouse>/<database>.db/<table>
63+
warehouse = new Path(temp.toUri().toString());
64+
tablePath = new Path(warehouse, "test_db.db/test_table");
6265
}
6366

6467
private FileStoreTable createTable(
@@ -85,8 +88,8 @@ void testGetNamespace() throws Exception {
8588

8689
String namespace = LineageUtils.getNamespace(table);
8790

88-
assertThat(namespace).startsWith("paimon://");
89-
assertThat(namespace).contains(tablePath.toString());
91+
// namespace is the warehouse root (2 levels up from the table path)
92+
assertThat(namespace).isEqualTo("file:" + warehouse.toUri().getPath());
9093
}
9194

9295
@Test
@@ -102,7 +105,7 @@ void testSourceLineageVertexBounded() throws Exception {
102105

103106
LineageDataset dataset = vertex.datasets().get(0);
104107
assertThat(dataset.name()).isEqualTo("paimon.db.src");
105-
assertThat(dataset.namespace()).startsWith("paimon://");
108+
assertThat(dataset.namespace()).isEqualTo("file:" + warehouse.toUri().getPath());
106109
}
107110

108111
@Test
@@ -128,7 +131,7 @@ void testSinkLineageVertex() throws Exception {
128131

129132
LineageDataset dataset = vertex.datasets().get(0);
130133
assertThat(dataset.name()).isEqualTo("paimon.db.sink");
131-
assertThat(dataset.namespace()).startsWith("paimon://");
134+
assertThat(dataset.namespace()).isEqualTo("file:" + warehouse.toUri().getPath());
132135
}
133136

134137
@Test
@@ -144,6 +147,7 @@ void testConfigFacetContainsPartitionAndPrimaryKeys() throws Exception {
144147

145148
DatasetConfigFacet configFacet = (DatasetConfigFacet) facets.get("config");
146149
Map<String, String> config = configFacet.config();
150+
assertThat(config).containsEntry("type", "paimon");
147151
assertThat(config).containsEntry("partition-keys", "f2");
148152
assertThat(config).containsEntry("primary-keys", "f0,f2");
149153
}

0 commit comments

Comments
 (0)