|
23 | 23 |
|
24 | 24 | import com.dtstack.flink.sql.table.AbsTableParser; |
25 | 25 | import com.dtstack.flink.sql.table.TableInfo; |
| 26 | +import com.dtstack.flink.sql.util.DtStringUtil; |
26 | 27 | import com.dtstack.flink.sql.util.MathUtil; |
27 | 28 |
|
| 29 | +import java.util.LinkedHashMap; |
| 30 | +import java.util.List; |
28 | 31 | import java.util.Map; |
29 | 32 |
|
30 | 33 | import static com.dtstack.flink.sql.table.TableInfo.PARALLELISM_KEY; |
@@ -65,4 +68,38 @@ public TableInfo getTableInfo(String tableName, String fieldsInfo, Map<String, O |
65 | 68 | hbaseTableInfo.setRowkey(rk.split(",")); |
66 | 69 | return hbaseTableInfo; |
67 | 70 | } |
| 71 | + |
| 72 | + public void parseFieldsInfo(String fieldsInfo, HbaseTableInfo tableInfo){ |
| 73 | + List<String> fieldRows = DtStringUtil.splitIgnoreQuota(fieldsInfo, ','); |
| 74 | + Map<String, String> columnFamilies = new LinkedHashMap<>(); |
| 75 | + for(String fieldRow : fieldRows){ |
| 76 | + fieldRow = fieldRow.trim(); |
| 77 | + |
| 78 | + String[] filedInfoArr = fieldRow.split("\\s+"); |
| 79 | + if(filedInfoArr.length < 2 ){ |
| 80 | + throw new RuntimeException(String.format("table [%s] field [%s] format error.", tableInfo.getName(), fieldRow)); |
| 81 | + } |
| 82 | + |
| 83 | + boolean isMatcherKey = dealKeyPattern(fieldRow, tableInfo); |
| 84 | + if(isMatcherKey){ |
| 85 | + continue; |
| 86 | + } |
| 87 | + |
| 88 | + //Compatible situation may arise in space in the fieldName |
| 89 | + String[] filedNameArr = new String[filedInfoArr.length - 1]; |
| 90 | + System.arraycopy(filedInfoArr, 0, filedNameArr, 0, filedInfoArr.length - 1); |
| 91 | + String fieldName = String.join(" ", filedNameArr); |
| 92 | + String fieldType = filedInfoArr[filedInfoArr.length - 1 ].trim(); |
| 93 | + Class fieldClass = dbTypeConvertToJavaType(fieldType); |
| 94 | + String[] columnFamily = fieldName.trim().split(":"); |
| 95 | + columnFamilies.put(fieldName.trim(),columnFamily[1]); |
| 96 | + tableInfo.addPhysicalMappings(filedInfoArr[0],filedInfoArr[0]); |
| 97 | + tableInfo.addField(columnFamily[1]); |
| 98 | + tableInfo.addFieldClass(fieldClass); |
| 99 | + tableInfo.addFieldType(fieldType); |
| 100 | + tableInfo.addFieldExtraInfo(null); |
| 101 | + } |
| 102 | + tableInfo.setColumnNameFamily(columnFamilies); |
| 103 | + tableInfo.finish(); |
| 104 | + } |
68 | 105 | } |
0 commit comments