Skip to content

Commit 09b84a1

Browse files
author
xuchao
committed
初步合并1.8 ==> 1.10
1 parent e346a80 commit 09b84a1

4 files changed

Lines changed: 22 additions & 24 deletions

File tree

core/src/main/java/com/dtstack/flink/sql/environment/MyLocalStreamEnvironment.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,11 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
101101

102102
JobGraph jobGraph = streamGraph.getJobGraph();
103103
jobGraph.setClasspaths(classpaths);
104-
jobGraph.setAllowQueuedScheduling(true);
105104

106105
Configuration configuration = new Configuration();
107106
configuration.addAll(jobGraph.getJobConfiguration());
108107

109-
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "512M");
108+
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "512M");
110109
configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
111110

112111
// add (and override) the settings with what the user defined

core/src/main/java/com/dtstack/flink/sql/exec/ExecuteProcessHelper.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package com.dtstack.flink.sql.exec;
2020

21-
import com.dtstack.flink.sql.config.CalciteConfig;
2221
import com.dtstack.flink.sql.parser.CreateFuncParser;
2322
import com.dtstack.flink.sql.parser.CreateTmpTableParser;
2423
import com.dtstack.flink.sql.parser.FlinkPlanner;
@@ -35,7 +34,6 @@
3534
import org.apache.flink.table.api.Table;
3635
import org.apache.flink.table.api.TableEnvironment;
3736
import org.apache.flink.table.api.java.StreamTableEnvironment;
38-
import org.apache.flink.table.calcite.FlinkPlannerImpl;
3937
import org.apache.flink.table.sinks.TableSink;
4038
import org.apache.flink.types.Row;
4139

@@ -151,7 +149,6 @@ public static boolean checkRemoteSqlPluginPath(String remoteSqlPluginPath, Strin
151149
public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInfo) throws Exception {
152150
StreamExecutionEnvironment env = ExecuteProcessHelper.getStreamExeEnv(paramsInfo.getConfProp(), paramsInfo.getDeployMode());
153151
StreamTableEnvironment tableEnv = getStreamTableEnv(env, paramsInfo.getConfProp());
154-
StreamQueryConfig streamQueryConfig = StreamEnvConfigManager.getStreamQueryConfig(tableEnv, paramsInfo.getConfProp());
155152

156153
SqlParser.setLocalSqlPluginRoot(paramsInfo.getLocalSqlPluginPath());
157154
SqlTree sqlTree = SqlParser.parseSql(paramsInfo.getSql());
@@ -167,7 +164,7 @@ public static StreamExecutionEnvironment getStreamExecution(ParamsInfo paramsInf
167164
// cache classPathSets
168165
ExecuteProcessHelper.registerPluginUrlToCachedFile(env, classPathSets);
169166

170-
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache, streamQueryConfig);
167+
ExecuteProcessHelper.sqlTranslation(paramsInfo.getLocalSqlPluginPath(), tableEnv, sqlTree, sideTableMap, registerTableCache);
171168

172169
if (env instanceof MyLocalStreamEnvironment) {
173170
((MyLocalStreamEnvironment) env).setClasspaths(ClassLoaderManager.getClassPath());
@@ -193,15 +190,14 @@ public static List<URL> getExternalJarUrls(String addJarListStr) throws java.io.
193190
private static void sqlTranslation(String localSqlPluginPath,
194191
StreamTableEnvironment tableEnv,
195192
SqlTree sqlTree,Map<String, AbstractSideTableInfo> sideTableMap,
196-
Map<String, Table> registerTableCache,
197-
StreamQueryConfig queryConfig) throws Exception {
193+
Map<String, Table> registerTableCache) throws Exception {
198194

199195
SideSqlExec sideSqlExec = new SideSqlExec();
200196
sideSqlExec.setLocalSqlPluginPath(localSqlPluginPath);
201197

202198
int scope = 0;
203199
for (CreateTmpTableParser.SqlParserResult result : sqlTree.getTmpSqlList()) {
204-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, result, scope + "");
200+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, result, scope + "");
205201
scope++;
206202
}
207203

@@ -218,7 +214,7 @@ private static void sqlTranslation(String localSqlPluginPath,
218214
SqlNode sqlNode = org.apache.calcite.sql.parser.SqlParser.create(realSql, CalciteConfig.MYSQL_LEX_CONFIG).parseStmt();
219215
String tmpSql = ((SqlInsert) sqlNode).getSource().toString();
220216
tmp.setExecSql(tmpSql);
221-
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, tmp, scope + "");
217+
sideSqlExec.exec(tmp.getExecSql(), sideTableMap, tableEnv, registerTableCache, tmp, scope + "");
222218
} else {
223219
for (String sourceTable : result.getSourceTableList()) {
224220
if (sideTableMap.containsKey(sourceTable)) {
@@ -228,11 +224,11 @@ private static void sqlTranslation(String localSqlPluginPath,
228224
}
229225
if (isSide) {
230226
//sql-dimensional table contains the dimension table of execution
231-
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, queryConfig, null, null);
227+
sideSqlExec.exec(result.getExecSql(), sideTableMap, tableEnv, registerTableCache, null, null);
232228
} else {
233229
LOG.info("----------exec sql without dimension join-----------");
234230
LOG.info("----------real sql exec is--------------------------\n{}", result.getExecSql());
235-
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql(), queryConfig);
231+
FlinkSQLExec.sqlUpdate(tableEnv, result.getExecSql());
236232
if (LOG.isInfoEnabled()) {
237233
LOG.info("exec sql: " + result.getExecSql());
238234
}

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,16 @@
2929
import com.google.common.collect.Lists;
3030
import com.google.common.collect.Maps;
3131
import org.apache.calcite.sql.JoinType;
32+
import org.apache.commons.collections.MapUtils;
33+
import org.apache.flink.api.common.functions.RuntimeContext;
3234
import org.apache.flink.api.java.tuple.Tuple2;
3335
import org.apache.flink.configuration.Configuration;
3436
import org.apache.flink.metrics.Counter;
3537
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3638
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
37-
import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
39+
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
40+
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
41+
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
3842
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
3943
import org.apache.flink.types.Row;
4044
import org.slf4j.Logger;
@@ -165,14 +169,14 @@ public void timeout(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>>
165169
resultFuture.complete(null);
166170
}
167171

168-
protected void preInvoke(CRow input, ResultFuture<CRow> resultFuture){
172+
protected void preInvoke(Tuple2<Boolean,Row> input, ResultFuture<Tuple2<Boolean,Row>> resultFuture){
169173
ScheduledFuture<?> timeFuture = registerTimer(input, resultFuture);
170174
cancelTimerWhenComplete(resultFuture, timeFuture);
171175
}
172176

173177
@Override
174-
public void asyncInvoke(CRow row, ResultFuture<CRow> resultFuture) throws Exception {
175-
CRow input = new CRow(Row.copy(row.row()), row.change());
178+
public void asyncInvoke(Tuple2<Boolean,Row> row, ResultFuture<Tuple2<Boolean,Row>> resultFuture) throws Exception {
179+
Tuple2<Boolean,Row> input = Tuple2.of(row.f0, Row.copy(row.f1));
176180
preInvoke(input, resultFuture);
177181
Map<String, Object> inputParams = parseInputParam(input);
178182
if(MapUtils.isEmpty(inputParams)){
@@ -186,11 +190,11 @@ public void asyncInvoke(CRow row, ResultFuture<CRow> resultFuture) throws Except
186190
handleAsyncInvoke(inputParams, input, resultFuture);
187191
}
188192

189-
private Map<String, Object> parseInputParam(CRow input){
193+
private Map<String, Object> parseInputParam(Tuple2<Boolean,Row> input){
190194
Map<String, Object> inputParams = Maps.newHashMap();
191195
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
192196
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
193-
Object equalObj = input.row().getField(conValIndex);
197+
Object equalObj = input.f1.getField(conValIndex);
194198
if(equalObj == null){
195199
return inputParams;
196200
}
@@ -213,17 +217,17 @@ private void invokeWithCache(Map<String, Object> inputParams, Tuple2<Boolean,Row
213217
return;
214218
}else if(ECacheContentType.SingleLine == val.getType()){
215219
try {
216-
Row row = fillData(input.f0, val.getContent());
217-
resultFuture.complete(Collections.singleton(new Tuple2<Boolean,Row>(row, input.change())));
220+
Row row = fillData(input.f1, val.getContent());
221+
resultFuture.complete(Collections.singleton(Tuple2.of(input.f0, row)));
218222
} catch (Exception e) {
219223
dealFillDataError(input, resultFuture, e);
220224
}
221225
} else if (ECacheContentType.MultiLine == val.getType()) {
222226
try {
223227
List<Tuple2<Boolean,Row>> rowList = Lists.newArrayList();
224228
for (Object one : (List) val.getContent()) {
225-
Row row = fillData(input.row(), one);
226-
rowList.add(new Tuple2<Boolean,Row>(row, input.change()));
229+
Row row = fillData(input.f1, one);
230+
rowList.add(Tuple2.of(input.f0, row));
227231
}
228232
resultFuture.complete(rowList);
229233
} catch (Exception e) {

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ public void exec(String sql,
9191
Map<String, AbstractSideTableInfo> sideTableMap,
9292
StreamTableEnvironment tableEnv,
9393
Map<String, Table> tableCache,
94-
StreamQueryConfig queryConfig,
9594
CreateTmpTableParser.SqlParserResult createView,
9695
String scope) throws Exception {
9796
if(localSqlPluginPath == null){
@@ -123,7 +122,7 @@ public void exec(String sql,
123122

124123

125124
if(pollSqlNode.getKind() == INSERT){
126-
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString(), queryConfig);
125+
FlinkSQLExec.sqlUpdate(tableEnv, pollSqlNode.toString());
127126
if(LOG.isInfoEnabled()){
128127
LOG.info("----------real exec sql-----------\n{}", pollSqlNode.toString());
129128
}

0 commit comments

Comments
 (0)