Skip to content

Commit 762bcf2

Browse files
author
xuchao
committed
Merge remote-tracking branch 'origin/v1.10.0_dev' into v1.10.0_dev
2 parents 73521c2 + 70d46a1 commit 762bcf2

11 files changed

Lines changed: 89 additions & 122 deletions

File tree

cassandra/cassandra-side/cassandra-async-side/src/main/java/com/dtstack/flink/sql/side/cassandra/CassandraAsyncReqRow.java

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,46 +19,32 @@
1919

2020
package com.dtstack.flink.sql.side.cassandra;
2121

22-
import com.dtstack.flink.sql.util.RowDataComplete;
23-
import org.apache.flink.api.java.tuple.Tuple2;
24-
import org.apache.flink.api.java.typeutils.RowTypeInfo;
25-
import org.apache.flink.configuration.Configuration;
26-
import org.apache.flink.streaming.api.functions.async.ResultFuture;
27-
import org.apache.flink.table.dataformat.BaseRow;
28-
import org.apache.flink.types.Row;
29-
30-
import com.datastax.driver.core.Cluster;
31-
import com.datastax.driver.core.ConsistencyLevel;
32-
import com.datastax.driver.core.HostDistance;
33-
import com.datastax.driver.core.PoolingOptions;
34-
import com.datastax.driver.core.QueryOptions;
35-
import com.datastax.driver.core.ResultSet;
36-
import com.datastax.driver.core.Session;
37-
import com.datastax.driver.core.SocketOptions;
22+
import com.datastax.driver.core.*;
3823
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
3924
import com.datastax.driver.core.policies.RetryPolicy;
4025
import com.dtstack.flink.sql.enums.ECacheContentType;
41-
import com.dtstack.flink.sql.side.BaseAsyncReqRow;
42-
import com.dtstack.flink.sql.side.CacheMissVal;
43-
import com.dtstack.flink.sql.side.FieldInfo;
44-
import com.dtstack.flink.sql.side.JoinInfo;
45-
import com.dtstack.flink.sql.side.AbstractSideTableInfo;
26+
import com.dtstack.flink.sql.side.*;
4627
import com.dtstack.flink.sql.side.cache.CacheObj;
4728
import com.dtstack.flink.sql.side.cassandra.table.CassandraSideTableInfo;
29+
import com.dtstack.flink.sql.util.RowDataComplete;
4830
import com.google.common.base.Function;
4931
import com.google.common.collect.Lists;
5032
import com.google.common.util.concurrent.AsyncFunction;
5133
import com.google.common.util.concurrent.FutureCallback;
5234
import com.google.common.util.concurrent.Futures;
5335
import com.google.common.util.concurrent.ListenableFuture;
54-
import io.vertx.core.json.JsonArray;
5536
import org.apache.commons.lang3.StringUtils;
37+
import org.apache.flink.api.java.typeutils.RowTypeInfo;
38+
import org.apache.flink.configuration.Configuration;
39+
import org.apache.flink.streaming.api.functions.async.ResultFuture;
40+
import org.apache.flink.table.dataformat.BaseRow;
41+
import org.apache.flink.types.Row;
5642
import org.slf4j.Logger;
5743
import org.slf4j.LoggerFactory;
5844

5945
import java.net.InetAddress;
60-
import java.sql.Timestamp;
6146
import java.util.ArrayList;
47+
import java.util.Collections;
6248
import java.util.List;
6349
import java.util.Map;
6450

@@ -211,7 +197,7 @@ public void onSuccess(List<com.datastax.driver.core.Row> rows) {
211197
if (openCache()) {
212198
putCache(key, CacheMissVal.getMissKeyObj());
213199
}
214-
resultFuture.complete(null);
200+
resultFuture.complete(Collections.EMPTY_LIST);
215201
}
216202
}
217203

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,21 @@
2525
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2626
import com.dtstack.flink.sql.side.cache.CacheObj;
2727
import com.dtstack.flink.sql.side.cache.LRUSideCache;
28-
import com.dtstack.flink.sql.util.RowDataComplete;
29-
import com.dtstack.flink.sql.util.RowDataConvert;
3028
import com.dtstack.flink.sql.util.ReflectionUtils;
29+
import com.dtstack.flink.sql.util.RowDataComplete;
3130
import com.google.common.collect.Lists;
3231
import com.google.common.collect.Maps;
3332
import org.apache.calcite.sql.JoinType;
3433
import org.apache.commons.collections.MapUtils;
3534
import org.apache.flink.api.common.functions.RuntimeContext;
36-
import org.apache.flink.api.java.tuple.Tuple2;
3735
import org.apache.flink.configuration.Configuration;
3836
import org.apache.flink.metrics.Counter;
3937
import org.apache.flink.streaming.api.functions.async.ResultFuture;
4038
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
41-
import org.apache.flink.table.dataformat.BaseRow;
4239
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
4340
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
4441
import org.apache.flink.table.api.DataTypes;
42+
import org.apache.flink.table.dataformat.BaseRow;
4543
import org.apache.flink.types.Row;
4644
import org.slf4j.Logger;
4745
import org.slf4j.LoggerFactory;
@@ -145,7 +143,7 @@ protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
145143
dealFillDataError(input, resultFuture, e);
146144
}
147145
} else {
148-
resultFuture.complete(null);
146+
resultFuture.complete(Collections.EMPTY_LIST);
149147
}
150148
}
151149

@@ -163,14 +161,14 @@ public void timeout(Row input, ResultFuture<BaseRow> resultFuture) throws Except
163161
}
164162
timeOutNum++;
165163
if (sideInfo.getJoinType() == JoinType.LEFT) {
166-
resultFuture.complete(null);
164+
resultFuture.complete(Collections.EMPTY_LIST);
167165
return;
168166
}
169167
if (timeOutNum > sideInfo.getSideTableInfo().getAsyncFailMaxNum(Long.MAX_VALUE)) {
170168
resultFuture.completeExceptionally(new Exception("Async function call timedoutNum beyond limit."));
171169
return;
172170
}
173-
resultFuture.complete(null);
171+
resultFuture.complete(Collections.EMPTY_LIST);
174172
}
175173

176174
protected void preInvoke(Row input, ResultFuture<BaseRow> resultFuture)

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/AbstractRowKeyModeDealer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,8 @@
2323
import com.dtstack.flink.sql.side.FieldInfo;
2424
import com.dtstack.flink.sql.side.cache.AbstractSideCache;
2525
import com.dtstack.flink.sql.util.RowDataComplete;
26-
import org.apache.calcite.sql.JoinType;
2726
import com.google.common.collect.Maps;
28-
import org.apache.flink.api.java.tuple.Tuple2;
27+
import org.apache.calcite.sql.JoinType;
2928
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3029
import org.apache.flink.table.dataformat.BaseRow;
3130
import org.apache.flink.types.Row;
@@ -85,7 +84,7 @@ protected void dealMissKey(Row input, ResultFuture<BaseRow> resultFuture) {
8584
resultFuture.completeExceptionally(e);
8685
}
8786
} else {
88-
resultFuture.complete(null);
87+
resultFuture.complete(Collections.EMPTY_LIST);
8988
}
9089
}
9190

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/PreRowKeyModeDealerDealer.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,18 @@
2727
import com.dtstack.flink.sql.side.cache.CacheObj;
2828
import com.dtstack.flink.sql.side.hbase.utils.HbaseUtils;
2929
import com.dtstack.flink.sql.util.RowDataComplete;
30+
import com.google.common.collect.Lists;
3031
import com.google.common.collect.Maps;
3132
import org.apache.calcite.sql.JoinType;
32-
import com.google.common.collect.Lists;
33-
import org.apache.flink.api.java.tuple.Tuple2;
3433
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3534
import org.apache.flink.table.dataformat.BaseRow;
3635
import org.apache.flink.types.Row;
37-
import org.hbase.async.BinaryPrefixComparator;
38-
import org.hbase.async.Bytes;
39-
import org.hbase.async.CompareFilter;
40-
import org.hbase.async.HBaseClient;
41-
import org.hbase.async.KeyValue;
42-
import org.hbase.async.RowFilter;
43-
import org.hbase.async.ScanFilter;
44-
import org.hbase.async.Scanner;
36+
import org.hbase.async.*;
4537
import org.slf4j.Logger;
4638
import org.slf4j.LoggerFactory;
4739

4840
import java.util.ArrayList;
41+
import java.util.Collections;
4942
import java.util.List;
5043
import java.util.Map;
5144

@@ -130,7 +123,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
130123
resultFuture.completeExceptionally(e);
131124
}
132125
} catch (Exception e) {
133-
resultFuture.complete(null);
126+
resultFuture.complete(Collections.EMPTY_LIST);
134127
LOG.error("record:" + input);
135128
LOG.error("get side record exception:", e);
136129
}
@@ -150,7 +143,7 @@ private String dealOneRow(ArrayList<ArrayList<KeyValue>> args, String rowKeyStr,
150143
private String dealFail(Object arg2, Row input, ResultFuture<BaseRow> resultFuture){
151144
LOG.error("record:" + input);
152145
LOG.error("get side record exception:" + arg2);
153-
resultFuture.complete(null);
146+
resultFuture.complete(Collections.EMPTY_LIST);
154147
return "";
155148
}
156149
}

hbase/hbase-side/hbase-async-side/src/main/java/com/dtstack/flink/sql/side/hbase/rowkeydealer/RowKeyEqualModeDealer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@
2727
import com.dtstack.flink.sql.side.cache.CacheObj;
2828
import com.dtstack.flink.sql.side.hbase.utils.HbaseUtils;
2929
import com.dtstack.flink.sql.util.RowDataComplete;
30+
import com.google.common.collect.Lists;
3031
import com.google.common.collect.Maps;
3132
import org.apache.calcite.sql.JoinType;
32-
import com.google.common.collect.Lists;
33-
import org.apache.flink.api.java.tuple.Tuple2;
3433
import org.apache.flink.streaming.api.functions.async.ResultFuture;
3534
import org.apache.flink.table.dataformat.BaseRow;
3635
import org.apache.flink.types.Row;
@@ -118,7 +117,7 @@ public void asyncGetData(String tableName, String rowKeyStr, Row input, ResultFu
118117
}, arg2 -> {
119118
LOG.error("record:" + input);
120119
LOG.error("get side record exception:" + arg2);
121-
resultFuture.complete(null);
120+
resultFuture.complete(Collections.EMPTY_LIST);
122121
return "";
123122
});
124123
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/LauncherMain.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
7171
String udfJar = launcherOptions.getAddjar();
7272
String queue = launcherOptions.getQueue();
7373
String pluginLoadMode = launcherOptions.getPluginLoadMode();
74+
String addShipfile = launcherOptions.getAddShipfile();
7475

7576
String yarnSessionConf = URLDecoder.decode(launcherOptions.getYarnSessionConf(), Charsets.UTF_8.toString());
7677
Properties yarnSessionConfProperties = PluginUtil.jsonStrToObject(yarnSessionConf, Properties.class);
@@ -91,6 +92,7 @@ public static JobParamsInfo parseArgs(String[] args) throws Exception {
9192
.setFlinkJarPath(flinkJarPath)
9293
.setPluginLoadMode(pluginLoadMode)
9394
.setQueue(queue)
95+
.setAddShipfile(addShipfile)
9496
.build();
9597
}
9698

launcher/src/main/java/com/dtstack/flink/sql/launcher/entity/JobParamsInfo.java

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,31 @@
2222
import java.util.Properties;
2323

2424
/**
25-
* parse the original mission parameters
25+
* parse the original mission parameters
2626
* Date: 2020/3/4
2727
* Company: www.dtstack.com
28+
*
2829
* @author maqi
2930
*/
3031
public class JobParamsInfo {
3132

32-
private String mode;
33-
private String name;
34-
private String queue;
35-
private String localPluginRoot;
36-
private String flinkConfDir;
37-
private String flinkJarPath;
38-
private String yarnConfDir;
39-
private String pluginLoadMode;
40-
private String udfJar;
41-
private String[] execArgs;
42-
private Properties confProperties;
43-
private Properties yarnSessionConfProperties;
33+
private final String mode;
34+
private final String name;
35+
private final String queue;
36+
private final String localPluginRoot;
37+
private final String flinkConfDir;
38+
private final String flinkJarPath;
39+
private final String yarnConfDir;
40+
private final String pluginLoadMode;
41+
private final String udfJar;
42+
private final String[] execArgs;
43+
private final Properties confProperties;
44+
private final Properties yarnSessionConfProperties;
45+
private final String addShipFile;
4446

4547
private JobParamsInfo(String mode, String name, String queue, String localPluginRoot, String flinkConfDir, String yarnConfDir,
4648
String pluginLoadMode, String[] execArgs, Properties confProperties, Properties yarnSessionConfProperties,
47-
String udfJar, String flinkJarPath) {
49+
String udfJar, String flinkJarPath, String addShipFile) {
4850
this.mode = mode;
4951
this.name = name;
5052
this.queue = queue;
@@ -57,6 +59,7 @@ private JobParamsInfo(String mode, String name, String queue, String localPlugin
5759
this.yarnSessionConfProperties = yarnSessionConfProperties;
5860
this.udfJar = udfJar;
5961
this.flinkJarPath = flinkJarPath;
62+
this.addShipFile = addShipFile;
6063
}
6164

6265
public String getMode() {
@@ -107,6 +110,10 @@ public String getFlinkJarPath() {
107110
return flinkJarPath;
108111
}
109112

113+
public String getAddShipFile() {
114+
return addShipFile;
115+
}
116+
110117
public static JobParamsInfo.Builder builder() {
111118
return new JobParamsInfo.Builder();
112119
}
@@ -125,6 +132,7 @@ public static class Builder {
125132
private String udfJar;
126133
private Properties confProperties;
127134
private Properties yarnSessionConfProperties;
135+
private String addShipfile;
128136

129137
public JobParamsInfo.Builder setMode(String mode) {
130138
this.mode = mode;
@@ -186,9 +194,15 @@ public JobParamsInfo.Builder setFlinkJarPath(String flinkJarPath) {
186194
return this;
187195
}
188196

197+
public JobParamsInfo.Builder setAddShipfile(String addShipfile) {
198+
this.addShipfile = addShipfile;
199+
return this;
200+
}
201+
189202
public JobParamsInfo build() {
190203
return new JobParamsInfo(mode, name, queue, localPluginRoot, flinkConfDir,
191-
yarnConfDir, pluginLoadMode, execArgs, confProperties, yarnSessionConfProperties, udfJar, flinkJarPath);
204+
yarnConfDir, pluginLoadMode, execArgs, confProperties,
205+
yarnSessionConfProperties, udfJar, flinkJarPath, addShipfile);
192206
}
193207
}
194208

@@ -207,6 +221,7 @@ public String toString() {
207221
", execArgs=" + Arrays.toString(execArgs) +
208222
", confProperties=" + confProperties +
209223
", yarnSessionConfProperties=" + yarnSessionConfProperties +
224+
", addShipFile='" + addShipFile + '\'' +
210225
'}';
211226
}
212227
}

launcher/src/main/java/com/dtstack/flink/sql/launcher/executor/YarnJobClusterExecutor.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.io.File;
4040
import java.net.MalformedURLException;
4141
import java.util.ArrayList;
42+
import java.util.Arrays;
4243
import java.util.List;
4344
import java.util.Optional;
4445

@@ -74,15 +75,23 @@ public void exec() throws Exception {
7475
.createClusterDescriptor(jobParamsInfo.getYarnConfDir(), flinkConfiguration);
7576

7677
List<File> shipFiles = getShipFiles(jobParamsInfo.getFlinkJarPath(), jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor);
78+
79+
if (jobParamsInfo.getAddShipFile() != null) {
80+
List<String> addShipFilesPath = parsePathFromStr(jobParamsInfo.getAddShipFile());
81+
for (String path : addShipFilesPath) {
82+
shipFiles.addAll(getShipFiles(path, jobParamsInfo.getPluginLoadMode(), jobGraph, clusterDescriptor));
83+
}
84+
}
85+
7786
clusterDescriptor.addShipFiles(shipFiles);
7887

7988
ClusterSpecification clusterSpecification = YarnClusterClientFactory.INSTANCE.getClusterSpecification(flinkConfiguration);
8089
ClusterClientProvider<ApplicationId> applicationIdClusterClientProvider = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, true);
8190

8291
String applicationId = applicationIdClusterClientProvider.getClusterClient().getClusterId().toString();
8392
String flinkJobId = jobGraph.getJobID().toString();
84-
String tips = String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId);
85-
System.out.println(tips);
93+
94+
LOG.info(String.format("deploy per_job with appId: %s, jobId: %s", applicationId, flinkJobId));
8695
}
8796

8897
private void appendApplicationConfig(Configuration flinkConfig, JobParamsInfo jobParamsInfo) {
@@ -115,9 +124,14 @@ protected List<File> getShipFiles(String flinkJarPath, String pluginLoadMode, Jo
115124

116125
private void dealFlinkLibJar(String flinkJarPath, YarnClusterDescriptor clusterDescriptor, List<File> shipFiles) throws MalformedURLException {
117126
if (StringUtils.isEmpty(flinkJarPath) || !new File(flinkJarPath).exists()) {
118-
throw new RuntimeException("The param '-flinkJarPath' ref dir is not exist");
127+
throw new RuntimeException("path " + flinkJarPath + " is not exist");
119128
}
120129
File[] jars = new File(flinkJarPath).listFiles();
130+
131+
if (jars == null || jars.length == 0) {
132+
throw new RuntimeException(flinkJarPath + " no file exist !");
133+
}
134+
121135
for (File file : jars) {
122136
if (file.toURI().toURL().toString().contains("flink-dist")) {
123137
clusterDescriptor.setLocalJarPath(new Path(file.toURI().toURL().toString()));
@@ -161,5 +175,11 @@ private Optional<File> discoverLogConfigFile(final String configurationDirectory
161175
return logConfigFile;
162176
}
163177

178+
private static List<String> parsePathFromStr(String pathStr) {
179+
if (pathStr.length() > 2) {
180+
pathStr = pathStr.substring(1, pathStr.length() - 1).replace("\"", "");
181+
}
164182

183+
return Arrays.asList(pathStr.split(","));
184+
}
165185
}

0 commit comments

Comments
 (0)