2323import org .apache .flink .api .java .tuple .Tuple2 ;
2424import org .apache .flink .configuration .Configuration ;
2525import org .apache .flink .types .Row ;
26- import org .apache .kudu .client .AsyncKuduClient ;
27- import org .apache .kudu .client .AsyncKuduSession ;
28- import org .apache .kudu .client .KuduClient ;
29- import org .apache .kudu .client .KuduException ;
30- import org .apache .kudu .client .KuduTable ;
31- import org .apache .kudu .client .Operation ;
32- import org .apache .kudu .client .PartialRow ;
26+ import org .apache .kudu .client .*;
3327import org .slf4j .Logger ;
3428import org .slf4j .LoggerFactory ;
3529
@@ -123,7 +117,7 @@ public void writeRecord(Tuple2 record) throws IOException {
123117 Row row = tupleTrans .getField (1 );
124118 if (row .getArity () != fieldNames .length ) {
125119 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
126- LOG .error ("record insert failed .. {}" , row .toString ());
120+ LOG .error ("record insert failed: {}" , row .toString ());
127121 LOG .error ("cause by row.getArity() != fieldNames.length" );
128122 }
129123 outDirtyRecords .inc ();
@@ -142,7 +136,7 @@ public void writeRecord(Tuple2 record) throws IOException {
142136 outRecords .inc ();
143137 } catch (KuduException e ) {
144138 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ){
145- LOG .error ("record insert failed ..{} " , row . toString (). substring ( 0 , 100 ));
139+ LOG .error ("record insert failed, total dirty record:{} current row:{} " , outDirtyRecords . getCount (), row . toString ( ));
146140 LOG .error ("" , e );
147141 }
148142 outDirtyRecords .inc ();
0 commit comments