"""Spatial signature analysis."""
import warnings
import clustergram
import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from pylandstats import settings
from pylandstats.multilandscape import MultiLandscape
__all__ = ["SpatialSignatureAnalysis"]
def _compute_metrics_df(
landscapes,
class_metrics,
classes,
class_metrics_kwargs,
class_metrics_fillna,
landscape_metrics,
landscape_metrics_kwargs,
):
flat_metrics_dfs = []
if len(class_metrics) != 0:
class_metrics_df = landscapes.compute_class_metrics_df(
metrics=class_metrics,
classes=classes,
metrics_kwargs=class_metrics_kwargs,
fillna=class_metrics_fillna,
)
flat_class_metrics_df = class_metrics_df.unstack(level="class_val")
flat_class_metrics_df.columns = [
f"{metric}_{class_val}"
for metric, class_val in flat_class_metrics_df.columns.values
]
flat_metrics_dfs.append(flat_class_metrics_df)
if len(landscape_metrics) != 0:
flat_metrics_dfs.append(
landscapes.compute_landscape_metrics_df(
metrics=landscape_metrics, metrics_kwargs=landscape_metrics_kwargs
)
)
return pd.concat(flat_metrics_dfs, axis=1)
def _fit_transform(X, transformer, **transformer_kwargs):
# ACHTUNG: do not modify X in place to avoid side effects
_X = transformer(**transformer_kwargs).fit_transform(X)
if isinstance(X, pd.DataFrame):
_X = pd.DataFrame(_X, index=X.index, columns=X.columns)
return _X
[docs]
class SpatialSignatureAnalysis:
"""Pattern-based analysis of landscapes based on spatial signatures."""
[docs]
def __init__(
self,
landscapes,
*,
class_metrics=None,
landscape_metrics=None,
classes=None,
class_metrics_fillna=None,
class_metrics_kwargs=None,
landscape_metrics_kwargs=None,
):
"""Initialize the spatial signature analysis.
Parameters
----------
landscapes : pylandstats.MultiLandscape or list-like of pylandstats.Landscape
A MultiLandscape object or list of the landscapes to be analyzed.
class_metrics : list-like of str, optional
A list-like of strings with the names of the metrics that should be
computed. If `None`, no class-level metric will be computed.
classes : list-like, optional
A list-like of ints or strings with the class values that should be
considered in the context of this analysis case. If `None` and class-level
metrics are computed, all unique class values will be considered. Ignored if
no class-level metrics are computed.
class_metrics_fillna : bool, optional
Whether `NaN` values representing landscapes with no occurrences of patches
of the provided class should be replaced by zero when appropriate, e.g.,
area and edge metrics (no occurrences mean zero area/edge). If the provided
value is `None` (default), the value will be taken from
`settings.CLASS_METRICS_DF_FILLNA`.
class_metrics_kwargs, landscape_metrics_kwargs : dict, optional
Dictionary mapping the keyword arguments (values) that should be passed to
each metric method (key) for the class and landscape-level metrics
respectively. For instance, to exclude the boundary from the computation of
`total_edge`, metric_kwargs should map the string 'total_edge' (method name)
to {'count_boundary': False}. If `None`, each metric will be computed
according to FRAGSTATS defaults.
"""
# overall idea: we only store the landscapes (and potentially zone_gser) as
# attributes. Then, any metric can be computed at any point through the
# `compute_metrics_df` method. This way, the key landscape attributes are cached
# so that the cost of computing new metrics is dramatically reduced (almost
# constant/access time in many cases).
if isinstance(landscapes, MultiLandscape):
self.landscapes = landscapes
else:
self.landscapes = MultiLandscape(
landscapes, "landscape_id", np.arange(len(landscapes))
)
if hasattr(landscapes, "zone_gser"):
self.zone_gser = landscapes.zone_gser
if hasattr(landscapes, "dates"):
self.dates = landscapes.dates
if class_metrics is None:
# class_metrics = Landscape.CLASS_METRICS
class_metrics = []
if landscape_metrics is None:
# landscape_metrics = Landscape.LANDSCAPE_METRICS
landscape_metrics = []
self.metrics_df = _compute_metrics_df(
self.landscapes,
class_metrics,
classes,
class_metrics_kwargs,
class_metrics_fillna,
landscape_metrics,
landscape_metrics_kwargs,
)
[docs]
def decompose(
self,
*,
decomposer=None,
preprocessor=None,
preprocessor_kwargs=None,
imputer=None,
imputer_kwargs=None,
**decomposer_kwargs,
):
"""Factorize the spatial signature matrix into components.
Parameters
----------
decomposer : class, optional
A class that implements the decomposition algorithm. It can be any
scikit-learn like transformer that implements the `fit`, `transform` and
`fit_transform` methods and with the `components_` and `n_components`
attributes. If no value is provided, the default value set in
`settings.DEFAULT_DECOMPOSER` will be taken.
preprocessor : class, optional
A class that implements the preprocessing algorithm. It can be any
scikit-learn like transformer that implements the `fit_transform` method.
If no value is provided, the default value set in
`settings.DEFAULT_PREPROCESSOR` will be taken.
preprocessor_kwargs : dict, optional
Keyword arguments to be passed to the initializationof `preprocessor`.
imputer : class, optional
A class that implements the imputation algorithm. It can be any scikit-learn
like transformer that implements the `fit_transform` method. If no value is
provided, no imputation will be performed.
imputer_kwargs : dict, optional
Keyword arguments to be passed to the initialization of `imputer`. Ignored
if `imputer` is `None`.
**decomposer_kwargs : dict, optional
Keyword arguments to be passed to the initialization of `decomposer`.
Returns
-------
components_df : pandas.DataFrame
A DataFrame with the components of the decomposition. Each row corresponds
to a landscape and each column to a component.
decomposer_model : object
The fitted decomposer model.
"""
# ACHTUNG: using a copy to avoid modifying the original metrics_df
X = self.metrics_df.copy()
if preprocessor is None:
preprocessor = settings.DEFAULT_PREPROCESSOR
if preprocessor: # user can provide `preprocessor=False` to skip this step
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
X = _fit_transform(X, preprocessor, **preprocessor_kwargs)
if imputer is not None:
if imputer_kwargs is None:
imputer_kwargs = {}
X = _fit_transform(X, imputer, **imputer_kwargs)
if decomposer is None:
decomposer = settings.DEFAULT_DECOMPOSER
try:
# try if the model accepts nan values
decompose_model = decomposer(**decomposer_kwargs).fit(X)
except ValueError:
warnings.warn(
"The provided spatial signatures contain NaN values which are not "
"supported by the decomposition model. In order to proceed, the NaN "
"values will be dropped. However, you may consider either (i) changing "
"the chosen metrics or (ii) imputing the NaN values by providing the "
"`imputer` and `imputer_kwargs` arguments.",
RuntimeWarning,
)
X = X.dropna()
decompose_model = decomposer(**decomposer_kwargs).fit(X)
# set X to the reduced matrix but as a data frame with the same index as the
# original metrics' matrix (taking into account the dropped rows if any)
return pd.DataFrame(
decompose_model.transform(X), index=X.index
), decompose_model
[docs]
def get_loading_df(self, decompose_model, *, columns=None, index=None, **df_kwargs):
"""Get components loadings for each metric.
Parameters
----------
decompose_model : object
The decomposition model fitted to the spatial signature matrix.
columns : list-like, optional
Column names for the components. If no value is provided, an integer range
from 0 to `n_components_ - 1` will be used.
index : list-like, optional
Index names for the metrics. If no value is provided, the column names of
`metrics_df` will be used.
**df_kwargs : dict, optional
Keyword arguments to be passed to the initialization of `pandas.DataFrame`
Returns
-------
loading_df : pandas.DataFrame
A DataFrame with the loadings of the components. Each row corresponds to a
metric and each column to a component.
"""
if df_kwargs is None:
_df_kwargs = {}
else:
_df_kwargs = df_kwargs.copy()
if columns is None:
columns = _df_kwargs.pop(
"columns",
range(decompose_model.n_components_),
)
if index is None:
index = _df_kwargs.pop("index", self.metrics_df.columns)
return pd.DataFrame(
decompose_model.components_.T, columns=columns, index=index, **_df_kwargs
)
[docs]
def get_cgram(
self,
*,
k_range=None,
decomposer=None,
decomposer_kwargs=None,
preprocessor=None,
preprocessor_kwargs=None,
imputer=None,
imputer_kwargs=None,
**clustergram_kwargs,
):
"""
Get the clustergram of the spatial signature matrix.
Parameters
----------
k_range : list-like, optional
A list-like of integer values with the number of clusters to be tested. If
no value is provided, the default value set in `settings.DEFAULT_K_RANGE`
will be taken.
decomposer : class, optional
A class that implements the decomposition algorithm. It can be any
scikit-learn like transformer that implements the `fit`, `transform` and
`fit_transform` methods and with the `components_` and `n_components`
attributes. If no value is provided, the clustering algorithm will be
applied to the original spatial signature matrix.
decomposer_kwargs : dict, optional
Keyword arguments to be passed to the initialization of `decomposer`.
Ignored if `decomposer` is `None`.
preprocessor : class, optional
A class that implements the preprocessing algorithm. It can be any
scikit-learn like transformer that implements the `fit_transform` method.
If no value is provided, the default value set in
`settings.DEFAULT_PREPROCESSOR` will be taken.
preprocessor_kwargs : dict, optional
Keyword arguments to be passed to the initializationof `preprocessor`.
imputer : class, optional
A class that implements the imputation algorithm. It can be any scikit-learn
like transformer that implements the `fit_transform` method. If no value is
provided, no imputation will be performed.
imputer_kwargs : dict, optional
Keyword arguments to be passed to the initialization of `imputer`. Ignored
if `imputer` is `None`.
**clustergram_kwargs : dict, optional
Keyword arguments to be passed to the initialization of `Clustergram`.
Returns
-------
cgram : clustergram.Clustergram
The fitted clustergram object.
"""
# TODO
# if cluster_traj:
# self.flat_metrics_df = self.flat_metrics_df.unstack(level="dates")
# self.flat_metrics_df.columns = [
# f"{metric}_{year}"
# for metric, year in self.flat_metrics_df.columns.values
# ]
if decomposer is not None:
if decomposer_kwargs is None:
decomposer_kwargs = {}
X, _ = self.decompose(
decomposer=decomposer,
preprocessor=preprocessor,
preprocessor_kwargs=preprocessor_kwargs,
imputer=imputer,
imputer_kwargs=imputer_kwargs,
**decomposer_kwargs,
)
else:
# if no decomposer is provided, we can still preprocess and impute the data
X = self.metrics_df.copy()
if preprocessor is None:
preprocessor = settings.DEFAULT_PREPROCESSOR
if preprocessor_kwargs is None:
preprocessor_kwargs = {}
X = _fit_transform(X, preprocessor, **preprocessor_kwargs)
if imputer is not None:
if imputer_kwargs is None:
imputer_kwargs = {}
X = _fit_transform(X, imputer, **imputer_kwargs)
if clustergram_kwargs is None:
clustergram_kwargs = {}
if k_range is None:
# TODO: use settings for the defaults
# no need to pop k_range from the kwargs since it is already a keyword-only
# argument of this method
# k_range = _clustergram_kwargs.pop("k_range", settings.DEFAULT_K_RANGE)
k_range = settings.DEFAULT_K_RANGE
try:
cgram = clustergram.Clustergram(k_range=k_range, **clustergram_kwargs).fit(
X
)
except ValueError:
warnings.warn(
"The provided spatial signatures contain NaN values which are not "
"supported by the clustering model. In order to proceed, the NaN values"
" will be dropped. However, you may consider either (i) changing the "
"chosen metrics or (ii) imputing the NaN values by providing the "
"`imputer` and `imputer_kwargs` arguments.",
RuntimeWarning,
)
X = X.dropna()
cgram = clustergram.Clustergram(k_range=k_range, **clustergram_kwargs).fit(
X
)
# update cgram.data in place so that it is a data frame (instead of a numpy
# ndarray) with the same indices and columns as `self.metrics_df`. This ensures
# that we properly match a cluster label to a landscape (crucial if we have
# dropped the rows of landscapes with NaN values)
cgram.data = pd.DataFrame(cgram.data, index=X.index, columns=X.columns)
return cgram
def _scatterplot_clusters(
self,
cgram,
x,
y,
data,
cluster_centers,
*,
ax=None,
palette_name=None,
center_marker="x",
center_plot_kwargs=None,
**scatterplot_kwargs,
):
if scatterplot_kwargs is None:
_scatterplot_kwargs = {}
else:
_scatterplot_kwargs = scatterplot_kwargs.copy()
if center_plot_kwargs is None:
_center_plot_kwargs = {}
else:
_center_plot_kwargs = center_plot_kwargs.copy()
center_marker = _center_plot_kwargs.pop("marker", center_marker)
_center_plot_kwargs["marker"] = center_marker
if ax is None:
ax = _scatterplot_kwargs.pop("ax", None)
if ax is None:
_, ax = plt.subplots()
n_clusters = cluster_centers.shape[0]
palette = _scatterplot_kwargs.pop(
"palette", sns.color_palette(palette_name, n_colors=n_clusters)
)
sns.scatterplot(
x=x,
y=y,
hue=pd.Series(cgram.labels_[n_clusters], name="cluster").values,
data=data,
palette=palette,
ax=ax,
**_scatterplot_kwargs,
)
for color, cluster_center in zip(palette, cluster_centers):
ax.plot(
cluster_center[0],
cluster_center[1],
color=color,
**_center_plot_kwargs,
)
return ax
[docs]
def scatterplot_cluster_metrics(
self,
cgram,
n_clusters,
metric_x,
metric_y,
*,
ax=None,
palette_name=None,
center_plot_kwargs=None,
**scatterplot_kwargs,
):
"""Scatterplot the landscape samples colored by their cluster.
Parameters
----------
cgram : clustergram.Clustergram
Fitted clustergram object.
n_clusters : int
Number of clusters to use.
metric_x, metric_y : str
Strings with the names of the metrics to be plotted on the x and y axes
respectively.
ax : matplotlib.axes.Axes, optional
Axes object to draw the plot onto, otherwise create a new figure.
palette_name : str, optional
Name of palette or None to return current palette
center_plot_kwargs : dict, optional
Keyword arguments to plot the cluster centers, which will be passed to
`matplotlib.axes.Axes.plot`.
scatterplot_kwargs : dict, optional
Keyword arguments to be passed to `seaborn.scatterplot`.
Returns
-------
ax : matplotlib.axes.Axes
Returns the `Axes` object with the plot drawn onto it.
"""
return self._scatterplot_clusters(
cgram,
metric_x,
metric_y,
# TODO: manage nan metrics better in case or reduction or not
self.metrics_df.loc[cgram.data.index],
cgram.cluster_centers[n_clusters][
:,
[
self.metrics_df.columns.get_loc(metric)
for metric in [metric_x, metric_y]
],
],
ax=ax,
palette_name=palette_name,
center_plot_kwargs=center_plot_kwargs,
**scatterplot_kwargs,
)
[docs]
def plot_cluster_landscapes(
self,
cgram,
n_clusters,
*,
n_cluster_landscapes=4,
n_cols=4,
sample_kwargs=None,
figure_kwargs=None,
subfigures_kwargs=None,
subplots_kwargs=None,
supylabel_kwargs=None,
**plot_landscape_kwargs,
):
"""Scatterplot the landscape samples colored by their cluster.
Parameters
----------
cgram : Clustergram
Fitted clustergram object.
n_clusters : int
Number of clusters to use.
n_cluster_landscapes : int, optional, default 4
Number of landscapes to plot for each cluster. Providing a value of None
will plot all landscapes.
n_cols : int, optional, default 4
Number of columns for the figure.
sample_kwargs, figure_kwargs, subfigures_kwargs, subplots_kwargs, \
supylabel_kwargs, plot_landscape_kwargs : dict, optional
Keyword arguments to be passed to `pandas.DataFrame.sample`,
`matplotlib.pyplot.figure`,
`matplotlib.figure.Figure.subfigures`,
`matplotlib.figure.SubFigure.subplots`,
`matplotlib.figure.SubFigure.supylabel` and
`pylandstats.Landscape.plot_landscape` respectively.
Returns
-------
fig : matplotlib.figure.Figure
The figure with its corresponding plots drawn into its axes.
"""
if figure_kwargs is None:
_figure_kwargs = {}
else:
_figure_kwargs = figure_kwargs.copy()
if subfigures_kwargs is None:
subfigures_kwargs = {}
if subplots_kwargs is None:
subplots_kwargs = {}
if supylabel_kwargs is None:
supylabel_kwargs = {}
if plot_landscape_kwargs is None:
_plot_landscape_kwargs = {}
else:
_plot_landscape_kwargs = plot_landscape_kwargs.copy()
_ = _plot_landscape_kwargs.pop("ax", None)
labels = cgram.labels_[n_clusters]
metrics_df = self.metrics_df.loc[cgram.data.index]
if n_cluster_landscapes is None:
# plot all landscapes
n_rows_per_cluster = np.ceil(
np.unique(labels, return_counts=True)[1] / n_cols
).astype(int)
else:
n_rows_per_cluster = np.full(
n_clusters, np.ceil(n_cluster_landscapes / n_cols), dtype=int
)
if sample_kwargs is None:
_sample_kwargs = {}
else:
_sample_kwargs = sample_kwargs.copy()
_ = _sample_kwargs.pop("frac", None)
# use sample to shuffle then groupby and head (because head does not raise
# an error if the group has less elements than n_cluster_landscapes)
metrics_df = (
self.metrics_df.loc[cgram.data.index]
.sample(frac=1, **_sample_kwargs)
.groupby(labels)
.head(n_cluster_landscapes)
)
# update labels to match the new metrics_df
labels = pd.Series(labels, index=cgram.data.index)[metrics_df.index].astype(
int
)
n_rows = n_rows_per_cluster.sum()
figsize = _figure_kwargs.pop("figsize", None)
if figsize is None:
figwidth, figheight = plt.rcParams["figure.figsize"]
figsize = (figwidth * n_cols, figheight * n_rows)
fig = plt.figure(figsize=figsize, **_figure_kwargs)
# gs = fig.add_gridspec(n_rows, n_cols)
subfigs = fig.subfigures(
n_clusters,
1,
height_ratios=n_rows_per_cluster,
**subfigures_kwargs,
)
# TODO: manage nan metrics better in case or reduction or not
for subfig, n_cluster_rows, (cluster_label, _cluster_df) in zip(
subfigs,
n_rows_per_cluster,
metrics_df.groupby(labels.values),
):
# n_cluster_rows = np.ceil(len(_cluster_df) / n_cols).astype(int)
axes = subfig.subplots(n_cluster_rows, n_cols, **subplots_kwargs)
try:
axes = axes.flat
except AttributeError:
axes = [axes]
subfig.supylabel(f"Cluster {cluster_label}", **supylabel_kwargs)
for ls_i, ax in zip(_cluster_df.index, axes):
self.landscapes.landscape_ser[ls_i].plot_landscape(
ax=ax, **plot_landscape_kwargs
)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# if the cluster does not fill all the columns of the last row, set the
# respective axes off
n_empty_axes = len(axes) - len(_cluster_df) % n_cols
if n_empty_axes > 0:
for ax in axes[-n_empty_axes:]:
ax.set_axis_off()
return fig
[docs]
def plot_cluster_zones(
self,
cgram,
n_clusters,
*,
legend=True,
categorical=True,
ax=None,
subplots_kwargs=None,
**plot_kwargs,
):
"""Plot the landscape zones colored by their cluster.
Only available if the `SpatialSignatureAnalysis` object has been initialized
with a zonal analysis class (or subclass). If the `SpatialSignatureAnalysis`
object has been initialized with a spatio-temporal zonal analysis class (or
subclass), each date snapshot will be plotted separately.
Parameters
----------
cgram : Clustergram
Fitted clustergram object.
n_clusters : int
Number of clusters to use.
ax : matplotlib.axes.Axes, optional
Axes object to draw the plot onto, otherwise create a new figure. Ignored
if the `SpatialSignatureAnalysis` object has been initialized with a
spatio-temporal zonal analysis class (or subclass).
legend : bool, optional, default True
Whether to show the legend.
categorical : bool, optional, default True
Whether the cluster color map should be categorical.
subplots_kwargs, plot_kwargs : dict, default None
Keyword arguments to be passed to `matplotlib.pyplot.subplots` and
`geopandas.GeoDataFrame.plot` respectively.
Returns
-------
matplotlib.figure.Figure or matplotlib.axes.Axes
If the `SpatialSignatureAnalysis` object has been initialized with a
spatio-temporal zonal analysis class (or subclass), returns the figure with
its corresponding plots drawn into its axes. Otherwise, returns the axes
object with the plot drawn onto it.
"""
if not hasattr(self, "zone_gser"):
warnings.warn(
"The `plot_cluster_zones` method can only be called when"
" `SpatialSignatureAnalysis` has been initialized with a zonal analysis"
" class (or sub-class) as the `landscapes` argument. Returning None",
RuntimeWarning,
)
return None
def _plot(cluster_labels, cgram_data, ax):
return gpd.GeoDataFrame(
{"cluster": cluster_labels},
geometry=cgram_data.index.map(self.zone_gser).values,
).plot(
"cluster", legend=legend, categorical=categorical, ax=ax, **plot_kwargs
)
if ax is None or hasattr(self, "dates"):
if subplots_kwargs is None:
_subplots_kwargs = {}
else:
_subplots_kwargs = subplots_kwargs.copy()
if hasattr(self, "dates"):
figsize = _subplots_kwargs.pop(
"figsize",
(
plt.rcParams["figure.figsize"][0] * len(self.dates),
plt.rcParams["figure.figsize"][1],
),
)
fig, axes = plt.subplots(
1, len(self.dates), figsize=figsize, **_subplots_kwargs
)
for date, ax in zip(self.dates, axes):
date_query = cgram.data.index.get_level_values("date") == date
_plot(
cgram.labels_[n_clusters][date_query],
cgram.data[date_query].droplevel("date"),
ax,
)
ax.set_title(date)
return fig
elif ax is None:
figsize = _subplots_kwargs.pop(
"figsize", plt.rcParams["figure.figsize"]
)
_, ax = plt.subplots(figsize=figsize, **_subplots_kwargs)
return _plot(cgram.labels_[n_clusters], cgram.data, ax)