Source code for pytesmo.cdf_matching

import numpy as np
import pandas as pd
from scipy import interpolate, optimize, special
from sklearn.base import BaseEstimator, RegressorMixin
from typing import Sequence
import warnings

from typing import Union


[docs]class CDFMatching(RegressorMixin, BaseEstimator): """ Predicts variable from single other variable by CDF matching. Parameters ---------- nbins : int, optional Number of bins to use for the empirical CDF. Default is 100. If `minobs` is set, this might be reduced in case there's not enough data in each bin. percentiles : sequence, optional Percentile values to use. If this is given, `nbins` is ignored. The percentiles might still be changed if `minobs` is given and the number data per bin is lower. Default is ``None``. linear_edge_scaling : bool, optional Whether to derive the edge parameters via linear regression (more robust, see Moesinger et al. (2020) for more info). Default is ``False``. Note that this way only the outliers in the reference (y) CDF are handled. Outliers in the input data (x) will not be removed and will still show up in the data. minobs : int, optional Minimum desired number of observations in a bin. If there is less data for a bin, the number of bins is reduced. Default is ``None`` (no resizing). combine_invalid : bool, optional Optional feature to combine the masks of invalid data (NaN, Inf) of both source (X) and reference (y) data passed to `fit`. This only makes sense if X and y are both timeseries data corresponding to the same index. In this case, this makes sures that data is only used if values for X and y are available, so that seasonal patterns in missing values in one of them do not lead to distortions. (For example, if X is available the whole year, but y is only available during summer, the distribution of y should not be matched against the whole year CDF of X, because that could introduce systematic seasonal biases). The default is ``False``. Attributes ---------- x_perc_ : np.ndarray (nbins,) The percentile values derived from the source (X) data. If the number of bins was reduced during fitting due to insufficient data, it is right-padded with NaNs. y_perc_ : np.ndarray (nbins,) The percentile values derived from the reference (y) data. If the number of bins was reduced during fitting due to insufficient data, it is right-padded with NaNs. Notes ----- This implementation does not do any temporal matching of the reference and source datasets. If this is required, this has to be done beforehand. """ def __init__( self, nbins: int = 100, percentiles: Sequence = None, minobs: int = None, linear_edge_scaling: bool = False, combine_invalid: bool = False, ): self.nbins = nbins self.minobs = minobs self.linear_edge_scaling = linear_edge_scaling self.combine_invalid = combine_invalid self.percentiles = percentiles if self.percentiles is not None: self.nbins = len(self.percentiles) - 1
[docs] def fit( self, X: Union[np.ndarray, pd.Series, pd.DataFrame], y: Union[np.ndarray, pd.Series], ): """ Derive the CDF matching parameters. Parameters ---------- X : array_like An array/pd.Series or a matrix/pd.DataFrame with a single column. y : array_like An array/pd.Series of reference data. """ # make sure that x and y are 1D numpy arrays if isinstance(y, pd.Series): y = y.values x = _make_X_array(X) # drop invalid values and pre-sort to avoid multiple resorting later isvalid_x = np.isfinite(x) isvalid_y = np.isfinite(y) if len(x) == len(y) and self.combine_invalid: isvalid_x = isvalid_x & isvalid_y isvalid_y = isvalid_x x_unsorted = x[isvalid_x] y_unsorted = y[isvalid_y] x = np.sort(x_unsorted) y = np.sort(y_unsorted) # calculate percentiles, potentially resize them nsamples = min(len(x), len(y)) tmp_percentiles, tmp_nbins = self._calc_percentiles(nsamples) if tmp_nbins == 1 and self.linear_edge_scaling: # for a single bin just do a linear interpolation in case linear # edge scaling is activated x_perc, y_perc = _linreg_percentiles(x_unsorted, y_unsorted) else: x_perc = _percentile_values_from_sorted(x, tmp_percentiles) y_perc = _percentile_values_from_sorted(y, tmp_percentiles) if self.linear_edge_scaling: y_perc = self._linear_edge_scaling( x, y, x_perc, y_perc, tmp_percentiles) # fill self.x_perc_ and self.y_perc_ with NaN on the right in case the # bin size was reduced, so we always get arrays of size nbins as # self.x_perc_ and self.y_perc_ self.x_perc_ = np.full(self.nbins + 1, np.nan, dtype=x.dtype) self.y_perc_ = np.full(self.nbins + 1, np.nan, dtype=x.dtype) self.percentiles_ = np.full(self.nbins + 1, np.nan, dtype=x.dtype) self.percentiles_[0:tmp_nbins + 1] = tmp_percentiles self.x_perc_[0:tmp_nbins + 1] = x_perc self.y_perc_[0:tmp_nbins + 1] = y_perc return self
[docs] def predict(self, X): x = _make_X_array(X) isvalid = np.isfinite(x) prediction = np.full_like(x, np.nan) if not np.any(isvalid): return prediction xp = self.x_perc_[~np.isnan(self.x_perc_)] yp = self.y_perc_[~np.isnan(self.y_perc_)] if len(xp) == 0 or len(yp) == 0: return prediction else: try: spline = interpolate.InterpolatedUnivariateSpline( xp, yp, k=1, ext=0) prediction[isvalid] = spline(x[isvalid]) return prediction except ValueError: # happens if there are non-unique values or not enough values warnings.warn("Too few percentiles for chosen k.") return prediction
def _calc_percentiles(self, nsamples): # calculate percentiles, potentially resize them if self.percentiles is None: percentiles = np.arange( self.nbins + 1, dtype=np.float64) / self.nbins * 100 else: percentiles = self.percentiles minbinsize = np.min(np.diff(percentiles)) if (self.minobs is not None and nsamples * minbinsize / 100 < self.minobs): warnings.warn("The bins have been resized") nbins = np.int32(np.floor(nsamples / self.minobs)) if nbins == 0: nbins = 1 elif nbins > len(percentiles) - 1: nbins = len(percentiles) - 1 percentiles = np.arange(nbins + 1, dtype=np.float64) / nbins * 100 else: nbins = len(percentiles) - 1 return percentiles, nbins def _linear_edge_scaling(self, x, y, x_perc, y_perc, percentiles): # scales the lower and upper edges of y_perc by replacing the values # with values inferred from a linear regression between the n # lowest/highest values for x and y. n is the number of data in the # lowest/highest y-bin, if there are more or less values in the # corresponding x-bin, the values will be resampled from the empirical # CDF. This can happen if the values are on only a few discrete levels. xlow = x[x <= x_perc[1]] - x_perc[1] ylow = y[y <= y_perc[1]] - y_perc[1] n = len(ylow) if len(xlow) != n: xlow = _resample_ecdf(xlow, n) a, _, _, _ = np.linalg.lstsq(xlow.reshape(-1, 1), ylow, rcond=None) y_perc[0] = y_perc[1] + a[0] * (x_perc[0] - x_perc[1]) xhigh = x[x >= x_perc[-2]] - x_perc[-2] yhigh = y[y >= y_perc[-2]] - y_perc[-2] n = len(yhigh) if len(xhigh) != n: xhigh = _resample_ecdf(xhigh, n) a, _, _, _ = np.linalg.lstsq(xhigh.reshape(-1, 1), yhigh, rcond=None) y_perc[-1] = y_perc[-2] + a[0] * (x_perc[-1] - x_perc[-2]) return y_perc
def _make_X_array(X): if isinstance(X, (pd.Series, pd.DataFrame)): X = X.values if len(X.shape) > 1: assert len(X.shape) == 2 assert X.shape[1] == 1 x = X.ravel() else: x = X return x def _resample_ecdf(x_sorted, n, is_sorted=True): """Resample ECDF to n bins""" # calculate percentiles for x_sorted if not is_sorted: x_sorted = np.sort(x_sorted) new_percentiles = np.arange(n, dtype=float) / (n - 1) * 100.0 return _percentile_values_from_sorted(x_sorted, new_percentiles) def _percentile_values_from_sorted(data, percentiles): perc = _matlab_percentile_values_from_sorted(data, percentiles) return _unique_percentile_interpolation(perc, percentiles) def _matlab_percentile_values_from_sorted(data, percentiles): """ Calculate percentiles in the way Matlab and IDL do it. By using interpolation between the lowest an highest rank and the minimum and maximum outside. Parameters ---------- data: numpy.ndarray sorted input data percentiles: numpy.ndarray percentiles at which to calculate the values Returns ------- perc: numpy.ndarray values of the percentiles """ p_rank = 100.0 * (np.arange(data.size) + 0.5) / data.size perc = np.interp(percentiles, p_rank, data, left=data[0], right=data[-1]) return perc def _unique_percentile_interpolation(perc, percentiles): # make sure the percentiles are unique # assumes `perc` is sorted uniq, uniq_ind = np.unique(perc, return_index=True) if uniq_ind.size != len(percentiles) and uniq_ind.size > 1: # If there are non-unique percentile values in perc, e.g. # [1, 1, 1, 2, ...] corresponding to percentiles [0, 5, 10, 20, ...], # we will interpolate the values for 5 and 10 to be between 1 and 2. # uniq_ind contains the first index of the non-unique values, so # selecting them will return [1, 2] and [0, 20]. new_percentiles = percentiles[uniq_ind] new_perc = perc[uniq_ind] # However, if we have non-unique indices at the end of the array, e.g. # [..., 8, 10, 10, 10] corresponding to percentiles [..., 85, 90, 95, # 100], this approach will return [8, 10] and [85, 90], meaning that # the last percentile values will be extrapolated. # To avoid this, we set the last non-unique percentile to the overall # last percentile new_percentiles[-1] = percentiles[-1] spline = interpolate.InterpolatedUnivariateSpline( new_percentiles, new_perc, k=1, ext=0) perc = spline(percentiles) return perc def _linreg_percentiles(x, y): """Calculate percentiles via linear regression.""" A = np.reshape(x - np.mean(x), (-1, 1)) b = y - np.mean(y) slope, _, _, _ = np.linalg.lstsq(A, b, rcond=None) intercept = np.mean(y) - np.mean(x) * slope[0] x_perc = [0, 1] y_perc = [intercept, intercept + slope[0]] return x_perc, y_perc