1414
1515import json
1616import logging
17- import os
1817import threading
1918import time
2019
2120import requests
2221from azure .core .exceptions import ClientAuthenticationError
2322from azure .identity ._exceptions import CredentialUnavailableError
2423
24+ from opencensus .ext .azure .statsbeat import state
25+
2526try :
2627 from urllib .parse import urlparse
2728except ImportError :
3435_MONITOR_OAUTH_SCOPE = "https://monitor.azure.com//.default"
3536_requests_lock = threading .Lock ()
3637_requests_map = {}
38+ _REACHED_INGESTION_STATUS_CODES = (200 , 206 , 402 , 408 , 429 , 439 , 500 )
3739
3840
3941class TransportMixin (object ):
4042
43+ # check to see if collecting requests information related to statsbeats
4144 def _check_stats_collection (self ):
42- return not os .environ .get ("APPLICATIONINSIGHTS_STATSBEAT_DISABLED_ALL" ) and (not hasattr (self , '_is_stats' ) or not self ._is_stats ) # noqa: E501
45+ return state .is_statsbeat_enabled () and \
46+ not state .get_statsbeat_shutdown () and \
47+ not self ._is_stats_exporter ()
4348
49+ # check if the current exporter is a statsbeat metric exporter
50+ # only applies to metrics exporter
4451 def _is_stats_exporter (self ):
4552 return hasattr (self , '_is_stats' ) and self ._is_stats
4653
@@ -128,7 +135,13 @@ def _transmit(self, envelopes):
128135 _requests_map ['retry' ] = _requests_map .get ('retry' , 0 ) + 1 # noqa: E501
129136 else :
130137 _requests_map ['exception' ] = _requests_map .get ('exception' , 0 ) + 1 # noqa: E501
131-
138+ if self ._is_stats_exporter () and \
139+ not state .get_statsbeat_shutdown () and \
140+ not state .get_statsbeat_initial_success ():
141+ # If ingestion threshold during statsbeat initialization is
142+ # reached, return back code to shut it down
143+ if _statsbeat_failure_reached_threshold ():
144+ return - 2
132145 return exception
133146
134147 text = 'N/A'
@@ -143,6 +156,19 @@ def _transmit(self, envelopes):
143156 data = json .loads (text )
144157 except Exception :
145158 pass
159+
160+ if self ._is_stats_exporter () and \
161+ not state .get_statsbeat_shutdown () and \
162+ not state .get_statsbeat_initial_success ():
163+ # If statsbeat exporter, record initialization as success if
164+ # appropriate status code is returned
165+ if _reached_ingestion_status_code (response .status_code ):
166+ state .set_statsbeat_initial_success (True )
167+ elif _statsbeat_failure_reached_threshold ():
168+ # If ingestion threshold during statsbeat initialization is
169+ # reached, return back code to shut it down
170+ return - 2
171+
146172 if response .status_code == 200 :
147173 self ._consecutive_redirects = 0
148174 if self ._check_stats_collection ():
@@ -271,3 +297,13 @@ def _transmit(self, envelopes):
271297 with _requests_lock :
272298 _requests_map ['throttle' ] = _requests_map .get ('throttle' , 0 ) + 1 # noqa: E501
273299 return - response .status_code
300+
301+
302+ def _reached_ingestion_status_code (status_code ):
303+ return status_code in _REACHED_INGESTION_STATUS_CODES
304+
305+
306+ def _statsbeat_failure_reached_threshold ():
307+ # increment failure counter for sending statsbeat if in initialization
308+ state .increment_statsbeat_initial_failure_count ()
309+ return state .get_statsbeat_initial_failure_count () >= 3
0 commit comments