|
15 | 15 | * See the License for the specific language governing permissions and |
16 | 16 | * limitations under the License. |
17 | 17 | */ |
18 | | -package com.dtstack.flink.sql.sink.ocean; |
| 18 | +package com.dtstack.flink.sql.sink.oceanbase; |
19 | 19 |
|
20 | 20 | import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect; |
21 | 21 |
|
@@ -50,27 +50,30 @@ public Optional<String> getUpsertStatement(String schema, |
50 | 50 | String[] uniqueKeyFields, |
51 | 51 | boolean allReplace) { |
52 | 52 | return allReplace ? |
53 | | - buildReplaceIntoStatement(tableName, fieldNames) : |
54 | | - buildDuplicateUpsertStatement(tableName, fieldNames); |
| 53 | + buildReplaceIntoStatement(tableName, fieldNames) : buildDuplicateUpsertStatement(tableName, fieldNames); |
55 | 54 | } |
56 | 55 |
|
57 | | - private Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldsName) { |
58 | | - String updateClause = Arrays.stream(fieldsName).map(f -> quoteIdentifier(f) |
59 | | - + "IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")") |
60 | | - .collect(Collectors.joining(",")); |
61 | | - return Optional.of(getInsertIntoStatement("", tableName, fieldsName, null) + |
| 56 | + public Optional<String> buildDuplicateUpsertStatement(String tableName, String[] fieldNames) { |
| 57 | + String updateClause = Arrays |
| 58 | + .stream(fieldNames) |
| 59 | + .map(f -> quoteIdentifier(f) + "=IFNULL(VALUES(" + quoteIdentifier(f) + ")," + quoteIdentifier(f) + ")") |
| 60 | + .collect(Collectors.joining(", ")); |
| 61 | + |
| 62 | + return Optional.of(getInsertIntoStatement("", tableName, fieldNames, null) + |
62 | 63 | " ON DUPLICATE KEY UPDATE " + updateClause |
63 | 64 | ); |
64 | 65 | } |
65 | 66 |
|
66 | | - private Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldsNames) { |
67 | | - String columns = Arrays.stream(fieldsNames) |
| 67 | + public Optional<String> buildReplaceIntoStatement(String tableName, String[] fieldNames) { |
| 68 | + String columns = Arrays |
| 69 | + .stream(fieldNames) |
68 | 70 | .map(this::quoteIdentifier) |
69 | | - .collect(Collectors.joining(",")); |
70 | | - String placeholders = Arrays.stream(fieldsNames) |
| 71 | + .collect(Collectors.joining(", ")); |
| 72 | + String placeholders = Arrays |
| 73 | + .stream(fieldNames) |
71 | 74 | .map(f -> "?") |
72 | | - .collect(Collectors.joining(",")); |
| 75 | + .collect(Collectors.joining(", ")); |
73 | 76 | return Optional.of("REPLACE INTO " + quoteIdentifier(tableName) + |
74 | | - "(" + columns + ") VALUES (" + placeholders + ")"); |
| 77 | + "(" + columns + ")" + " VALUES (" + placeholders + ")"); |
75 | 78 | } |
76 | 79 | } |
0 commit comments