1818
1919package com .dtstack .flink .sql .sink .kudu ;
2020
21+ import com .dtstack .flink .sql .factory .DTThreadFactory ;
2122import com .dtstack .flink .sql .outputformat .AbstractDtRichOutputFormat ;
23+ import com .dtstack .flink .sql .sink .kudu .table .KuduTableInfo ;
2224import com .dtstack .flink .sql .util .KrbUtils ;
2325import org .apache .flink .api .common .typeinfo .TypeInformation ;
2426import org .apache .flink .api .java .tuple .Tuple2 ;
3032import org .apache .kudu .client .KuduSession ;
3133import org .apache .kudu .client .KuduTable ;
3234import org .apache .kudu .client .Operation ;
35+ import org .apache .kudu .client .OperationResponse ;
3336import org .apache .kudu .client .PartialRow ;
37+ import org .apache .kudu .client .RowError ;
38+ import org .apache .kudu .client .SessionConfiguration ;
3439import org .slf4j .Logger ;
3540import org .slf4j .LoggerFactory ;
3641
4045import java .sql .Timestamp ;
4146import java .util .Date ;
4247import java .util .Objects ;
48+ import java .util .concurrent .ScheduledExecutorService ;
49+ import java .util .concurrent .ScheduledFuture ;
50+ import java .util .concurrent .ScheduledThreadPoolExecutor ;
51+ import java .util .concurrent .TimeUnit ;
52+ import java .util .concurrent .atomic .AtomicInteger ;
4353
4454/**
4555 * @author gituser
4656 * @modify xiuzhu
4757 */
48- public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 > {
58+ public class KuduOutputFormat extends AbstractDtRichOutputFormat <Tuple2 < Boolean , Row > > {
4959
5060 private static final long serialVersionUID = 1L ;
5161
@@ -66,15 +76,31 @@ public class KuduOutputFormat extends AbstractDtRichOutputFormat<Tuple2> {
6676
6777 private Integer defaultOperationTimeoutMs ;
6878
69- private Integer defaultSocketReadTimeoutMs ;
70-
7179 /**
7280 * kerberos
7381 */
7482 private String principal ;
7583 private String keytab ;
7684 private String krb5conf ;
7785
86+ /**
87+ * batch size
88+ */
89+ private Integer batchSize ;
90+ private Integer batchWaitInterval ;
91+ /**
92+ * kudu session flush mode
93+ */
94+ private String flushMode ;
95+
96+ private transient AtomicInteger rowCount ;
97+
98+ /**
99+ * 定时任务
100+ */
101+ private transient ScheduledExecutorService scheduler ;
102+ private transient ScheduledFuture <?> scheduledFuture ;
103+
78104 private KuduOutputFormat () {
79105 }
80106
@@ -91,16 +117,35 @@ public void configure(Configuration parameters) {
91117 public void open (int taskNumber , int numTasks ) throws IOException {
92118 establishConnection ();
93119 initMetric ();
120+ initSchedulerTask ();
121+ rowCount = new AtomicInteger (0 );
122+ }
123+
124+ /**
125+ * init the scheduler task of {@link KuduOutputFormat#flush()}
126+ */
127+ private void initSchedulerTask () {
128+ try {
129+ if (batchWaitInterval > 0 ) {
130+ this .scheduler = new ScheduledThreadPoolExecutor (
131+ 1 ,
132+ new DTThreadFactory ("kudu-batch-flusher" )
133+ );
134+
135+ this .scheduledFuture = this .scheduler .scheduleWithFixedDelay (
136+ this ::flush , batchWaitInterval , batchWaitInterval , TimeUnit .MILLISECONDS );
137+ }
138+ } catch (Exception e ) {
139+ LOG .error ("init schedule task failed !" );
140+ throw new RuntimeException (e );
141+ }
94142 }
95143
96144 private void establishConnection () throws IOException {
97145 KuduClient .KuduClientBuilder kuduClientBuilder = new KuduClient .KuduClientBuilder (kuduMasters );
98146 if (null != workerCount ) {
99147 kuduClientBuilder .workerCount (workerCount );
100148 }
101- if (null != defaultSocketReadTimeoutMs ) {
102- kuduClientBuilder .defaultSocketReadTimeoutMs (defaultSocketReadTimeoutMs );
103- }
104149
105150 if (null != defaultOperationTimeoutMs ) {
106151 kuduClientBuilder .defaultOperationTimeoutMs (defaultOperationTimeoutMs );
@@ -127,38 +172,113 @@ private void establishConnection() throws IOException {
127172 }
128173 LOG .info ("connect kudu is succeed!" );
129174
130- session = client .newSession ();
175+ session = buildSessionWithFlushMode (flushMode , client );
176+ }
177+
178+ /**
179+ * According to the different flush mode, build different session. Detail see {@link SessionConfiguration.FlushMode}
180+ *
181+ * @param flushMode flush mode
182+ * @param kuduClient kudu client
183+ * @return KuduSession with flush mode
184+ */
185+ private KuduSession buildSessionWithFlushMode (String flushMode , KuduClient kuduClient ) {
186+ KuduSession kuduSession = kuduClient .newSession ();
187+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .MANUAL_FLUSH .name ())) {
188+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .MANUAL_FLUSH );
189+ kuduSession .setMutationBufferSpace (
190+ Integer .parseInt (String .valueOf (Math .round (batchSize * 1.2 )))
191+ );
192+ }
193+
194+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_SYNC .name ())) {
195+ LOG .warn ("Parameter [batchSize] will not take effect at AUTO_FLUSH_SYNC mode." );
196+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC );
197+ }
198+
199+ if (flushMode .equalsIgnoreCase (KuduTableInfo .KuduFlushMode .AUTO_FLUSH_BACKGROUND .name ())) {
200+ LOG .warn ("Unable to determine the order of data at AUTO_FLUSH_BACKGROUND mode." );
201+ kuduSession .setFlushMode (SessionConfiguration .FlushMode .AUTO_FLUSH_BACKGROUND );
202+ }
203+
204+ return kuduSession ;
131205 }
132206
133207 @ Override
134- public void writeRecord (Tuple2 record ) throws IOException {
135- Tuple2 <Boolean , Row > tupleTrans = record ;
136- Boolean retract = tupleTrans .getField (0 );
208+ public void writeRecord (Tuple2 <Boolean , Row > record ) throws IOException {
209+ Boolean retract = record .getField (0 );
137210 if (!retract ) {
138211 return ;
139212 }
140- Row row = tupleTrans .getField (1 );
141- if (row .getArity () != fieldNames .length ) {
142- if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
143- LOG .error ("record insert failed ..{}" , row .toString ());
144- LOG .error ("cause by row.getArity() != fieldNames.length" );
145- }
146- outDirtyRecords .inc ();
147- return ;
148- }
213+ Row row = record .getField (1 );
149214
150215 try {
151216 if (outRecords .getCount () % ROW_PRINT_FREQUENCY == 0 ) {
152217 LOG .info ("Receive data : {}" , row );
153218 }
219+ if (rowCount .getAndIncrement () >= batchSize ) {
220+ flush ();
221+ }
222+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation, then get the response from kudu server.
223+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
224+ dealResponse (session .apply (toOperation (writeMode , row )));
225+ }
226+
154227 session .apply (toOperation (writeMode , row ));
155228 outRecords .inc ();
156229 } catch (KuduException e ) {
230+ throw new RuntimeException (e );
231+ }
232+ }
233+
234+ /**
235+ * Flush data with session, then deal the responses of operations and reset rowCount.
236+ * Detail of flush see {@link KuduSession#flush()}
237+ */
238+ private synchronized void flush () {
239+ try {
240+ if (session .isClosed ()) {
241+ throw new IllegalStateException ("Session is closed! Flush data error!" );
242+ }
243+
244+ // At AUTO_FLUSH_SYNC mode, kudu automatically flush once session apply operation
245+ if (flushMode .equalsIgnoreCase (SessionConfiguration .FlushMode .AUTO_FLUSH_SYNC .name ())) {
246+ return ;
247+ }
248+ session .flush ().forEach (this ::dealResponse );
249+ // clear
250+ rowCount .set (0 );
251+ } catch (KuduException kuduException ) {
252+ LOG .error ("flush data error!" , kuduException );
253+ throw new RuntimeException (kuduException );
254+ }
255+ }
256+
257+ /**
258+ * Deal response when operation apply.
259+ * At MANUAL_FLUSH mode, response returns after {@link KuduSession#flush()}.
260+ * But at AUTO_FLUSH_SYNC mode, response returns after {@link KuduSession#apply(Operation)}
261+ *
262+ * @param response {@link OperationResponse} response after operation done.
263+ */
264+ private void dealResponse (OperationResponse response ) {
265+ if (response .hasRowError ()) {
266+ RowError error = response .getRowError ();
267+ String errorMsg = error .getErrorStatus ().toString ();
157268 if (outDirtyRecords .getCount () % DIRTY_PRINT_FREQUENCY == 0 ) {
158- LOG .error ("record insert failed, total dirty record:{} current row:{}" , outDirtyRecords .getCount (), row .toString ());
159- LOG .error ("" , e );
269+ LOG .error (errorMsg );
270+ LOG .error (String .format ("Dirty data count: [%s]. Row data: [%s]" ,
271+ outDirtyRecords .getCount () + 1 , error .getOperation ().getRow ().toString ()));
160272 }
161273 outDirtyRecords .inc ();
274+
275+ if (error .getErrorStatus ().isNotFound ()
276+ || error .getErrorStatus ().isIOError ()
277+ || error .getErrorStatus ().isRuntimeError ()
278+ || error .getErrorStatus ().isServiceUnavailable ()
279+ || error .getErrorStatus ().isIllegalState ()) {
280+ throw new RuntimeException (errorMsg );
281+ }
162282 }
163283 }
164284
@@ -179,6 +299,14 @@ public void close() {
179299 throw new IllegalArgumentException ("[closeKuduClient]:" + e .getMessage ());
180300 }
181301 }
302+
303+ if (scheduledFuture != null ) {
304+ scheduledFuture .cancel (false );
305+ }
306+
307+ if (scheduler != null ) {
308+ scheduler .shutdownNow ();
309+ }
182310 }
183311
184312 private Operation toOperation (WriteMode writeMode , Row row ) {
@@ -320,11 +448,6 @@ public KuduOutputFormatBuilder setDefaultOperationTimeoutMs(Integer defaultOpera
320448 return this ;
321449 }
322450
323- public KuduOutputFormatBuilder setDefaultSocketReadTimeoutMs (Integer defaultSocketReadTimeoutMs ) {
324- kuduOutputFormat .defaultSocketReadTimeoutMs = defaultSocketReadTimeoutMs ;
325- return this ;
326- }
327-
328451 public KuduOutputFormatBuilder setPrincipal (String principal ) {
329452 kuduOutputFormat .principal = principal ;
330453 return this ;
@@ -345,6 +468,21 @@ public KuduOutputFormatBuilder setEnableKrb(boolean enableKrb) {
345468 return this ;
346469 }
347470
471+ public KuduOutputFormatBuilder setBatchSize (Integer batchSize ) {
472+ kuduOutputFormat .batchSize = batchSize ;
473+ return this ;
474+ }
475+
476+ public KuduOutputFormatBuilder setBatchWaitInterval (Integer batchWaitInterval ) {
477+ kuduOutputFormat .batchWaitInterval = batchWaitInterval ;
478+ return this ;
479+ }
480+
481+ public KuduOutputFormatBuilder setFlushMode (String flushMode ) {
482+ kuduOutputFormat .flushMode = flushMode ;
483+ return this ;
484+ }
485+
348486 public KuduOutputFormat finish () {
349487 if (kuduOutputFormat .kuduMasters == null ) {
350488 throw new IllegalArgumentException ("No kuduMasters supplied." );
0 commit comments