Skip to content

Commit 63cde11

Browse files
committed
Merge branch 'feat_1.10_addLocalTest' into 'v1.10.0_dev'
Feat 1.10 add local test See merge request dt-insight-engine/flinkStreamSQL!67
2 parents 622b4b8 + 969fae5 commit 63cde11

22 files changed

Lines changed: 334 additions & 135 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ private static DtClassLoader retrieveClassLoad(String pluginJarPath) {
6565
LOG.info("pluginJarPath:{} create ClassLoad successful...", pluginJarPath);
6666
return classLoader;
6767
} catch (Throwable e) {
68-
LOG.error("retrieve ClassLoad happens error:{}", e);
68+
LOG.error("retrieve ClassLoad happens error", e);
6969
throw new RuntimeException("retrieve ClassLoad happens error");
7070
}
7171
});

core/src/main/java/com/dtstack/flink/sql/enums/EPluginLoadMode.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ public enum EPluginLoadMode {
3333
/**
3434
* 1:shipfile
3535
*/
36-
SHIPFILE(1);
36+
SHIPFILE(1),
37+
38+
/**
39+
* 2:localTest
40+
*/
41+
LOCALTEST(3);
3742

3843
private int type;
3944

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,15 @@
2525
import com.dtstack.flink.sql.parser.SqlParser;
2626
import com.dtstack.flink.sql.parser.SqlTree;
2727
import org.apache.flink.api.common.typeinfo.TypeInformation;
28-
import org.apache.flink.api.java.tuple.Tuple2;
2928
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3029
import org.apache.flink.streaming.api.datastream.DataStream;
3130
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3231
import org.apache.flink.table.api.*;
33-
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
3432
import org.apache.flink.table.api.java.StreamTableEnvironment;
3533
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
3634
import org.apache.flink.table.sinks.TableSink;
37-
import org.apache.flink.types.Row;
3835

3936
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
40-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
4137
import com.dtstack.flink.sql.enums.ClusterMode;
4238
import com.dtstack.flink.sql.enums.ECacheType;
4339
import com.dtstack.flink.sql.enums.EPluginLoadMode;
@@ -108,7 +104,6 @@ public static ParamsInfo parseParams(String[] args) throws Exception {
108104
String remoteSqlPluginPath = options.getRemoteSqlPluginPath();
109105
String pluginLoadMode = options.getPluginLoadMode();
110106
String deployMode = options.getMode();
111-
String logLevel = options.getLogLevel();
112107

113108
Preconditions.checkArgument(checkRemoteSqlPluginPath(remoteSqlPluginPath, deployMode, pluginLoadMode),
114109
"Non-local mode or shipfile deployment mode, remoteSqlPluginPath is required");
@@ -152,7 +147,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
152147

153148

154149
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
155-
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
150+
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql(), paramsInfo.getPluginLoadMode());
156151

157152
Map<String, AbstractSideTableInfo> sideTableMap = Maps.newHashMap();
158153
Map<String, Table> registerTableCache = Maps.newHashMap();
@@ -165,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
165160
// cache classPathSets
166161
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
167162

168-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache);
163+
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), paramsInfo.getPluginLoadMode(),tableEnv, sqlTree, sideTableMap, registerTableCache);
169164

170165
if (env instanceof MyLocalStreamEnvironment) {
171166
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -189,12 +184,14 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
189184
}
190185

191186
private static void sqlTranslation(String localSqlPluginPath,
187+
String pluginLoadMode,
192188
StreamTableEnvironment tableEnv,
193189
SqlTree sqlTree,Map<String, AbstractSideTableInfo> sideTableMap,
194190
Map<String, Table> registerTableCache) throws Exception {
195191

196192
SideSqlExec sideSqlExec = new SideSqlExec();
197193
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
194+
sideSqlExec.setPluginLoadMode(pluginLoadMode);
198195

199196
int scope = 0;
200197
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
@@ -278,7 +275,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
278275
if (tableInfo instanceof AbstractSourceTableInfo) {
279276

280277
AbstractSourceTableInfo sourceTableInfo = (AbstractSourceTableInfo) tableInfo;
281-
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
278+
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath, pluginLoadMode);
282279
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
283280
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
284281
//Create table in which the function is arranged only need adaptation sql
@@ -308,7 +305,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
308305
pluginClassPathSets.add(sourceTablePathUrl);
309306
} else if (tableInfo instanceof AbstractTargetTableInfo) {
310307

311-
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath);
308+
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath, pluginLoadMode);
312309
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
313310
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
314311

@@ -324,6 +321,9 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
324321
throw new RuntimeException("not support table type:" + tableInfo.getType());
325322
}
326323
}
324+
if (localSqlPluginPath == null || localSqlPluginPath.isEmpty()) {
325+
return Sets.newHashSet();
326+
}
327327
return pluginClassPathSets;
328328
}
329329

@@ -351,7 +351,7 @@ public static StreamExecutionEnvironment getStreamExeEnv(Properties confProperti
351351
}
352352

353353

354-
private static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, Properties confProperties) {
354+
public static StreamTableEnvironment getStreamTableEnv(StreamExecutionEnvironment env, Properties confProperties) {
355355
// use blink and streammode
356356
EnvironmentSettings settings = EnvironmentSettings.newInstance()
357357
.useBlinkPlanner()

core/src/main/java/com/dtstack/flink/sql/exec/ParamsInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,8 @@ public static class Builder {
114114
private String remoteSqlPluginPath;
115115
private String pluginLoadMode;
116116
private String deployMode;
117-
private String logLevel;
118117
private Properties confProp;
119118

120-
121119
public ParamsInfo.Builder setSql(String sql) {
122120
this.sql = sql;
123121
return this;

core/src/main/java/com/dtstack/flink/sql/option/Options.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,10 @@ public class Options {
4545
@OptionRequired(description = "Yarn and Hadoop configuration directory")
4646
private String yarnconf;
4747

48-
@OptionRequired(required = true,description = "Sql local plugin root")
48+
@OptionRequired(description = "Sql local plugin root")
4949
private String localSqlPluginPath;
5050

51-
@OptionRequired(required = false,description = "Sql remote plugin root")
51+
@OptionRequired(description = "Sql remote plugin root")
5252
private String remoteSqlPluginPath ;
5353

5454
@OptionRequired(description = "sql ext jar,eg udf jar")
@@ -69,9 +69,6 @@ public class Options {
6969
@OptionRequired(description = "plugin load mode, by classpath or shipfile")
7070
private String pluginLoadMode = EPluginLoadMode.CLASSPATH.name();
7171

72-
@OptionRequired(description = "log level")
73-
private String logLevel = "info";
74-
7572
@OptionRequired(description = "file add to ship file")
7673
private String addShipfile;
7774

@@ -180,14 +177,6 @@ public void setPluginLoadMode(String pluginLoadMode) {
180177
this.pluginLoadMode = pluginLoadMode;
181178
}
182179

183-
public String getLogLevel() {
184-
return logLevel;
185-
}
186-
187-
public void setLogLevel(String logLevel) {
188-
this.logLevel = logLevel;
189-
}
190-
191180
public String getAddShipfile() {
192181
return addShipfile;
193182
}

core/src/main/java/com/dtstack/flink/sql/parser/CreateTableParser.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ private Map parseProp(String propsStr){
7474
propsStr = propsStr.replaceAll("'\\s*,", "'|");
7575
String[] strs = propsStr.trim().split("\\|");
7676
Map<String, Object> propMap = Maps.newHashMap();
77-
for(int i=0; i<strs.length; i++){
78-
List<String> ss = DtStringUtil.splitIgnoreQuota(strs[i], '=');
77+
for (String str : strs) {
78+
List<String> ss = DtStringUtil.splitIgnoreQuota(str, '=');
7979
String key = ss.get(0).trim();
8080
String value = extractValue(ss.get(1).trim());
8181
propMap.put(key, value);

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,12 @@ public static void setLocalSqlPluginRoot(String localSqlPluginRoot){
5858
* insert into tb1 select * from tb2;
5959
* @param sql
6060
*/
61-
public static SqlTree parseSql(String sql) throws Exception {
61+
public static SqlTree parseSql(String sql, String pluginLoadMode) throws Exception {
6262

6363
if(StringUtils.isBlank(sql)){
6464
throw new RuntimeException("sql is not null");
6565
}
6666

67-
if(LOCAL_SQL_PLUGIN_ROOT == null){
68-
throw new RuntimeException("need to set local sql plugin root");
69-
}
70-
7167
sql = DtStringUtil.dealSqlComment(sql)
7268
.replaceAll("\r\n", " ")
7369
.replaceAll("\n", " ")
@@ -114,7 +110,7 @@ public static SqlTree parseSql(String sql) throws Exception {
114110
}
115111

116112
AbstractTableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
117-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
113+
createTableResult, LOCAL_SQL_PLUGIN_ROOT, pluginLoadMode);
118114
sqlTree.addTableInfo(tableName, tableInfo);
119115
}
120116
}
@@ -127,7 +123,7 @@ public static SqlTree parseSql(String sql) throws Exception {
127123
}
128124

129125
AbstractTableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SINK.getType(),
130-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
126+
createTableResult, LOCAL_SQL_PLUGIN_ROOT, pluginLoadMode);
131127
sqlTree.addTableInfo(tableName, tableInfo);
132128
}
133129
}
@@ -145,7 +141,7 @@ public static SqlTree parseSql(String sql) throws Exception {
145141
}
146142
} else {
147143
AbstractTableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
148-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
144+
createTableResult, LOCAL_SQL_PLUGIN_ROOT, pluginLoadMode);
149145
sqlTree.addTableInfo(tableName, tableInfo);
150146
}
151147
}

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class SideSqlExec {
8787

8888
private String tmpFields = null;
8989

90+
private String pluginLoadMode = null;
91+
9092
private SidePredicatesParser sidePredicatesParser = new SidePredicatesParser();
9193

9294
private Map<String, Table> localTableCache = Maps.newHashMap();
@@ -97,10 +99,6 @@ public void exec(String sql,
9799
Map<String, Table> tableCache,
98100
CreateTmpTableParser.SqlParserResult createView,
99101
String scope) throws Exception {
100-
if(localSqlPluginPath == null){
101-
throw new RuntimeException("need to set localSqlPluginPath");
102-
}
103-
104102
localTableCache.putAll(tableCache);
105103
try {
106104
sidePredicatesParser.fillPredicatesForSideTable(sql, sideTableMap);
@@ -258,15 +256,14 @@ private TypeInformation convertTimeAttributeType(TypeInformation typeInformation
258256
return typeInformation;
259257
}
260258

261-
262-
263-
264-
265-
266259
public void setLocalSqlPluginPath(String localSqlPluginPath) {
267260
this.localSqlPluginPath = localSqlPluginPath;
268261
}
269262

263+
public void setPluginLoadMode(String pluginLoadMode) {
264+
this.pluginLoadMode = pluginLoadMode;
265+
}
266+
270267
private Table getTableFromCache(Map<String, Table> localTableCache, String tableAlias, String tableName){
271268
Table table = localTableCache.get(tableAlias);
272269
if(table == null){
@@ -419,9 +416,9 @@ private void joinFun(Object pollObj,
419416

420417
DataStream<Tuple2<Boolean, Row>> dsOut = null;
421418
if(ECacheType.ALL.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
422-
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
419+
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo, pluginLoadMode);
423420
}else{
424-
dsOut = SideAsyncOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
421+
dsOut = SideAsyncOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo, pluginLoadMode);
425422
}
426423

427424
RowTypeInfo sideOutTypeInfo = buildOutRowTypeInfo(sideJoinFieldInfo, mappingTable);

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public class StreamSideFactory {
3737

3838
private static final String CURR_TYPE = "side";
3939

40-
public static AbstractTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
40+
public static AbstractTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType, String pluginLoadMode) throws Exception {
4141

4242
String sideOperator = ECacheType.ALL.name().equalsIgnoreCase(cacheType) ? "all" : "async";
43-
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
43+
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir, pluginLoadMode);
4444
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4545

4646
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ public class SideAsyncOperator {
4949
private static final String ORDERED = "ordered";
5050

5151

52-
private static BaseAsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
53-
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) throws Exception {
52+
private static BaseAsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
53+
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
5454
String pathOfType = String.format(PATH_FORMAT, sideType);
55-
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
55+
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir, pluginLoadMode);
5656
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
5757
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
5858
cl.loadClass(className).asSubclass(BaseAsyncReqRow.class)
@@ -61,8 +61,8 @@ private static BaseAsyncReqRow loadAsyncReq(String sideType, String sqlRootDir,
6161
}
6262

6363
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
64-
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) throws Exception {
65-
BaseAsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
64+
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
65+
BaseAsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo, pluginLoadMode);
6666

6767
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
6868
if (ORDERED.equals(sideTableInfo.getCacheMode())){

0 commit comments

Comments
 (0)