4545import org .apache .calcite .sql .parser .SqlParseException ;
4646import org .apache .calcite .sql .parser .SqlParserPos ;
4747import org .apache .commons .collections .CollectionUtils ;
48- import org .apache .flink .api .common .typeinfo .SqlTimeTypeInfo ;
4948import org .apache .flink .api .common .typeinfo .TypeInformation ;
5049import org .apache .flink .api .java .tuple .Tuple2 ;
5150import org .apache .flink .api .java .typeutils .RowTypeInfo ;
5453import com .google .common .collect .Maps ;
5554import org .apache .flink .streaming .api .datastream .DataStream ;
5655import org .apache .flink .table .api .Table ;
57- import org .apache .flink .table .api .TableEnvironment ;
5856import org .apache .flink .table .api .java .StreamTableEnvironment ;
5957import org .apache .flink .table .typeutils .TimeIndicatorTypeInfo ;
6058import org .apache .flink .types .Row ;
6159import org .slf4j .Logger ;
6260import org .slf4j .LoggerFactory ;
6361import java .sql .Timestamp ;
62+ import java .time .LocalDateTime ;
63+ import java .util .Arrays ;
6464import java .util .Collection ;
6565import java .util .LinkedList ;
6666import java .util .List ;
@@ -128,7 +128,6 @@ public void exec(String sql, Map<String, SideTableInfo> sideTableMap, StreamTabl
128128 System .out .println ("----------real exec sql-----------" );
129129 System .out .println (pollSqlNode .toString ());
130130 FlinkSQLExec .sqlUpdate (tableEnv , pollSqlNode .toString ());
131- // tableEnv.sqlUpdate(pollSqlNode.toString());
132131 if (LOG .isInfoEnabled ()){
133132 LOG .info ("exec sql: " + pollSqlNode .toString ());
134133 }
@@ -160,15 +159,12 @@ private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, H
160159 SqlNode source = ((SqlInsert ) pollSqlNode ).getSource ();
161160 addAliasForFieldNode (source , fieldList , mappingTable );
162161 break ;
163-
164162 case AS :
165163 addAliasForFieldNode (((SqlBasicCall ) pollSqlNode ).getOperands ()[0 ], fieldList , mappingTable );
166164 break ;
167165
168166 case SELECT :
169-
170167 SqlNodeList selectList = ((SqlSelect ) pollSqlNode ).getSelectList ();
171-
172168 selectList .getList ().forEach (node -> {
173169 if (node .getKind () == IDENTIFIER ) {
174170 SqlIdentifier sqlIdentifier = (SqlIdentifier ) node ;
@@ -183,15 +179,13 @@ private void addAliasForFieldNode(SqlNode pollSqlNode, List<String> fieldList, H
183179
184180 }
185181 });
186-
187182 for (int i = 0 ; i < selectList .getList ().size (); i ++) {
188183 SqlNode node = selectList .get (i );
189184 if (node .getKind () == IDENTIFIER ) {
190185 SqlIdentifier sqlIdentifier = (SqlIdentifier ) node ;
191186 if (sqlIdentifier .names .size () == 1 ) {
192187 return ;
193188 }
194-
195189 String name = sqlIdentifier .names .get (1 );
196190 // avoid real field pv0 convert pv
197191 if (name .endsWith ("0" ) && !fieldList .contains (name ) && !fieldList .contains (name .substring (0 , name .length () - 1 ))) {
@@ -268,7 +262,7 @@ private RowTypeInfo buildLeftTableOutType(RowTypeInfo leftTypeInfo) {
268262
269263 private TypeInformation convertTimeAttributeType (TypeInformation typeInformation ) {
270264 if (typeInformation instanceof TimeIndicatorTypeInfo ) {
271- return TypeInformation .of (Timestamp .class );
265+ return TypeInformation .of (LocalDateTime .class );
272266 }
273267 return typeInformation ;
274268 }
0 commit comments