1- import gc
2- import json
3- import os
4- import subprocess
5- import sys
61import textwrap
2+ import time
73import unittest
84
95from test .support import (
10- import_helper ,
11- SHORT_TIMEOUT ,
126 requires_gil_enabled ,
137 requires_remote_subprocess_debugging ,
148)
15-
16- PROCESS_VM_READV_SUPPORTED = False
9+ from test .test_profiling .test_sampling_profiler .helpers import test_subprocess
1710
1811try :
19- from _remote_debugging import PROCESS_VM_READV_SUPPORTED
12+ import _remote_debugging # noqa: F401
2013except ImportError :
2114 raise unittest .SkipTest (
2215 "Test only runs when _remote_debugging is available"
2316 )
2417
18+
2519def get_interpreter_identifiers (gc_stats : tuple [dict [str , str | int | float ]]) -> tuple [str ,...]:
2620 return tuple (sorted ({s ["iid" ] for s in gc_stats }))
2721
@@ -45,72 +39,90 @@ def get_last_item(gc_stats: tuple[dict[str, str|int|float]],
4539
4640 return item
4741
48- skip_if_not_supported = unittest .skipIf (
49- (
50- sys .platform != "darwin"
51- and sys .platform != "linux"
52- and sys .platform != "win32"
53- ),
54- "Test only runs on Linux, Windows and MacOS" ,
55- )
56-
5742
5843@requires_gil_enabled ()
5944@requires_remote_subprocess_debugging ()
6045class TestGetGCStats (unittest .TestCase ):
6146
62- def _run_child_process (self , all_interpreters ):
63- # Run the test in a subprocess to avoid side effects
64- script = textwrap .dedent (f"""\
65- import json
66- import os
67- import sys
68- import _remote_debugging
69- try:
70- from _remote_debugging import PROCESS_VM_READV_SUPPORTED
71- supported = True
72- except ImportError:
73- supported = False
74-
75- if supported:
76- pid = int(sys.argv[1])
77- gc_stats = _remote_debugging.get_gc_stats(pid, all_interpreters={ all_interpreters } )
78- print(json.dumps(gc_stats, indent=1))
79- else:
80- print(json.dumps(dict([("error", "not supported")])))
81- """ )
82-
83- gc .collect (0 )
84- gc .collect (1 )
85- gc .collect (2 )
86-
87- result = subprocess .run (
88- [sys .executable , "-c" , script , str (os .getpid ())],
89- capture_output = True ,
90- text = True ,
91- timeout = SHORT_TIMEOUT ,
92- )
93- self .assertEqual (
94- result .returncode , 0 ,
95- f"stdout: { result .stdout } \n stderr: { result .stderr } "
96- )
97- data = json .loads (result .stdout )
98- if isinstance (data , dict ) and "error" in data :
99- if sys .platform == "linux" :
100- self .skipTest ("Testing on Linux requires process_vm_readv support" )
101- else :
102- self .assertTrue (False , f"Unexpected error: { data } " )
103- return data
104-
105- def _run_in_interpreter (self , interp ):
106- source = f"""if True:
107- import gc
108-
109- gc.collect(0)
110- gc.collect(1)
111- gc.collect(2)
112- """
113- interp .exec (source )
47+ @classmethod
48+ def setUpClass (cls ):
49+ cls ._main_iid = 0 # main interpreter ID
50+ cls ._only_main_interpreter_script = '''
51+ import gc
52+ import time
53+
54+ gc.collect(0)
55+ gc.collect(1)
56+ gc.collect(2)
57+
58+ _test_sock.sendall(b"working")
59+
60+ objects = []
61+ while True:
62+ if len(objects) > 100:
63+ objects = []
64+
65+ # objects that GC will visit should increase
66+ objects.append(object())
67+
68+ time.sleep(0.1)
69+ gc.collect(0)
70+ gc.collect(1)
71+ gc.collect(2)
72+ '''
73+ cls ._subinterpreters_script = '''
74+ import concurrent.interpreters as interpreters
75+ import gc
76+ import time
77+
78+ source = """if True:
79+ import gc
80+
81+ gc.collect(0)
82+ gc.collect(1)
83+ gc.collect(2)
84+ """
85+
86+ interp = interpreters.create()
87+ interp.exec(source)
88+
89+ gc.collect(0)
90+ gc.collect(1)
91+ gc.collect(2)
92+
93+ _test_sock.sendall(b"working")
94+ objects = []
95+ while True:
96+ if len(objects) > 100:
97+ objects = []
98+
99+ # objects that GC will visit should increase
100+ objects.append(object())
101+
102+ time.sleep(0.1)
103+ interp.exec(source)
104+ gc.collect(0)
105+ gc.collect(1)
106+ gc.collect(2)
107+ '''
108+
109+ def _collect_gc_stats (self , script :str , all_interpreters :bool ):
110+ get_gc_stats = _remote_debugging .get_gc_stats
111+ with (
112+ test_subprocess (script , wait_for_working = True ) as subproc
113+ ):
114+ before_stats = get_gc_stats (subproc .process .pid ,
115+ all_interpreters = all_interpreters )
116+ before = get_last_item (before_stats , 2 , self ._main_iid )
117+ for _ in range (10 ):
118+ time .sleep (0.5 )
119+ after_stats = get_gc_stats (subproc .process .pid ,
120+ all_interpreters = all_interpreters )
121+ after = get_last_item (after_stats , 2 , self ._main_iid )
122+ if after ["ts_stop" ] > before ["ts_stop" ]:
123+ break
124+
125+ return before_stats , after_stats
114126
115127 def _check_gc_state (self , before , after ):
116128 self .assertIsNotNone (before )
@@ -136,54 +148,49 @@ def _check_gc_state(self, before, after):
136148 before ["objects_not_transitively_reachable" ],
137149 (before , after ))
138150
139- @skip_if_not_supported
140- @unittest .skipIf (
141- sys .platform == "linux" and not PROCESS_VM_READV_SUPPORTED ,
142- "Test only runs on Linux with process_vm_readv support" ,
143- )
144- def test_get_gc_stats_for_main_interpreter (self ):
145- before_stats = self ._run_child_process (False )
146- after_stats = self ._run_child_process (False )
147-
151+ def _check_main_interpreter_stats (self , before_stats , after_stats ):
148152 before_iids = get_interpreter_identifiers (before_stats )
149153 after_iids = get_interpreter_identifiers (after_stats )
150154
151155 self .assertEqual (before_iids , (0 ,))
152156 self .assertEqual (after_iids , (0 ,))
153157
154- before_gens = get_generations (before_stats )
155- after_gens = get_generations (after_stats )
158+ self . assertEqual ( get_generations (before_stats ), ( 0 , 1 , 2 ) )
159+ self . assertEqual ( get_generations (after_stats ), ( 0 , 1 , 2 ) )
156160
157- self .assertEqual (before_gens , (0 , 1 , 2 ))
158- self .assertEqual (after_gens , (0 , 1 , 2 ))
159-
160- iid = 0 # main interpreter ID
161- before_last_items = (get_last_item (before_stats , 0 , iid ),
162- get_last_item (before_stats , 1 , iid ),
163- get_last_item (before_stats , 2 , iid ))
161+ before_last_items = (get_last_item (before_stats , 0 , self ._main_iid ),
162+ get_last_item (before_stats , 1 , self ._main_iid ),
163+ get_last_item (before_stats , 2 , self ._main_iid ))
164164
165- after_last_items = (get_last_item (after_stats , 0 , iid ),
166- get_last_item (after_stats , 1 , iid ),
167- get_last_item (after_stats , 2 , iid ))
165+ after_last_items = (get_last_item (after_stats , 0 , self . _main_iid ),
166+ get_last_item (after_stats , 1 , self . _main_iid ),
167+ get_last_item (after_stats , 2 , self . _main_iid ))
168168
169169 for before , after in zip (before_last_items , after_last_items ):
170170 self ._check_gc_state (before , after )
171171
172- def test_get_gc_stats_for_all_interpreters (self ):
173- interpreters = import_helper .import_module ("concurrent.interpreters" )
174- interp = interpreters .create ()
172+ def test_get_gc_stats_for_main_interpreter (self ):
173+ script = textwrap .dedent (self ._only_main_interpreter_script )
174+ before_stats , after_stats = self ._collect_gc_stats (script , False )
175+
176+ self ._check_main_interpreter_stats (before_stats ,after_stats )
175177
176- self ._run_in_interpreter (interp ) # ensure that subinterpeter have GC stats
177- before_stats = self ._run_child_process (True )
178- self ._run_in_interpreter (interp ) # ensure that GC stats in subinterpreter changed
179- after_stats = self ._run_child_process (True )
180- interp .close ()
178+ def test_get_gc_stats_for_main_interpreter_if_subinterpreter_exists (self ):
179+ script = textwrap .dedent (self ._subinterpreters_script )
180+ before_stats , after_stats = self ._collect_gc_stats (script , False )
181+
182+ self ._check_main_interpreter_stats (before_stats ,after_stats )
183+
184+ def test_get_gc_stats_for_all_interpreters (self ):
185+ script = textwrap .dedent (self ._subinterpreters_script )
186+ before_stats , after_stats = self ._collect_gc_stats (script , True )
181187
182188 before_iids = get_interpreter_identifiers (before_stats )
183189 after_iids = get_interpreter_identifiers (after_stats )
184190
185- self .assertEqual (before_iids , (0 , interp .id ))
186- self .assertEqual (after_iids , (0 , interp .id ))
191+ self .assertGreater (len (before_iids ), 1 )
192+ self .assertGreater (len (after_iids ), 1 )
193+ self .assertEqual (before_iids , after_iids )
187194
188195 before_gens = get_generations (before_stats )
189196 after_gens = get_generations (after_stats )
@@ -203,37 +210,3 @@ def test_get_gc_stats_for_all_interpreters(self):
203210
204211 for before , after in zip (before_last_items , after_last_items ):
205212 self ._check_gc_state (before , after )
206-
207- def test_get_gc_stats_for_main_interpreter_if_subinterpreter_exists (self ):
208- interpreters = import_helper .import_module ("concurrent.interpreters" )
209- interp = interpreters .create ()
210-
211- self ._run_in_interpreter (interp ) # ensure that subinterpeter have GC stats
212- before_stats = self ._run_child_process (False )
213- self ._run_in_interpreter (interp ) # ensure that GC stats in subinterpreter changed
214- after_stats = self ._run_child_process (False )
215- interp .close ()
216-
217- before_iids = get_interpreter_identifiers (before_stats )
218- after_iids = get_interpreter_identifiers (after_stats )
219-
220- self .assertEqual (before_iids , (0 , ))
221- self .assertEqual (after_iids , (0 , ))
222-
223- before_gens = get_generations (before_stats )
224- after_gens = get_generations (after_stats )
225-
226- self .assertEqual (before_gens , (0 , 1 , 2 ))
227- self .assertEqual (after_gens , (0 , 1 , 2 ))
228-
229- iid = 0 # main interpreter ID
230- before_last_items = (get_last_item (before_stats , 0 , iid ),
231- get_last_item (before_stats , 1 , iid ),
232- get_last_item (before_stats , 2 , iid ))
233-
234- after_last_items = (get_last_item (after_stats , 0 , iid ),
235- get_last_item (after_stats , 1 , iid ),
236- get_last_item (after_stats , 2 , iid ))
237-
238- for before , after in zip (before_last_items , after_last_items ):
239- self ._check_gc_state (before , after )
0 commit comments