diff --git a/tests/preprocessing/test_positionfixes.py b/tests/preprocessing/test_positionfixes.py index 1417ac1b..fffef61f 100644 --- a/tests/preprocessing/test_positionfixes.py +++ b/tests/preprocessing/test_positionfixes.py @@ -280,6 +280,40 @@ def test_gap_threshold(self): _, sp = pfs.generate_staypoints(gap_threshold=1e8, include_last=True) assert len(sp) == 1 + def test_timezone_aware_non_nanosecond_timestamps(self): + """Staypoint generation should handle timezone-aware datetime dtypes with non-nanosecond units. + + Pandas can store timezone-aware timestamps as e.g. ``datetime64[us, UTC]``. The optimized staypoint path + converts timestamps to integer arrays and must derive the unit from pandas' timezone-aware dtype instead of + passing that dtype to ``np.datetime_data``, which only understands plain numpy datetime64 dtypes. + """ + tracked_at = pd.Series( + pd.array( + [ + pd.Timestamp("2024-01-01 00:00:00", tz="UTC"), + pd.Timestamp("2024-01-01 00:10:00", tz="UTC"), + pd.Timestamp("2024-01-01 00:20:00", tz="UTC"), + ], + dtype=pd.DatetimeTZDtype(unit="us", tz="UTC"), + ) + ) + df = pd.DataFrame( + { + "user_id": [0, 0, 0], + "tracked_at": tracked_at, + "longitude": [8.0, 8.0001, 8.01], + "latitude": [47.0, 47.0001, 47.01], + } + ) + pfs = ti.Positionfixes( + gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"]), crs="EPSG:4326") + ) + + _, sp = pfs.generate_staypoints(dist_threshold=100, time_threshold=5, gap_threshold=15) + + assert str(pfs["tracked_at"].dtype) == "datetime64[us, UTC]" + assert len(sp) == 1 + def test_str_userid(self, example_positionfixes): """Staypoint generation should also run without error if the user_id is a string""" # the pfs would not generate staypoints with the default parameters @@ -324,6 +358,58 @@ def test_planar_crs(self, geolife_pfs_sp_long): # planar and non-planar differ only if we experience a wrap in coords like [+180, -180] assert_geodataframe_equal(sp_wgs84, sp_lv95, check_less_precise=True) + def test_centroid_uses_unique_positionfix_geometries(self): + """Staypoint centroids should match the previous point-union behavior for repeated identical fixes. + + The optimized staypoint path computes centroids from coordinate arrays instead of building a Shapely + MultiPoint. Shapely's point union collapses identical point geometries, so repeated pings at the same + coordinate must not give that coordinate extra weight in the staypoint centroid. + """ + base_time = pd.Timestamp("2024-01-01 00:00:00", tz="UTC") + df = pd.DataFrame( + { + "user_id": [0, 0, 0, 0], + "tracked_at": [base_time + pd.Timedelta(minutes=minutes) for minutes in [0, 10, 20, 30]], + "longitude": [0, 0, 2, 100], + "latitude": [0, 0, 0, 0], + } + ) + pfs = ti.Positionfixes( + gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"]), crs="EPSG:4326") + ) + + _, sp = pfs.generate_staypoints(dist_threshold=500000, time_threshold=5, gap_threshold=60) + + assert len(sp) == 1 + assert sp.geometry.iloc[0].x == pytest.approx(1) + assert sp.geometry.iloc[0].y == pytest.approx(0) + + def test_centroid_wraps_antimeridian(self): + """Staypoint centroids should preserve circular longitude averaging around the antimeridian. + + Geographic CRS centroids cannot use a plain arithmetic longitude mean: points at +179 and -179 degrees + represent nearby positions, not a centroid near 0 degrees. This guards the optimized array centroid against + regressing from Trackintel's previous angle-based centroid behavior. + """ + base_time = pd.Timestamp("2024-01-01 00:00:00", tz="UTC") + df = pd.DataFrame( + { + "user_id": [0, 0, 0], + "tracked_at": [base_time + pd.Timedelta(minutes=minutes) for minutes in [0, 10, 20]], + "longitude": [179, -179, 0], + "latitude": [10, 20, 0], + } + ) + pfs = ti.Positionfixes( + gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"]), crs="EPSG:4326") + ) + + _, sp = pfs.generate_staypoints(dist_threshold=2000000, time_threshold=5, gap_threshold=60) + + assert len(sp) == 1 + assert abs(sp.geometry.iloc[0].x) == pytest.approx(180) + assert sp.geometry.iloc[0].y == pytest.approx(15) + class TestGenerate_triplegs_between_staypoints: """Tests for generate_triplegs() with 'between_staypoints' method.""" diff --git a/trackintel/preprocessing/positionfixes.py b/trackintel/preprocessing/positionfixes.py index ebe0d51e..8ca39127 100644 --- a/trackintel/preprocessing/positionfixes.py +++ b/trackintel/preprocessing/positionfixes.py @@ -1,14 +1,16 @@ -import datetime -import warnings - -import geopandas as gpd -import numpy as np -import pandas as pd -from shapely.geometry import LineString - -from trackintel import Positionfixes, Staypoints, Triplegs -from trackintel.geogr import check_gdf_planar, point_haversine_dist -from trackintel.preprocessing.util import _explode_agg, angle_centroid_multipoints, applyParallel +import datetime +import math +import warnings + +import geopandas as gpd +import numpy as np +import pandas as pd +from shapely.geometry import LineString, Point +from tqdm import tqdm + +from trackintel import Positionfixes, Staypoints, Triplegs +from trackintel.geogr import check_gdf_planar +from trackintel.preprocessing.util import _explode_agg, applyParallel def generate_staypoints( @@ -123,22 +125,39 @@ def generate_staypoints( else: sp_column = ["user_id", "started_at", "finished_at", geo_col] - # TODO: tests using a different distance function, e.g., L2 distance - if method == "sliding": - # Algorithm from Li et al. (2008). For details, please refer to the paper. - sp = applyParallel( - pfs.groupby("user_id", as_index=False), - _generate_staypoints_sliding_user, - n_jobs=n_jobs, - print_progress=print_progress, - geo_col=geo_col, - elevation_flag=elevation_flag, - dist_threshold=dist_threshold, - time_threshold=time_threshold, - gap_threshold=gap_threshold, - distance_metric=distance_metric, - include_last=include_last, - ).reset_index(drop=True) + # TODO: tests using a different distance function, e.g., L2 distance + if method == "sliding": + # Algorithm from Li et al. (2008). For details, please refer to the paper. + grouped = pfs.groupby("user_id", as_index=False) + if n_jobs == 1: + result_list = [ + _generate_staypoints_sliding_user( + group, + geo_col=geo_col, + elevation_flag=elevation_flag, + dist_threshold=dist_threshold, + time_threshold=time_threshold, + gap_threshold=gap_threshold, + distance_metric=distance_metric, + include_last=include_last, + ) + for _, group in tqdm(grouped, disable=not print_progress) + ] + sp = pd.concat(result_list).reset_index(drop=True) + else: + sp = applyParallel( + grouped, + _generate_staypoints_sliding_user, + n_jobs=n_jobs, + print_progress=print_progress, + geo_col=geo_col, + elevation_flag=elevation_flag, + dist_threshold=dist_threshold, + time_threshold=time_threshold, + gap_threshold=gap_threshold, + distance_metric=distance_metric, + include_last=include_last, + ).reset_index(drop=True) # index management sp["staypoint_id"] = sp.index @@ -465,70 +484,93 @@ def _generate_staypoints_sliding_user( gap_threshold, distance_metric, include_last=False, -): - """User level staypoint generation using sliding method, see generate_staypoints() function for parameter meaning.""" - if distance_metric == "haversine": - dist_func = point_haversine_dist - else: - raise ValueError("distance_metric unknown. We only support ['haversine']. " f"You passed {distance_metric}") - - df = df.sort_index(kind="stable").sort_values(by=["tracked_at"], kind="stable") - - # transform times to pandas Timedelta to simplify comparisons - gap_threshold = pd.Timedelta(gap_threshold, unit="minutes") - time_threshold = pd.Timedelta(time_threshold, unit="minutes") - # to numpy as access time of numpy array is faster than pandas Series - gap_times = ((df.tracked_at - df.tracked_at.shift(1)) > gap_threshold).to_numpy() - - # put x and y into numpy arrays to speed up the access in the for loop (shapely is slow) - x = df[geo_col].x.to_numpy() - y = df[geo_col].y.to_numpy() - - ret_sp = [] - curr = start = 0 - for curr in range(1, len(df)): - # the gap of two consecutive positionfixes should not be too long - if gap_times[curr]: - start = curr - continue - - delta_dist = dist_func(x[start], y[start], x[curr], y[curr], float_flag=True) - if delta_dist >= dist_threshold: - # we want the staypoint to have long enough duration - if (df["tracked_at"].iloc[curr] - df["tracked_at"].iloc[start]) >= time_threshold: - ret_sp.append(__create_new_staypoints(start, curr, df, elevation_flag, geo_col)) - # distance large enough but time is too short -> not a staypoint - # also initializer when new sp is added - start = curr - - if include_last: # aggregate remaining positionfixes - # additional control: we aggregate only if duration longer than time_threshold - if (df["tracked_at"].iloc[curr] - df["tracked_at"].iloc[start]) >= time_threshold: - new_sp = __create_new_staypoints(start, curr, df, elevation_flag, geo_col, last_flag=True) - ret_sp.append(new_sp) +): + """User level staypoint generation using sliding method, see generate_staypoints() function for parameter meaning.""" + if distance_metric != "haversine": + raise ValueError("distance_metric unknown. We only support ['haversine']. " f"You passed {distance_metric}") + + df = df.sort_index(kind="stable").sort_values(by=["tracked_at"], kind="stable") + + gap_threshold = pd.Timedelta(gap_threshold, unit="minutes") + time_threshold = pd.Timedelta(time_threshold, unit="minutes") + tracked_at = df["tracked_at"] + tracked_unit = tracked_at.dtype.unit + tracked_values = tracked_at.astype("int64").to_numpy(copy=False) + gap_threshold_value = gap_threshold / pd.Timedelta(1, unit=tracked_unit) + time_threshold_value = time_threshold / pd.Timedelta(1, unit=tracked_unit) + + # put x and y into numpy arrays to speed up the access in the for loop (shapely is slow) + x = df[geo_col].x.to_numpy() + y = df[geo_col].y.to_numpy() + lon_rad = np.deg2rad(x) + lat_rad = np.deg2rad(y) + cos_lat = np.cos(lat_rad) + planar = check_gdf_planar(df) + + ret_sp = [] + curr = start = 0 + for curr in range(1, len(df)): + # the gap of two consecutive positionfixes should not be too long + if tracked_values[curr] - tracked_values[curr - 1] > gap_threshold_value: + start = curr + continue + + delta_dist = _haversine_dist_from_precomputed(lon_rad, lat_rad, cos_lat, start, curr) + if delta_dist >= dist_threshold: + # we want the staypoint to have long enough duration + if tracked_values[curr] - tracked_values[start] >= time_threshold_value: + ret_sp.append(__create_new_staypoints(start, curr, df, elevation_flag, geo_col, x, y, planar)) + # distance large enough but time is too short -> not a staypoint + # also initializer when new sp is added + start = curr + + if include_last: # aggregate remaining positionfixes + # additional control: we aggregate only if duration longer than time_threshold + if tracked_values[curr] - tracked_values[start] >= time_threshold_value: + new_sp = __create_new_staypoints(start, curr, df, elevation_flag, geo_col, x, y, planar, last_flag=True) + ret_sp.append(new_sp) ret_sp = pd.DataFrame(ret_sp) ret_sp["user_id"] = df["user_id"].unique()[0] - return ret_sp - - -def __create_new_staypoints(start, end, pfs, elevation_flag, geo_col, last_flag=False): - """Create a staypoint with relevant information from start to end pfs.""" - new_sp = {} + return ret_sp + + +def _haversine_dist_from_precomputed(lon_rad, lat_rad, cos_lat, start, end): + """Calculate haversine distance using per-user precomputed trigonometric arrays.""" + return 6371000 * math.acos( + math.cos(lat_rad[start] - lat_rad[end]) + - cos_lat[start] * cos_lat[end] * (1 - math.cos(lon_rad[start] - lon_rad[end])) + ) + + +def _centroid_from_coordinates(x, y, start, end, planar): + """Return the centroid of unique coordinates in the same way as a point union.""" + coordinates = np.column_stack((x[start:end], y[start:end])) + coordinates = np.unique(coordinates, axis=0) + + if planar: + return Point(coordinates[:, 0].mean(), coordinates[:, 1].mean()) + + x_rad = np.deg2rad(coordinates[:, 0]) + x_sin = np.sin(x_rad).mean() + x_cos = np.cos(x_rad).mean() + return Point(np.rad2deg(np.arctan2(x_sin, x_cos)), coordinates[:, 1].mean()) + + +def __create_new_staypoints(start, end, pfs, elevation_flag, geo_col, x, y, planar, last_flag=False): + """Create a staypoint with relevant information from start to end pfs.""" + new_sp = {} # Here we consider pfs[end] time for stp 'finished_at', but only include # pfs[end - 1] for stp geometry and pfs linkage. new_sp["started_at"] = pfs["tracked_at"].iloc[start] new_sp["finished_at"] = pfs["tracked_at"].iloc[end] - # if end is the last pfs, we want to include the info from it as well - if last_flag: - end = len(pfs) - points = pfs[geo_col].iloc[start:end].union_all() - if check_gdf_planar(pfs): - new_sp[geo_col] = points.centroid - else: - new_sp[geo_col] = angle_centroid_multipoints(points)[0] + # if end is the last pfs, we want to include the info from it as well + if last_flag: + end = len(pfs) + + new_sp[geo_col] = _centroid_from_coordinates(x, y, start, end, planar) if elevation_flag: new_sp["elevation"] = pfs["elevation"].iloc[start:end].median()