Skip to content

Engine

This module manages interactions between other raster modules.

It serves as the core dispatch mechanism for raster processing.

AggregationType

Bases: Enum

Strategies for combining results from chunked processing.

Options

STITCH: Reassemble processed tiles into a single raster file. COLLECT: Return a list of results for each tile (no stitching). REDUCE: Accumulate results using a reducer function (e.g. sum, max). NONE: Discard individual results (useful for side-effect functions).

Source code in src/phytospatial/raster/engine.py
class AggregationType(Enum):
    """Strategies for combining results from chunked processing.

    Options:
        STITCH: Reassemble processed tiles into a single raster file.
        COLLECT: Return a list of results for each tile (no stitching).
        REDUCE: Accumulate results using a reducer function (e.g. sum, max).
        NONE: Discard individual results (useful for side-effect functions).
    """
    STITCH = "stitch"
    COLLECT = "collect"
    REDUCE = "reduce"
    NONE = "none"

DispatchConfig

Configuration object for the execution engine.

Parameters:

Name Type Description Default
mode Union[ProcessingMode, str]

ProcessingMode to enforce ('in_memory', 'tiled', 'blocked', 'auto').

'auto'
tile_size int

Size of tiles/windows for streaming (in pixels). Default=512.

512
overlap int

Overlap between tiles/windows (in pixels). Default=0.

0
output_path Optional[Union[str, Path]]

Optional path to save output raster (required for STITCH).

None
aggregation AggregationType

AggregationType for combining results. Default=STITCH.

STITCH
reducer Optional[Callable[[Any, Any], Any]]

Optional function for REDUCE aggregation.

None
Source code in src/phytospatial/raster/engine.py
class DispatchConfig:
    """Configuration object for the execution engine.

    Args:
        mode: ProcessingMode to enforce ('in_memory', 'tiled', 'blocked', 'auto').
        tile_size: Size of tiles/windows for streaming (in pixels). Default=512.
        overlap: Overlap between tiles/windows (in pixels). Default=0.
        output_path: Optional path to save output raster (required for STITCH).
        aggregation: AggregationType for combining results. Default=STITCH.
        reducer: Optional function for REDUCE aggregation.
    """
    def __init__(
        self,
        mode: Union[ProcessingMode, str] = "auto",
        tile_size: int = 512,
        overlap: int = 0,
        output_path: Optional[Union[str, Path]] = None,
        aggregation: AggregationType = AggregationType.STITCH,
        reducer: Optional[Callable[[Any, Any], Any]] = None
    ):
        self.mode = mode
        self.tile_size = tile_size
        self.overlap = overlap
        self.output_path = Path(output_path) if output_path else None
        self.aggregation = aggregation
        self.reducer = reducer

dispatch(func, input_map, static_args=(), static_kwargs=None, config=None)

Execute a function over raster inputs using the optimal strategy.

Parameters:

Name Type Description Default
func Callable

The function to execute. Must accept raster objects as arguments.

required
input_map Dict[str, Union[str, Path, Raster]]

Dictionary mapping argument names to raster sources. {'raster_a': 'file1.tif', 'raster_b': 'file2.tif'}

required
static_args Tuple

Positional arguments to pass to func (passed through).

()
static_kwargs Dict

Keyword arguments to pass to func (passed through).

None
config DispatchConfig

Execution configuration (Mode, Tiling, Aggregation).

None

Returns:

Type Description
Any

The result of the processing (Path, List, or Value).

Source code in src/phytospatial/raster/engine.py
def dispatch(
    func: Callable,
    input_map: Dict[str, Union[str, Path, Raster]],
    static_args: Tuple = (),
    static_kwargs: Dict = None,
    config: DispatchConfig = None
) -> Any:
    """
    Execute a function over raster inputs using the optimal strategy.

    Args:
        func: The function to execute. Must accept raster objects as arguments.
        input_map: Dictionary mapping argument names to raster sources.
                   {'raster_a': 'file1.tif', 'raster_b': 'file2.tif'}
        static_args: Positional arguments to pass to func (passed through).
        static_kwargs: Keyword arguments to pass to func (passed through).
        config: Execution configuration (Mode, Tiling, Aggregation).

    Returns:
        The result of the processing (Path, List, or Value).
    """
    if not input_map:
        raise ValueError("Cannot dispatch engine without at least one raster input.")

    static_kwargs = static_kwargs or {}
    config = config or DispatchConfig()

    primary_input = next(iter(input_map.values()))

    if isinstance(primary_input, Raster):
        mode = ProcessingMode.IN_MEMORY
        log.info(f"Engine dispatching {func.__name__} in IN_MEMORY mode (Object Input)")
    else:
        user_mode = config.mode if config.mode != "auto" else "auto"
        report = determine_strategy(Path(primary_input), user_mode=user_mode)
        mode = report.mode

        log.info(f"Engine dispatching {func.__name__} in {mode.value} mode")
        log.debug(f"Strategy Report: {report.reason}")

    if mode == ProcessingMode.IN_MEMORY:
        loaded_inputs = {}
        for name, source in input_map.items():
            loaded_inputs[name] = source if isinstance(source, Raster) else load(source)
        return func(*static_args, **{**static_kwargs, **loaded_inputs})

    else:
        if config.aggregation == AggregationType.STITCH and not config.output_path:
            raise ValueError("AggregationType.STITCH requires 'output_path' in config.")

        def execution_stream():
            # Process tiles/blocks and yield results
            for window, tiles in _synchronize_inputs(input_map, mode, config):
                yield window, func(*static_args, **{**static_kwargs, **tiles})

        if config.aggregation == AggregationType.STITCH:
            return _aggregate_stitch(execution_stream(), primary_input, config.output_path)
        elif config.aggregation == AggregationType.COLLECT:
            return [res for _, res in execution_stream()]
        elif config.aggregation == AggregationType.REDUCE:
            if not config.reducer: raise ValueError("REDUCE requires 'reducer' function.")
            acc = None
            for _, res in execution_stream():
                acc = res if acc is None else config.reducer(acc, res)
            return acc
        else:
            for _ in execution_stream(): pass
            return None