Skip to content

Commit 969fae5

Browse files
committed
【fix】opt LocalTest PluginLoadMode
1 parent 29f41be commit 969fae5

5 files changed

Lines changed: 20 additions & 29 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
160160
// cache classPathSets
161161
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
162162

163-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache);
163+
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), paramsInfo.getPluginLoadMode(),tableEnv, sqlTree, sideTableMap, registerTableCache);
164164

165165
if (env instanceof MyLocalStreamEnvironment) {
166166
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -184,12 +184,14 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
184184
}
185185

186186
private static void sqlTranslation(String localSqlPluginPath,
187+
String pluginLoadMode,
187188
StreamTableEnvironment tableEnv,
188189
SqlTree sqlTree,Map<String, AbstractSideTableInfo> sideTableMap,
189190
Map<String, Table> registerTableCache) throws Exception {
190191

191192
SideSqlExec sideSqlExec = new SideSqlExec();
192193
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
194+
sideSqlExec.setPluginLoadMode(pluginLoadMode);
193195

194196
int scope = 0;
195197
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public class SideSqlExec {
8787

8888
private String tmpFields = null;
8989

90+
private String pluginLoadMode = null;
91+
9092
private SidePredicatesParser sidePredicatesParser = new SidePredicatesParser();
9193

9294
private Map<String, Table> localTableCache = Maps.newHashMap();
@@ -254,15 +256,14 @@ private TypeInformation convertTimeAttributeType(TypeInformation typeInformation
254256
return typeInformation;
255257
}
256258

257-
258-
259-
260-
261-
262259
public void setLocalSqlPluginPath(String localSqlPluginPath) {
263260
this.localSqlPluginPath = localSqlPluginPath;
264261
}
265262

263+
public void setPluginLoadMode(String pluginLoadMode) {
264+
this.pluginLoadMode = pluginLoadMode;
265+
}
266+
266267
private Table getTableFromCache(Map<String, Table> localTableCache, String tableAlias, String tableName){
267268
Table table = localTableCache.get(tableAlias);
268269
if(table == null){
@@ -415,9 +416,9 @@ private void joinFun(Object pollObj,
415416

416417
DataStream<Tuple2<Boolean, Row>> dsOut = null;
417418
if(ECacheType.ALL.name().equalsIgnoreCase(sideTableInfo.getCacheType())){
418-
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
419+
dsOut = SideWithAllCacheOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo, pluginLoadMode);
419420
}else{
420-
dsOut = SideAsyncOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo);
421+
dsOut = SideAsyncOperator.getSideJoinDataStream(adaptStream, sideTableInfo.getType(), localSqlPluginPath, typeInfo, joinInfo, sideJoinFieldInfo, sideTableInfo, pluginLoadMode);
421422
}
422423

423424
RowTypeInfo sideOutTypeInfo = buildOutRowTypeInfo(sideJoinFieldInfo, mappingTable);

core/src/main/java/com/dtstack/flink/sql/side/operator/SideAsyncOperator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ public class SideAsyncOperator {
4949
private static final String ORDERED = "ordered";
5050

5151

52-
private static BaseAsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
53-
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) throws Exception {
52+
private static BaseAsyncReqRow loadAsyncReq(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
53+
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
5454
String pathOfType = String.format(PATH_FORMAT, sideType);
55-
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
55+
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir, pluginLoadMode);
5656
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
5757
return ClassLoaderManager.newInstance(pluginJarPath, (cl) ->
5858
cl.loadClass(className).asSubclass(BaseAsyncReqRow.class)
@@ -61,8 +61,8 @@ private static BaseAsyncReqRow loadAsyncReq(String sideType, String sqlRootDir,
6161
}
6262

6363
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
64-
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) throws Exception {
65-
BaseAsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
64+
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
65+
BaseAsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo, pluginLoadMode);
6666

6767
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
6868
if (ORDERED.equals(sideTableInfo.getCacheMode())){

core/src/main/java/com/dtstack/flink/sql/side/operator/SideWithAllCacheOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ public class SideWithAllCacheOperator {
4646

4747
private static BaseAllReqRow loadFlatMap(String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo,
4848
JoinInfo joinInfo, List<FieldInfo> outFieldInfoList,
49-
AbstractSideTableInfo sideTableInfo) throws Exception {
49+
AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
5050

5151
String pathOfType = String.format(PATH_FORMAT, sideType);
52-
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir);
52+
String pluginJarPath = PluginUtil.getJarFileDirPath(pathOfType, sqlRootDir, pluginLoadMode);
5353
String className = PluginUtil.getSqlSideClassName(sideType, "side", OPERATOR_TYPE);
5454

5555
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> cl.loadClass(className).asSubclass(BaseAllReqRow.class)
@@ -58,8 +58,8 @@ private static BaseAllReqRow loadFlatMap(String sideType, String sqlRootDir, Row
5858
}
5959

6060
public static DataStream getSideJoinDataStream(DataStream inputStream, String sideType, String sqlRootDir, RowTypeInfo rowTypeInfo, JoinInfo joinInfo,
61-
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo) throws Exception {
62-
BaseAllReqRow allReqRow = loadFlatMap(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
61+
List<FieldInfo> outFieldInfoList, AbstractSideTableInfo sideTableInfo, String pluginLoadMode) throws Exception {
62+
BaseAllReqRow allReqRow = loadFlatMap(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo, pluginLoadMode);
6363
return inputStream.flatMap(allReqRow);
6464
}
6565
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,18 +82,6 @@ public static String getJarFileDirPath(String type, String sqlRootDir, String pl
8282
return jarPath;
8383
}
8484

85-
public static String getJarFileDirPath(String type, String sqlRootDir) {
86-
String jarPath = sqlRootDir + SP + type;
87-
88-
File jarFile = new File(jarPath);
89-
90-
if(!jarFile.exists()){
91-
throw new RuntimeException(String.format("path %s not exists!!!", jarPath));
92-
}
93-
94-
return jarPath;
95-
}
96-
9785
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir, String pluginLoadMode) throws MalformedURLException {
9886
String dirName = sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
9987

0 commit comments

Comments
 (0)