Skip to content

Commit 930e919

Browse files
committed
add extendOutputFormat
1 parent d798386 commit 930e919

1 file changed

Lines changed: 103 additions & 0 deletions

File tree

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.sink.rdb.format;
20+
21+
import org.apache.commons.lang3.StringUtils;
22+
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
23+
24+
import java.sql.ResultSet;
25+
import java.sql.SQLException;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import java.util.Map;
29+
30+
/**
31+
* Reason:
32+
* Date: 2018/11/30
33+
* Company: www.dtstack.com
34+
*
35+
* @author maqi
36+
*/
37+
public class ExtendOutputFormat extends RetractJDBCOutputFormat {
38+
39+
40+
@Override
41+
public boolean isReplaceInsertQuery() throws SQLException {
42+
fillRealIndexes();
43+
fillFullColumns();
44+
45+
if (!getRealIndexes().isEmpty()) {
46+
for (List<String> value : getRealIndexes().values()) {
47+
for (String fieldName : getDbSink().getFieldNames()) {
48+
if (value.contains(fieldName)) {
49+
return true;
50+
}
51+
}
52+
}
53+
}
54+
return false;
55+
}
56+
57+
/**
58+
* get db all index
59+
*
60+
* @throws SQLException
61+
*/
62+
public void fillRealIndexes() throws SQLException {
63+
Map<String, List<String>> map = Maps.newHashMap();
64+
ResultSet rs = getDbConn().getMetaData().getIndexInfo(null, null, getTableName(), true, false);
65+
66+
while (rs.next()) {
67+
String indexName = rs.getString("INDEX_NAME");
68+
if (!map.containsKey(indexName)) {
69+
map.put(indexName, new ArrayList<>());
70+
}
71+
String column_name = rs.getString("COLUMN_NAME");
72+
if (StringUtils.isNotBlank(column_name)) {
73+
column_name = column_name.toUpperCase();
74+
}
75+
map.get(indexName).add(column_name);
76+
}
77+
78+
for (Map.Entry<String, List<String>> entry : map.entrySet()) {
79+
String k = entry.getKey();
80+
List<String> v = entry.getValue();
81+
if (v != null && v.size() != 0 && v.get(0) != null) {
82+
getRealIndexes().put(k, v);
83+
}
84+
}
85+
}
86+
87+
/**
88+
* get db all column name
89+
*
90+
* @throws SQLException
91+
*/
92+
public void fillFullColumns() throws SQLException {
93+
ResultSet rs = getDbConn().getMetaData().getColumns(null, null, getTableName(), null);
94+
while (rs.next()) {
95+
String columnName = rs.getString("COLUMN_NAME");
96+
if (StringUtils.isNotBlank(columnName)) {
97+
getFullField().add(columnName.toUpperCase());
98+
}
99+
}
100+
}
101+
102+
103+
}

0 commit comments

Comments
 (0)