@@ -62,63 +62,7 @@ public void open(Configuration configuration) {
6262
6363 schema .setCounter (counter );
6464
65- if (null != flinkKafkaPartitioner ) {
66- if (flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner ) {
67- ((FlinkKafkaDelegatePartitioner ) flinkKafkaPartitioner ).setPartitions (
68- getPartitionsByTopic (this .defaultTopicId , this .producer ));
69- }
70- flinkKafkaPartitioner .open (ctx .getIndexOfThisSubtask (), ctx .getNumberOfParallelSubtasks ());
71- }
72-
73-
74- // register Kafka metrics to Flink accumulators
75- if (!Boolean .parseBoolean (producerConfig .getProperty (KEY_DISABLE_METRICS , "false" ))) {
76- Map <MetricName , ? extends Metric > metrics = this .producer .metrics ();
77-
78- if (metrics == null ) {
79- // MapR's Kafka implementation returns null here.
80- } else {
81- final MetricGroup kafkaMetricGroup = getRuntimeContext ().getMetricGroup ().addGroup ("KafkaProducer" );
82- for (Map .Entry <MetricName , ? extends Metric > metric : metrics .entrySet ()) {
83- kafkaMetricGroup .gauge (metric .getKey ().name (), new KafkaMetricWrapper (metric .getValue ()));
84- }
85- }
86- }
87-
88- if (flushOnCheckpoint && !((StreamingRuntimeContext ) this .getRuntimeContext ()).isCheckpointingEnabled ()) {
89- flushOnCheckpoint = false ;
90- }
91-
92- if (logFailuresOnly ) {
93- callback = new Callback () {
94- @ Override
95- public void onCompletion (RecordMetadata metadata , Exception e ) {
96- if (e != null ) {
97- }
98- acknowledgeMessage ();
99- }
100- };
101- } else {
102- callback = new Callback () {
103- @ Override
104- public void onCompletion (RecordMetadata metadata , Exception exception ) {
105- if (exception != null && asyncException == null ) {
106- asyncException = exception ;
107- }
108- acknowledgeMessage ();
109- }
110- };
111- }
65+ super .open (configuration );
11266 }
11367
114- private void acknowledgeMessage () {
115- if (flushOnCheckpoint ) {
116- synchronized (pendingRecordsLock ) {
117- pendingRecords --;
118- if (pendingRecords == 0 ) {
119- pendingRecordsLock .notifyAll ();
120- }
121- }
122- }
123- }
12468}
0 commit comments