Skip to content

Commit 29f41be

Browse files
committed
【fix】opt LocalTest PluginLoadMode
1 parent 954246c commit 29f41be

7 files changed

Lines changed: 48 additions & 31 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
147147

148148

149149
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
150-
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
150+
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql(), paramsInfo.getPluginLoadMode());
151151

152152
Map<String, AbstractSideTableInfo> sideTableMap = Maps.newHashMap();
153153
Map<String, Table> registerTableCache = Maps.newHashMap();
@@ -273,7 +273,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
273273
if (tableInfo instanceof AbstractSourceTableInfo) {
274274

275275
AbstractSourceTableInfo sourceTableInfo = (AbstractSourceTableInfo) tableInfo;
276-
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath);
276+
Table table = StreamSourceFactory.getStreamSource(sourceTableInfo, env, tableEnv, localSqlPluginPath, pluginLoadMode);
277277
tableEnv.registerTable(sourceTableInfo.getAdaptName(), table);
278278
//Note --- parameter conversion function can not be used inside a function of the type of polymerization
279279
//Create table in which the function is arranged only need adaptation sql
@@ -303,7 +303,7 @@ public static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment
303303
pluginClassPathSets.add(sourceTablePathUrl);
304304
} else if (tableInfo instanceof AbstractTargetTableInfo) {
305305

306-
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath);
306+
TableSink tableSink = StreamSinkFactory.getTableSink((AbstractTargetTableInfo) tableInfo, localSqlPluginPath, pluginLoadMode);
307307
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
308308
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
309309

core/src/main/java/com/dtstack/flink/sql/parser/SqlParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public static void setLocalSqlPluginRoot(String localSqlPluginRoot){
5858
* insert into tb1 select * from tb2;
5959
* @param sql
6060
*/
61-
public static SqlTree parseSql(String sql) throws Exception {
61+
public static SqlTree parseSql(String sql, String pluginLoadMode) throws Exception {
6262

6363
if(StringUtils.isBlank(sql)){
6464
throw new RuntimeException("sql is not null");
@@ -110,7 +110,7 @@ public static SqlTree parseSql(String sql) throws Exception {
110110
}
111111

112112
AbstractTableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
113-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
113+
createTableResult, LOCAL_SQL_PLUGIN_ROOT, pluginLoadMode);
114114
sqlTree.addTableInfo(tableName, tableInfo);
115115
}
116116
}
@@ -123,7 +123,7 @@ public static SqlTree parseSql(String sql) throws Exception {
123123
}
124124

125125
AbstractTableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SINK.getType(),
126-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
126+
createTableResult, LOCAL_SQL_PLUGIN_ROOT, pluginLoadMode);
127127
sqlTree.addTableInfo(tableName, tableInfo);
128128
}
129129
}
@@ -141,7 +141,7 @@ public static SqlTree parseSql(String sql) throws Exception {
141141
}
142142
} else {
143143
AbstractTableInfo tableInfo = tableInfoParser.parseWithTableType(ETableType.SOURCE.getType(),
144-
createTableResult, LOCAL_SQL_PLUGIN_ROOT);
144+
createTableResult, LOCAL_SQL_PLUGIN_ROOT, pluginLoadMode);
145145
sqlTree.addTableInfo(tableName, tableInfo);
146146
}
147147
}

core/src/main/java/com/dtstack/flink/sql/side/StreamSideFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ public class StreamSideFactory {
3737

3838
private static final String CURR_TYPE = "side";
3939

40-
public static AbstractTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType) throws Exception {
40+
public static AbstractTableParser getSqlParser(String pluginType, String sqlRootDir, String cacheType, String pluginLoadMode) throws Exception {
4141

4242
String sideOperator = ECacheType.ALL.name().equalsIgnoreCase(cacheType) ? "all" : "async";
43-
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir);
43+
String pluginJarPath = PluginUtil.getSideJarFileDirPath(pluginType, sideOperator, "side", sqlRootDir, pluginLoadMode);
4444
String className = PluginUtil.getSqlParserClassName(pluginType, CURR_TYPE);
4545

4646
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {

core/src/main/java/com/dtstack/flink/sql/sink/StreamSinkFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ public class StreamSinkFactory {
4040

4141
private static final String DIR_NAME_FORMAT = "%ssink";
4242

43-
public static AbstractTableParser getSqlParser(String pluginType, String sqlRootDir) throws Exception {
44-
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
43+
public static AbstractTableParser getSqlParser(String pluginType, String sqlRootDir, String pluginLoadMode) throws Exception {
44+
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir, pluginLoadMode);
4545
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
4646
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
4747

@@ -54,9 +54,9 @@ public static AbstractTableParser getSqlParser(String pluginType, String sqlRoot
5454
});
5555
}
5656

57-
public static TableSink getTableSink(AbstractTargetTableInfo targetTableInfo, String localSqlRootDir) throws Exception {
57+
public static TableSink getTableSink(AbstractTargetTableInfo targetTableInfo, String localSqlRootDir, String pluginLoadMode) throws Exception {
5858
String pluginType = targetTableInfo.getType();
59-
String pluginJarDirPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), localSqlRootDir);
59+
String pluginJarDirPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), localSqlRootDir, pluginLoadMode);
6060
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
6161
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
6262

core/src/main/java/com/dtstack/flink/sql/source/StreamSourceFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ public class StreamSourceFactory {
4343

4444
private static final String DIR_NAME_FORMAT = "%ssource";
4545

46-
public static AbstractSourceParser getSqlParser(String pluginType, String sqlRootDir) throws Exception {
46+
public static AbstractSourceParser getSqlParser(String pluginType, String sqlRootDir, String pluginLoadMode) throws Exception {
4747

48-
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir);
48+
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, pluginType), sqlRootDir, pluginLoadMode);
4949
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(pluginType);
5050
String className = PluginUtil.getSqlParserClassName(typeNoVersion, CURR_TYPE);
5151
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {
@@ -63,11 +63,11 @@ public static AbstractSourceParser getSqlParser(String pluginType, String sqlRoo
6363
* @return
6464
*/
6565
public static Table getStreamSource(AbstractSourceTableInfo sourceTableInfo, StreamExecutionEnvironment env,
66-
StreamTableEnvironment tableEnv, String sqlRootDir) throws Exception {
66+
StreamTableEnvironment tableEnv, String sqlRootDir, String pluginLoadMode) throws Exception {
6767

6868
String sourceTypeStr = sourceTableInfo.getType();
6969
String typeNoVersion = DtStringUtil.getPluginTypeWithoutVersion(sourceTypeStr);
70-
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, sourceTypeStr), sqlRootDir);
70+
String pluginJarPath = PluginUtil.getJarFileDirPath(String.format(DIR_NAME_FORMAT, sourceTypeStr), sqlRootDir, pluginLoadMode);
7171
String className = PluginUtil.getGenerClassName(typeNoVersion, CURR_TYPE);
7272

7373
return ClassLoaderManager.newInstance(pluginJarPath, (cl) -> {

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfoParser.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class AbstractTableInfoParser {
5858

5959
//Parsing loaded plugin
6060
public AbstractTableInfo parseWithTableType(int tableType, CreateTableParser.SqlParserResult parserResult,
61-
String localPluginRoot) throws Exception {
61+
String localPluginRoot, String pluginLoadMode) throws Exception {
6262
AbstractTableParser absTableParser = null;
6363
Map<String, Object> props = parserResult.getPropMap();
6464
String type = MathUtil.getString(props.get(TYPE_KEY));
@@ -73,22 +73,22 @@ public AbstractTableInfo parseWithTableType(int tableType, CreateTableParser.Sql
7373
if(!isSideTable){
7474
absTableParser = sourceTableInfoMap.get(type);
7575
if(absTableParser == null){
76-
absTableParser = StreamSourceFactory.getSqlParser(type, localPluginRoot);
76+
absTableParser = StreamSourceFactory.getSqlParser(type, localPluginRoot, pluginLoadMode);
7777
sourceTableInfoMap.put(type, absTableParser);
7878
}
7979
}else{
8080
absTableParser = sideTableInfoMap.get(type);
8181
if(absTableParser == null){
8282
String cacheType = MathUtil.getString(props.get(AbstractSideTableInfo.CACHE_KEY));
83-
absTableParser = StreamSideFactory.getSqlParser(type, localPluginRoot, cacheType);
83+
absTableParser = StreamSideFactory.getSqlParser(type, localPluginRoot, cacheType, pluginLoadMode);
8484
sideTableInfoMap.put(type + cacheType, absTableParser);
8585
}
8686
}
8787

8888
}else if(tableType == ETableType.SINK.getType()){
8989
absTableParser = targetTableInfoMap.get(type);
9090
if(absTableParser == null){
91-
absTableParser = StreamSinkFactory.getSqlParser(type, localPluginRoot);
91+
absTableParser = StreamSinkFactory.getSqlParser(type, localPluginRoot, pluginLoadMode);
9292
targetTableInfoMap.put(type, absTableParser);
9393
}
9494
}

core/src/main/java/com/dtstack/flink/sql/util/PluginUtil.java

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,32 +74,49 @@ public static URL buildSidePathByLoadMode(String type, String operator, String s
7474
return getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath, pluginLoadMode);
7575
}
7676

77-
public static String getJarFileDirPath(String type, String sqlRootDir){
77+
public static String getJarFileDirPath(String type, String sqlRootDir, String pluginLoadMode){
7878
String jarPath = sqlRootDir + SP + type;
7979

80-
checkJarFileDirPath(sqlRootDir, jarPath);
80+
checkJarFileDirPath(sqlRootDir, jarPath, pluginLoadMode);
8181

8282
return jarPath;
8383
}
8484

85-
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir) throws MalformedURLException {
85+
public static String getJarFileDirPath(String type, String sqlRootDir) {
86+
String jarPath = sqlRootDir + SP + type;
87+
88+
File jarFile = new File(jarPath);
89+
90+
if(!jarFile.exists()){
91+
throw new RuntimeException(String.format("path %s not exists!!!", jarPath));
92+
}
93+
94+
return jarPath;
95+
}
96+
97+
public static String getSideJarFileDirPath(String pluginType, String sideOperator, String tableType, String sqlRootDir, String pluginLoadMode) throws MalformedURLException {
8698
String dirName = sqlRootDir + SP + pluginType + sideOperator + tableType.toLowerCase();
8799

88-
checkJarFileDirPath(sqlRootDir, dirName);
100+
checkJarFileDirPath(sqlRootDir, dirName, pluginLoadMode);
89101

90102
return dirName;
91103
}
92104

93-
private static void checkJarFileDirPath(String sqlRootDir, String dirName) {
94-
if (sqlRootDir == null || sqlRootDir.isEmpty()) {
95-
LOG.warn("be sure you are not in LocalTest mode, if not, check the sqlRootDir");
96-
return;
105+
private static void checkJarFileDirPath(String sqlRootDir, String path, String pluginLoadMode) {
106+
107+
if (sqlRootDir == null || sqlRootDir.isEmpty()){
108+
if (pluginLoadMode.equalsIgnoreCase(EPluginLoadMode.LOCALTEST.name())) {
109+
LOG.warn("be sure you are not in LocalTest mode, if not, check the sqlRootDir");
110+
return;
111+
}
112+
113+
throw new RuntimeException("sqlPlugin is empty !");
97114
}
98115

99-
File jarFile = new File(dirName);
116+
File jarFile = new File(path);
100117

101118
if(!jarFile.exists()){
102-
throw new RuntimeException(String.format("path %s not exists!!!", dirName));
119+
throw new RuntimeException(String.format("path %s not exists!!!", path));
103120
}
104121
}
105122

0 commit comments

Comments
 (0)