Skip to content

Commit 8387e4b

Browse files
committed
opt main code
1 parent 8c5ea02 commit 8387e4b

4 files changed

Lines changed: 136 additions & 148 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/Main.java

Lines changed: 66 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,12 @@
1717
*/
1818

1919

20-
2120
package com.dtstack.flink.sql;
2221

2322
import com.dtstack.flink.sql.config.CalciteConfig;
2423
import com.dtstack.flink.sql.classloader.ClassLoaderManager;
25-
import com.dtstack.flink.sql.constrant.ConfigConstrant;
2624
import com.dtstack.flink.sql.enums.ClusterMode;
2725
import com.dtstack.flink.sql.enums.ECacheType;
28-
import com.dtstack.flink.sql.enums.EPluginLoadMode;
29-
//import com.dtstack.flink.sql.exec.FlinkSQLExec;
3026
import com.dtstack.flink.sql.environment.MyLocalStreamEnvironment;
3127
import com.dtstack.flink.sql.environment.StreamEnvConfigManager;
3228
import com.dtstack.flink.sql.exec.FlinkSQLExec;
@@ -44,29 +40,21 @@
4440
import com.dtstack.flink.sql.sink.StreamSinkFactory;
4541
import com.dtstack.flink.sql.source.StreamSourceFactory;
4642
import com.dtstack.flink.sql.util.DtStringUtil;
47-
import com.dtstack.flink.sql.util.PropertiesUtils;
4843
import com.dtstack.flink.sql.watermarker.WaterMarkerAssigner;
49-
import com.dtstack.flink.sql.util.FlinkUtil;
44+
import com.dtstack.flink.sql.function.FunctionManager;
5045
import com.dtstack.flink.sql.util.PluginUtil;
5146
import org.apache.calcite.sql.SqlInsert;
5247
import org.apache.calcite.sql.SqlNode;
5348
import org.apache.commons.io.Charsets;
54-
import org.apache.commons.lang3.StringUtils;
55-
import org.apache.flink.api.common.ExecutionConfig;
56-
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
57-
import org.apache.flink.api.common.time.Time;
5849
import org.apache.flink.api.common.typeinfo.TypeInformation;
5950
import org.apache.flink.api.java.tuple.Tuple2;
6051
import org.apache.flink.api.java.typeutils.RowTypeInfo;
61-
import org.apache.flink.client.program.ContextEnvironment;
62-
import org.apache.flink.configuration.Configuration;
6352
import com.google.common.base.Strings;
6453
import com.google.common.collect.Lists;
6554
import com.google.common.collect.Maps;
6655
import com.google.common.collect.Sets;
6756
import com.fasterxml.jackson.databind.ObjectMapper;
6857
import org.apache.flink.streaming.api.datastream.DataStream;
69-
import org.apache.flink.streaming.api.environment.StreamContextEnvironment;
7058
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
7159
import org.apache.flink.table.api.EnvironmentSettings;
7260
import org.apache.flink.table.api.Table;
@@ -76,18 +64,17 @@
7664
import org.apache.flink.types.Row;
7765
import org.slf4j.Logger;
7866
import org.slf4j.LoggerFactory;
67+
7968
import java.io.File;
80-
import java.lang.reflect.Field;
8169
import java.lang.reflect.InvocationTargetException;
82-
import java.lang.reflect.Method;
8370
import java.net.URL;
8471
import java.net.URLClassLoader;
8572
import java.net.URLDecoder;
8673
import java.util.List;
8774
import java.util.Map;
8875
import java.util.Properties;
8976
import java.util.Set;
90-
import java.util.concurrent.TimeUnit;
77+
9178
import com.dtstack.flink.sql.option.Options;
9279

9380
/**
@@ -122,7 +109,7 @@ public static void main(String[] args) throws Exception {
122109
SqlParser.setLocalSqlPluginRoot(localSqlPluginPath);
123110

124111
List<String> addJarFileList = Lists.newArrayList();
125-
if(!Strings.isNullOrEmpty(addJarListStr)){
112+
if (!Strings.isNullOrEmpty(addJarListStr)) {
126113
addJarListStr = URLDecoder.decode(addJarListStr, Charsets.UTF_8.name());
127114
addJarFileList = objMapper.readValue(addJarListStr, List.class);
128115
}
@@ -136,7 +123,7 @@ public static void main(String[] args) throws Exception {
136123
SqlTree sqlTree = SqlParser.parseSql(sql);
137124

138125
//Get External jar to load
139-
for(String addJarPath : addJarFileList){
126+
for (String addJarPath : addJarFileList) {
140127
File tmpFile = new File(addJarPath);
141128
jarURList.add(tmpFile.toURI().toURL());
142129
}
@@ -145,28 +132,30 @@ public static void main(String[] args) throws Exception {
145132
Map<String, Table> registerTableCache = Maps.newHashMap();
146133

147134
//register udf
148-
registerUDF(sqlTree, jarURList, tableEnv);
135+
registerUserDefinedFunction(sqlTree, jarURList, tableEnv);
149136
//register table schema
150-
registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
137+
Set<URL> classPathSets = registerTable(sqlTree, env, tableEnv, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode, sideTableMap, registerTableCache);
138+
// cache classPathSets
139+
registerPluginUrlToCachedFile(env, classPathSets);
151140

152-
sqlTranslation(localSqlPluginPath, tableEnv,sqlTree,sideTableMap,registerTableCache);
141+
sqlTranslation(localSqlPluginPath, tableEnv, sqlTree, sideTableMap, registerTableCache);
153142

154-
if(env instanceof MyLocalStreamEnvironment) {
143+
if (env instanceof MyLocalStreamEnvironment) {
155144
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
156145
}
157146

158147
env.execute(name);
159148
}
160149

161-
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv,SqlTree sqlTree,Map<String, SideTableInfo> sideTableMap,Map<String, Table> registerTableCache) throws Exception {
150+
private static void sqlTranslation(String localSqlPluginPath, StreamTableEnvironment tableEnv, SqlTree sqlTree, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
162151
SideSqlExec sideSqlExec = new SideSqlExec();
163152
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
164153
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
165154
sideSqlExec.registerTmpTable(result, sideTableMap, tableEnv, registerTableCache);
166155
}
167156

168157
for (InsertSqlParser.SqlParseResult result : sqlTree.getExecSqlList()) {
169-
if(LOG.isInfoEnabled()){
158+
if (LOG.isInfoEnabled()) {
170159
LOG.info("exe-sql:\n" + result.getExecSql());
171160
}
172161
boolean isSide = false;
@@ -180,18 +169,18 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
180169
tmp.setExecSql(tmpSql);
181170
sideSqlExec.registerTmpTable(tmp, sideTableMap, tableEnv, registerTableCache);
182171
} else {
183-
for(String sourceTable : result.getSourceTableList()){
184-
if(sideTableMap.containsKey(sourceTable)){
172+
for (String sourceTable : result.getSourceTableList()) {
173+
if (sideTableMap.containsKey(sourceTable)) {
185174
isSide = true;
186175
break;
187176
}
188177
}
189-
if(isSide){
178+
if (isSide) {
190179
//sql-dimensional table contains the dimension table of execution
191180
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache);
192-
}else{
181+
} else {
193182
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
194-
if(LOG.isInfoEnabled()){
183+
if (LOG.isInfoEnabled()) {
195184
LOG.info("exec sql: " + result.getExecSql());
196185
}
197186
}
@@ -202,26 +191,38 @@ private static void sqlTranslation(String localSqlPluginPath, StreamTableEnviron
202191

203192
}
204193

205-
private static void registerUDF(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
206-
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
207-
//register urf
194+
195+
private static void registerUserDefinedFunction(SqlTree sqlTree, List<URL> jarURList, TableEnvironment tableEnv)
196+
throws IllegalAccessException, InvocationTargetException {
208197
// udf和tableEnv须由同一个类加载器加载
209198
ClassLoader levelClassLoader = tableEnv.getClass().getClassLoader();
210199
URLClassLoader classLoader = null;
211200
List<CreateFuncParser.SqlParserResult> funcList = sqlTree.getFunctionList();
212201
for (CreateFuncParser.SqlParserResult funcInfo : funcList) {
213202
//classloader
214203
if (classLoader == null) {
215-
classLoader = FlinkUtil.loadExtraJar(jarURList, (URLClassLoader)levelClassLoader);
204+
classLoader = ClassLoaderManager.loadExtraJar(jarURList, (URLClassLoader) levelClassLoader);
216205
}
217-
FlinkUtil.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
206+
FunctionManager.registerUDF(funcInfo.getType(), funcInfo.getClassName(), funcInfo.getName(), tableEnv, classLoader);
218207
}
219208
}
220209

221-
222-
private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
223-
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
224-
Set<URL> classPathSet = Sets.newHashSet();
210+
/**
211+
* 向Flink注册源表和结果表,返回执行时插件包的全路径
212+
* @param sqlTree
213+
* @param env
214+
* @param tableEnv
215+
* @param localSqlPluginPath
216+
* @param remoteSqlPluginPath
217+
* @param pluginLoadMode 插件加载模式 classpath or shipfile
218+
* @param sideTableMap
219+
* @param registerTableCache
220+
* @return
221+
* @throws Exception
222+
*/
223+
private static Set<URL> registerTable(SqlTree sqlTree, StreamExecutionEnvironment env, StreamTableEnvironment tableEnv, String localSqlPluginPath,
224+
String remoteSqlPluginPath, String pluginLoadMode, Map<String, SideTableInfo> sideTableMap, Map<String, Table> registerTableCache) throws Exception {
225+
Set<URL> pluginClassPatshSets = Sets.newHashSet();
225226
WaterMarkerAssigner waterMarkerAssigner = new WaterMarkerAssigner();
226227
for (TableInfo tableInfo : sqlTree.getTableInfoMap().values()) {
227228

@@ -237,63 +238,63 @@ private static void registerTable(SqlTree sqlTree, StreamExecutionEnvironment en
237238

238239
RowTypeInfo typeInfo = new RowTypeInfo(adaptTable.getSchema().getFieldTypes(), adaptTable.getSchema().getFieldNames());
239240
DataStream adaptStream = tableEnv.toRetractStream(adaptTable, typeInfo)
240-
.map((Tuple2<Boolean, Row> f0) -> { return f0.f1; })
241+
.map((Tuple2<Boolean, Row> f0) -> {
242+
return f0.f1;
243+
})
241244
.returns(typeInfo);
242245

243246
String fields = String.join(",", typeInfo.getFieldNames());
244247

245-
if(waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)){
248+
if (waterMarkerAssigner.checkNeedAssignWaterMarker(sourceTableInfo)) {
246249
adaptStream = waterMarkerAssigner.assignWaterMarker(adaptStream, typeInfo, sourceTableInfo);
247250
fields += ",ROWTIME.ROWTIME";
248-
}else{
251+
} else {
249252
fields += ",PROCTIME.PROCTIME";
250253
}
251254

252255
Table regTable = tableEnv.fromDataStream(adaptStream, fields);
253256
tableEnv.registerTable(tableInfo.getName(), regTable);
254-
if(LOG.isInfoEnabled()){
257+
if (LOG.isInfoEnabled()) {
255258
LOG.info("registe table {} success.", tableInfo.getName());
256259
}
257260
registerTableCache.put(tableInfo.getName(), regTable);
258-
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
259-
} else if (tableInfo instanceof TargetTableInfo) {
260261

262+
URL sourceTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), SourceTableInfo.SOURCE_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
263+
pluginClassPatshSets.add(sourceTablePathUrl);
264+
} else if (tableInfo instanceof TargetTableInfo) {
261265
TableSink tableSink = StreamSinkFactory.getTableSink((TargetTableInfo) tableInfo, localSqlPluginPath);
262-
TypeInformation[] flinkTypes = FlinkUtil.transformTypes(tableInfo.getFieldClasses());
266+
TypeInformation[] flinkTypes = FunctionManager.transformTypes(tableInfo.getFieldClasses());
263267
tableEnv.registerTableSink(tableInfo.getName(), tableInfo.getFields(), flinkTypes, tableSink);
264-
classPathSet.add(buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
265-
} else if(tableInfo instanceof SideTableInfo){
266268

269+
URL sinkTablePathUrl = PluginUtil.buildSourceAndSinkPathByLoadMode(tableInfo.getType(), TargetTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
270+
pluginClassPatshSets.add(sinkTablePathUrl);
271+
} else if (tableInfo instanceof SideTableInfo) {
267272
String sideOperator = ECacheType.ALL.name().equals(((SideTableInfo) tableInfo).getCacheType()) ? "all" : "async";
268273
sideTableMap.put(tableInfo.getName(), (SideTableInfo) tableInfo);
269-
classPathSet.add(buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode));
270-
}else {
274+
275+
URL sideTablePathUrl = PluginUtil.buildSidePathByLoadMode(tableInfo.getType(), sideOperator, SideTableInfo.TARGET_SUFFIX, localSqlPluginPath, remoteSqlPluginPath, pluginLoadMode);
276+
pluginClassPatshSets.add(sideTablePathUrl);
277+
} else {
271278
throw new RuntimeException("not support table type:" + tableInfo.getType());
272279
}
273280
}
281+
return pluginClassPatshSets;
282+
}
274283

284+
/**
285+
* 将job依赖的插件包路径存储到cacheFile,在外围将插件包路径传递给jobgraph
286+
* @param env
287+
* @param classPathSet
288+
*/
289+
private static void registerPluginUrlToCachedFile(StreamExecutionEnvironment env, Set<URL> classPathSet) {
275290
int i = 0;
276-
for(URL url : classPathSet){
291+
for (URL url : classPathSet) {
277292
String classFileName = String.format(CLASS_FILE_NAME_FMT, i);
278-
env.registerCachedFile(url.getPath(), classFileName, true);
293+
env.registerCachedFile(url.getPath(), classFileName, true);
279294
i++;
280295
}
281296
}
282297

283-
private static URL buildSourceAndSinkPathByLoadMode(String type, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
284-
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
285-
return PluginUtil.getRemoteJarFilePath(type, suffix, remoteSqlPluginPath, localSqlPluginPath);
286-
}
287-
return PluginUtil.getLocalJarFilePath(type, suffix, localSqlPluginPath);
288-
}
289-
290-
private static URL buildSidePathByLoadMode(String type, String operator, String suffix, String localSqlPluginPath, String remoteSqlPluginPath, String pluginLoadMode) throws Exception {
291-
if (StringUtils.equalsIgnoreCase(pluginLoadMode, EPluginLoadMode.CLASSPATH.name())) {
292-
return PluginUtil.getRemoteSideJarFilePath(type, operator, suffix, remoteSqlPluginPath, localSqlPluginPath);
293-
}
294-
return PluginUtil.getLocalSideJarFilePath(type, operator, suffix, localSqlPluginPath);
295-
}
296-
297298
private static StreamExecutionEnvironment getStreamExeEnv(Properties confProperties, String deployMode) throws Exception {
298299
StreamExecutionEnvironment env = !ClusterMode.local.name().equals(deployMode) ?
299300
StreamExecutionEnvironment.getExecutionEnvironment() :

core/src/main/java/com/dtstack/flink/sql/classloader/ClassLoaderManager.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919
package com.dtstack.flink.sql.classloader;
2020

2121
import com.dtstack.flink.sql.util.PluginUtil;
22+
import com.dtstack.flink.sql.util.ReflectionUtils;
2223
import org.apache.commons.lang3.StringUtils;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

27+
import java.lang.reflect.InvocationTargetException;
28+
import java.lang.reflect.Method;
2629
import java.net.URL;
30+
import java.net.URLClassLoader;
2731
import java.util.ArrayList;
2832
import java.util.Arrays;
2933
import java.util.Comparator;
@@ -91,4 +95,27 @@ public static List<URL> getClassPath() {
9195
}
9296
return classPaths;
9397
}
98+
99+
100+
101+
public static URLClassLoader loadExtraJar(List<URL> jarURLList, URLClassLoader classLoader)
102+
throws IllegalAccessException, InvocationTargetException {
103+
104+
for(URL url : jarURLList){
105+
if(url.toString().endsWith(".jar")){
106+
urlClassLoaderAddUrl(classLoader, url);
107+
}
108+
}
109+
return classLoader;
110+
}
111+
112+
private static void urlClassLoaderAddUrl(URLClassLoader classLoader, URL url) throws InvocationTargetException, IllegalAccessException {
113+
Method method = ReflectionUtils.getDeclaredMethod(classLoader, "addURL", URL.class);
114+
115+
if (method == null) {
116+
throw new RuntimeException("can't not find declared method addURL, curr classLoader is " + classLoader.getClass());
117+
}
118+
method.setAccessible(true);
119+
method.invoke(classLoader, url);
120+
}
94121
}

0 commit comments

Comments
 (0)