Skip to content

Commit 5b1a824

Browse files
committed
[fix] sink add parallelism
1 parent 25fadc4 commit 5b1a824

12 files changed

Lines changed: 65 additions & 15 deletions

File tree

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5858
protected Integer readTimeoutMillis;
5959
protected Integer connectTimeoutMillis;
6060
protected Integer poolTimeoutMillis;
61+
protected Integer parallelism = -1;
62+
protected String registerTableName;
6163

6264
public CassandraSink() {
6365
// TO DO NOTHING
@@ -78,6 +80,8 @@ public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
7880
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
7981
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
8082
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
83+
this.parallelism = cassandraTableInfo.getParallelism();
84+
this.registerTableName = cassandraTableInfo.getTableName();
8185
return this;
8286
}
8387

@@ -106,7 +110,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
106110

107111
CassandraOutputFormat outputFormat = builder.finish();
108112
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
109-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
113+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
114+
.setParallelism(parallelism)
115+
.name(registerTableName);
110116
return dataStreamSink;
111117
}
112118

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@
3434
import com.google.common.collect.Lists;
3535
import com.google.common.collect.Maps;
3636
import com.google.common.collect.Sets;
37-
import org.apache.calcite.sql.*;
37+
import org.apache.calcite.sql.SqlBasicCall;
38+
import org.apache.calcite.sql.SqlIdentifier;
39+
import org.apache.calcite.sql.SqlKind;
40+
import org.apache.calcite.sql.SqlNode;
41+
import org.apache.calcite.sql.SqlSelect;
42+
import org.apache.calcite.sql.SqlWithItem;
3843
import org.apache.calcite.sql.parser.SqlParseException;
3944
import org.apache.commons.collections.CollectionUtils;
4045
import org.apache.commons.lang3.StringUtils;

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

@@ -45,6 +45,7 @@
4545
import java.util.HashMap;
4646
import java.util.List;
4747
import java.util.Map;
48+
import java.util.Objects;
4849

4950
/**
5051
* table output elastic5plugin
@@ -77,6 +78,8 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
7778

7879
private int parallelism = -1;
7980

81+
protected String registerTableName;
82+
8083
private ElasticsearchTableInfo esTableInfo;
8184

8285

@@ -135,8 +138,6 @@ private RichSinkFunction createEsSinkFunction(){
135138

136139
boolean authMesh = esTableInfo.isAuthMesh();
137140
if (authMesh) {
138-
String username = esTableInfo.getUserName();
139-
String password = esTableInfo.getPassword();
140141
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
141142
userConfig.put("xpack.security.user", authPassword);
142143
}
@@ -154,18 +155,14 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
154155
@Override
155156
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
156157
RichSinkFunction richSinkFunction = createEsSinkFunction();
157-
DataStreamSink streamSink = dataStream.addSink(richSinkFunction);
158+
DataStreamSink streamSink = dataStream.addSink(richSinkFunction).name(registerTableName);
158159
if(parallelism > 0){
159160
streamSink.setParallelism(parallelism);
160161
}
161162

162163
return streamSink;
163164
}
164165

165-
public void setParallelism(int parallelism) {
166-
this.parallelism = parallelism;
167-
}
168-
169166
public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
170167
this.bulkFlushMaxActions = bulkFlushMaxActions;
171168
}
@@ -183,6 +180,9 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
183180
String id = elasticsearchTableInfo.getId();
184181
String[] idField = StringUtils.split(id, ",");
185182
idIndexList = new ArrayList<>();
183+
registerTableName = elasticsearchTableInfo.getName();
184+
parallelism = Objects.isNull(elasticsearchTableInfo.getParallelism()) ?
185+
parallelism : elasticsearchTableInfo.getParallelism();
186186

187187
for(int i = 0; i < idField.length; ++i) {
188188
idIndexList.add(Integer.valueOf(idField[i]));

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
5050

5151
private static final String KEY_ES_PASSWORD = "password";
5252

53+
private static final String KEY_ES_PARALLELISM = "parallelism";
54+
5355
@Override
5456
protected boolean fieldNameNeedsUpperCase() {
5557
return false;
@@ -65,6 +67,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6567
elasticsearchTableInfo.setId((String) props.get(KEY_ES_ID_FIELD_INDEX_LIST.toLowerCase()));
6668
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES_INDEX.toLowerCase()));
6769
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES_TYPE.toLowerCase()));
70+
elasticsearchTableInfo.setParallelism(MathUtil.getIntegerVal(props.getOrDefault(KEY_ES_PARALLELISM.toLowerCase(), 1)));
6871

6972
String authMeshStr = (String)props.get(KEY_ES_AUTHMESH.toLowerCase());
7073
if (authMeshStr != null & "true".equals(authMeshStr)) {

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import com.google.common.collect.Maps;
3636
import org.apache.commons.lang.StringUtils;
3737
import org.apache.http.HttpHost;
38+
3839
import java.util.Arrays;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Objects;
4143
import java.util.stream.Collectors;
4244

4345
/**
@@ -69,6 +71,8 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6971

7072
private int parallelism = -1;
7173

74+
protected String registerTableName;
75+
7276
private ElasticsearchTableInfo esTableInfo;
7377

7478

@@ -142,6 +146,8 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
142146
columnTypes = esTableInfo.getFieldTypes();
143147
esAddressList = Arrays.asList(esTableInfo.getAddress().split(","));
144148
String id = esTableInfo.getId();
149+
registerTableName = esTableInfo.getName();
150+
parallelism = Objects.isNull(esTableInfo.getParallelism()) ? parallelism : esTableInfo.getParallelism();
145151

146152
if (!StringUtils.isEmpty(id)) {
147153
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
4949

5050
private static final String KEY_TRUE = "true";
5151

52+
private static final String KEY_ES6_PARALLELISM = "parallelism";
53+
5254
@Override
5355
protected boolean fieldNameNeedsUpperCase() {
5456
return false;
@@ -64,6 +66,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6466
elasticsearchTableInfo.setId((String) props.get(KEY_ES6_ID_FIELD_INDEX_LIST.toLowerCase()));
6567
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES6_INDEX.toLowerCase()));
6668
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES6_TYPE.toLowerCase()));
69+
elasticsearchTableInfo.setParallelism(MathUtil.getIntegerVal(props.getOrDefault(KEY_ES6_PARALLELISM.toLowerCase(), 1)));
6770

6871
String authMeshStr = (String) props.get(KEY_ES6_AUTHMESH.toLowerCase());
6972
if (authMeshStr != null && StringUtils.equalsIgnoreCase(KEY_TRUE, authMeshStr)) {

hbase/hbase-sink/src/main/java/com/dtstack/flink/sql/sink/hbase/HbaseSink.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.sink.hbase;
2222

@@ -121,7 +121,12 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
121121

122122
HbaseOutputFormat outputFormat = builder.finish();
123123
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
124-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
124+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction).name(registerTabName);
125+
126+
if (parallelism > 0) {
127+
dataStreamSink.setParallelism(parallelism);
128+
}
129+
125130
return dataStreamSink;
126131
}
127132

kafka-base/kafka-base-sink/src/main/java/com/dtstack/flink/sql/sink/kafka/AbstractKafkaSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public abstract class AbstractKafkaSink implements RetractStreamTableSink<Row>,
5858
protected String[] partitionKeys;
5959
protected String sinkOperatorName;
6060
protected Properties properties;
61-
protected int parallelism;
61+
protected int parallelism = -1;
6262
protected String topic;
6363
protected String tableName;
6464
protected String updateMode;

kudu/kudu-sink/src/main/java/com/dtstack/flink/sql/sink/kudu/KuduSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public KuduSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
5656
this.keytab = kuduTableInfo.getKeytab();
5757
this.krb5conf = kuduTableInfo.getKrb5conf();
5858
this.enableKrb = kuduTableInfo.isEnableKrb();
59+
this.parallelism = kuduTableInfo.getParallelism();
5960

6061
return this;
6162
}

mongo/mongo-sink/src/main/java/com/dtstack/flink/sql/sink/mongo/MongoSink.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.flink.table.sinks.TableSink;
3535
import org.apache.flink.types.Row;
3636

37+
import java.util.Objects;
38+
3739
/**
3840
* Reason:
3941
* Date: 2018/11/6
@@ -49,6 +51,8 @@ public class MongoSink implements RetractStreamTableSink<Row>, IStreamSinkGener<
4951
protected String userName;
5052
protected String password;
5153
protected String database;
54+
protected Integer parallelism = 1;
55+
protected String registerTableName;
5256

5357
public MongoSink() {
5458
// TO DO NOTHING
@@ -62,6 +66,9 @@ public MongoSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
6266
this.userName = mongoTableInfo.getUserName();
6367
this.password = mongoTableInfo.getPassword();
6468
this.database = mongoTableInfo.getDatabase();
69+
this.parallelism = Objects.isNull(mongoTableInfo.getParallelism()) ?
70+
parallelism : mongoTableInfo.getParallelism();
71+
this.registerTableName = mongoTableInfo.getName();
6572
return this;
6673
}
6774

@@ -83,7 +90,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
8390

8491
MongoOutputFormat outputFormat = builder.finish();
8592
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
86-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
93+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
94+
.setParallelism(parallelism)
95+
.name(registerTableName);
8796
return dataStreamSink;
8897
}
8998

0 commit comments

Comments
 (0)