Skip to content

Commit b02da6b

Browse files
committed
Merge branch '1.10_release_4.2.x' into feat_1.10_4.2.x_sqlserver
2 parents 8e32d1d + 112c528 commit b02da6b

20 files changed

Lines changed: 125 additions & 50 deletions

File tree

cassandra/cassandra-side/cassandra-side-core/src/main/java/com/dtstack/flink/sql/side/cassandra/table/CassandraSideParser.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
9191
cassandraSideTableInfo.setConnectTimeoutMillis(MathUtil.getIntegerVal(props.get(CONNECT_TIMEOUT_MILLIS_KEY.toLowerCase())));
9292
cassandraSideTableInfo.setPoolTimeoutMillis(MathUtil.getIntegerVal(props.get(POOL_TIMEOUT_MILLIS_KEY.toLowerCase())));
9393

94+
if (MathUtil.getLongVal(props.get(cassandraSideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
95+
cassandraSideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(cassandraSideTableInfo.ERROR_LIMIT.toLowerCase())));
96+
}
97+
9498
return cassandraSideTableInfo;
9599
}
96100

core/src/main/java/com/dtstack/flink/sql/dirtyManager/manager/DirtyDataManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,12 @@ public static String buildDefaultDirty() {
140140
* 脏数据收集任务停止,任务停止之前,需要将队列中所有的数据清空
141141
*/
142142
public void close() {
143-
if (checkConsumer()) {
143+
if (consumer != null && checkConsumer()) {
144144
LOG.info("dirty consumer is closing ...");
145145
consumer.close();
146+
}
147+
148+
if (dirtyDataConsumer != null) {
146149
dirtyDataConsumer.shutdownNow();
147150
}
148151
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public class ExecuteProcessHelper {
104104
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
105105

106106
private static final String TIME_ZONE = "timezone";
107-
private static final String PLUGIN_PATH_STR = "pluginPath";
107+
private static final String PLUGIN_LOCAL_STR = "pluginPath";
108108
private static final String PLUGIN_LOAD_STR = "pluginLoadMode";
109109

110110
public static FlinkPlanner flinkPlanner = new FlinkPlanner();
@@ -137,8 +137,8 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
137137
dirtyProperties.put(PLUGIN_LOAD_STR, pluginLoadMode);
138138
}
139139

140-
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_PATH_STR))) {
141-
dirtyProperties.put(PLUGIN_PATH_STR, localSqlPluginPath);
140+
if (!pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name()) && Objects.isNull(dirtyProperties.get(PLUGIN_LOCAL_STR))) {
141+
dirtyProperties.put(PLUGIN_LOCAL_STR, localSqlPluginPath);
142142
}
143143

144144
List<URL> jarUrlList = getExternalJarUrls(options.getAddjar());
@@ -393,9 +393,10 @@ public static Set<URL> registerTable(
393393
return Sets.newHashSet();
394394
}
395395
pluginClassPathSets.add(PluginUtil.buildDirtyPluginUrl(
396-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_PATH_STR)),
398-
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_LOAD_MODE_STR))
396+
String.valueOf(dirtyProperties.get(DirtyKeys.PLUGIN_TYPE_STR)),
397+
localSqlPluginPath,
398+
remoteSqlPluginPath,
399+
pluginLoadMode
399400
));
400401
return pluginClassPathSets;
401402
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public abstract class AbstractTableInfo implements Serializable {
7171
* error data limit. Task will failed once {@link AbstractDtRichOutputFormat#outDirtyRecords}
7272
* count over limit. Default 1000L;
7373
*/
74-
private Long errorLimit = 0L;
74+
private Long errorLimit = Long.MAX_VALUE;
7575

7676
public String[] getFieldTypes() {
7777
return fieldTypes;

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.io.File;
3333
import java.io.IOException;
3434
import java.net.MalformedURLException;
35+
import java.net.URI;
3536
import java.net.URL;
3637
import java.nio.charset.StandardCharsets;
3738
import java.util.ArrayList;
@@ -49,12 +50,14 @@
4950

5051
public class PluginUtil {
5152

52-
private static String SP = File.separator;
53+
private static final String SP = File.separator;
5354

5455
private static final String JAR_SUFFIX = ".jar";
5556

5657
private static final String CLASS_PRE_STR = "com.dtstack.flink.sql";
5758

59+
private static final String DIRTY_DATA_PRE = "dirtyData";
60+
5861
private static final Logger LOG = LoggerFactory.getLogger(PluginUtil.class);
5962

6063
private static final ObjectMapper objectMapper = new ObjectMapper();
@@ -161,27 +164,32 @@ public static URL buildFinalJarFilePath(String pluginType, String tableType, Str
161164
* build dirty data url from plugin path
162165
*
163166
* @param dirtyType type of dirty type
164-
* @param pluginPath plugin path
167+
* @param localPath local path
168+
* @param remotePath remote path
165169
* @param pluginLoadMode load plugin mode
166170
* @return dirty plugin url
167171
* @throws Exception exception
168172
*/
169173
public static URL buildDirtyPluginUrl(
170-
String dirtyType,
171-
String pluginPath,
172-
String pluginLoadMode) throws Exception {
174+
String dirtyType,
175+
String localPath,
176+
String remotePath,
177+
String pluginLoadMode) throws Exception {
173178
if (Objects.isNull(dirtyType)) {
174179
dirtyType = DirtyConsumerFactory.DEFAULT_DIRTY_TYPE;
175180
}
176181
String prefix = String.format("dirtyConsumer-%s", dirtyType.toLowerCase()).toLowerCase();
177182
String consumerType = DirtyConsumerFactory.DIRTY_CONSUMER_PATH + File.separator + dirtyType;
178-
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, pluginPath, pluginLoadMode);
183+
String consumerJar = PluginUtil.getJarFileDirPath(consumerType, localPath, pluginLoadMode);
179184
String jarFileName = PluginUtil.getCoreJarFileName(
180-
consumerJar,
181-
prefix,
182-
pluginLoadMode
185+
consumerJar,
186+
prefix,
187+
pluginLoadMode
183188
);
184-
return new URL("file:" + consumerJar + SP + jarFileName);
189+
String path = remotePath == null ? localPath : remotePath;
190+
String dirtyPath = path + SP + DIRTY_DATA_PRE + SP + dirtyType;
191+
URI uri = URI.create("file:" + dirtyPath + SP + jarFileName);
192+
return uri.toURL();
185193
}
186194

187195
public static URL getRemoteSideJarFilePath(String pluginType, String sideOperator, String tableType, String remoteSqlRootDir, String localSqlPluginPath, String pluginLoadMode) throws Exception {

elasticsearch6/elasticsearch6-side/elasticsearch6-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch6/table/Elasticsearch6SideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7171
elasticsearch6SideTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES6_USERNAME.toLowerCase())));
7272
elasticsearch6SideTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES6_PASSWORD.toLowerCase())));
7373
}
74+
75+
if (MathUtil.getLongVal(props.get(elasticsearch6SideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
76+
elasticsearch6SideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(elasticsearch6SideTableInfo.ERROR_LIMIT.toLowerCase())));
77+
}
78+
7479
elasticsearch6SideTableInfo.check();
7580
return elasticsearch6SideTableInfo;
7681
}

elasticsearch7/elasticsearch7-side/elasticsearch7-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AllSideInfo.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import com.dtstack.flink.sql.util.ParseUtils;
2727
import org.apache.calcite.sql.SqlNode;
2828
import org.apache.commons.collections.CollectionUtils;
29-
import org.apache.commons.compress.utils.Lists;
3029
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3130

31+
import java.util.ArrayList;
3232
import java.util.List;
3333

3434
/**
@@ -53,7 +53,7 @@ public void parseSelectFields(JoinInfo joinInfo) {
5353

5454
String sideTableName = joinInfo.getSideTableName();
5555
String nonSideTableName = joinInfo.getNonSideTable();
56-
List<String> fields = Lists.newArrayList();
56+
List<String> fields = new ArrayList<>();
5757

5858
int sideIndex = 0;
5959
for (int i = 0; i< outFieldInfoList.size(); i++) {
@@ -77,7 +77,7 @@ public void parseSelectFields(JoinInfo joinInfo) {
7777

7878
SqlNode conditionNode = joinInfo.getCondition();
7979

80-
List<SqlNode> sqlNodeList = Lists.newArrayList();
80+
List<SqlNode> sqlNodeList = new ArrayList<>();
8181

8282
ParseUtils.parseAnd(conditionNode, sqlNodeList);
8383

elasticsearch7/elasticsearch7-side/elasticsearch7-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AsyncReqRow.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.dtstack.flink.sql.util.ParseUtils;
2828
import com.dtstack.flink.sql.util.RowDataComplete;
2929
import org.apache.calcite.sql.SqlNode;
30-
import org.apache.commons.compress.utils.Lists;
3130
import org.apache.commons.lang3.StringUtils;
3231
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3332
import org.apache.flink.configuration.Configuration;
@@ -50,6 +49,7 @@
5049
import java.io.IOException;
5150
import java.io.Serializable;
5251
import java.sql.Timestamp;
52+
import java.util.ArrayList;
5353
import java.util.List;
5454
import java.util.Map;
5555

@@ -64,7 +64,7 @@ public class Elasticsearch7AsyncReqRow extends BaseAsyncReqRow implements Serial
6464
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch7AsyncReqRow.class);
6565
private transient RestHighLevelClient rhlClient;
6666
private SearchRequest searchRequest;
67-
private List<String> sqlJoinCompareOperate = Lists.newArrayList();
67+
private List<String> sqlJoinCompareOperate = new ArrayList<>();
6868

6969
public Elasticsearch7AsyncReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) {
7070
super(new Elasticsearch7AsyncSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
@@ -97,8 +97,8 @@ public void handleAsyncInvoke(Map<String, Object> inputParams, BaseRow input, Re
9797
@Override
9898
public void onResponse(SearchResponse searchResponse) {
9999

100-
List<Object> cacheContent = Lists.newArrayList();
101-
List<BaseRow> rowList = Lists.newArrayList();
100+
List<Object> cacheContent = new ArrayList<>();
101+
List<BaseRow> rowList = new ArrayList<>();
102102
SearchHit[] searchHits = searchResponse.getHits().getHits();
103103
if (searchHits.length > 0) {
104104
Elasticsearch7SideTableInfo tableInfo = null;

elasticsearch7/elasticsearch7-side/elasticsearch7-side-core/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/table/Elasticsearch7SideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7171
elasticsearch7SideTableInfo.setUserName(MathUtil.getString(props.get(KEY_ES7_USERNAME.toLowerCase())));
7272
elasticsearch7SideTableInfo.setPassword(MathUtil.getString(props.get(KEY_ES7_PASSWORD.toLowerCase())));
7373
}
74+
75+
if (MathUtil.getLongVal(props.get(elasticsearch7SideTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
76+
elasticsearch7SideTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(elasticsearch7SideTableInfo.ERROR_LIMIT.toLowerCase())));
77+
}
78+
7479
elasticsearch7SideTableInfo.check();
7580
return elasticsearch7SideTableInfo;
7681
}

hbase/hbase-side/hbase-side-core/src/main/java/com/dtstack/flink/sql/side/hbase/table/HbaseSideParser.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
7272
hbaseTableInfo.setPreRowKey(MathUtil.getBoolean(props.get(PRE_ROW_KEY.toLowerCase()), false));
7373
hbaseTableInfo.setCacheType((String) props.get(CACHE));
7474
hbaseTableInfo.setKerberosAuthEnable(MathUtil.getBoolean(props.get(KERBEROS_ENABLE), false));
75+
76+
if (MathUtil.getLongVal(props.get(hbaseTableInfo.ERROR_LIMIT.toLowerCase())) != null) {
77+
hbaseTableInfo.setErrorLimit(MathUtil.getLongVal(props.get(hbaseTableInfo.ERROR_LIMIT.toLowerCase())));
78+
}
79+
7580
props.entrySet().stream()
7681
.filter(entity -> entity.getKey().contains("."))
7782
.map(entity -> hbaseTableInfo.getHbaseConfig().put(entity.getKey(), String.valueOf(entity.getValue())))

0 commit comments

Comments
 (0)