Source code for pylandstats.zonal

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import rasterio
from numpy.lib import stride_tricks
from rasterio import features

from . import landscape as pls_landscape
from . import multilandscape

try:
    import geopandas as gpd
    from shapely.geometry import Point
    from shapely.geometry.base import BaseGeometry
    geo_imports = True
except ImportError:
    geo_imports = False

__all__ = ['ZonalAnalysis', 'BufferAnalysis', 'ZonalGridAnalysis']


[docs]class ZonalAnalysis(multilandscape.MultiLandscape):
[docs] def __init__(self, landscape, masks_arr, landscape_crs=None, landscape_transform=None, attribute_name=None, attribute_values=None, **kwargs): """ Parameters ---------- landscape : `Landscape` or str, file object or pathlib.Path object A `Landscape` object or of string/file object/pathlib.Path object that will be passed as the `landscape` argument of `Landscape.__init__` masks_arr : list-like or np.ndarray A list-like of numpy arrays of shape (width, height), i.e., of the same shape as the landscape raster image. Each array will serve to mask the base landscape and define a region of study for which the metrics will be computed separately. The same information can also be provided as a single array of shape (num_masks, width, height). landscape_crs : dict, optional The coordinate reference system of the landscapes. Used to dump rasters in the `compute_zonal_statistics_arr` method. Ignored if the passed-in `landscape` is a path to a GeoTiff raster that already contains such information. landscape_transform : affine.Affine Transformation from pixel coordinates to coordinate reference system. Used to dump rasters in the `compute_zonal_statistics_arr` method. Ignored if the passed-in `landscape` is a path to a GeoTiff raster that already contains such information. attribute_name : str, optional Name of the attribute that will distinguish each landscape attribute_values : str, optional Values of the attribute that correspond to each of the landscapes """ # read input data/metadata if not isinstance(landscape, pls_landscape.Landscape): with rasterio.open(landscape) as src: landscape_crs = src.crs landscape = pls_landscape.Landscape(landscape) landscape_arr = landscape.landscape_arr height, width = landscape_arr.shape if landscape.transform is not None: landscape_transform = landscape.transform landscapes = [ pls_landscape.Landscape( np.where(mask_arr, landscape_arr, landscape.nodata).astype( landscape.landscape_arr.dtype), res=(landscape.cell_width, landscape.cell_height), nodata=landscape.nodata, transform=landscape.transform) for mask_arr in masks_arr ] # store `landscape_meta`/`masks_arr` as instance attributes so that we # can compute zonal statistics self.landscape_meta = dict( driver='GTiff', width=width, height=height, count=1, transform=landscape_transform, crs=landscape_crs, ) self.masks_arr = masks_arr # useful in `compute_zonal_statistics_arr` below self.filter_landscape_nodata = True # The attribute name will be `buffer_dists` for `BufferAnalysis` or # `transect_dist` for `TransectAnalysis`, but for any other custom use # of `ZonalAnalysis`, the user might provide (or not) a custom name if attribute_name is None: attribute_name = 'attribute_values' # If the values for the distinguishing attribute are not provided, a # basic enumeration will be automatically generated if attribute_values is None: attribute_values = [i for i in range(len(masks_arr))] # now call the parent's init super(ZonalAnalysis, self).__init__(landscapes, attribute_name, attribute_values, **kwargs)
[docs] def compute_zonal_statistics_arr(self, metric, class_val=None, metric_kws=None, dst_filepath=None, custom_meta=None): """ Compute the zonal statistics of a metric over an array with the form of the landscape Parameters ---------- metric : str A string indicating the name of the metric for which the zonal statistics will be computed class_val : int, optional If provided, the zonal statistics will be computed for the metric computed at the level of the corresponding class, otherwise they will be computed at the landscape level metric_kws : dict, optional Keyword arguments to be passed to the method that computes the metric (specified in the `metric` argument) for each landscape dst_filepath : str, file object or pathlib.Path object, optional Path to dump the zonal statistics raster. If not provided, no raster will be dumped. custom_meta : dict, optional Custom meta data for the output raster, consistent with the rasterio library. Returns ------- zonal_statistics_arr : ndarray Two-dimensional array with the computed zonal statistics """ # ACHTUNG: do not confuse `metric_kws` and `metrics_kws`. The former # are the keyword arguments for the method to compute the metric. The # latter is a dict mapping the metric to such keyword argument (such # dict will be passed to the `compute_class_metrics_df`/ # `compute_landscape_metrics_df` method) if metric_kws is None: metrics_kws = None else: metrics_kws = {metric: metric_kws} if class_val is None: zonal_metrics_df = self.compute_landscape_metrics_df( metrics=[metric], metrics_kws=metrics_kws) metric_ser = zonal_metrics_df[metric] else: zonal_metrics_df = self.compute_class_metrics_df( metrics=[metric], classes=[class_val], metrics_kws=metrics_kws) metric_ser = zonal_metrics_df.loc[class_val, metric] # ensure that we have numeric types (not strings) metric_ser = pd.to_numeric(metric_ser) # reconstruct the zonal statistics array zonal_statistics_arr = np.full( (self.landscape_meta['height'], self.landscape_meta['width']), np.nan, dtype=metric_ser.dtype) if self.filter_landscape_nodata: for metric_val, landscape, mask_arr in zip(metric_ser, self.landscapes, self.masks_arr): zonal_statistics_arr[ (landscape.landscape_arr != landscape.nodata) & mask_arr] = metric_val else: for metric_val, mask_arr in zip(metric_ser, self.masks_arr): zonal_statistics_arr[mask_arr] = metric_val # dump a raster if dst_filepath: dst_meta = self.landscape_meta.copy() dst_meta.update(dtype=zonal_statistics_arr.dtype) if custom_meta is None: dst_meta.update(nodata=np.nan) else: if 'nodata' in custom_meta: zonal_statistics_arr[np.isnan( zonal_statistics_arr)] = custom_meta['nodata'] dst_meta.update(**custom_meta) with rasterio.open(dst_filepath, 'w', **dst_meta) as dst: dst.write(zonal_statistics_arr, 1) return zonal_statistics_arr
[docs]class BufferAnalysis(ZonalAnalysis):
[docs] def __init__(self, landscape, base_mask, buffer_dists, buffer_rings=False, base_mask_crs=None, landscape_crs=None, landscape_transform=None): """ Parameters ---------- landscape : `Landscape` or str, file object or pathlib.Path object A `Landscape` object or of string/file object/pathlib.Path object that will be passed as the `landscape` argument of `Landscape.__init__` base_mask : shapely geometry or geopandas GeoSeries Geometry that will serve as a base mask to buffer around buffer_dists : list-like Buffer distances buffer_rings : bool, default False If `False`, each buffer zone will consist of the whole region that lies within the respective buffer distance around the base mask. If `True`, buffer zones will take the form of rings around the base mask. base_mask_crs : dict, optional The coordinate reference system of the base mask. Required if the base mask is a shapely geometry or a geopandas GeoSeries without the `crs` attribute set landscape_crs : dict, optional The coordinate reference system of the landscapes. Required if the passed-in landscapes are `Landscape` objects, ignored if they are paths to GeoTiff rasters that already contain such information. landscape_transform : affine.Affine Transformation from pixel coordinates to coordinate reference system. Required if the passed-in landscapes are `Landscape` objects, ignored if they are paths to GeoTiff rasters that already contain such information. """ # first check that we meet the package dependencies if not geo_imports: raise ImportError( "The `BufferAnalysis` class requires the geopandas package. " "For better performance, we strongly suggest that you install " "its cythonized version via conda-forge as in:\nconda install " "-c conda-forge/label/dev geopandas\n See " "https://github.com/geopandas/geopandas for more information " "about installing geopandas") # get `buffer_masks_arr` from a base geometry and a list of buffer # distances # 1. get a GeoSeries with the base mask geometry if isinstance(base_mask, BaseGeometry): if base_mask_crs is None: raise ValueError( "If `base_mask` is a shapely geometry, `base_mask_crs` " "must be provided") # BufferSpatioTemporalAnalysis.get_buffer_masks_gser( base_mask_gser = gpd.GeoSeries(base_mask, crs=base_mask_crs) else: # we assume that `base_mask` is a geopandas GeoSeries if base_mask.crs is None: if base_mask_crs is None: raise ValueError( "If `base_mask` is a naive geopandas GeoSeries (with " "no crs set), `base_mask_crs` must be provided") base_mask_gser = base_mask.copy() # avoid alias/ref problems base_mask_gser.crs = base_mask_crs else: base_mask_gser = base_mask # 2. get the crs, transform and shape of the landscapes if isinstance(landscape, pls_landscape.Landscape): if landscape_crs is None: raise ValueError( "If passing `Landscape` objects (instead of geotiff " "filepaths), `landscape_crs` must be provided") if landscape_transform is None: raise ValueError( "If passing `Landscape` objects (instead of geotiff " "filepaths), `landscape_transform` must be provided") landscape_shape = landscape.landscape_arr.shape else: with rasterio.open(landscape) as src: landscape_crs = src.crs landscape_transform = src.transform landscape_shape = src.height, src.width # 3. buffer around base mask avg_longitude = base_mask_gser.to_crs( '+proj=longlat +ellps=WGS84 +datum=WGS84 +no_defs' ).unary_union.centroid.x # trick from OSMnx to be able to buffer in meters utm_zone = int(np.floor((avg_longitude + 180) / 6.) + 1) utm_crs = { 'datum': 'WGS84', 'ellps': 'WGS84', 'proj': 'utm', 'zone': utm_zone, 'units': 'm' } base_mask_geom = base_mask_gser.to_crs(utm_crs).iloc[0] if buffer_rings: if not isinstance(base_mask_geom, Point): raise ValueError( "Buffer rings can only work when `base_mask_geom` is a " "`Point`") _buffer_dists = np.concatenate([[0], buffer_dists]) buffer_dists = list( map(lambda d: '{}-{}'.format(d[0], d[1]), zip(_buffer_dists[:-1], _buffer_dists[1:]))) masks_gser = gpd.GeoSeries([ base_mask_geom.buffer(_buffer_dists[i + 1]) - base_mask_geom.buffer(_buffer_dists[i]) for i in range(len(_buffer_dists) - 1) ], index=buffer_dists, crs=utm_crs).to_crs(landscape_crs) else: masks_gser = gpd.GeoSeries([ base_mask_geom.buffer(buffer_dist) for buffer_dist in buffer_dists ], index=buffer_dists, crs=utm_crs).to_crs(landscape_crs) # 4. rasterize each mask num_rows, num_cols = landscape_shape buffer_masks_arr = np.zeros((len(buffer_dists), num_rows, num_cols), dtype=np.uint8) for i in range(len(masks_gser)): buffer_masks_arr[i] = features.rasterize( [masks_gser.iloc[i]], out_shape=landscape_shape, transform=landscape_transform, dtype=np.uint8) buffer_masks_arr = buffer_masks_arr.astype(bool) # now we can call the parent's init with the landscape and the # constructed buffer_masks_arr super(BufferAnalysis, self).__init__(landscape, buffer_masks_arr, landscape_crs=landscape_crs, landscape_transform=landscape_transform, attribute_name='buffer_dists', attribute_values=buffer_dists)
# override docs
[docs] def compute_class_metrics_df(self, metrics=None, classes=None, metrics_kws=None): return super(BufferAnalysis, self).compute_class_metrics_df(metrics=metrics, classes=classes, metrics_kws=metrics_kws)
compute_class_metrics_df.__doc__ = \ multilandscape._compute_class_metrics_df_doc.format( index_descr='multi-indexed by the class and buffer distance', index_return='class, buffer distance (multi-index)')
[docs] def compute_landscape_metrics_df(self, metrics=None, metrics_kws=None): return super(BufferAnalysis, self).compute_landscape_metrics_df( metrics=metrics, metrics_kws=metrics_kws)
compute_landscape_metrics_df.__doc__ = \ multilandscape._compute_landscape_metrics_df_doc.format( index_descr='indexed by the buffer distance', index_return='buffer distance (index)')
[docs]class ZonalGridAnalysis(ZonalAnalysis):
[docs] def __init__(self, landscape, num_zone_rows=None, num_zone_cols=None, zone_pixel_width=None, zone_pixel_height=None, landscape_crs=None, landscape_transform=None): """ Parameters ---------- landscape : `Landscape` or str, file object or pathlib.Path object A `Landscape` object or of string/file object/pathlib.Path object that will be passed as the `landscape` argument of `Landscape.__init__` num_zone_rows, num_zone_cols : int, optional The number of zone rows/columns into which the landscape will be separated. If the landscape dimensions and the desired zones do not divide evenly, the zones will be defined for the maximum subset (starting from the top, left corner) for which there is an even division. If not provided, then `num_pixel_width`/ `num_pixel_height` must be provided. num_pixel_width, num_pixel_height : int, optional The width/height of each zone (in pixels). If the landscape dimensions and the desired zones do not divide evenly, the zones will be defined for the maximum subset (starting from the top, left corner) for which there is an even division. If not provided, then `num_zone_rows`/`num_zone_cols` must be provided. landscape_crs : dict, optional The coordinate reference system of the landscapes. Required to reconstruct the zonal statistics rasters if the passed-in landscapes are `Landscape` objects, ignored if they are paths to GeoTiff rasters that already contain such information. landscape_transform : affine.Affine Transformation from pixel coordinates to coordinate reference system. Required if the passed-in landscapes are `Landscape` objects, ignored if they are paths to GeoTiff rasters that already contain such information. """ if not isinstance(landscape, pls_landscape.Landscape): with rasterio.open(landscape) as src: landscape_crs = src.crs landscape = pls_landscape.Landscape(landscape) landscape_arr = landscape.landscape_arr height, width = landscape_arr.shape if zone_pixel_height is None: if num_zone_rows is None: raise ValueError( "Either `num_zone_rows` or `zone_pixel_height` must be " "provided") zone_pixel_height = height // num_zone_rows if zone_pixel_width is None: if num_zone_cols is None: raise ValueError( "Either `num_zone_cols` or `zone_pixel_width` must be " "provided") zone_pixel_width = width // num_zone_cols if num_zone_rows is None: num_zone_rows = height // zone_pixel_height if num_zone_cols is None: num_zone_cols = width // zone_pixel_width # raster meta # transform.from_origin(landscape_transform.c, landscape_transform.f) if landscape.transform is not None: landscape_transform = landscape.transform self.landscape_meta = dict( driver='GTiff', width=num_zone_cols, height=num_zone_rows, count=1, transform=landscape_transform * landscape_transform.scale(zone_pixel_width, zone_pixel_height), crs=landscape_crs, ) # Based on `skimage.util.shape.view_as_blocks` # arr_shape = np.array([height, width]) zone_shape = np.array([zone_pixel_height, zone_pixel_width]) # num_even_rows, num_even_cols = arr_shape - arr_shape % zone_shape # landscape_arr[:num_even_rows, :num_even_cols] landscape_arrs = stride_tricks.as_strided( landscape_arr, # shape=tuple(arr_shape // zone_shape) + tuple(zone_shape), shape=(num_zone_rows, num_zone_cols) + tuple(zone_shape), strides=tuple(landscape_arr.strides * zone_shape) + landscape_arr.strides) # the reshape could probably be done directly in the `as_strided` call # tuple(landscape_arrs.shape[0] * landscape_arrs.shape[1]) landscape_arrs = landscape_arrs.reshape((num_zone_cols * num_zone_rows, ) + tuple(zone_shape)) # identify zones as their (row, col) position zone_ids = np.array([(row, col) for row in range(num_zone_rows) for col in range(num_zone_cols)]) # check which zones actually contain only nans # nan_zones = np.full(len(masks), False) # for i, mask_arr in enumerate(masks): # if np.any(landscape.landscape_arr[mask_arr] != landscape.nodata): # nan_zones[i] = True # save this as instance attribute since we will need it to reconstruct # the zonal statistics raster self.data_zones = np.array([ np.any(landscape_arr != landscape.nodata) for landscape_arr in landscape_arrs ]) # We only need to consider zones that actually contain non-nan pixels landscapes = [ pls_landscape.Landscape( landscape_arr, res=(landscape.cell_width, landscape.cell_height), nodata=landscape.nodata) for landscape_arr in landscape_arrs[self.data_zones] ] zone_ids = list(map(tuple, zone_ids[self.data_zones])) # TODO: find a better way to DRY this (see comment just below) # build a list of numpy masks, each representing a grid cell of our # zonal analysis. Doing this here is rather silly, but it allows us to # re-use the `compute_zonal_statistics_arr` method of the # `ZonalAnalysis` class (at the expense of some performance loss, # though most-likely not too critical) # masks = [] # # base_mask_arr = np.full((height, width), False) # for zone_row_start in range(0, height, zone_pixel_height): # for zone_col_start in range(0, width, zone_pixel_width): # # mask_arr = np.copy(base_mask_arr) # mask_arr = np.full((height, width), False) # mask_arr[zone_row_start:zone_row_start + # zone_pixel_height, zone_col_start:zone_col_start + # zone_pixel_width] = True # masks.append(mask_arr) # # make it a numpy array, filter out the nan zones and store it as a # # class attribute # self.masks_arr = np.array(masks)[self.data_zones] masks = [] for zone_rowcol in zone_ids: mask_arr = np.full( (self.landscape_meta['height'], self.landscape_meta['width']), False) mask_arr[zone_rowcol] = True masks.append(mask_arr) self.masks_arr = np.array(masks) # to reuse the `compute_zonal_statistics_arr` from `ZonalAnalysis` self.filter_landscape_nodata = False # Note that # # now we can call the parent's init with the landscape and the # # constructed masks. We only need to consider zones that actually # # contain non-nan pixels # zones = list(map(tuple, np.compress(nan_zones, zones, axis=0))) # super(ZonalGridAnalysis, self).__init__( # landscape, np.compress(nan_zones, masks, axis=0),'zones', zones, # crop_landscapes=False) # ACHTUNG: since we have built the landscapes here, we bypass the # parent's init (i.e., `ZonalAnalysis`), and call the grandparent's # init instead super(ZonalAnalysis, self).__init__(landscapes, 'zones', zone_ids)
[docs] def plot_landscapes(self, cmap=None, ax=None, figsize=None, **show_kws): """ Plots the spatial distribution of the landscape zones Parameters ------- cmap : str or `~matplotlib.colors.Colormap`, optional A Colormap instance ax : axis object, optional Plot in given axis; if None creates a new figure figsize: tuple of two numeric types, optional Size of the figure to create. Ignored if axis `ax` is provided **show_kws : optional Keyword arguments to be passed to `rasterio.plot.show` Returns ------- ax : matplotlib axis axis with plot data """ if cmap is None: cmap = plt.rcParams['image.cmap'] if isinstance(cmap, str): cmap = plt.get_cmap(cmap) if ax is None: fig, ax = plt.subplots(figsize=figsize) ax.set_aspect("equal") if show_kws is None: show_kws = {} zone_arr = np.full_like(self.data_zones, np.nan, dtype=np.float32) zone_arr[self.data_zones] = np.random.random(np.sum(self.data_zones)) ax.imshow( zone_arr.reshape(self.landscape_meta['height'], self.landscape_meta['width']), cmap=cmap, **show_kws) return ax