Skip to content

Commit 9bdcfa3

Browse files
committed
Merge branch 'hotfix_1.10_4.0.x_30807' into '1.10_release_4.0.x'
Hotfix 1.10 4.0.x 30807 See merge request dt-insight-engine/flinkStreamSQL!171
2 parents eb8ad94 + 5e3a38b commit 9bdcfa3

15 files changed

Lines changed: 74 additions & 22 deletions

File tree

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5858
protected Integer readTimeoutMillis;
5959
protected Integer connectTimeoutMillis;
6060
protected Integer poolTimeoutMillis;
61+
protected Integer parallelism = 1;
62+
protected String registerTableName;
6163

6264
public CassandraSink() {
6365
// TO DO NOTHING
@@ -78,6 +80,8 @@ public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
7880
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
7981
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
8082
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
83+
this.parallelism = cassandraTableInfo.getParallelism();
84+
this.registerTableName = cassandraTableInfo.getTableName();
8185
return this;
8286
}
8387

@@ -106,7 +110,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
106110

107111
CassandraOutputFormat outputFormat = builder.finish();
108112
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
109-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
113+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
114+
.setParallelism(parallelism)
115+
.name(registerTableName);
110116
return dataStreamSink;
111117
}
112118

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@
3434
import com.google.common.collect.Lists;
3535
import com.google.common.collect.Maps;
3636
import com.google.common.collect.Sets;
37-
import org.apache.calcite.sql.*;
37+
import org.apache.calcite.sql.SqlBasicCall;
38+
import org.apache.calcite.sql.SqlIdentifier;
39+
import org.apache.calcite.sql.SqlKind;
40+
import org.apache.calcite.sql.SqlNode;
41+
import org.apache.calcite.sql.SqlSelect;
42+
import org.apache.calcite.sql.SqlWithItem;
3843
import org.apache.calcite.sql.parser.SqlParseException;
3944
import org.apache.commons.collections.CollectionUtils;
4045
import org.apache.commons.lang3.StringUtils;

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class AbstractTableInfo implements Serializable {
5959

6060
private List<String> primaryKeys;
6161

62-
private Integer parallelism = -1;
62+
private Integer parallelism = 1;
6363

6464
public String[] getFieldTypes() {
6565
return fieldTypes;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

@@ -45,6 +45,7 @@
4545
import java.util.HashMap;
4646
import java.util.List;
4747
import java.util.Map;
48+
import java.util.Objects;
4849

4950
/**
5051
* table output elastic5plugin
@@ -75,7 +76,9 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
7576

7677
private TypeInformation[] fieldTypes;
7778

78-
private int parallelism = -1;
79+
private int parallelism = 1;
80+
81+
protected String registerTableName;
7982

8083
private ElasticsearchTableInfo esTableInfo;
8184

@@ -135,8 +138,6 @@ private RichSinkFunction createEsSinkFunction(){
135138

136139
boolean authMesh = esTableInfo.isAuthMesh();
137140
if (authMesh) {
138-
String username = esTableInfo.getUserName();
139-
String password = esTableInfo.getPassword();
140141
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
141142
userConfig.put("xpack.security.user", authPassword);
142143
}
@@ -154,18 +155,14 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
154155
@Override
155156
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
156157
RichSinkFunction richSinkFunction = createEsSinkFunction();
157-
DataStreamSink streamSink = dataStream.addSink(richSinkFunction);
158+
DataStreamSink streamSink = dataStream.addSink(richSinkFunction).name(registerTableName);
158159
if(parallelism > 0){
159160
streamSink.setParallelism(parallelism);
160161
}
161162

162163
return streamSink;
163164
}
164165

165-
public void setParallelism(int parallelism) {
166-
this.parallelism = parallelism;
167-
}
168-
169166
public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
170167
this.bulkFlushMaxActions = bulkFlushMaxActions;
171168
}
@@ -183,6 +180,9 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
183180
String id = elasticsearchTableInfo.getId();
184181
String[] idField = StringUtils.split(id, ",");
185182
idIndexList = new ArrayList<>();
183+
registerTableName = elasticsearchTableInfo.getName();
184+
parallelism = Objects.isNull(elasticsearchTableInfo.getParallelism()) ?
185+
parallelism : elasticsearchTableInfo.getParallelism();
186186

187187
for(int i = 0; i < idField.length; ++i) {
188188
idIndexList.add(Integer.valueOf(idField[i]));

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
5050

5151
private static final String KEY_ES_PASSWORD = "password";
5252

53+
private static final String KEY_ES_PARALLELISM = "parallelism";
54+
5355
@Override
5456
protected boolean fieldNameNeedsUpperCase() {
5557
return false;
@@ -65,6 +67,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6567
elasticsearchTableInfo.setId((String) props.get(KEY_ES_ID_FIELD_INDEX_LIST.toLowerCase()));
6668
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES_INDEX.toLowerCase()));
6769
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES_TYPE.toLowerCase()));
70+
elasticsearchTableInfo.setParallelism(MathUtil.getIntegerVal(props.getOrDefault(KEY_ES_PARALLELISM.toLowerCase(), 1)));
6871

6972
String authMeshStr = (String)props.get(KEY_ES_AUTHMESH.toLowerCase());
7073
if (authMeshStr != null & "true".equals(authMeshStr)) {

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import com.google.common.collect.Maps;
3636
import org.apache.commons.lang.StringUtils;
3737
import org.apache.http.HttpHost;
38+
3839
import java.util.Arrays;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Objects;
4143
import java.util.stream.Collectors;
4244

4345
/**
@@ -67,7 +69,9 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6769

6870
private TypeInformation[] fieldTypes;
6971

70-
private int parallelism = -1;
72+
private int parallelism = 1;
73+
74+
protected String registerTableName;
7175

7276
private ElasticsearchTableInfo esTableInfo;
7377

@@ -142,6 +146,8 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
142146
columnTypes = esTableInfo.getFieldTypes();
143147
esAddressList = Arrays.asList(esTableInfo.getAddress().split(","));
144148
String id = esTableInfo.getId();
149+
registerTableName = esTableInfo.getName();
150+
parallelism = Objects.isNull(esTableInfo.getParallelism()) ? parallelism : esTableInfo.getParallelism();
145151

146152
if (!StringUtils.isEmpty(id)) {
147153
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
4949

5050
private static final String KEY_TRUE = "true";
5151

52+
private static final String KEY_ES6_PARALLELISM = "parallelism";
53+
5254
@Override
5355
protected boolean fieldNameNeedsUpperCase() {
5456
return false;
@@ -64,6 +66,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6466
elasticsearchTableInfo.setId((String) props.get(KEY_ES6_ID_FIELD_INDEX_LIST.toLowerCase()));
6567
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES6_INDEX.toLowerCase()));
6668
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES6_TYPE.toLowerCase()));
69+
elasticsearchTableInfo.setParallelism(MathUtil.getIntegerVal(props.getOrDefault(KEY_ES6_PARALLELISM.toLowerCase(), 1)));
6770

6871
String authMeshStr = (String) props.get(KEY_ES6_AUTHMESH.toLowerCase());
6972
if (authMeshStr != null && StringUtils.equalsIgnoreCase(KEY_TRUE, authMeshStr)) {

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.sink.hbase;
2222

@@ -63,7 +63,7 @@ public class HbaseSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
6363

6464
private String clientPrincipal;
6565
private String clientKeytabFile;
66-
private int parallelism = -1;
66+
private int parallelism = 1;
6767

6868

6969
public HbaseSink() {
@@ -121,7 +121,12 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
121121

122122
HbaseOutputFormat outputFormat = builder.finish();
123123
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
124-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
124+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction).name(registerTabName);
125+
126+
if (parallelism > 0) {
127+
dataStreamSink.setParallelism(parallelism);
128+
}
129+
125130
return dataStreamSink;
126131
}
127132

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>,
5858
protected String[] partitionKeys;
5959
protected String sinkOperatorName;
6060
protected Properties properties;
61-
protected int parallelism;
61+
protected int parallelism = 1;
6262
protected String topic;
6363
protected String tableName;
6464
protected String updateMode;

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/table/KafkaSinkParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
5757
kafkaSinkTableInfo.setPartitionKeys(MathUtil.getString(props.get(KafkaSinkTableInfo.PARTITION_KEY.toLowerCase())));
5858
kafkaSinkTableInfo.setUpdateMode(MathUtil.getString(props.getOrDefault(KafkaSinkTableInfo.UPDATE_KEY.toLowerCase(), EUpdateMode.APPEND.name())));
5959

60-
Integer parallelism = MathUtil.getIntegerVal(props.get(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase()));
60+
Integer parallelism = MathUtil.getIntegerVal(props.getOrDefault(KafkaSinkTableInfo.PARALLELISM_KEY.toLowerCase(), 1));
6161
kafkaSinkTableInfo.setParallelism(parallelism);
6262

6363
for (String key : props.keySet()) {

0 commit comments

Comments
 (0)