Passenger-car speed assumptions embedded in a default OSM routing graph produce ETAs that are systematically wrong for Class 6–8 trucks: mass-dependent acceleration lag, regulatory speed governors, and grade-induced velocity decay all drive real traversal times well above what generic profiles predict. Speed profile calibration for heavy vehicles is the process of replacing those assumptions with physics-grounded, telematics-validated velocity surfaces that reflect how freight actually moves — and it sits at the heart of the OSM Graph Architecture & Network Modeling pipeline, between raw topology construction and the cost function layer that feeds the routing solver.
Prerequisites
Python version: 3.9 or later. The vectorized patterns below use numpy structured dtypes and pandas nullable integer types that are unstable on earlier releases.
Libraries:
# Install all required dependencies
pip install osmnx>=1.9 networkx>=3.2 numpy>=1.26 pandas>=2.1 \
geopandas>=0.14 pyarrow>=14 scipy>=1.11 rasterio>=1.3
Data sources:
- OSM PBF extract for your region — download from Geofabrik or use
osmnx.graph_from_place. The process of building directed graphs from OSM PBF files should be completed first so thathgv,hgv:conditional,maxweight, andmaxheighttags are preserved in edge attributes. - Digital Elevation Model at ≤10 m resolution. SRTM 1-Arc-Second (30 m) is the practical minimum; Copernicus DEM GLO-30 offers better accuracy on European networks.
- Telematics corpus — GPS traces or CAN-bus logs from your fleet, exported as timestamped GeoJSON or CSV. Minimum 30 days of data covering the target network provides statistical confidence; sparser coverage still improves calibration on high-volume corridors.
System resources: Continental-scale graphs (>50 M edges) require 32 GB RAM for in-memory pandas operations. For larger networks, the production optimization section covers chunked polars and memory-mapped Parquet patterns.
Conceptual Architecture
Heavy vehicle speed calibration inserts a transform layer between raw OSM topology and the cost function that configuring edge weights for freight logistics consumes. Three physical effects compound to reduce traversal speed below the posted limit:
- Regulatory caps — speed governors limit Class 8 trucks to 80–90 km/h in North America and 80–90 km/h in the EU regardless of posted motorway limits.
- Grade-induced velocity decay — sustained inclines above 3 % force power-limited deceleration; declines require braking-limited speed caps to prevent runaway.
- Acceleration lag — on short edges (ramps, urban intersections), laden trucks cannot reach cruising speed within the available distance, so the time-weighted average speed is materially lower than posted.
The calibration output is a per-edge calibrated_speed_kmh attribute. Downstream, t = d / v converts this to traversal time, which becomes the edge weight fed to A* or Contraction Hierarchies.
Step-by-Step Implementation
Step 1: Network Extraction and HGV Access Filtering
Load the directed graph and strip edges that are legally or physically inaccessible to heavy vehicles. This step must preserve graph connectivity to avoid routing dead-ends in industrial corridors; see graph fragmentation prevention in OSM data for the connectivity-repair patterns that should run immediately after filtering.
# Requires: osmnx>=1.9, networkx>=3.2, pandas>=2.1
import osmnx as ox
import networkx as nx
import pandas as pd
import numpy as np
VEHICLE_WEIGHT_T = 26.0 # gross vehicle weight in tonnes (Class 8 typical)
VEHICLE_HEIGHT_M = 4.0 # metres
def load_hgv_graph(place_name: str) -> nx.MultiDiGraph:
"""Load OSM graph with HGV-relevant tags retained."""
useful_tags = ox.settings.useful_tags_way + [
"hgv", "hgv:conditional", "maxweight", "maxheight",
"maxspeed", "maxspeed:forward", "maxspeed:backward",
"access", "motor_vehicle",
]
ox.settings.useful_tags_way = list(set(useful_tags))
G = ox.graph_from_place(place_name, network_type="drive", retain_all=False)
return G
def filter_hgv_accessible(G: nx.MultiDiGraph, weight_t: float, height_m: float) -> nx.MultiDiGraph:
"""Remove edges with weight or height restrictions that exclude this vehicle."""
edges_to_remove = []
for u, v, k, data in G.edges(keys=True, data=True):
# Explicit HGV prohibition
hgv_val = str(data.get("hgv", "")).lower()
if hgv_val in {"no", "prohibited", "private"}:
edges_to_remove.append((u, v, k))
continue
# Weight limit
max_w = data.get("maxweight")
if max_w is not None:
try:
if float(str(max_w).split()[0]) < weight_t:
edges_to_remove.append((u, v, k))
continue
except (ValueError, IndexError):
pass
# Height limit
max_h = data.get("maxheight")
if max_h is not None:
try:
if float(str(max_h).split()[0]) < height_m:
edges_to_remove.append((u, v, k))
except (ValueError, IndexError):
pass
G.remove_edges_from(edges_to_remove)
return G
Step 2: Baseline Speed Assignment with Jurisdictional Fallbacks
Parse maxspeed tags to numeric km/h values, convert mph where necessary, and fill gaps with road-class defaults. This step establishes the upper bound that kinematic corrections reduce in Step 3.
# Requires: pandas>=2.1, numpy>=1.26
# Jurisdictional HGV caps (km/h) indexed by OSM highway class — North America defaults
HGV_SPEED_CAPS: dict[str, float] = {
"motorway": 88.5, # 55 mph federal cap, many states allow 65 mph
"trunk": 88.5,
"primary": 72.4, # 45 mph
"secondary": 64.4,
"tertiary": 56.3,
"residential": 40.0,
"unclassified": 48.0,
"service": 24.0,
"link_motorway": 56.0, # ramp — separate cap applied in step 3
}
def parse_maxspeed(raw: object) -> float | None:
"""Parse OSM maxspeed value to km/h float; return None if unparseable."""
if raw is None or (isinstance(raw, float) and np.isnan(raw)):
return None
s = str(raw).strip().lower()
if s in {"none", "walk", "signals", "variable", ""}:
return None
try:
if "mph" in s:
return round(float(s.replace("mph", "").strip()) * 1.60934, 1)
if "knots" in s:
return round(float(s.replace("knots", "").strip()) * 1.852, 1)
return round(float(s), 1)
except ValueError:
return None
def assign_baseline_speeds(edges_df: pd.DataFrame) -> pd.DataFrame:
"""
Assign baseline speed (km/h) per edge, respecting directional tags and HGV caps.
edges_df must have columns: highway, maxspeed, maxspeed:forward, maxspeed:backward
"""
df = edges_df.copy()
# Parse all speed columns
df["speed_fwd"] = df.get("maxspeed:forward", pd.NA).map(parse_maxspeed)
df["speed_bwd"] = df.get("maxspeed:backward", pd.NA).map(parse_maxspeed)
df["speed_tag"] = df.get("maxspeed", pd.NA).map(parse_maxspeed)
# Resolve: forward tag > generic tag > highway-class cap
hw_caps = df["highway"].map(HGV_SPEED_CAPS).fillna(48.0)
df["baseline_speed_kmh"] = (
df["speed_fwd"]
.combine_first(df["speed_tag"])
.combine_first(hw_caps)
.clip(upper=90.0) # global HGV governor cap
)
return df
Step 3: Grade-Based Kinematic Correction
Sample the DEM along each edge’s geometry and compute a length-weighted mean grade. Apply the tractive effort reduction to baseline speeds. Declines use a separate braking-limited cap to prevent runaway speed assignments on long descents.
# Requires: rasterio>=1.3, numpy>=1.26, geopandas>=0.14
import rasterio
from rasterio.sample import sample_gen
import geopandas as gpd
from shapely.geometry import LineString
ALPHA_CLASS8 = 0.10 # grade sensitivity coefficient for Class 8 trucks (laden)
ALPHA_CLASS6 = 0.06 # lighter rigid trucks
MIN_SPEED_KMH = 12.0 # absolute floor (steep crawl speed)
DESCENT_CAP_KMH = 70.0 # braking-limited cap on negative grade
def sample_mean_grade(geom: LineString, dem_path: str, n_samples: int = 20) -> float:
"""
Compute length-weighted mean grade (signed decimal) along a LineString.
Positive = uphill in direction of travel.
"""
if geom is None or geom.length < 1e-9:
return 0.0
coords = [geom.interpolate(t, normalized=True).coords[0]
for t in np.linspace(0, 1, n_samples)]
xy = [(c[0], c[1]) for c in coords]
with rasterio.open(dem_path) as src:
elev = np.array([v[0] for v in src.sample(xy)], dtype=float)
elev = np.nan_to_num(elev, nan=0.0)
# Horizontal distance between first and last sample (projected metres)
horiz_m = geom.length # assumes projected CRS; reproject before calling
rise_m = elev[-1] - elev[0]
return rise_m / horiz_m if horiz_m > 0 else 0.0
def apply_grade_correction(
baseline_speeds: pd.Series,
grades: pd.Series,
vehicle_class: str = "class_8",
governor_limit_kmh: float = 90.0,
) -> pd.Series:
"""
Vectorized grade correction for heavy vehicle speed profiles.
Enforces physical bounds and governor cap.
"""
if not baseline_speeds.index.equals(grades.index):
raise ValueError("baseline_speeds and grades must share identical indices.")
alpha = ALPHA_CLASS8 if vehicle_class == "class_8" else ALPHA_CLASS6
# Uphill: linear reduction; downhill: braking cap applies
uphill_mask = grades > 0
adjusted = baseline_speeds.copy().astype(float)
# Uphill reduction
adjusted[uphill_mask] = (
baseline_speeds[uphill_mask] * (1.0 - alpha * grades[uphill_mask])
)
# Steep descent: cap at braking limit regardless of posted speed
steep_descent = grades < -0.04
adjusted[steep_descent] = adjusted[steep_descent].clip(upper=DESCENT_CAP_KMH)
# Global bounds
adjusted = np.clip(adjusted, a_min=MIN_SPEED_KMH, a_max=governor_limit_kmh)
# Safe NaN fallback: retain baseline where correction produced NaN
return adjusted.fillna(baseline_speeds).round(2)
Step 4: Telematics Ground-Truthing
Match fleet GPS traces to graph edges via spatial-temporal snapping, compute observed speed distributions, and derive per-edge correction factors. This step transforms the physics model into an empirically calibrated surface.
# Requires: geopandas>=0.14, pandas>=2.1, scipy>=1.11
from scipy.stats import trim_mean
def snap_traces_to_edges(
traces_gdf: gpd.GeoDataFrame, # columns: geometry (Point), timestamp, speed_kmh, vehicle_id
edges_gdf: gpd.GeoDataFrame, # columns: edge_id, geometry (LineString)
snap_distance_m: float = 20.0,
) -> pd.DataFrame:
"""
Spatial join of GPS pings to nearest edge within snap_distance_m.
Returns DataFrame with columns: edge_id, observed_speed_kmh, vehicle_id, timestamp.
"""
# Reproject to metric CRS for accurate distance buffering
traces_proj = traces_gdf.to_crs(epsg=3857)
edges_proj = edges_gdf.to_crs(epsg=3857)
joined = gpd.sjoin_nearest(
traces_proj[["geometry", "speed_kmh", "vehicle_id", "timestamp"]],
edges_proj[["edge_id", "geometry"]],
how="left",
max_distance=snap_distance_m,
distance_col="snap_dist_m",
)
return joined[["edge_id", "speed_kmh", "vehicle_id", "timestamp"]].rename(
columns={"speed_kmh": "observed_speed_kmh"}
)
def compute_edge_correction_factors(
snapped: pd.DataFrame,
calibrated_df: pd.DataFrame, # columns: edge_id, calibrated_speed_kmh
min_observations: int = 30,
) -> pd.DataFrame:
"""
Compare observed vs. calibrated speeds per edge.
Returns correction factor = median_observed / calibrated_speed.
Only edges with >= min_observations passes are adjusted.
"""
# Filter out idle (< 10 km/h) and GPS-drift outliers (> 110 km/h)
valid = snapped.query("10 <= observed_speed_kmh <= 110").copy()
agg = (
valid.groupby("edge_id")["observed_speed_kmh"]
.agg(
obs_count="count",
obs_median="median",
obs_p25=lambda x: x.quantile(0.25),
obs_p75=lambda x: x.quantile(0.75),
)
.reset_index()
)
merged = agg.merge(calibrated_df[["edge_id", "calibrated_speed_kmh"]], on="edge_id")
merged["mape"] = (
(merged["obs_median"] - merged["calibrated_speed_kmh"]).abs()
/ merged["calibrated_speed_kmh"]
) * 100
# Only correct edges with statistical confidence
high_confidence = merged["obs_count"] >= min_observations
merged["correction_factor"] = np.where(
high_confidence,
merged["obs_median"] / merged["calibrated_speed_kmh"].clip(lower=1.0),
1.0,
)
return merged
Step 5: Edge Weight Injection
Apply correction factors and write traversal times back to the graph. The output feeds directly into the edge weight schema that configuring edge weights for freight logistics describes.
# Requires: networkx>=3.2, numpy>=1.26, pandas>=2.1
def inject_traversal_times(
G: nx.MultiDiGraph,
correction_df: pd.DataFrame, # columns: edge_id, correction_factor
calibrated_df: pd.DataFrame, # columns: edge_id, calibrated_speed_kmh, length_m
) -> nx.MultiDiGraph:
"""
Compute final traversal times (seconds) per edge and write as 'travel_time' attribute.
edge_id convention: (u, v, k) tuples stored as string 'u-v-k'.
"""
final = calibrated_df.merge(correction_df[["edge_id", "correction_factor"]], on="edge_id", how="left")
final["correction_factor"] = final["correction_factor"].fillna(1.0)
final["final_speed_kmh"] = (
final["calibrated_speed_kmh"] * final["correction_factor"]
).clip(lower=MIN_SPEED_KMH, upper=90.0)
final["travel_time_s"] = (final["length_m"] / (final["final_speed_kmh"] / 3.6)).round(2)
time_map = dict(zip(final["edge_id"], final["travel_time_s"]))
for u, v, k, data in G.edges(keys=True, data=True):
eid = f"{u}-{v}-{k}"
if eid in time_map:
data["travel_time"] = time_map[eid]
else:
# Fallback: use length and free-flow speed
length = data.get("length", 100.0)
data["travel_time"] = round(length / (48.0 / 3.6), 2)
return G
Configuration Reference
| Parameter | Recommended range | Effect |
|---|---|---|
ALPHA_CLASS8 |
0.08 – 0.12 | Grade sensitivity for fully laden Class 8 (40 t GVW). Higher values steepen descent into low-speed territory on sustained grades. |
ALPHA_CLASS6 |
0.04 – 0.08 | Lighter rigid trucks (12–18 t); less power-limited on grades. |
MIN_SPEED_KMH |
10 – 15 | Floor below which calibration will not push edge speed; prevents division errors in traversal time. |
DESCENT_CAP_KMH |
60 – 75 | Braking-limited cap on downhill edges; consult vehicle’s rated descent speed for the payload. |
governor_limit_kmh |
80 – 90 | Legal governor cap; varies by jurisdiction and vehicle registration class. |
snap_distance_m |
15 – 25 | GPS ping snap radius; tighter values reduce cross-road misassignment on dense urban grids. |
min_observations |
30 – 100 | Minimum GPS pings before a telematics correction factor is applied; lower values risk noise on sparse rural edges. |
n_samples (DEM) |
10 – 30 | DEM elevation samples per edge; more samples improve grade accuracy on curved alignments. |
Key OSM tag mappings:
| OSM tag | Pipeline role |
|---|---|
hgv=no / hgv=private |
Edge removed in Step 1 |
maxweight (tonnes) |
Edge removed if below VEHICLE_WEIGHT_T |
maxheight (metres) |
Edge removed if below VEHICLE_HEIGHT_M |
maxspeed / maxspeed:forward |
Baseline speed source in Step 2 |
hgv:conditional |
Time-dependent boolean mask (Step 1, extend for time-aware routing) |
highway class |
Fallback speed cap lookup key |
Production Optimization and Scaling
Chunked processing for large networks. For graphs exceeding 10 M edges, avoid loading all edge attributes into a single DataFrame. Serialize the graph to Parquet with pyarrow and process in spatial tiles:
# Requires: pyarrow>=14, pandas>=2.1, geopandas>=0.14
import pyarrow.parquet as pq
import pyarrow as pa
def edges_to_parquet(G: nx.MultiDiGraph, output_path: str) -> None:
"""Serialize edge attributes to Parquet for chunked downstream processing."""
records = [
{"edge_id": f"{u}-{v}-{k}", **{kk: vv for kk, vv in data.items()}}
for u, v, k, data in G.edges(keys=True, data=True)
]
df = pd.DataFrame(records)
table = pa.Table.from_pandas(df)
pq.write_table(table, output_path, compression="zstd")
Spatial indexing for telematics snapping. gpd.sjoin_nearest builds an STRtree internally, but for very large trace corpuses (>100 M pings) pre-build the index explicitly and batch queries in 500 k chunks to avoid peak memory spikes.
Compiled-tool alternatives. Where Python throughput is a bottleneck, deploying OSRM with Docker for local routing provides a compiled C++ alternative that accepts custom speed profiles via lua profile scripts — the calibrated final_speed_kmh values from this pipeline can be exported as a CSV speed table and consumed directly by OSRM’s traffic endpoint.
Deterministic execution. Fix numpy random seeds (np.random.seed(42)) in any Monte Carlo fallback routines. Store calibration coefficients and DEM checksums in a versioned manifest so CI/CD re-runs reproduce identical edge weights.
Validation and Testing
After running the full pipeline, apply the following sanity checks before promoting calibrated weights to production:
-
Speed distribution histogram — plot
final_speed_kmhper highway class. Motorway edges should cluster between 80–89 km/h; residential edges should cluster between 30–50 km/h. Any bimodal distributions indicate a parsing error in Step 2. -
Grade-speed correlation — compute
np.corrcoef(grades, final_speeds)over the edge set. Expect a negative correlation (r ≈ −0.35 to −0.55 for hilly networks). A near-zero correlation suggests grade data was not applied. -
MAPE distribution — from
compute_edge_correction_factors, the median MAPE across high-confidence edges should fall below 8 %. Edges with MAPE > 20 % warrant manual inspection for tag parsing anomalies or GPS trace quality issues. -
Connectivity check — after HGV filtering, verify that all major industrial zones and distribution centres remain reachable:
# Requires: networkx>=3.2
def assert_key_nodes_reachable(G: nx.MultiDiGraph, key_node_ids: list[int]) -> None:
"""Raise AssertionError if any key node is disconnected from the main component."""
largest_cc = max(nx.weakly_connected_components(G), key=len)
unreachable = [n for n in key_node_ids if n not in largest_cc]
assert not unreachable, f"Key nodes disconnected after HGV filtering: {unreachable}"
- ETA regression — route a held-out set of historical shipments using legacy and calibrated profiles. The calibrated profile should reduce mean absolute ETA error; a regression (error increases) indicates a coefficient tuning problem.
Troubleshooting
Calibrated speeds drop to the minimum floor on flat urban edges
This usually means grade values are being supplied as percentages (e.g. 5.0) rather than decimals (0.05). The tractive effort formula 1 - alpha * grade becomes negative when grade > 1/alpha, clipping everything to MIN_SPEED_KMH. Normalize grades before calling apply_grade_correction: grades = grades / 100 if your DEM pipeline returns percent values.
Missing maxspeed tags on 40–60% of edges in rural areas
Rural OSM coverage for maxspeed is sparse in many regions. Add a second fallback tier: after the highway-class cap, apply a country-level default table keyed on ISO 3166-1 alpha-2 codes, resolved via a spatial join with a country-boundary GeoJSON. For North American rural primary roads this typically fills gaps at 72–88 km/h.
High MAPE on motorway on-ramps and off-ramps
Ramp edges (highway=motorway_link) are short and heavily speed-limited by geometry — a laden truck cannot accelerate to posted speed within 200–400 m of ramp length. Set a dedicated ramp speed factor of 0.55–0.70 × posted limit in HGV_SPEED_CAPS for link_motorway and motorway_link keys. This alone typically reduces ramp MAPE by 15–25 percentage points.
Graph becomes fragmented after HGV access filtering
Industrial corridors sometimes connect via bridge or tunnel edges that exceed weight limits. After filtering, run a weakly-connected-components check and compare against the pre-filter component count. If fragmentation is significant, review the weight and height tag parsing logic — maxweight values in OSM can appear as "7.5 t", "7500 kg", or bare "7.5" (implying tonnes). Normalize to tonnes before comparing against VEHICLE_WEIGHT_T. See graph fragmentation prevention in OSM data for systematic repair strategies.
Telematics snapping assigns pings to the wrong parallel road
In urban grids with parallel arterials spaced 25–40 m apart, sjoin_nearest can mis-assign pings from a high-speed motorway service road to an adjacent residential street. Reduce snap_distance_m to 12–15 m and add a bearing-filter post-step: discard snapped pings where the GPS heading differs from the edge bearing by more than 30°. This requires a bearing column derived from consecutive timestamp pairs in the trace.
Related
- Building directed graphs from OSM PBF files — foundational graph construction that must preserve
hgv,maxweight, andmaxspeedtags for this calibration pipeline to work correctly - Calibrating speed profiles for electric delivery fleets — extends this pipeline with state-of-charge degradation and regenerative braking efficiency on grade edges
- Configuring edge weights for freight logistics — consumes the
travel_timeedge attribute produced here and assembles the full cost function for A* and Contraction Hierarchies - Graph fragmentation prevention in OSM data — connectivity repair strategies needed after heavy HGV access filtering
- Handling turn restrictions in routing graphs — complements speed calibration with penalty-based or hard-constraint turn costs for vehicles subject to HGV-specific turn bans