-
-
Notifications
You must be signed in to change notification settings - Fork 135
Expand file tree
/
Copy pathpydevd_modify_bytecode.py
More file actions
355 lines (281 loc) · 12.9 KB
/
pydevd_modify_bytecode.py
File metadata and controls
355 lines (281 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
from collections import namedtuple
import contextlib
import dis
from functools import partial
import itertools
import os.path
import sys
from _pydevd_frame_eval.vendored import bytecode
from _pydevd_frame_eval.vendored.bytecode.instr import Instr, Label
from _pydev_bundle import pydev_log
from _pydevd_frame_eval.pydevd_frame_tracing import _pydev_stop_at_break, _pydev_needs_stop_at_break
DEBUG = False
class DebugHelper(object):
def __init__(self):
self._debug_dir = os.path.join(os.path.dirname(__file__), 'debug_info')
try:
os.makedirs(self._debug_dir)
except:
pass
self._next = partial(next, itertools.count(0))
def _get_filename(self, op_number=None, prefix=''):
if op_number is None:
op_number = self._next()
name = '%03d_before.txt' % op_number
else:
name = '%03d_change.txt' % op_number
filename = os.path.join(self._debug_dir, prefix + name)
return filename, op_number
def write_bytecode(self, b, op_number=None, prefix=''):
filename, op_number = self._get_filename(op_number, prefix)
with open(filename, 'w') as stream:
with contextlib.redirect_stdout(stream):
bytecode.dump_bytecode(b, lineno=True)
return op_number
def write_dis(self, code_to_modify, op_number=None, prefix=''):
filename, op_number = self._get_filename(op_number, prefix)
with open(filename, 'w') as stream:
stream.write('-------- ')
stream.write('-------- ')
stream.write('id(code_to_modify): %s' % id(code_to_modify))
stream.write('\n\n')
dis.dis(code_to_modify, file=stream)
return op_number
_CodeLineInfo = namedtuple('_CodeLineInfo', 'line_to_offset, first_line, last_line')
# Note: this method has a version in cython too (that one is usually used, this is just for tests).
def _get_code_line_info(code_obj):
line_to_offset = {}
first_line = None
last_line = None
for offset, line in dis.findlinestarts(code_obj):
line_to_offset[line] = offset
if line_to_offset:
first_line = min(line_to_offset)
last_line = max(line_to_offset)
return _CodeLineInfo(line_to_offset, first_line, last_line)
if DEBUG:
debug_helper = DebugHelper()
def get_instructions_to_add(
stop_at_line,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break
):
'''
This is the bytecode for something as:
if _pydev_needs_stop_at_break():
_pydev_stop_at_break()
but with some special handling for lines.
'''
# Good reference to how things work regarding line numbers and jumps:
# https://github.com/python/cpython/blob/3.6/Objects/lnotab_notes.txt
label = Label()
return [
# -- if _pydev_needs_stop_at_break():
Instr("LOAD_CONST", _pydev_needs_stop_at_break, lineno=stop_at_line),
Instr("LOAD_CONST", stop_at_line, lineno=stop_at_line),
Instr("CALL_FUNCTION", 1, lineno=stop_at_line),
Instr("POP_JUMP_IF_FALSE", label, lineno=stop_at_line),
# -- _pydev_stop_at_break()
#
# Note that this has line numbers -1 so that when the NOP just below
# is executed we have a spurious line event.
Instr("LOAD_CONST", _pydev_stop_at_break, lineno=stop_at_line - 1),
Instr("LOAD_CONST", stop_at_line, lineno=stop_at_line - 1),
Instr("CALL_FUNCTION", 1, lineno=stop_at_line - 1),
Instr("POP_TOP", lineno=stop_at_line - 1),
# Reason for the NOP: Python will give us a 'line' trace event whenever we forward jump to
# the first instruction of a line, so, in the case where we haven't added a programmatic
# breakpoint (either because we didn't hit a breakpoint anymore or because it was already
# tracing), we don't want the spurious line event due to the line change, so, we make a jump
# to the instruction right after the NOP so that the spurious line event is NOT generated in
# this case (otherwise we'd have a line event even if the line didn't change).
Instr("NOP", lineno=stop_at_line),
label,
]
class _Node(object):
def __init__(self, data):
self.prev = None
self.next = None
self.data = data
def append(self, data):
node = _Node(data)
curr_next = self.next
node.next = self.next
node.prev = self
self.next = node
if curr_next is not None:
curr_next.prev = node
return node
def prepend(self, data):
node = _Node(data)
curr_prev = self.prev
node.prev = self.prev
node.next = self
self.prev = node
if curr_prev is not None:
curr_prev.next = node
return node
class _HelperBytecodeList(object):
'''
A helper double-linked list to make the manipulation a bit easier (so that we don't need
to keep track of indices that change) and performant (because adding multiple items to
the middle of a regular list isn't ideal).
'''
def __init__(self, lst=None):
self._head = None
self._tail = None
if lst:
node = self
for item in lst:
node = node.append(item)
def append(self, data):
if self._tail is None:
node = _Node(data)
self._head = self._tail = node
return node
else:
node = self._tail = self.tail.append(data)
return node
@property
def head(self):
node = self._head
# Manipulating the node directly may make it unsynchronized.
while node.prev:
self._head = node = node.prev
return node
@property
def tail(self):
node = self._tail
# Manipulating the node directly may make it unsynchronized.
while node.next:
self._tail = node = node.next
return node
def __iter__(self):
node = self.head
while node:
yield node.data
node = node.next
_PREDICT_TABLE = {
'LIST_APPEND': ('JUMP_ABSOLUTE',),
'SET_ADD': ('JUMP_ABSOLUTE',),
'GET_ANEXT': ('LOAD_CONST',),
'GET_AWAITABLE': ('LOAD_CONST',),
'DICT_MERGE': ('CALL_FUNCTION_EX',),
'MAP_ADD': ('JUMP_ABSOLUTE',),
'COMPARE_OP': ('POP_JUMP_IF_FALSE', 'POP_JUMP_IF_TRUE',),
'IS_OP': ('POP_JUMP_IF_FALSE', 'POP_JUMP_IF_TRUE',),
'CONTAINS_OP': ('POP_JUMP_IF_FALSE', 'POP_JUMP_IF_TRUE',),
# Note: there are some others with PREDICT on ceval, but they have more logic
# and it needs more experimentation to know how it behaves in the static generated
# code (and it's only an issue for us if there's actually a line change between
# those, so, we don't have to really handle all the cases, only the one where
# the line number actually changes from one instruction to the predicted one).
}
# 3.10 optimizations include copying code branches multiple times (for instance
# if the body of a finally has a single assign statement it can copy the assign to the case
# where an exception happens and doesn't happen for optimization purposes) and as such
# we need to add the programmatic breakpoint multiple times.
TRACK_MULTIPLE_BRANCHES = sys.version_info[:2] >= (3, 10)
# When tracking multiple branches, we try to fix the bytecodes which would be PREDICTED in the
# Python eval loop so that we don't have spurious line events that wouldn't usually be issued
# in the tracing as they're ignored due to the eval prediction (even though they're in the bytecode).
FIX_PREDICT = sys.version_info[:2] >= (3, 10)
def insert_pydevd_breaks(
code_to_modify,
breakpoint_lines,
code_line_info=None,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break,
):
"""
Inserts pydevd programmatic breaks into the code (at the given lines).
:param breakpoint_lines: set with the lines where we should add breakpoints.
:return: tuple(boolean flag whether insertion was successful, modified code).
"""
if code_line_info is None:
code_line_info = _get_code_line_info(code_to_modify)
if not code_line_info.line_to_offset:
return False, code_to_modify
# Create a copy (and make sure we're dealing with a set).
breakpoint_lines = set(breakpoint_lines)
# Note that we can even generate breakpoints on the first line of code
# now, since we generate a spurious line event -- it may be a bit pointless
# as we'll stop in the first line and we don't currently stop the tracing after the
# user resumes, but in the future, if we do that, this would be a nice
# improvement.
# if code_to_modify.co_firstlineno in breakpoint_lines:
# return False, code_to_modify
try:
b = bytecode.Bytecode.from_code(code_to_modify)
if DEBUG:
op_number_bytecode = debug_helper.write_bytecode(b, prefix='bytecode.')
helper_list = _HelperBytecodeList(b)
modified_breakpoint_lines = breakpoint_lines.copy()
curr_node = helper_list.head
added_breaks_in_lines = set()
last_lineno = None
while curr_node is not None:
instruction = curr_node.data
instruction_lineno = getattr(instruction, 'lineno', None)
curr_name = getattr(instruction, 'name', None)
if FIX_PREDICT:
predict_targets = _PREDICT_TABLE.get(curr_name)
if predict_targets:
# Odd case: the next instruction may have a line number but it doesn't really
# appear in the tracing due to the PREDICT() in ceval, so, fix the bytecode so
# that it does things the way that ceval actually interprets it.
# See: https://mail.python.org/archives/list/python-dev@python.org/thread/CP2PTFCMTK57KM3M3DLJNWGO66R5RVPB/
next_instruction = curr_node.next.data
next_name = getattr(next_instruction, 'name', None)
if next_name in predict_targets:
next_instruction_lineno = getattr(next_instruction, 'lineno', None)
if next_instruction_lineno:
next_instruction.lineno = None
if instruction_lineno is not None:
if TRACK_MULTIPLE_BRANCHES:
if last_lineno is None:
last_lineno = instruction_lineno
else:
if last_lineno == instruction_lineno:
# If the previous is a label, someone may jump into it, so, we need to add
# the break even if it's in the same line.
if curr_node.prev.data.__class__ != Label:
# Skip adding this as the line is still the same.
curr_node = curr_node.next
continue
last_lineno = instruction_lineno
else:
if instruction_lineno in added_breaks_in_lines:
curr_node = curr_node.next
continue
if instruction_lineno in modified_breakpoint_lines:
added_breaks_in_lines.add(instruction_lineno)
if curr_node.prev is not None and curr_node.prev.data.__class__ == Label \
and curr_name == 'POP_TOP':
# If we have a SETUP_FINALLY where the target is a POP_TOP, we can't change
# the target to be the breakpoint instruction (this can crash the interpreter).
for new_instruction in get_instructions_to_add(
instruction_lineno,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break,
):
curr_node = curr_node.append(new_instruction)
else:
for new_instruction in get_instructions_to_add(
instruction_lineno,
_pydev_stop_at_break=_pydev_stop_at_break,
_pydev_needs_stop_at_break=_pydev_needs_stop_at_break,
):
curr_node.prepend(new_instruction)
curr_node = curr_node.next
b[:] = helper_list
if DEBUG:
debug_helper.write_bytecode(b, op_number_bytecode, prefix='bytecode.')
new_code = b.to_code()
except:
pydev_log.exception('Error inserting pydevd breaks.')
return False, code_to_modify
if DEBUG:
op_number = debug_helper.write_dis(code_to_modify)
debug_helper.write_dis(new_code, op_number)
return True, new_code