|
19 | 19 |
|
20 | 20 | package com.dtstack.flink.sql.table; |
21 | 21 |
|
| 22 | +import com.dtstack.flink.sql.side.AbstractSideTableInfo; |
22 | 23 | import com.dtstack.flink.sql.util.ClassUtil; |
23 | 24 | import com.dtstack.flink.sql.util.DtStringUtil; |
24 | 25 | import com.google.common.base.Preconditions; |
25 | 26 | import com.google.common.collect.Maps; |
26 | 27 | import org.apache.commons.lang3.StringUtils; |
27 | 28 | import org.apache.flink.api.java.tuple.Tuple2; |
28 | 29 |
|
| 30 | +import java.util.ArrayList; |
29 | 31 | import java.util.Arrays; |
30 | 32 | import java.util.List; |
31 | 33 | import java.util.Map; |
@@ -105,30 +107,50 @@ public void parseFieldsInfo(String fieldsInfo, AbstractTableInfo tableInfo) { |
105 | 107 | continue; |
106 | 108 | } |
107 | 109 |
|
108 | | - Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName()); |
109 | | - String fieldName = t.f0; |
110 | | - String fieldType = t.f1; |
| 110 | + handleKeyNotHaveAlias(fieldRow, tableInfo); |
| 111 | + } |
111 | 112 |
|
112 | | - Class fieldClass; |
113 | | - AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null; |
| 113 | + /* |
| 114 | + * check whether filed list contains pks and then add pks into field list. |
| 115 | + * because some no-sql database is not primary key. eg :redis、hbase etc... |
| 116 | + */ |
| 117 | + if (tableInfo instanceof AbstractSideTableInfo) { |
| 118 | + List<String> pks = new ArrayList<>(); |
| 119 | + tableInfo.getPrimaryKeys().stream().forEach(pk -> { |
| 120 | + if (!tableInfo.getFieldList().contains(pk)) { |
| 121 | + pks.add(String.format("%s varchar", pk)); |
| 122 | + } |
| 123 | + }); |
| 124 | + pks.stream().forEach(pk -> { |
| 125 | + handleKeyNotHaveAlias(pk, tableInfo); |
| 126 | + }); |
| 127 | + } |
114 | 128 |
|
115 | | - Matcher matcher = charTypePattern.matcher(fieldType); |
116 | | - if (matcher.find()) { |
117 | | - fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); |
118 | | - fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo(); |
119 | | - fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1))); |
120 | | - } else { |
121 | | - fieldClass = dbTypeConvertToJavaType(fieldType); |
122 | | - } |
| 129 | + tableInfo.finish(); |
| 130 | + } |
123 | 131 |
|
124 | | - tableInfo.addPhysicalMappings(fieldName, fieldName); |
125 | | - tableInfo.addField(fieldName); |
126 | | - tableInfo.addFieldClass(fieldClass); |
127 | | - tableInfo.addFieldType(fieldType); |
128 | | - tableInfo.addFieldExtraInfo(fieldExtraInfo); |
| 132 | + private void handleKeyNotHaveAlias(String fieldRow, AbstractTableInfo tableInfo) { |
| 133 | + Tuple2<String, String> t = extractType(fieldRow, tableInfo.getName()); |
| 134 | + String fieldName = t.f0; |
| 135 | + String fieldType = t.f1; |
| 136 | + |
| 137 | + Class fieldClass; |
| 138 | + AbstractTableInfo.FieldExtraInfo fieldExtraInfo = null; |
| 139 | + |
| 140 | + Matcher matcher = charTypePattern.matcher(fieldType); |
| 141 | + if (matcher.find()) { |
| 142 | + fieldClass = dbTypeConvertToJavaType(CHAR_TYPE_NO_LENGTH); |
| 143 | + fieldExtraInfo = new AbstractTableInfo.FieldExtraInfo(); |
| 144 | + fieldExtraInfo.setLength(Integer.parseInt(matcher.group(1))); |
| 145 | + } else { |
| 146 | + fieldClass = dbTypeConvertToJavaType(fieldType); |
129 | 147 | } |
130 | 148 |
|
131 | | - tableInfo.finish(); |
| 149 | + tableInfo.addPhysicalMappings(fieldName, fieldName); |
| 150 | + tableInfo.addField(fieldName); |
| 151 | + tableInfo.addFieldClass(fieldClass); |
| 152 | + tableInfo.addFieldType(fieldType); |
| 153 | + tableInfo.addFieldExtraInfo(fieldExtraInfo); |
132 | 154 | } |
133 | 155 |
|
134 | 156 | private Tuple2<String, String> extractType(String fieldRow, String tableName) { |
|
0 commit comments