-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
245 lines (204 loc) · 8.2 KB
/
utils.py
File metadata and controls
245 lines (204 loc) · 8.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import os
import json
import math
import cv2
import numpy as np
from shapely.geometry import mapping, MultiLineString, LineString, Point
from shapely.affinity import translate
def load_linestring_from_geojson_for_pretrain(file_path):
"""
Load trajectories from a GeoJSON file containing MultiLineString geometries.
Args:
file_path (str): Path to the GeoJSON file.
Returns:
List[List[List[str]]]: Multi-trajectory samples. Each sample contains multiple trajectories.
"""
with open(file_path, 'r') as f:
geojson_data = json.load(f)
trajectories = []
for feature in geojson_data['features']:
linestring = feature['geometry']['coordinates']
# Convert coordinates to "x,y" format
trajectory = [[int(x),int(y)] for x, y in linestring]
trajectories.append(trajectory)
return trajectories
def load_linestring_from_geojson_for_finetune(file_path):
"""
Load trajectories from a GeoJSON file containing MultiLineString geometries.
Args:
file_path (str): Path to the GeoJSON file.
Returns:
List[List[List[str]]]: Multi-trajectory samples. Each sample contains multiple trajectories.
"""
with open(file_path, 'r') as f:
geojson_data = json.load(f)
trajectories = []
shifts = []
patch_xys = []
reference_lines = []
for feature in geojson_data['features']:
multilinestring = feature['geometry']
sample = []
for linestring in multilinestring:
# Convert coordinates to "x,y" format
trajectory = [[int(x),int(y)] for x, y in linestring]
sample.append(trajectory)
trajectories.append(sample)
shifts.append(
(int(feature['properties'].get('tr_x', 0)), int(feature['properties'].get('tr_y', 0)))
if feature.get('properties') else (0, 0)
)
patch_xys.append(
(int(feature['properties'].get('patch_x', 0)), int(feature['properties'].get('patch_y', 0)))
if feature.get('properties') else (0, 0)
)
reference_lines.append(
[(int(x), int(y)) for x, y in feature['properties'].get('reference_line', [])]
)
return trajectories, shifts, patch_xys, reference_lines
def normalize_linestring_orientation(line):
"""
Normalize the orientation of a LineString such that the node with the smaller
x-coordinate comes first. If the x-coordinates are the same, use the y-coordinate
to decide the order.
Parameters:
- line: A Shapely LineString object.
Returns:
- A LineString with normalized orientation.
"""
if not isinstance(line, LineString) and not isinstance(line, list):
raise ValueError("Input must be a LineString object or a list.")
if isinstance(line, LineString):
line = list(line.coords)
if len(line) < 2:
raise ValueError("LineString must have at least two points.")
start = line[0]
end = line[-1]
# Reverse the line if the x of the first point is greater than the x of the last point
# Or if x is the same, use y to determine order
if start[0] > end[0] or (start[0] == end[0] and start[1] > end[1]):
return LineString(line[::-1])
return LineString(line)
def draw_multilinestring_on_image(multi_lines, image_size, output_image_path, base_image=None, \
line_color=(255, 255, 255), line_thickness=4, switch_xy=False, random_color=True):
"""
Draw a MultiLineString on an image, shifting it to fit within [0,500] if needed.
Parameters:
- multi_line: A Shapely MultiLineString object.
- image_size: Tuple (height, width) for the output image.
- output_image_path: Path to save the output image.
- line_color: Tuple (B, G, R) for the line color (default: green).
- line_thickness: Thickness of the lines (default: 2).
"""
# print(shifted_multi_line)
# Create a blank image (black background)
height, width = image_size
if base_image is None:
image = np.ones((height, width, 3), dtype=np.uint8) * 255
else:
image = base_image
if multi_lines == []:
cv2.imwrite(output_image_path, image)
return
# Iterate over each LineString in the MultiLineString
for line in multi_lines:
# Generate random color for the line
# r, g, b = np.random.randint(0, 256, size=3)
if random_color:
r = np.random.randint(150, 256) # High red (200-255)
g = np.random.randint(0, 101) # Low green (0-100)
b = np.random.randint(0, 101) # Low blue (0-100)
else:
r, g, b = line_color
# Convert line coordinates to OpenCV format
if isinstance(line, LineString):
coords = np.array(list(line.coords), dtype=np.int32)
elif isinstance(line, list):
coords = np.array(line, dtype=np.int32)
else:
raise("line must be LineString or List")
# If switch_xy is True, swap X and Y coordinates
if switch_xy:
coords = coords[:, [1, 0]] # Swap columns X ↔ Y
coords = coords.reshape((-1, 1, 2)) # Reshape for OpenCV
# Draw the line on the image
cv2.polylines(image, [coords], isClosed=False, color=(int(b),int(g),int(r)), \
thickness=line_thickness)
# Save the image
cv2.imwrite(output_image_path, image)
return image
def convert_prediction_to_multilinestring(coord_list, max_id=500):
"""
Convert a list of coordinates into a MultiLineString, handling specific rules:
- Remove coordinates (1501, 1501), (1502, 1502), and (1503, 1503).
- Start a new LineString whenever encountering (1504, 1504).
Parameters:
- coord_list: List of coordinate tuples.
Returns:
- MultiLineString object.
"""
lines = []
segment = []
for coord in coord_list:
# Skip specific coordinates
if coord in [(max_id+1, max_id+1), (max_id+2, max_id+2), (max_id+3, max_id+3)]:
continue
# Start a new LineString when encountering (1504, 1504)
if coord == (max_id+4, max_id+4):
if len(segment) > 1: # Add the current segment if it has enough points
lines.append(segment)
segment = [] # Start a new segment
continue
# Add the current coordinate to the segment
segment.append(coord)
# Add the last segment to the lines
if len(segment) > 1:
lines.append(segment)
return lines
def convert_tensor_input_to_multilinestring(tensor_input, max_id=500):
"""
Convert tensor input to multilinestring format.
Args:
tensor_input: List of tensors like [tensor([0]), tensor([136])]
max_id: Maximum ID value for coordinate mapping
Returns:
List of multilinestring coordinates
"""
# Convert tensors to regular integers
sequence = []
for tensor_pair in tensor_input:
if len(tensor_pair) >= 2:
x = tensor_pair[0].item() if hasattr(tensor_pair[0], 'item') else int(tensor_pair[0])
y = tensor_pair[1].item() if hasattr(tensor_pair[1], 'item') else int(tensor_pair[1])
sequence.append((x, y))
if len(sequence) < 2:
# Return empty list if sequence has less than 2 points
return []
return [sequence]
def save_line_groups_to_geojson(line_patch_groups, output_path):
"""
Saves a list of list of LineStrings to a GeoJSON file.
Each inner list becomes a MultiLineString feature.
Args:
line_groups: List of lists of LineString objects.
output_path: File path to save GeoJSON.
"""
features = []
for key, value in line_patch_groups.items():
line_list = [list(line.coords) for line in value]
feature = {
"type": "Feature",
"geometry": line_list,
"properties": {
"patch_x": key[0],
"patch_y": key[1]
}
}
features.append(feature)
geojson = {
"type": "FeatureCollection",
"features": features
}
with open(output_path, "w") as f:
json.dump(geojson, f, indent=2)
print(f"Saved {len(features)} MultiLineString features to {output_path}")