Skip to content

Commit 445d37b

Browse files
author
gituser
committed
Merge branch '1.10_release_4.0.x' into 1.10_release_4.1.x
2 parents 66babbc + aed2e2d commit 445d37b

25 files changed

Lines changed: 218 additions & 60 deletions

File tree

cassandra/cassandra-sink/src/main/java/com/dtstack/flink/sql/sink/cassandra/CassandraSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public class CassandraSink implements RetractStreamTableSink<Row>, IStreamSinkGe
5858
protected Integer readTimeoutMillis;
5959
protected Integer connectTimeoutMillis;
6060
protected Integer poolTimeoutMillis;
61+
protected Integer parallelism = 1;
62+
protected String registerTableName;
6163

6264
public CassandraSink() {
6365
// TO DO NOTHING
@@ -78,6 +80,8 @@ public CassandraSink genStreamSink(AbstractTargetTableInfo targetTableInfo) {
7880
this.readTimeoutMillis = cassandraTableInfo.getReadTimeoutMillis();
7981
this.connectTimeoutMillis = cassandraTableInfo.getConnectTimeoutMillis();
8082
this.poolTimeoutMillis = cassandraTableInfo.getPoolTimeoutMillis();
83+
this.parallelism = cassandraTableInfo.getParallelism();
84+
this.registerTableName = cassandraTableInfo.getTableName();
8185
return this;
8286
}
8387

@@ -106,7 +110,9 @@ public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<
106110

107111
CassandraOutputFormat outputFormat = builder.finish();
108112
RichSinkFunction richSinkFunction = new OutputFormatSinkFunction(outputFormat);
109-
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction);
113+
DataStreamSink dataStreamSink = dataStream.addSink(richSinkFunction)
114+
.setParallelism(parallelism)
115+
.name(registerTableName);
110116
return dataStreamSink;
111117
}
112118

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,16 @@
3434
import com.google.common.collect.Lists;
3535
import com.google.common.collect.Maps;
3636
import com.google.common.collect.Sets;
37-
import org.apache.calcite.sql.*;
37+
import org.apache.calcite.sql.SqlBasicCall;
38+
import org.apache.calcite.sql.SqlIdentifier;
39+
import org.apache.calcite.sql.SqlKind;
40+
import org.apache.calcite.sql.SqlNode;
41+
import org.apache.calcite.sql.SqlSelect;
42+
import org.apache.calcite.sql.SqlWithItem;
3843
import org.apache.calcite.sql.parser.SqlParseException;
3944
import org.apache.commons.collections.CollectionUtils;
4045
import org.apache.commons.lang3.StringUtils;
46+
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
4147
import org.apache.flink.api.common.typeinfo.TypeInformation;
4248
import org.apache.flink.api.java.typeutils.RowTypeInfo;
4349
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -47,16 +53,27 @@
4753
import org.apache.flink.table.api.java.StreamTableEnvironment;
4854
import org.apache.flink.table.catalog.ObjectIdentifier;
4955
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
56+
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
57+
import org.apache.flink.table.types.logical.DecimalType;
58+
import org.apache.flink.table.types.logical.LegacyTypeInformationType;
5059
import org.apache.flink.table.types.logical.LogicalType;
5160
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo;
5261
import org.apache.flink.types.Row;
5362
import org.slf4j.Logger;
5463
import org.slf4j.LoggerFactory;
5564

5665
import java.time.LocalDateTime;
57-
import java.util.*;
58-
59-
import static org.apache.calcite.sql.SqlKind.*;
66+
import java.util.Arrays;
67+
import java.util.LinkedList;
68+
import java.util.List;
69+
import java.util.Map;
70+
import java.util.Queue;
71+
import java.util.Set;
72+
73+
import static org.apache.calcite.sql.SqlKind.AS;
74+
import static org.apache.calcite.sql.SqlKind.INSERT;
75+
import static org.apache.calcite.sql.SqlKind.SELECT;
76+
import static org.apache.calcite.sql.SqlKind.WITH_ITEM;
6077

6178
/**
6279
* Reason:
@@ -279,10 +296,7 @@ private Table getTableFromCache(Map<String, Table> localTableCache, String table
279296
*/
280297
private boolean checkJoinCondition(SqlNode conditionNode, String sideTableAlias, AbstractSideTableInfo sideTableInfo) {
281298
List<String> conditionFields = getConditionFields(conditionNode, sideTableAlias, sideTableInfo);
282-
if (CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo))) {
283-
return true;
284-
}
285-
return false;
299+
return CollectionUtils.isEqualCollection(conditionFields, convertPrimaryAlias(sideTableInfo));
286300
}
287301

288302
private List<String> convertPrimaryAlias(AbstractSideTableInfo sideTableInfo) {
@@ -367,8 +381,12 @@ private void joinFun(Object pollObj,
367381

368382
int length = leftTable.getSchema().getFieldDataTypes().length;
369383
LogicalType[] logicalTypes = new LogicalType[length];
370-
for(int i=0; i<length; i++){
384+
for (int i = 0; i < length; i++) {
371385
logicalTypes[i] = leftTable.getSchema().getFieldDataTypes()[i].getLogicalType();
386+
if (logicalTypes[i] instanceof LegacyTypeInformationType &&
387+
((LegacyTypeInformationType<?>) logicalTypes[i]).getTypeInformation().getClass().equals(BigDecimalTypeInfo.class)) {
388+
logicalTypes[i] = new DecimalType(38, 18);
389+
}
372390
}
373391

374392
BaseRowTypeInfo leftBaseTypeInfo = new BaseRowTypeInfo(logicalTypes, leftTable.getSchema().getFieldNames());
@@ -407,7 +425,14 @@ private void joinFun(Object pollObj,
407425
targetTable = localTableCache.get(joinInfo.getLeftTableName());
408426
}
409427

410-
RowTypeInfo typeInfo = new RowTypeInfo(targetTable.getSchema().getFieldTypes(), targetTable.getSchema().getFieldNames());
428+
TypeInformation[] fieldDataTypes = targetTable.getSchema().getFieldTypes();
429+
for (int i = 0; i < fieldDataTypes.length; i++) {
430+
if (fieldDataTypes[i].getClass().equals(BigDecimalTypeInfo.class)) {
431+
fieldDataTypes[i] = BasicTypeInfo.BIG_DEC_TYPE_INFO;
432+
}
433+
}
434+
435+
RowTypeInfo typeInfo = new RowTypeInfo(fieldDataTypes, targetTable.getSchema().getFieldNames());
411436

412437
DataStream adaptStream = tableEnv.toRetractStream(targetTable, typeInfo)
413438
.filter(f -> f.f0)
@@ -479,9 +504,7 @@ private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Tab
479504
String fieldType = filed[filed.length - 1].trim();
480505
Class fieldClass = ClassUtil.stringConvertClass(fieldType);
481506
Class tableField = table.getSchema().getFieldType(i).get().getTypeClass();
482-
if (fieldClass == tableField) {
483-
continue;
484-
} else {
507+
if (fieldClass != tableField) {
485508
return false;
486509
}
487510
}

core/src/main/java/com/dtstack/flink/sql/table/AbstractTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public abstract class AbstractTableInfo implements Serializable {
5959

6060
private List<String> primaryKeys;
6161

62-
private Integer parallelism = -1;
62+
private Integer parallelism = 1;
6363

6464
public String[] getFieldTypes() {
6565
return fieldTypes;

core/src/main/java/com/dtstack/flink/sql/util/DateUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ public class DateUtil {
5959

6060
private static final Pattern DATETIME = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d{3,9})?Z$");
6161
private static final Pattern DATE = Pattern.compile("^\\d{4}-(?:0[0-9]|1[0-2])-[0-9]{2}$");
62+
private static final Pattern TIME = Pattern.compile("^\\d{2}:\\d{2}:\\d{2}(\\.\\d{3,9})?Z$");
6263
private static final int MILLIS_PER_SECOND = 1000;
6364

6465

@@ -832,4 +833,18 @@ public static java.sql.Date getDateFromStr(String dateStr) {
832833
return null == date ? null : new java.sql.Date(date.getTime());
833834
}
834835

836+
public static java.sql.Time getTimeFromStr(String dateStr) {
837+
if (TIME.matcher(dateStr).matches()) {
838+
dateStr = dateStr.substring(0,dateStr.length()-1);
839+
Instant instant = LocalTime.parse(dateStr).atDate(LocalDate.now()).toInstant(ZoneOffset.UTC);
840+
return new java.sql.Time(instant.toEpochMilli());
841+
} else if (DATETIME.matcher(dateStr).matches()) {
842+
Instant instant = Instant.from(ISO_INSTANT.parse(dateStr));
843+
return new java.sql.Time(instant.toEpochMilli());
844+
}
845+
Date date = stringToDate(dateStr);
846+
return null == date ? null : new java.sql.Time(date.getTime());
847+
}
848+
849+
835850
}

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.math.BigDecimal;
2323
import java.math.BigInteger;
2424
import java.sql.Date;
25+
import java.sql.Time;
2526
import java.sql.Timestamp;
2627

2728
/**
@@ -234,7 +235,19 @@ public static Date getDate(Object obj) {
234235
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
235236
}
236237

237-
238+
public static Time getTime(Object obj) {
239+
if (obj == null) {
240+
return null;
241+
}
242+
if (obj instanceof String) {
243+
return DateUtil.getTimeFromStr((String) obj);
244+
} else if (obj instanceof Timestamp) {
245+
return new Time(((Timestamp) obj).getTime());
246+
} else if (obj instanceof Time) {
247+
return (Time) obj;
248+
}
249+
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Time.");
250+
}
238251

239252
public static Timestamp getTimestamp(Object obj) {
240253
if (obj == null) {

core/src/main/java/com/dtstack/flink/sql/util/RowDataConvert.java

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,17 @@
2121
import org.apache.flink.table.dataformat.BaseRow;
2222
import org.apache.flink.table.dataformat.BinaryString;
2323
import org.apache.flink.table.dataformat.DataFormatConverters;
24+
import org.apache.flink.table.dataformat.Decimal;
2425
import org.apache.flink.table.dataformat.GenericRow;
2526
import org.apache.flink.table.dataformat.SqlTimestamp;
2627
import org.apache.flink.types.Row;
2728

29+
import java.math.BigDecimal;
2830
import java.sql.Date;
2931
import java.sql.Time;
3032
import java.sql.Timestamp;
3133
import java.time.LocalDate;
34+
import java.time.LocalTime;
3235

3336
/**
3437
* Company: www.dtstack.com
@@ -42,28 +45,46 @@ public static BaseRow convertToBaseRow(Row row) {
4245
int length = row.getArity();
4346
GenericRow genericRow = new GenericRow(length);
4447
for (int i = 0; i < length; i++) {
45-
if (row.getField(i) instanceof String) {
48+
if (row.getField(i) == null) {
49+
genericRow.setField(i, row.getField(i));
50+
} else if (row.getField(i) instanceof String) {
4651
genericRow.setField(i, BinaryString.fromString((String) row.getField(i)));
4752
} else if (row.getField(i) instanceof Timestamp) {
4853
SqlTimestamp newTimestamp = SqlTimestamp.fromTimestamp(((Timestamp) row.getField(i)));
4954
genericRow.setField(i, newTimestamp);
5055
} else if (row.getField(i) instanceof Time) {
5156
genericRow.setField(i, DataFormatConverters.TimeConverter.INSTANCE.toInternal((Time) row.getField(i)));
52-
} else if (row.getField(i) instanceof Double) {
57+
} else if (row.getField(i) instanceof Double || row.getField(i).getClass().equals(double.class)) {
5358
genericRow.setField(i, DataFormatConverters.DoubleConverter.INSTANCE.toInternal((Double) row.getField(i)));
54-
} else if (row.getField(i) instanceof Float) {
59+
} else if (row.getField(i) instanceof Float || row.getField(i).getClass().equals(float.class)) {
5560
genericRow.setField(i, DataFormatConverters.FloatConverter.INSTANCE.toInternal((Float) row.getField(i)));
56-
} else if (row.getField(i) instanceof Long) {
61+
} else if (row.getField(i) instanceof Long || row.getField(i).getClass().equals(long.class)) {
5762
genericRow.setField(i, DataFormatConverters.LongConverter.INSTANCE.toInternal((Long) row.getField(i)));
63+
} else if (row.getField(i) instanceof Boolean || row.getField(i).getClass().equals(boolean.class)) {
64+
genericRow.setField(i, DataFormatConverters.BooleanConverter.INSTANCE.toInternal((Boolean) row.getField(i)));
65+
} else if (row.getField(i) instanceof Integer || row.getField(i).getClass().equals(int.class)) {
66+
genericRow.setField(i, DataFormatConverters.IntConverter.INSTANCE.toInternal((Integer) row.getField(i)));
67+
} else if (row.getField(i) instanceof Short || row.getField(i).getClass().equals(short.class)) {
68+
genericRow.setField(i, DataFormatConverters.ShortConverter.INSTANCE.toInternal((Short) row.getField(i)));
69+
} else if (row.getField(i) instanceof Byte || row.getField(i).getClass().equals(byte.class)) {
70+
genericRow.setField(i, DataFormatConverters.ByteConverter.INSTANCE.toInternal((Byte) row.getField(i)));
5871
} else if (row.getField(i) instanceof Date) {
5972
genericRow.setField(i, DataFormatConverters.DateConverter.INSTANCE.toInternal((Date) row.getField(i)));
6073
} else if (row.getField(i) instanceof LocalDate) {
6174
genericRow.setField(i, DataFormatConverters.LocalDateConverter.INSTANCE.toInternal((LocalDate) row.getField(i)));
75+
} else if (row.getField(i) instanceof LocalTime) {
76+
genericRow.setField(i, DataFormatConverters.LocalTimeConverter.INSTANCE.toInternal((LocalTime) row.getField(i)));
77+
} else if (row.getField(i) instanceof BigDecimal) {
78+
BigDecimal tempDecimal = (BigDecimal) row.getField(i);
79+
int precision = ((BigDecimal) row.getField(i)).precision();
80+
int scale = ((BigDecimal) row.getField(i)).scale();
81+
DataFormatConverters.DecimalConverter decimalConverter = new DataFormatConverters.DecimalConverter(precision, scale);
82+
genericRow.setField(i, decimalConverter.toExternal(Decimal.fromBigDecimal(tempDecimal, precision, scale)));
6283
} else {
6384
genericRow.setField(i, row.getField(i));
6485
}
6586
}
6687

6788
return genericRow;
6889
}
69-
}
90+
}

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* limitations under the License.
1717
*/
1818

19-
19+
2020

2121
package com.dtstack.flink.sql.sink.elasticsearch;
2222

@@ -45,6 +45,7 @@
4545
import java.util.HashMap;
4646
import java.util.List;
4747
import java.util.Map;
48+
import java.util.Objects;
4849

4950
/**
5051
* table output elastic5plugin
@@ -75,7 +76,9 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
7576

7677
private TypeInformation[] fieldTypes;
7778

78-
private int parallelism = -1;
79+
private int parallelism = 1;
80+
81+
protected String registerTableName;
7982

8083
private ElasticsearchTableInfo esTableInfo;
8184

@@ -135,8 +138,6 @@ private RichSinkFunction createEsSinkFunction(){
135138

136139
boolean authMesh = esTableInfo.isAuthMesh();
137140
if (authMesh) {
138-
String username = esTableInfo.getUserName();
139-
String password = esTableInfo.getPassword();
140141
String authPassword = esTableInfo.getUserName() + ":" + esTableInfo.getPassword();
141142
userConfig.put("xpack.security.user", authPassword);
142143
}
@@ -154,18 +155,14 @@ public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
154155
@Override
155156
public DataStreamSink<Tuple2<Boolean, Row>> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
156157
RichSinkFunction richSinkFunction = createEsSinkFunction();
157-
DataStreamSink streamSink = dataStream.addSink(richSinkFunction);
158+
DataStreamSink streamSink = dataStream.addSink(richSinkFunction).name(registerTableName);
158159
if(parallelism > 0){
159160
streamSink.setParallelism(parallelism);
160161
}
161162

162163
return streamSink;
163164
}
164165

165-
public void setParallelism(int parallelism) {
166-
this.parallelism = parallelism;
167-
}
168-
169166
public void setBulkFlushMaxActions(int bulkFlushMaxActions) {
170167
this.bulkFlushMaxActions = bulkFlushMaxActions;
171168
}
@@ -183,6 +180,9 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
183180
String id = elasticsearchTableInfo.getId();
184181
String[] idField = StringUtils.split(id, ",");
185182
idIndexList = new ArrayList<>();
183+
registerTableName = elasticsearchTableInfo.getName();
184+
parallelism = Objects.isNull(elasticsearchTableInfo.getParallelism()) ?
185+
parallelism : elasticsearchTableInfo.getParallelism();
186186

187187
for(int i = 0; i < idField.length; ++i) {
188188
idIndexList.add(Integer.valueOf(idField[i]));

elasticsearch5/elasticsearch5-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
5050

5151
private static final String KEY_ES_PASSWORD = "password";
5252

53+
private static final String KEY_ES_PARALLELISM = "parallelism";
54+
5355
@Override
5456
protected boolean fieldNameNeedsUpperCase() {
5557
return false;
@@ -65,6 +67,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6567
elasticsearchTableInfo.setId((String) props.get(KEY_ES_ID_FIELD_INDEX_LIST.toLowerCase()));
6668
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES_INDEX.toLowerCase()));
6769
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES_TYPE.toLowerCase()));
70+
elasticsearchTableInfo.setParallelism(MathUtil.getIntegerVal(props.getOrDefault(KEY_ES_PARALLELISM.toLowerCase(), 1)));
6871

6972
String authMeshStr = (String)props.get(KEY_ES_AUTHMESH.toLowerCase());
7073
if (authMeshStr != null & "true".equals(authMeshStr)) {

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ElasticsearchSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
import com.google.common.collect.Maps;
3636
import org.apache.commons.lang.StringUtils;
3737
import org.apache.http.HttpHost;
38+
3839
import java.util.Arrays;
3940
import java.util.List;
4041
import java.util.Map;
42+
import java.util.Objects;
4143
import java.util.stream.Collectors;
4244

4345
/**
@@ -67,7 +69,9 @@ public class ElasticsearchSink implements RetractStreamTableSink<Row>, IStreamSi
6769

6870
private TypeInformation[] fieldTypes;
6971

70-
private int parallelism = -1;
72+
private int parallelism = 1;
73+
74+
protected String registerTableName;
7175

7276
private ElasticsearchTableInfo esTableInfo;
7377

@@ -142,6 +146,8 @@ public ElasticsearchSink genStreamSink(AbstractTargetTableInfo targetTableInfo)
142146
columnTypes = esTableInfo.getFieldTypes();
143147
esAddressList = Arrays.asList(esTableInfo.getAddress().split(","));
144148
String id = esTableInfo.getId();
149+
registerTableName = esTableInfo.getName();
150+
parallelism = Objects.isNull(esTableInfo.getParallelism()) ? parallelism : esTableInfo.getParallelism();
145151

146152
if (!StringUtils.isEmpty(id)) {
147153
idIndexList = Arrays.stream(StringUtils.split(id, ",")).map(Integer::valueOf).collect(Collectors.toList());

elasticsearch6/elasticsearch6-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchSinkParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class ElasticsearchSinkParser extends AbstractTableParser {
4949

5050
private static final String KEY_TRUE = "true";
5151

52+
private static final String KEY_ES6_PARALLELISM = "parallelism";
53+
5254
@Override
5355
protected boolean fieldNameNeedsUpperCase() {
5456
return false;
@@ -64,6 +66,7 @@ public AbstractTableInfo getTableInfo(String tableName, String fieldsInfo, Map<S
6466
elasticsearchTableInfo.setId((String) props.get(KEY_ES6_ID_FIELD_INDEX_LIST.toLowerCase()));
6567
elasticsearchTableInfo.setIndex((String) props.get(KEY_ES6_INDEX.toLowerCase()));
6668
elasticsearchTableInfo.setEsType((String) props.get(KEY_ES6_TYPE.toLowerCase()));
69+
elasticsearchTableInfo.setParallelism(MathUtil.getIntegerVal(props.getOrDefault(KEY_ES6_PARALLELISM.toLowerCase(), 1)));
6770

6871
String authMeshStr = (String) props.get(KEY_ES6_AUTHMESH.toLowerCase());
6972
if (authMeshStr != null && StringUtils.equalsIgnoreCase(KEY_TRUE, authMeshStr)) {

0 commit comments

Comments
 (0)