Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions tests/preprocessing/test_positionfixes.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,40 @@ def test_gap_threshold(self):
_, sp = pfs.generate_staypoints(gap_threshold=1e8, include_last=True)
assert len(sp) == 1

def test_timezone_aware_non_nanosecond_timestamps(self):
"""Staypoint generation should handle timezone-aware datetime dtypes with non-nanosecond units.

Pandas can store timezone-aware timestamps as e.g. ``datetime64[us, UTC]``. The optimized staypoint path
converts timestamps to integer arrays and must derive the unit from pandas' timezone-aware dtype instead of
passing that dtype to ``np.datetime_data``, which only understands plain numpy datetime64 dtypes.
"""
tracked_at = pd.Series(
pd.array(
[
pd.Timestamp("2024-01-01 00:00:00", tz="UTC"),
pd.Timestamp("2024-01-01 00:10:00", tz="UTC"),
pd.Timestamp("2024-01-01 00:20:00", tz="UTC"),
],
dtype=pd.DatetimeTZDtype(unit="us", tz="UTC"),
)
)
df = pd.DataFrame(
{
"user_id": [0, 0, 0],
"tracked_at": tracked_at,
"longitude": [8.0, 8.0001, 8.01],
"latitude": [47.0, 47.0001, 47.01],
}
)
pfs = ti.Positionfixes(
gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"]), crs="EPSG:4326")
)

_, sp = pfs.generate_staypoints(dist_threshold=100, time_threshold=5, gap_threshold=15)

assert str(pfs["tracked_at"].dtype) == "datetime64[us, UTC]"
assert len(sp) == 1

def test_str_userid(self, example_positionfixes):
"""Staypoint generation should also run without error if the user_id is a string"""
# the pfs would not generate staypoints with the default parameters
Expand Down Expand Up @@ -324,6 +358,58 @@ def test_planar_crs(self, geolife_pfs_sp_long):
# planar and non-planar differ only if we experience a wrap in coords like [+180, -180]
assert_geodataframe_equal(sp_wgs84, sp_lv95, check_less_precise=True)

def test_centroid_uses_unique_positionfix_geometries(self):
"""Staypoint centroids should match the previous point-union behavior for repeated identical fixes.

The optimized staypoint path computes centroids from coordinate arrays instead of building a Shapely
MultiPoint. Shapely's point union collapses identical point geometries, so repeated pings at the same
coordinate must not give that coordinate extra weight in the staypoint centroid.
"""
base_time = pd.Timestamp("2024-01-01 00:00:00", tz="UTC")
df = pd.DataFrame(
{
"user_id": [0, 0, 0, 0],
"tracked_at": [base_time + pd.Timedelta(minutes=minutes) for minutes in [0, 10, 20, 30]],
"longitude": [0, 0, 2, 100],
"latitude": [0, 0, 0, 0],
}
)
pfs = ti.Positionfixes(
gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"]), crs="EPSG:4326")
)

_, sp = pfs.generate_staypoints(dist_threshold=500000, time_threshold=5, gap_threshold=60)

assert len(sp) == 1
assert sp.geometry.iloc[0].x == pytest.approx(1)
assert sp.geometry.iloc[0].y == pytest.approx(0)

def test_centroid_wraps_antimeridian(self):
"""Staypoint centroids should preserve circular longitude averaging around the antimeridian.

Geographic CRS centroids cannot use a plain arithmetic longitude mean: points at +179 and -179 degrees
represent nearby positions, not a centroid near 0 degrees. This guards the optimized array centroid against
regressing from Trackintel's previous angle-based centroid behavior.
"""
base_time = pd.Timestamp("2024-01-01 00:00:00", tz="UTC")
df = pd.DataFrame(
{
"user_id": [0, 0, 0],
"tracked_at": [base_time + pd.Timedelta(minutes=minutes) for minutes in [0, 10, 20]],
"longitude": [179, -179, 0],
"latitude": [10, 20, 0],
}
)
pfs = ti.Positionfixes(
gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df["longitude"], df["latitude"]), crs="EPSG:4326")
)

_, sp = pfs.generate_staypoints(dist_threshold=2000000, time_threshold=5, gap_threshold=60)

assert len(sp) == 1
assert abs(sp.geometry.iloc[0].x) == pytest.approx(180)
assert sp.geometry.iloc[0].y == pytest.approx(15)


class TestGenerate_triplegs_between_staypoints:
"""Tests for generate_triplegs() with 'between_staypoints' method."""
Expand Down
206 changes: 124 additions & 82 deletions trackintel/preprocessing/positionfixes.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import datetime
import warnings

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString

from trackintel import Positionfixes, Staypoints, Triplegs
from trackintel.geogr import check_gdf_planar, point_haversine_dist
from trackintel.preprocessing.util import _explode_agg, angle_centroid_multipoints, applyParallel
import datetime
import math
import warnings

import geopandas as gpd
import numpy as np
import pandas as pd
from shapely.geometry import LineString, Point
from tqdm import tqdm

from trackintel import Positionfixes, Staypoints, Triplegs
from trackintel.geogr import check_gdf_planar
from trackintel.preprocessing.util import _explode_agg, applyParallel


def generate_staypoints(
Expand Down Expand Up @@ -123,22 +125,39 @@ def generate_staypoints(
else:
sp_column = ["user_id", "started_at", "finished_at", geo_col]

# TODO: tests using a different distance function, e.g., L2 distance
if method == "sliding":
# Algorithm from Li et al. (2008). For details, please refer to the paper.
sp = applyParallel(
pfs.groupby("user_id", as_index=False),
_generate_staypoints_sliding_user,
n_jobs=n_jobs,
print_progress=print_progress,
geo_col=geo_col,
elevation_flag=elevation_flag,
dist_threshold=dist_threshold,
time_threshold=time_threshold,
gap_threshold=gap_threshold,
distance_metric=distance_metric,
include_last=include_last,
).reset_index(drop=True)
# TODO: tests using a different distance function, e.g., L2 distance
if method == "sliding":
# Algorithm from Li et al. (2008). For details, please refer to the paper.
grouped = pfs.groupby("user_id", as_index=False)
if n_jobs == 1:
result_list = [
_generate_staypoints_sliding_user(
group,
geo_col=geo_col,
elevation_flag=elevation_flag,
dist_threshold=dist_threshold,
time_threshold=time_threshold,
gap_threshold=gap_threshold,
distance_metric=distance_metric,
include_last=include_last,
)
for _, group in tqdm(grouped, disable=not print_progress)
]
sp = pd.concat(result_list).reset_index(drop=True)
else:
sp = applyParallel(
grouped,
_generate_staypoints_sliding_user,
n_jobs=n_jobs,
print_progress=print_progress,
geo_col=geo_col,
elevation_flag=elevation_flag,
dist_threshold=dist_threshold,
time_threshold=time_threshold,
gap_threshold=gap_threshold,
distance_metric=distance_metric,
include_last=include_last,
).reset_index(drop=True)

# index management
sp["staypoint_id"] = sp.index
Expand Down Expand Up @@ -465,70 +484,93 @@ def _generate_staypoints_sliding_user(
gap_threshold,
distance_metric,
include_last=False,
):
"""User level staypoint generation using sliding method, see generate_staypoints() function for parameter meaning."""
if distance_metric == "haversine":
dist_func = point_haversine_dist
else:
raise ValueError("distance_metric unknown. We only support ['haversine']. " f"You passed {distance_metric}")

df = df.sort_index(kind="stable").sort_values(by=["tracked_at"], kind="stable")

# transform times to pandas Timedelta to simplify comparisons
gap_threshold = pd.Timedelta(gap_threshold, unit="minutes")
time_threshold = pd.Timedelta(time_threshold, unit="minutes")
# to numpy as access time of numpy array is faster than pandas Series
gap_times = ((df.tracked_at - df.tracked_at.shift(1)) > gap_threshold).to_numpy()

# put x and y into numpy arrays to speed up the access in the for loop (shapely is slow)
x = df[geo_col].x.to_numpy()
y = df[geo_col].y.to_numpy()

ret_sp = []
curr = start = 0
for curr in range(1, len(df)):
# the gap of two consecutive positionfixes should not be too long
if gap_times[curr]:
start = curr
continue

delta_dist = dist_func(x[start], y[start], x[curr], y[curr], float_flag=True)
if delta_dist >= dist_threshold:
# we want the staypoint to have long enough duration
if (df["tracked_at"].iloc[curr] - df["tracked_at"].iloc[start]) >= time_threshold:
ret_sp.append(__create_new_staypoints(start, curr, df, elevation_flag, geo_col))
# distance large enough but time is too short -> not a staypoint
# also initializer when new sp is added
start = curr

if include_last: # aggregate remaining positionfixes
# additional control: we aggregate only if duration longer than time_threshold
if (df["tracked_at"].iloc[curr] - df["tracked_at"].iloc[start]) >= time_threshold:
new_sp = __create_new_staypoints(start, curr, df, elevation_flag, geo_col, last_flag=True)
ret_sp.append(new_sp)
):
"""User level staypoint generation using sliding method, see generate_staypoints() function for parameter meaning."""
if distance_metric != "haversine":
raise ValueError("distance_metric unknown. We only support ['haversine']. " f"You passed {distance_metric}")

df = df.sort_index(kind="stable").sort_values(by=["tracked_at"], kind="stable")

gap_threshold = pd.Timedelta(gap_threshold, unit="minutes")
time_threshold = pd.Timedelta(time_threshold, unit="minutes")
tracked_at = df["tracked_at"]
tracked_unit = tracked_at.dtype.unit
tracked_values = tracked_at.astype("int64").to_numpy(copy=False)
gap_threshold_value = gap_threshold / pd.Timedelta(1, unit=tracked_unit)
time_threshold_value = time_threshold / pd.Timedelta(1, unit=tracked_unit)

# put x and y into numpy arrays to speed up the access in the for loop (shapely is slow)
x = df[geo_col].x.to_numpy()
y = df[geo_col].y.to_numpy()
lon_rad = np.deg2rad(x)
lat_rad = np.deg2rad(y)
cos_lat = np.cos(lat_rad)
planar = check_gdf_planar(df)

ret_sp = []
curr = start = 0
for curr in range(1, len(df)):
# the gap of two consecutive positionfixes should not be too long
if tracked_values[curr] - tracked_values[curr - 1] > gap_threshold_value:
start = curr
continue

delta_dist = _haversine_dist_from_precomputed(lon_rad, lat_rad, cos_lat, start, curr)
if delta_dist >= dist_threshold:
# we want the staypoint to have long enough duration
if tracked_values[curr] - tracked_values[start] >= time_threshold_value:
ret_sp.append(__create_new_staypoints(start, curr, df, elevation_flag, geo_col, x, y, planar))
# distance large enough but time is too short -> not a staypoint
# also initializer when new sp is added
start = curr

if include_last: # aggregate remaining positionfixes
# additional control: we aggregate only if duration longer than time_threshold
if tracked_values[curr] - tracked_values[start] >= time_threshold_value:
new_sp = __create_new_staypoints(start, curr, df, elevation_flag, geo_col, x, y, planar, last_flag=True)
ret_sp.append(new_sp)

ret_sp = pd.DataFrame(ret_sp)
ret_sp["user_id"] = df["user_id"].unique()[0]
return ret_sp


def __create_new_staypoints(start, end, pfs, elevation_flag, geo_col, last_flag=False):
"""Create a staypoint with relevant information from start to end pfs."""
new_sp = {}
return ret_sp


def _haversine_dist_from_precomputed(lon_rad, lat_rad, cos_lat, start, end):
"""Calculate haversine distance using per-user precomputed trigonometric arrays."""
return 6371000 * math.acos(
math.cos(lat_rad[start] - lat_rad[end])
- cos_lat[start] * cos_lat[end] * (1 - math.cos(lon_rad[start] - lon_rad[end]))
)


def _centroid_from_coordinates(x, y, start, end, planar):
"""Return the centroid of unique coordinates in the same way as a point union."""
coordinates = np.column_stack((x[start:end], y[start:end]))
coordinates = np.unique(coordinates, axis=0)

if planar:
return Point(coordinates[:, 0].mean(), coordinates[:, 1].mean())

x_rad = np.deg2rad(coordinates[:, 0])
x_sin = np.sin(x_rad).mean()
x_cos = np.cos(x_rad).mean()
return Point(np.rad2deg(np.arctan2(x_sin, x_cos)), coordinates[:, 1].mean())


def __create_new_staypoints(start, end, pfs, elevation_flag, geo_col, x, y, planar, last_flag=False):
"""Create a staypoint with relevant information from start to end pfs."""
new_sp = {}

# Here we consider pfs[end] time for stp 'finished_at', but only include
# pfs[end - 1] for stp geometry and pfs linkage.
new_sp["started_at"] = pfs["tracked_at"].iloc[start]
new_sp["finished_at"] = pfs["tracked_at"].iloc[end]

# if end is the last pfs, we want to include the info from it as well
if last_flag:
end = len(pfs)
points = pfs[geo_col].iloc[start:end].union_all()
if check_gdf_planar(pfs):
new_sp[geo_col] = points.centroid
else:
new_sp[geo_col] = angle_centroid_multipoints(points)[0]
# if end is the last pfs, we want to include the info from it as well
if last_flag:
end = len(pfs)

new_sp[geo_col] = _centroid_from_coordinates(x, y, start, end, planar)

if elevation_flag:
new_sp["elevation"] = pfs["elevation"].iloc[start:end].median()
Expand Down
Loading