Skip to content

Commit f487182

Browse files
author
yanxi0227
committed
modify structure
1 parent 25cc345 commit f487182

5 files changed

Lines changed: 21 additions & 133 deletions

File tree

mysql/mysql-sink/src/main/java/com/dtstack/flink/sql/sink/mysql/MysqlSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2424
import com.dtstack.flink.sql.sink.rdb.RdbSink;
25+
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2526

2627
import java.util.List;
2728
import java.util.Map;
@@ -40,6 +41,11 @@ public class MysqlSink extends RdbSink implements IStreamSinkGener<RdbSink> {
4041
public MysqlSink() {
4142
}
4243

44+
@Override
45+
public RetractJDBCOutputFormat getOutputFormat() {
46+
return new RetractJDBCOutputFormat();
47+
}
48+
4349
@Override
4450
public void buildSql(String tableName, List<String> fields) {
4551
buildInsertSql(tableName, fields);

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/RdbSink.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,8 @@
1717
*/
1818
package com.dtstack.flink.sql.sink.rdb;
1919

20-
import com.dtstack.flink.sql.enums.EDatabaseType;
2120
import com.dtstack.flink.sql.sink.IStreamSinkGener;
22-
import com.dtstack.flink.sql.sink.rdb.format.OracleOutputFormat;
2321
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
24-
import com.dtstack.flink.sql.sink.rdb.format.SqlserverOutputFormat;
2522
import com.dtstack.flink.sql.sink.rdb.table.RdbTableInfo;
2623
import com.dtstack.flink.sql.table.TargetTableInfo;
2724
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -85,7 +82,7 @@ public RichSinkFunction createJdbcSinkFunc() {
8582
throw new RuntimeException("any of params in(driverName, dbURL, userName, password, type, tableName) " +
8683
" must not be null. please check it!!!");
8784
}
88-
RetractJDBCOutputFormat outputFormat = getOutputFormat(dbType);
85+
RetractJDBCOutputFormat outputFormat = getOutputFormat();
8986
outputFormat.setDbURL(dbURL);
9087
outputFormat.setDrivername(driverName);
9188
outputFormat.setUsername(userName);
@@ -102,15 +99,7 @@ public RichSinkFunction createJdbcSinkFunc() {
10299
return outputFormatSinkFunc;
103100
}
104101

105-
protected RetractJDBCOutputFormat getOutputFormat(String dbType) {
106-
if (dbType.equalsIgnoreCase(EDatabaseType.SQLSERVER.name())) {
107-
return new SqlserverOutputFormat();
108-
} else if (dbType.equalsIgnoreCase(EDatabaseType.ORACLE.name())) {
109-
return new OracleOutputFormat();
110-
} else {
111-
return new RetractJDBCOutputFormat();
112-
}
113-
}
102+
public abstract RetractJDBCOutputFormat getOutputFormat();
114103

115104

116105
@Override

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/OracleOutputFormat.java

Lines changed: 0 additions & 115 deletions
This file was deleted.

rdb/rdb-sink/src/main/java/com/dtstack/flink/sql/sink/rdb/format/SqlserverOutputFormat.java renamed to sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverOutputFormat.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
55
* regarding copyright ownership. The ASF licenses this file
66
* to you under the Apache License, Version 2.0 (the
77
* "License"); you may not use this file except in compliance
88
* with the License. You may obtain a copy of the License at
9-
* <p>
10-
* http://www.apache.org/licenses/LICENSE-2.0
11-
* <p>
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
1212
* Unless required by applicable law or agreed to in writing, software
1313
* distributed under the License is distributed on an "AS IS" BASIS,
1414
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18-
package com.dtstack.flink.sql.sink.rdb.format;
1918

19+
package com.dtstack.flink.sql.sink.sqlserver;
20+
21+
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2022
import org.apache.commons.lang3.StringUtils;
2123
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
2224
import java.sql.ResultSet;

sqlserver/sqlserver-sink/src/main/java/com/dtstack/flink/sql/sink/sqlserver/SqlserverSink.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import com.dtstack.flink.sql.sink.IStreamSinkGener;
2121
import com.dtstack.flink.sql.sink.rdb.RdbSink;
22+
import com.dtstack.flink.sql.sink.rdb.format.RetractJDBCOutputFormat;
2223
import org.apache.commons.lang3.StringUtils;
2324

2425
import java.util.*;
@@ -38,6 +39,11 @@ public String getDriverName() {
3839
return SQLSERVER_DRIVER;
3940
}
4041

42+
@Override
43+
public RetractJDBCOutputFormat getOutputFormat() {
44+
return new SqlserverOutputFormat();
45+
}
46+
4147
@Override
4248
public void buildSql(String tableName, List<String> fields) {
4349
buildInsertSql(tableName, fields);

0 commit comments

Comments
 (0)