Skip to content

Csf

This module refactors the Cloth Simulation Filter (CSF) python package, solving two major bottlenecks:

  1. Speed performance

  2. Original C++ CSF: 6.09 seconds

  3. Refactored (In-memory): 3.88 seconds
  4. Refactored (chunked): 5.56 seconds

By eliminating the C++ bindings, avoiding std::vector memory duplication, and using JIT-compiled contiguous memory arrays, the execution time dropped by around 20% for the in-memory implementation. The Python/Numba implementation is faster than the compiled C++ library because it entirely avoids CPU cache thrashing and heavy object instantiation.

  1. The Memory Bottleneck (Disk Streaming Comparison)
  2. Original Peak Memory: 3500.71 MB
  3. Refactored (In-Memory): Peak Memory: 3500.66 MB
  4. Refactored (Chunked) Peak Memory: 549.60 MB

The memory footprint was slashed by 84%, and this could be improved further by decreasing chunk size. Using the reduced memory footprint, the refactored implementation still managed to outpace the original package by a small margin.

Best of all, the refactored implementation is now contained in 1 module only.

simulate_cloth(pc, cell_size, iterations, time_step, rigidness, height_threshold)

Coordinates an in-memory spatial processing pipeline utilizing pure JIT-compiled structures to classify ground points.

Parameters:

Name Type Description Default
pc Union[str, Path, PointCloud]

The memory-resident LiDAR point cloud object or a file path that will be dynamically resolved into memory.

required
cell_size float

The spatial resolution of the 2D simulation grid in coordinate units.

required
iterations int

The total number of consecutive physics engine passes to run.

required
time_step float

The downward gravitational displacement applied to the cloth per iteration.

required
rigidness float

The maximum allowed vertical deviation from the localized cross-neighborhood average.

required
height_threshold float

The maximum absolute vertical distance from the final cloth to classify a point as ground.

required

Returns:

Type Description
ndarray

np.ndarray: A 1D boolean array aligned with the input point cloud where True designates a ground classification.

Source code in src/phytospatial/lidar/csf.py
@resolve_pc
def simulate_cloth(
    pc: Union[str, Path, PointCloud],
    cell_size: float,
    iterations: int,
    time_step: float,
    rigidness: float,
    height_threshold: float
    ) -> np.ndarray:
    """
    Coordinates an in-memory spatial processing pipeline utilizing pure JIT-compiled structures 
    to classify ground points.

    Args:
        pc (Union[str, Path, PointCloud]): The memory-resident LiDAR point cloud object or a file path 
            that will be dynamically resolved into memory.
        cell_size (float): The spatial resolution of the 2D simulation grid in coordinate units.
        iterations (int): The total number of consecutive physics engine passes to run.
        time_step (float): The downward gravitational displacement applied to the cloth per iteration.
        rigidness (float): The maximum allowed vertical deviation from the localized cross-neighborhood average.
        height_threshold (float): The maximum absolute vertical distance from the final cloth to classify a point as ground.

    Returns:
        np.ndarray: A 1D boolean array aligned with the input point cloud where True designates a ground classification.
    """
    cols = int(np.ceil((pc.max_x - pc.min_x) / cell_size)) + 1
    rows = int(np.ceil((pc.max_y - pc.min_y) / cell_size)) + 1

    highest_z_grid = np.full((rows, cols), -np.inf, dtype=np.float64)

    _populate_z_grid(
        x=pc.x, y=pc.y, z=pc.z, 
        min_x=pc.min_x, min_y=pc.min_y, 
        cell_size=cell_size, 
        highest_z_grid=highest_z_grid
    )

    final_cloth = _run_csf_iterations(
        highest_z_grid=highest_z_grid,
        iterations=iterations,
        time_step=time_step,
        rigidness=rigidness
    )

    return _extract_ground_mask(
        x=pc.x, y=pc.y, z=pc.z, 
        min_x=pc.min_x, min_y=pc.min_y, 
        cell_size=cell_size, 
        cloth=final_cloth, 
        height_threshold=height_threshold
    )

simulate_cloth_chunked(source, cell_size, iterations, time_step, rigidness, height_threshold, chunk_size=2000000)

Coordinates a streaming spatial processing pipeline utilizing pure JIT-compiled structures to classify ground points globally while maintaining strict memory safety boundaries.

Parameters:

Name Type Description Default
source Union[str, Path]

Target .las or .laz file.

required
cell_size float

The spatial resolution of the 2D simulation grid in coordinate units.

required
iterations int

The total number of consecutive physics engine passes to run.

required
time_step float

The downward gravitational displacement applied to the cloth per iteration.

required
rigidness float

The maximum allowed vertical deviation from the localized cross-neighborhood average.

required
height_threshold float

The maximum absolute vertical distance from the final cloth to classify a point as ground.

required
chunk_size int

Number of points to stream per chunk to maintain memory safety.

2000000

Yields:

Type Description
ndarray

Generator[np.ndarray, None, None]: Sequential 1D boolean arrays mapping strictly to the yielded chunks, where True designates a ground classification.

Source code in src/phytospatial/lidar/csf.py
def simulate_cloth_chunked(
    source: Union[str, Path],
    cell_size: float,
    iterations: int,
    time_step: float,
    rigidness: float,
    height_threshold: float,
    chunk_size: int = 2_000_000
    ) -> Generator[np.ndarray, None, None]:
    """
    Coordinates a streaming spatial processing pipeline utilizing pure JIT-compiled structures 
    to classify ground points globally while maintaining strict memory safety boundaries.

    Args:
        source (Union[str, Path]): Target .las or .laz file.
        cell_size (float): The spatial resolution of the 2D simulation grid in coordinate units.
        iterations (int): The total number of consecutive physics engine passes to run.
        time_step (float): The downward gravitational displacement applied to the cloth per iteration.
        rigidness (float): The maximum allowed vertical deviation from the localized cross-neighborhood average.
        height_threshold (float): The maximum absolute vertical distance from the final cloth to classify a point as ground.
        chunk_size (int): Number of points to stream per chunk to maintain memory safety.

    Yields:
        Generator[np.ndarray, None, None]: Sequential 1D boolean arrays mapping strictly to the yielded chunks, 
                                           where True designates a ground classification.
    """
    source_path = Path(source)

    with laspy.open(source_path) as fh:
        min_x = fh.header.x_min
        max_x = fh.header.x_max
        min_y = fh.header.y_min
        max_y = fh.header.y_max

    cols = int(np.ceil((max_x - min_x) / cell_size)) + 1
    rows = int(np.ceil((max_y - min_y) / cell_size)) + 1

    highest_z_grid = np.full((rows, cols), -np.inf, dtype=np.float64)

    for pc in iter_pc(source_path, chunk_size=chunk_size):
        _populate_z_grid(
            x=pc.x, y=pc.y, z=pc.z, 
            min_x=min_x, min_y=min_y, 
            cell_size=cell_size, 
            highest_z_grid=highest_z_grid
        )

    final_cloth = _run_csf_iterations(
        highest_z_grid=highest_z_grid,
        iterations=iterations,
        time_step=time_step,
        rigidness=rigidness
    )

    for pc in iter_pc(source_path, chunk_size=chunk_size):
        yield _extract_ground_mask(
            x=pc.x, y=pc.y, z=pc.z, 
            min_x=min_x, min_y=min_y, 
            cell_size=cell_size, 
            cloth=final_cloth, 
            height_threshold=height_threshold
        )