Skip to content

Commit 06ddc4c

Browse files
author
yanxi0227
committed
Merge branch 'v1.5.0_dev' of github.com:DTStack/flinkStreamSQL into v1.5.0_dev
2 parents 302670a + 930e919 commit 06ddc4c

10 files changed

Lines changed: 43 additions & 126 deletions

File tree

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleSink.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
22+
import com.dtstack.flink.sql.sink.rdb.format.ExtendOutputFormat;
2223
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2324
import org.apache.commons.lang3.StringUtils;
2425

@@ -43,7 +44,7 @@ public String getDriverName() {
4344

4445
@Override
4546
public RetractJDBCOutputFormat getOutputFormat() {
46-
return new OracleOutputFormat();
47+
return new ExtendOutputFormat();
4748
}
4849

4950
@Override
@@ -71,7 +72,7 @@ public String buildUpdateSql(String tableName, List<String> fieldNames, Map<Stri
7172
+ updateKeySql(realIndexes) + ") WHEN MATCHED THEN UPDATE SET "
7273
+ getUpdateSql(fieldNames, fullField, "T1", "T2", keyColList(realIndexes)) + " WHEN NOT MATCHED THEN "
7374
+ "INSERT (" + quoteColumns(fieldNames) + ") VALUES ("
74-
+ quoteColumns(fieldNames, "T2") + ");";
75+
+ quoteColumns(fieldNames, "T2") + ")";
7576
}
7677

7778

@@ -106,7 +107,7 @@ public String getUpdateSql(List<String> column, List<String> fullColumn, String
106107
String prefixRight = StringUtils.isBlank(rightTable) ? "" : quoteTable(rightTable) + ".";
107108
List<String> list = new ArrayList<>();
108109
for (String col : fullColumn) {
109-
if (keyCols == null || keyCols.size() == 0) {
110+
if (keyCols == null || keyCols.size() == 0 || keyCols.contains(col)) {
110111
continue;
111112
}
112113
if (fullColumn == null || column.contains(col)) {
@@ -152,6 +153,7 @@ public String makeValues(List<String> column) {
152153
}
153154
sb.append("? " + quoteColumn(column.get(i)));
154155
}
156+
sb.append(" FROM DUAL");
155157
return sb.toString();
156158
}
157159

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/all/RdbAllReqRow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public abstract class RdbAllReqRow extends AllReqRow {
6262

6363
private AtomicReference<Map<String, List<Map<String, Object>>>> cacheRef = new AtomicReference<>();
6464

65+
6566
public RdbAllReqRow(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, List<FieldInfo> outFieldInfoList, SideTableInfo sideTableInfo) {
6667
super(new RdbAllSideInfo(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo));
6768
}
@@ -204,7 +205,7 @@ private void loadData(Map<String, List<Map<String, Object>>> tmpCache) throws SQ
204205
Map<String, Object> oneRow = Maps.newHashMap();
205206
for (String fieldName : sideFieldNames) {
206207
Object object = resultSet.getObject(fieldName.trim());
207-
int fieldIndex = sideInfo.getRowTypeInfo().getFieldIndex(fieldName.trim());
208+
int fieldIndex = sideInfo.getSideTableInfo().getFieldList().indexOf(fieldName.trim());
208209
object = SwitchUtil.getTarget(object, fields[fieldIndex]);
209210
oneRow.put(fieldName.trim(), object);
210211
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/async/RdbAsyncReqRow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public Row fillData(Row input, Object line) {
170170
if (jsonArray == null) {
171171
row.setField(entry.getKey(), null);
172172
} else {
173-
Object object = SwitchUtil.getTarget(jsonArray.getValue(entry.getValue()), fields[entry.getKey()]);
173+
Object object = SwitchUtil.getTarget(jsonArray.getValue(entry.getValue()), fields[entry.getValue()]);
174174
row.setField(entry.getKey(), object);
175175
}
176176
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/util/MathUtil.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121

2222
import java.math.BigDecimal;
2323
import java.math.BigInteger;
24+
import java.sql.Date;
25+
import java.sql.Timestamp;
2426
import java.text.ParseException;
2527
import java.text.SimpleDateFormat;
26-
import java.util.Date;
2728

2829
/**
2930
* Date: 2017/4/21
@@ -75,6 +76,8 @@ public static Integer getIntegerVal(Object obj) {
7576
return ((Double) obj).intValue();
7677
} else if (obj instanceof BigDecimal) {
7778
return ((BigDecimal) obj).intValue();
79+
} else if (obj instanceof BigInteger) {
80+
return ((BigInteger) obj).intValue();
7881
}
7982

8083
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Integer.");
@@ -219,18 +222,28 @@ public static Date getDate(Object obj) {
219222
if (obj == null) {
220223
return null;
221224
}
222-
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
223225
if (obj instanceof String) {
224-
try {
225-
return sdf.parse((String) obj);
226-
} catch (ParseException e) {
227-
e.printStackTrace();
228-
}
229-
} else if (obj instanceof Date) {
226+
return Date.valueOf((String) obj);
227+
} else if (obj instanceof Timestamp) {
228+
return new Date(((Timestamp) obj).getTime());
229+
}else if (obj instanceof Date){
230230
return (Date) obj;
231231
}
232232
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
233233
}
234234

235+
public static Timestamp getTimestamp(Object obj) {
236+
if (obj == null) {
237+
return null;
238+
}
239+
if (obj instanceof Timestamp) {
240+
return (Timestamp) obj;
241+
} else if (obj instanceof Date) {
242+
return Timestamp.valueOf((String) obj);
243+
} else if (obj instanceof String) {
244+
return Timestamp.valueOf((String) obj);
245+
}
246+
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Date.");
247+
}
235248

236249
}

rdb/rdb-side/src/main/java/com/dtstack/flink/sql/side/rdb/util/SwitchUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* @author maqi
2828
*/
2929
public class SwitchUtil {
30-
public static Object getTarget(Object obj, String targetType) {
30+
public static Object getTarget(Object obj, String targetType) {
3131
targetType = targetType.toLowerCase();
3232
switch (targetType) {
3333
case "int":
@@ -52,6 +52,8 @@ public static Object getTarget(Object obj, String targetType) {
5252
return MathUtil.getBigDecimal(obj);
5353
case "date":
5454
return MathUtil.getDate(obj);
55+
case "timestamp":
56+
return MathUtil.getTimestamp(obj);
5557
}
5658
return obj;
5759
}

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
import java.io.Serializable;
3737
import java.math.BigDecimal;
38+
import java.sql.Date;
3839
import java.sql.Timestamp;
3940
import java.sql.Types;
4041
import java.util.Arrays;
@@ -149,7 +150,9 @@ protected void buildSqlTypes(List<Class> fieldTypeArray) {
149150
String fieldType = fieldTypeArray.get(i).getName();
150151
if (fieldType.equals(Integer.class.getName())) {
151152
tmpFieldsType[i] = Types.INTEGER;
152-
} else if (fieldType.equals(Long.class.getName())) {
153+
}else if (fieldType.equals(Boolean.class.getName())) {
154+
tmpFieldsType[i] = Types.BOOLEAN;
155+
}else if (fieldType.equals(Long.class.getName())) {
153156
tmpFieldsType[i] = Types.BIGINT;
154157
} else if (fieldType.equals(Byte.class.getName())) {
155158
tmpFieldsType[i] = Types.TINYINT;
@@ -167,6 +170,8 @@ protected void buildSqlTypes(List<Class> fieldTypeArray) {
167170
tmpFieldsType[i] = Types.TIMESTAMP;
168171
} else if (fieldType.equals(BigDecimal.class.getName())) {
169172
tmpFieldsType[i] = Types.DECIMAL;
173+
} else if (fieldType.equals(Date.class.getName())) {
174+
tmpFieldsType[i] = Types.DATE;
170175
} else {
171176
throw new RuntimeException("no support field type for sql. the input type:" + fieldType);
172177
}

oracle/oracle-sink/src/main/java/com/dtstack/flink/sql/sink/oracle/OracleOutputFormat.java renamed to rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/ExtendOutputFormat.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.sink.oracle;
19+
package com.dtstack.flink.sql.sink.rdb.format;
2020

21-
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2221
import org.apache.commons.lang3.StringUtils;
2322
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2423

@@ -35,7 +34,7 @@
3534
*
3635
* @author maqi
3736
*/
38-
public class OracleOutputFormat extends RetractJDBCOutputFormat {
37+
public class ExtendOutputFormat extends RetractJDBCOutputFormat {
3938

4039

4140
@Override
@@ -91,8 +90,7 @@ public void fillRealIndexes() throws SQLException {
9190
* @throws SQLException
9291
*/
9392
public void fillFullColumns() throws SQLException {
94-
String schema = null;
95-
ResultSet rs = getDbConn().getMetaData().getColumns(null, schema, getTableName(), null);
93+
ResultSet rs = getDbConn().getMetaData().getColumns(null, null, getTableName(), null);
9694
while (rs.next()) {
9795
String columnName = rs.getString("COLUMN_NAME");
9896
if (StringUtils.isNotBlank(columnName)) {

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/table/RdbTableInfo.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
*/
3030
public class RdbTableInfo extends TargetTableInfo {
3131

32-
//private static final String CURR_TYPE = "mysql";
33-
3432
public static final String URL_KEY = "url";
3533

3634
public static final String TABLE_NAME_KEY = "tableName";

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverOutputFormat.java

Lines changed: 0 additions & 103 deletions
This file was deleted.

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverSink.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
22+
import com.dtstack.flink.sql.sink.rdb.format.ExtendOutputFormat;
2223
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2324
import org.apache.commons.lang3.StringUtils;
2425

@@ -41,7 +42,7 @@ public String getDriverName() {
4142

4243
@Override
4344
public RetractJDBCOutputFormat getOutputFormat() {
44-
return new SqlserverOutputFormat();
45+
return new ExtendOutputFormat();
4546
}
4647

4748
@Override

0 commit comments

Comments
 (0)