2020
2121import com .dtstack .flink .sql .dirtyManager .manager .DirtyDataManager ;
2222import com .dtstack .flink .sql .format .DeserializationMetricWrapper ;
23+ import com .dtstack .flink .sql .util .ReflectionUtils ;
2324import org .apache .flink .api .common .serialization .DeserializationSchema ;
2425import org .apache .flink .api .common .typeinfo .TypeInformation ;
2526import org .apache .flink .metrics .Gauge ;
2627import org .apache .flink .metrics .MetricGroup ;
2728import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
2829import org .apache .flink .streaming .connectors .kafka .internals .AbstractFetcher ;
30+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
31+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartitionState ;
2932import org .apache .flink .types .Row ;
3033import org .apache .kafka .clients .consumer .KafkaConsumer ;
3134import org .apache .kafka .clients .consumer .internals .SubscriptionState ;
3235import org .apache .kafka .common .TopicPartition ;
3336import org .slf4j .Logger ;
3437import org .slf4j .LoggerFactory ;
3538
36- import java .io .IOException ;
3739import java .lang .reflect .Field ;
38- import java .util .Set ;
39- import java .util .concurrent .atomic .AtomicBoolean ;
40+ import java .util .List ;
4041
4142import static com .dtstack .flink .sql .metric .MetricConstant .DT_PARTITION_GROUP ;
4243import static com .dtstack .flink .sql .metric .MetricConstant .DT_TOPIC_GROUP ;
@@ -53,10 +54,6 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
5354
5455 private static final Logger LOG = LoggerFactory .getLogger (KafkaDeserializationMetricWrapper .class );
5556
56- private AbstractFetcher <Row , ?> fetcher ;
57-
58- private AtomicBoolean firstMsg = new AtomicBoolean (true );
59-
6057 private Calculate calculate ;
6158
6259 public KafkaDeserializationMetricWrapper (
@@ -68,66 +65,82 @@ public KafkaDeserializationMetricWrapper(
6865 this .calculate = calculate ;
6966 }
7067
71- @ Override
72- protected void beforeDeserialize () throws IOException {
73- super .beforeDeserialize ();
74- if (firstMsg .compareAndSet (true , false )) {
75- try {
76- registerPtMetric (fetcher );
77- } catch (Exception e ) {
78- LOG .error ("register topic partition metric error." , e );
79- }
80- }
81- }
82-
8368 protected void registerPtMetric (AbstractFetcher <Row , ?> fetcher ) throws Exception {
84- Field consumerThreadField = getConsumerThreadField (fetcher );
69+ Field consumerThreadField = ReflectionUtils . getDeclaredField (fetcher , "consumerThread" );
8570 consumerThreadField .setAccessible (true );
8671 KafkaConsumerThread consumerThread = (KafkaConsumerThread ) consumerThreadField .get (fetcher );
8772
8873 Field hasAssignedPartitionsField = consumerThread .getClass ().getDeclaredField ("hasAssignedPartitions" );
8974 hasAssignedPartitionsField .setAccessible (true );
9075
91- //wait until assignedPartitions
92-
93- boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
76+ // get subtask unassigned kafka topic partition
77+ Field subscribedPartitionStatesField = ReflectionUtils .getDeclaredField (fetcher , "subscribedPartitionStates" );
78+ subscribedPartitionStatesField .setAccessible (true );
79+ List <KafkaTopicPartitionState <KafkaTopicPartition >> subscribedPartitionStates = (List <KafkaTopicPartitionState <KafkaTopicPartition >>) subscribedPartitionStatesField .get (fetcher );
9480
95- if (!hasAssignedPartitions ) {
96- throw new RuntimeException ("wait 50 secs, but not assignedPartitions" );
97- }
81+ // init partition lag metric
82+ for (KafkaTopicPartitionState <KafkaTopicPartition > kafkaTopicPartitionState : subscribedPartitionStates ) {
83+ KafkaTopicPartition kafkaTopicPartition = kafkaTopicPartitionState .getKafkaTopicPartition ();
84+ MetricGroup topicMetricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , kafkaTopicPartition .getTopic ());
9885
99- Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
100- consumerField .setAccessible (true );
101-
102- KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
103- Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
104- subscriptionStateField .setAccessible (true );
105-
106- //topic partitions lag
107- SubscriptionState subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
108- Set <TopicPartition > assignedPartitions = subscriptionState .assignedPartitions ();
109-
110- for (TopicPartition topicPartition : assignedPartitions ) {
111- MetricGroup metricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , topicPartition .topic ())
112- .addGroup (DT_PARTITION_GROUP , topicPartition .partition () + "" );
86+ MetricGroup metricGroup = topicMetricGroup .addGroup (DT_PARTITION_GROUP , kafkaTopicPartition .getPartition () + "" );
11387 metricGroup .gauge (DT_TOPIC_PARTITION_LAG_GAUGE , new Gauge <Long >() {
88+ // tmp variable
89+ boolean initLag = true ;
90+ int partitionIndex ;
91+ SubscriptionState subscriptionState ;
92+ TopicPartition topicPartition ;
93+
11494 @ Override
11595 public Long getValue () {
116- return calculate .calc (subscriptionState , topicPartition );
96+ // first time register metrics
97+ if (initLag ) {
98+ partitionIndex = kafkaTopicPartition .getPartition ();
99+ initLag = false ;
100+ return -1L ;
101+ }
102+ // when kafka topic partition assigned calc metrics
103+ if (subscriptionState == null ) {
104+ try {
105+ Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
106+ consumerField .setAccessible (true );
107+
108+ KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
109+ Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
110+ subscriptionStateField .setAccessible (true );
111+
112+ boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
113+
114+ if (!hasAssignedPartitions ) {
115+ LOG .error ("wait 50 secs, but not assignedPartitions" );
116+ }
117+
118+ subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
119+
120+ topicPartition = subscriptionState
121+ .assignedPartitions ()
122+ .stream ()
123+ .filter (x -> x .partition () == partitionIndex )
124+ .findFirst ()
125+ .get ();
126+
127+ } catch (Exception e ) {
128+ LOG .error (e .getMessage ());
129+ }
130+ return -1L ;
131+ } else {
132+ return calculate .calc (subscriptionState , topicPartition );
133+ }
117134 }
118135 });
119136 }
120137 }
121138
122139 public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
123- this .fetcher = fetcher ;
124- }
125-
126- private Field getConsumerThreadField (AbstractFetcher fetcher ) throws NoSuchFieldException {
127140 try {
128- return fetcher . getClass (). getDeclaredField ( "consumerThread" );
141+ registerPtMetric ( fetcher );
129142 } catch (Exception e ) {
130- return fetcher . getClass (). getSuperclass (). getDeclaredField ( "consumerThread" );
143+ LOG . error ( "register topic partition metric error." , e );
131144 }
132145 }
133146}
0 commit comments