4747import java .util .concurrent .ScheduledFuture ;
4848import java .util .concurrent .ScheduledThreadPoolExecutor ;
4949import java .util .concurrent .TimeUnit ;
50+ import java .util .concurrent .atomic .AtomicInteger ;
5051import java .util .regex .Matcher ;
5152import java .util .regex .Pattern ;
5253import java .util .stream .Collectors ;
@@ -72,18 +73,17 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
7273 private static final int RECEIVE_DATA_PRINT_FREQUENCY = 1000 ;
7374
7475 private static final String KUDU_TYPE = "kudu" ;
75- private static final String DYNAMIC_MODE = "dynamic" ;
76- private static final String STATIC_MODE = "static" ;
7776 private static final String UPDATE_MODE = "update" ;
7877 private static final String PARTITION_CONSTANT = "PARTITION" ;
7978 private static final String STRING_TYPE = "STRING" ;
79+ private static final String PARTITION_CONDITION = "$partitionCondition" ;
8080 private static final String DRIVER_NAME = "com.cloudera.impala.jdbc41.Driver" ;
8181
8282 protected transient Connection connection ;
8383 protected transient PreparedStatement statement ;
8484
8585 private transient volatile boolean closed = false ;
86- private transient int batchCount = 0 ;
86+ private final AtomicInteger batchCount = new AtomicInteger ( 0 ) ;
8787
8888 protected String keytabPath ;
8989 protected String krb5confPath ;
@@ -109,14 +109,12 @@ public class ImpalaOutputFormat extends AbstractDtRichOutputFormat<Tuple2<Boolea
109109 // partition field of static partition which matched by ${field}
110110 private List <String > staticPartitionField = new ArrayList <>();
111111
112+ // static partition sql like 'INSERT INTO tableName(field1, field2) PARTITION(pt=xx) VALUES(?, ?)'
113+ private String staticPartitionSql ;
114+
112115 private transient ScheduledExecutorService scheduler ;
113116 private transient ScheduledFuture scheduledFuture ;
114117
115- /**
116- * 具体需要执行的Sql列表
117- */
118- List <String > executeSql = new ArrayList <>();
119-
120118 @ Override
121119 public void configure (Configuration parameters ) {
122120 }
@@ -194,8 +192,7 @@ private void openJdbc() {
194192 }
195193 // static
196194 if (enablePartition && !staticPartitionField .isEmpty ()) {
197-
198- return ;
195+ staticPartitionSql = buildStaticInsertSql (schema , tableName , fieldList , fieldTypeList , partitionFields );
199196 }
200197
201198 } catch (SQLException sqlException ) {
@@ -204,7 +201,10 @@ private void openJdbc() {
204201 }
205202
206203 private void flush () throws SQLException {
207- statement .executeBatch ();
204+ if (Objects .nonNull (statement )) {
205+ statement .executeBatch ();
206+ batchCount .set (0 );
207+ }
208208 }
209209
210210 /**
@@ -234,10 +234,23 @@ public void writeRecord(Tuple2<Boolean, Row> record) throws IOException {
234234 valueMap .put (fieldList .get (i ), copyRow .getField (i ));
235235 }
236236
237- setRowToStatement (statement , fieldTypeList , copyRow );
237+ // build static partition statement from row data
238+ if (!staticPartitionSql .isEmpty ()) {
239+ statement = connection .prepareStatement (
240+ staticPartitionSql .replace (PARTITION_CONDITION ,
241+ buildStaticPartitionCondition (valueMap , staticPartitionField ))
242+ );
243+ }
244+
245+ Row rowValue = new Row (fieldTypeList .size ());
246+ for (int i = 0 ; i < fieldTypeList .size (); i ++) {
247+ rowValue .setField (i , copyRow .getField (i ));
248+ }
249+
250+ setRowToStatement (statement , fieldTypeList , rowValue );
238251 statement .addBatch ();
239252
240- if (batchCount ++ > batchSize ) {
253+ if (batchCount . incrementAndGet () > batchSize ) {
241254 flush ();
242255 }
243256 } catch (Exception e ) {
@@ -260,7 +273,7 @@ public void close() throws IOException {
260273 this .scheduler .shutdown ();
261274 }
262275 // 将还未执行的SQL flush
263- if (! executeSql . isEmpty () ) {
276+ if (batchCount . get () > 0 ) {
264277 try {
265278 flush ();
266279 } catch (Exception e ) {
@@ -310,22 +323,61 @@ private String buildKuduInsertSql(String schema,
310323 "(" + columns + ")" + " VALUES (" + placeholders + ")" ;
311324 }
312325
326+ /**
327+ * get static partition value from rowData and build static partition condition
328+ *
329+ * @param rowData the row data
330+ * @param staticPartitionField static partition field
331+ * @return partition condition like pt1=v1, pt2=v2
332+ */
333+ private String buildStaticPartitionCondition (Map <String , Object > rowData , List <String > staticPartitionField ) {
334+ StringBuilder sb = new StringBuilder ();
335+ for (String key : staticPartitionField ) {
336+ Object value = rowData .get (key );
337+ sb .append (key ).append ("=" ).append (value );
338+ }
339+ return sb .toString ();
340+ }
341+
313342 /**
314343 * impala 通过静态分区的方式写入数据 SQL
315- * 通过解析${pt}占位符的方式来做 pt=%s 替换,具体的分区值通过数据获取
316344 *
317- * @return INSERT INTO tableName(field1, field2) PARTITION(pt=%s ) VALUES (?, ?)
345+ * @return INSERT INTO tableName(field1, field2) PARTITION($partitionCondition ) VALUES (?, ?)
318346 */
319347 private String buildStaticInsertSql (String schema , String tableName , List <String > fieldNames , List <String > fieldTypes , String partitionFields ) {
320- return "insert into table demoTwo(id, name) partition(year=2018) values ('001','张三');" ;
348+ List <String > copyFieldNames = new ArrayList <>(fieldNames );
349+ for (int i = fieldNames .size () - 1 ; i >= 0 ; i --) {
350+ if (partitionFields .contains (fieldNames .get (i ))) {
351+ copyFieldNames .remove (i );
352+ fieldTypes .remove (i );
353+ }
354+ }
355+
356+ String placeholders = fieldTypes .stream ().map (
357+ f -> {
358+ if (STRING_TYPE .equals (f .toUpperCase ())) {
359+ return "cast(? as string)" ;
360+ }
361+ return "?" ;
362+ }).collect (Collectors .joining (", " ));
363+
364+ String columns = copyFieldNames .stream ()
365+ .map (this ::quoteIdentifier )
366+ .collect (Collectors .joining (", " ));
367+
368+ String partitionCondition = PARTITION_CONSTANT + "(" + PARTITION_CONDITION + ")" ;
369+
370+ return "INSERT INTO " + (Objects .isNull (schema ) ? "" : quoteIdentifier (schema ) + "." ) + quoteIdentifier (tableName ) +
371+ " (" + columns + ") " + partitionCondition + " VALUES (" + placeholders + ")" ;
321372 }
322373
323374 /**
324375 * impala 通过动态分区的方式写入数据 SQL
325376 *
326377 * @return INSERT INTO tableName(field1, field2) PARTITION(pt) VALUES (?, ?)
327378 */
328- private String buildDynamicInsertSql (String schema , String tableName , List <String > fieldList , List <String > fieldTypes , String partitionFields ) {
379+ private String buildDynamicInsertSql (String schema , String tableName , List <String > fieldName , List <String > fieldTypes , String partitionFields ) {
380+
329381 String placeholders = fieldTypes .stream ().map (
330382 f -> {
331383 if (STRING_TYPE .equals (f .toUpperCase ())) {
@@ -334,7 +386,7 @@ private String buildDynamicInsertSql(String schema, String tableName, List<Strin
334386 return "?" ;
335387 }).collect (Collectors .joining (", " ));
336388
337- String columns = fieldList .stream ()
389+ String columns = fieldName .stream ()
338390 .filter (f -> !partitionFields .contains (f ))
339391 .map (this ::quoteIdentifier )
340392 .collect (Collectors .joining (", " ));
@@ -350,7 +402,7 @@ private String buildDynamicInsertSql(String schema, String tableName, List<Strin
350402 *
351403 * @return UPDATE tableName SET setCondition WHERE whereCondition
352404 */
353- private String buildUpdateSql (String schema , String tableName , String [] fieldNames , String [] partitionFields ) {
405+ private String buildUpdateSql (String schema , String tableName , String [] fieldNames , String [] partitionFields , Map < String , Object > replaceField ) {
354406 return "update tableName set name='王五' where id='003" ;
355407 }
356408
0 commit comments