Skip to content

Commit 9ca53f2

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 445d37b + 6e3fe25 commit 9ca53f2

8 files changed

Lines changed: 198 additions & 37 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/GetPlan.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.dtstack.flink.sql.exec.ParamsInfo;
2424
import org.apache.commons.lang.exception.ExceptionUtils;
2525
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
2628

2729
/**
2830
* local模式获取sql任务的执行计划
@@ -32,15 +34,19 @@
3234
*/
3335
public class GetPlan {
3436

37+
private static final Logger LOG = LoggerFactory.getLogger(GetPlan.class);
38+
3539
public static String getExecutionPlan(String[] args) {
3640
try {
3741
long start = System.currentTimeMillis();
3842
ParamsInfo paramsInfo = ExecuteProcessHelper.parseParams(args);
43+
paramsInfo.setGetPlan(true);
3944
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExecution(paramsInfo);
4045
String executionPlan = env.getExecutionPlan();
4146
long end = System.currentTimeMillis();
4247
return ApiResult.createSuccessResultJsonStr(executionPlan, end - start);
4348
} catch (Exception e) {
49+
LOG.error("Get plan error", e);
4450
return ApiResult.createErrorResultJsonStr(ExceptionUtils.getFullStackTrace(e));
4551
}
4652
}

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,6 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21-
import com.dtstack.flink.sql.parser.CreateFuncParser;
22-
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
23-
import com.dtstack.flink.sql.parser.FlinkPlanner;
24-
import com.dtstack.flink.sql.parser.InsertSqlParser;
25-
import com.dtstack.flink.sql.parser.SqlParser;
26-
import com.dtstack.flink.sql.parser.SqlTree;
27-
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
29-
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
31-
import org.apache.flink.table.api.*;
32-
import org.apache.flink.table.api.java.StreamTableEnvironment;
33-
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
34-
import org.apache.flink.table.sinks.TableSink;
35-
3621
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
3722
import com.dtstack.flink.sql.enums.ClusterMode;
3823
import com.dtstack.flink.sql.enums.ECacheType;
@@ -42,8 +27,14 @@
4227
import com.dtstack.flink.sql.function.FunctionManager;
4328
import com.dtstack.flink.sql.option.OptionParser;
4429
import com.dtstack.flink.sql.option.Options;
45-
import com.dtstack.flink.sql.side.SideSqlExec;
30+
import com.dtstack.flink.sql.parser.CreateFuncParser;
31+
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
32+
import com.dtstack.flink.sql.parser.FlinkPlanner;
33+
import com.dtstack.flink.sql.parser.InsertSqlParser;
34+
import com.dtstack.flink.sql.parser.SqlParser;
35+
import com.dtstack.flink.sql.parser.SqlTree;
4636
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
37+
import com.dtstack.flink.sql.side.SideSqlExec;
4738
import com.dtstack.flink.sql.sink.StreamSinkFactory;
4839
import com.dtstack.flink.sql.source.StreamSourceFactory;
4940
import com.dtstack.flink.sql.table.AbstractSourceTableInfo;
@@ -62,6 +53,17 @@
6253
import org.apache.calcite.sql.SqlNode;
6354
import org.apache.commons.io.Charsets;
6455
import org.apache.commons.lang3.StringUtils;
56+
import org.apache.flink.api.common.typeinfo.TypeInformation;
57+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
58+
import org.apache.flink.streaming.api.datastream.DataStream;
59+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
60+
import org.apache.flink.table.api.EnvironmentSettings;
61+
import org.apache.flink.table.api.Table;
62+
import org.apache.flink.table.api.TableConfig;
63+
import org.apache.flink.table.api.TableEnvironment;
64+
import org.apache.flink.table.api.java.StreamTableEnvironment;
65+
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
66+
import org.apache.flink.table.sinks.TableSink;
6567
import org.slf4j.Logger;
6668
import org.slf4j.LoggerFactory;
6769

@@ -71,13 +73,13 @@
7173
import java.net.URLClassLoader;
7274
import java.net.URLDecoder;
7375
import java.time.ZoneId;
76+
import java.util.ArrayList;
7477
import java.util.Arrays;
7578
import java.util.List;
7679
import java.util.Map;
7780
import java.util.Properties;
7881
import java.util.Set;
7982
import java.util.TimeZone;
80-
import java.util.ArrayList;
8183

8284
/**
8385
* 任务执行时的流程方法
@@ -158,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
158160
Map<String, Table> registerTableCache = Maps.newHashMap();
159161

160162
//register udf
161-
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv);
163+
ExecuteProcessHelper.registerUserDefinedFunction(sqlTree, paramsInfo.getJarUrlList(), tableEnv, paramsInfo.isGetPlan());
162164
//register table schema
163165
Set<URL> classPathSets = ExecuteProcessHelper.registerTable(sqlTree, env, tableEnv, paramsInfo.getLocalSqlPluginPath(),
164166
paramsInfo.getRemoteSqlPluginPath(), paramsInfo.getPluginLoadMode(), sideTableMap, registerTableCache);
@@ -243,13 +245,19 @@ private static void sqlTranslation(String localSqlPluginPath,
243245
}
244246
}
245247

246-
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv)
248+
public static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarUrlList, TableEnvironment tableEnv, boolean getPlan)
247249
throws IllegalAccessException, InvocationTargetException {
248250
// udf和tableEnv须由同一个类加载器加载
249251
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
250252
URLClassLoader classLoader = null;
251253
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
252254
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
255+
// 构建plan的情况下,udf和tableEnv不需要是同一个类加载器
256+
if (getPlan) {
257+
URL[] urls = jarUrlList.toArray(new URL[0]);
258+
classLoader = URLClassLoader.newInstance(urls);
259+
}
260+
253261
//classloader
254262
if (classLoader == null) {
255263
classLoader = ClassLoaderManager.loadExtraJar(jarUrlList, (URLClassLoader) levelClassLoader);

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URL;
2323
import java.util.List;
24+
import java.util.Objects;
2425
import java.util.Properties;
2526

2627
/**
@@ -39,6 +40,7 @@ public class ParamsInfo {
3940
private String pluginLoadMode;
4041
private String deployMode;
4142
private Properties confProp;
43+
private boolean getPlan = false;
4244

4345
public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSqlPluginPath,
4446
String remoteSqlPluginPath, String pluginLoadMode, String deployMode, Properties confProp) {
@@ -52,6 +54,14 @@ public ParamsInfo(String sql, String name, List<URL> jarUrlList, String localSql
5254
this.confProp = confProp;
5355
}
5456

57+
public boolean isGetPlan() {
58+
return getPlan;
59+
}
60+
61+
public void setGetPlan(boolean getPlan) {
62+
this.getPlan = getPlan;
63+
}
64+
5565
public String getSql() {
5666
return sql;
5767
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.commons.collections.CollectionUtils;
4545
import org.apache.commons.lang3.StringUtils;
4646
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
47+
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
4748
import org.apache.flink.api.common.typeinfo.TypeInformation;
4849
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4950
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -54,9 +55,11 @@
5455
import org.apache.flink.table.catalog.ObjectIdentifier;
5556
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
5657
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
58+
import org.apache.flink.table.runtime.typeutils.LegacyLocalDateTimeTypeInfo;
5759
import org.apache.flink.table.types.logical.DecimalType;
5860
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5961
import org.apache.flink.table.types.logical.LogicalType;
62+
import org.apache.flink.table.types.logical.TimestampType;
6063
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
6164
import org.apache.flink.types.Row;
6265
import org.slf4j.Logger;
@@ -387,6 +390,11 @@ private void joinFun(Object pollObj,
387390
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
388391
logicalTypes[i] = new DecimalType(38, 18);
389392
}
393+
394+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
395+
(((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(LegacyLocalDateTimeTypeInfo.class))) {
396+
logicalTypes[i] = new TimestampType(TimestampType.MAX_PRECISION);
397+
}
390398
}
391399

392400
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -425,11 +433,15 @@ private void joinFun(Object pollObj,
425433
targetTable = localTableCache.get(joinInfo.getLeftTableName());
426434
}
427435

428-
TypeInformation[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
436+
TypeInformation<?>[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
429437
for (int i = 0; i < fieldDataTypes.length; i++) {
430438
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
431439
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
432440
}
441+
442+
if (fieldDataTypes[i].getClass().equals(LegacyLocalDateTimeTypeInfo.class)) {
443+
fieldDataTypes[i] = LocalTimeTypeInfo.LOCAL_DATE_TIME;
444+
}
433445
}
434446

435447
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());

core/src/main/java/com/dtstack/flink/sql/util/RowDataConvert.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.sql.Time;
3232
import java.sql.Timestamp;
3333
import java.time.LocalDate;
34+
import java.time.LocalDateTime;
3435
import java.time.LocalTime;
3536

3637
/**
@@ -52,6 +53,8 @@ public static BaseRow convertToBaseRow(Row row) {
5253
} else if (row.getField(i) instanceof Timestamp) {
5354
SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp) row.getField(i)));
5455
genericRow.setField(i, newTimestamp);
56+
} else if (row.getField(i) instanceof LocalDateTime) {
57+
genericRow.setField(i, SqlTimestamp.fromLocalDateTime((LocalDateTime) row.getField(i)));
5558
} else if (row.getField(i) instanceof Time) {
5659
genericRow.setField(i, DataFormatConverters.TimeConverter.INSTANCE.toInternal((Time) row.getField(i)));
5760
} else if (row.getField(i) instanceof Double || row.getField(i).getClass().equals(double.class)) {

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/table/KafkaSourceParser.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flink.sql.util.MathUtil;
2727

2828
import java.util.Map;
29+
import java.util.UUID;
2930
import java.util.stream.Collectors;
3031

3132
/**
@@ -46,7 +47,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
4647
kafkaSourceTableInfo.setType(MathUtil.getString(props.get(KafkaSourceTableInfo.TYPE_KEY.toLowerCase())));
4748
kafkaSourceTableInfo.setParallelism(MathUtil.getIntegerVal(props.get(KafkaSourceTableInfo.PARALLELISM_KEY.toLowerCase())));
4849
kafkaSourceTableInfo.setBootstrapServers(MathUtil.getString(props.get(KafkaSourceTableInfo.BOOTSTRAPSERVERS_KEY.toLowerCase())));
49-
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.get(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase())));
50+
kafkaSourceTableInfo.setGroupId(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.GROUPID_KEY.toLowerCase(), UUID.randomUUID().toString().replace("-", ""))));
5051
kafkaSourceTableInfo.setTopic(MathUtil.getString(props.get(KafkaSourceTableInfo.TOPIC_KEY.toLowerCase())));
5152
kafkaSourceTableInfo.setTopicIsPattern(MathUtil.getBoolean(props.get(KafkaSourceTableInfo.TOPICISPATTERN_KEY.toLowerCase()), false));
5253
kafkaSourceTableInfo.setOffsetReset(MathUtil.getString(props.getOrDefault(KafkaSourceTableInfo.OFFSETRESET_KEY.toLowerCase(), EKafkaOffset.LATEST.name().toLowerCase())));

0 commit comments

Comments
 (0)