Source code for pytesmo.temporal_matching

"""
Provides functions for temporally collocating data from multiple dataframes.
"""

from numba import guvectorize, float32, float64
import numpy as np
import pandas as pd
from pykdtree.kdtree import KDTree
import warnings

__all__ = ["temporal_collocation", "combined_temporal_collocation"]


def df_match(reference, *args, **kwds):
    """
    **Deprecated!**

    Finds temporal match between the reference pandas.DataFrame (index has to
    be datetime) and n other pandas.DataFrame (index has to be datetime).

    Parameters
    ----------
    reference : pandas.DataFrame or pandas.TimeSeries
        The index of this dataframe will be the reference.
    *args : pandas.DataFrame or pandas.TimeSeries
        The index of this dataframe(s) will be matched.
        If it is a pandas.Series then it has to have a name. Otherwise
        no column name can be assigned to the matched DataFrame.
    window : float
        Fraction of days of the maximum pos./neg. distance allowed, i.e. the
        value of window represents the half-winow size (e.g. window=0.5, will
        search for matches between -12 and +12 hours) (default: None)
    dropna : boolean
        Drop rows containing only NaNs (default: False)
    dropduplicates : boolean
        Drop duplicated temporal matched (default: False)
    asym_window: string, optional
        ``<=`` stands for using a smaller and equal only for the left/smaller
        side of the window comparison, ``>=`` stands for using a larger and
        equal only for the right/larger side of the window comparison.
        The default is to use <= and >= for both sides of the search window

    Returns
    -------
    temporal_matched_args : pandas.DataFrame or tuple of pandas.DataFrame
        Dataframe with index from matched reference index
    """
    warnings.warn(
        "'pytesmo.temporal_matching.df_match' is deprecated. Use"
        "'pytesmo.temporal_matching.temporal_collocation' instead!",
        DeprecationWarning,
    )

    if "window" in kwds:
        window = kwds["window"]
    else:
        window = None

    if "asym_window" in kwds:
        asym_window = kwds["asym_window"]
    else:
        asym_window = None

    temporal_matched_args = []
    ref_step = reference.index.values - reference.index.values[0]

    for arg in args:

        if type(arg) is pd.Series:
            arg = pd.DataFrame(arg)
        comp_step = arg.index.values - reference.index.values[0]
        values = np.arange(comp_step.size)
        # setup kdtree which must get 2D input
        try:
            tree = KDTree(np.atleast_2d(comp_step).T, balanced_tree=False)
        except TypeError:
            # scipy before version 0.16 does not have the balanced_tree kw
            # but is fast in this case also without it
            tree = KDTree(np.atleast_2d(comp_step).T)

        dist, i = tree.query(np.atleast_2d(ref_step).T)
        matched = values[i]

        distance = np.zeros_like(matched, dtype=float)
        distance.fill(np.nan)
        valid_match = np.invert(np.isnan(matched))

        distance[valid_match] = (
            arg.index.values[np.int32(matched[valid_match])] -
            reference.index.values[valid_match]) / np.timedelta64(1, "D")

        arg = arg.assign(index=arg.index.values, merge_key=np.arange(len(arg)))

        arg_matched = pd.DataFrame({
            "merge_key": matched,
            "distance": distance,
            "ref_index": reference.index.values,
        })
        arg_matched = arg_matched.merge(arg, on="merge_key", how="left")
        arg_matched.index = arg_matched["ref_index"].values
        arg_matched = arg_matched.sort_index()

        if window is not None:
            if asym_window is None:
                invalid_dist = arg_matched["distance"].abs() > window
            if asym_window == "<=":
                # this means that only distance in the interval [distance[ are
                # taken
                valid_dist = ((arg_matched["distance"] >= 0.0)
                              & (arg_matched["distance"] <= window)) | (
                                  (arg_matched["distance"] <= 0.0)
                                  & (arg_matched["distance"] > -window))
                invalid_dist = ~valid_dist
            if asym_window == ">=":
                # this means that only distance in the interval ]distance] are
                # taken
                valid_dist = ((arg_matched["distance"] >= 0.0)
                              & (arg_matched["distance"] < window)) | (
                                  (arg_matched["distance"] <= 0.0)
                                  & (arg_matched["distance"] >= -window))
                invalid_dist = ~valid_dist
            arg_matched.loc[invalid_dist] = np.nan

        if "dropna" in kwds and kwds["dropna"]:
            arg_matched = arg_matched.dropna(how="all")

        if "dropduplicates" in kwds and kwds["dropduplicates"]:
            arg_matched = arg_matched.dropna(how="all")
            g = arg_matched.groupby("merge_key")
            min_dists = g.distance.apply(lambda x: x.abs().idxmin())
            arg_matched = arg_matched.loc[min_dists]

        temporal_matched_args.append(
            arg_matched.drop(["merge_key", "ref_index"], axis=1))

    if len(temporal_matched_args) == 1:
        return temporal_matched_args[0]
    else:
        return tuple(temporal_matched_args)


def matching(reference, *args, **kwargs):
    """
    Finds temporal match between the reference pandas.TimeSeries (index has to
    be datetime) and n other pandas.TimeSeries (index has to be datetime).

    Parameters
    ----------
    reference : pandas.TimeSeries
        The index of this Series will be the reference.
    *args : pandas.TimeSeries
        The index of these Series(s) will be matched.
    window : float
        Fraction of days of the maximum pos./neg. distance allowed, i.e. the
        value of window represents the half-winow size (e.g. window=0.5, will
        search for matches between -12 and +12 hours) (default: None)

    Returns
    -------
    temporal_match : pandas.DataFrame
        containing the index of the reference Series and a column for each of
        the other input Series
    """
    warnings.warn(
        "'pytesmo.temporal_matching.matching' is deprecated. Use"
        "'pytesmo.temporal_matching.temporal_collocation' instead!",
        DeprecationWarning,
    )
    matched_datasets = df_match(
        reference, *args, dropna=True, dropduplicates=True, **kwargs)

    if type(matched_datasets) != tuple:
        matched_datasets = [matched_datasets]

    matched_data = pd.DataFrame(reference)

    for match in matched_datasets:
        match = match.drop(["distance", "index"], axis=1)
        matched_data = matched_data.join(match)

    return matched_data.dropna()


[docs]def temporal_collocation( reference, other, window, method="nearest", return_index=False, return_distance=False, dropduplicates=False, dropna=False, checkna=False, flag=None, use_invalid=False, ): """ Temporally collocates values to reference. Parameters ---------- reference : pd.DataFrame, pd.Series, or pd.DatetimeIndex The reference onto which `other` should be collocated. If this is a DataFrame or a Series, the index must be a DatetimeIndex. If the index is timezone-naive and `other` is not, the timezone of `other` will be assumed. other : pd.DataFrame or pd.Series Data to be collocated. Must have a pd.DatetimeIndex as index. If the index is timezone-naive and `reference` is not, the timezone of the reference data will be assumed. window : pd.Timedelta or float Window around reference timestamps in which to look for data. Floats are interpreted as number of days. method : str, optional Which method to use for the temporal collocation: - "nearest" (default): Uses the nearest valid neighbour. When this method is used, entries with duplicate index values in `other` will be dropped, and only the first of the duplicates is kept. - "mean": Takes the mean over the given window around the reference times. return_index : boolean, optional Include index of `other` in matched dataframe (default: False). Only used with ``method="nearest"``. The index will be added as a separate column with the name "index_other". return_distance : boolean, optional Include distance information between `reference` and `other` in matched dataframe (default: False). This is only used with ``method="nearest"``, and implies ``return_index=True``. The distance will be added as a separate column with the name "distance_other". dropduplicates : bool, optional Whether to drop duplicated timestamps in `other`. Default is ``False``, except when ``method="nearest"``, in which case this is enforced to be ``True``. dropna : bool, optional Whether to drop NaNs from the resulting dataframe (arising for example from duplicates with ``duplicates_nan=True`` or from missing values). This uses ``how="all"``, that is, only rows where all values are NaN are dropped. Default is ``False``. checkna: bool, optional Whether to check if only NaNs are returned (i.e. no match has been found). If set to ``True``, raises a ``UserWarning`` in case no match has been found. Default is ``False``. flag : np.ndarray, str or None, optional Flag column as array or name of the flag column in `other`. If this is given, the column will be interpreted as validity indicator. Any nonzero values mark the row as invalid. Default is ``None``. use_invalid : bool, optional Whether to use invalid values marked by `flag` in case no valid values are available. Default is ``False``. Returns ------- collocated : pd.DataFrame or pd.Series Temporally collocated version of ``other``. """ # input validation # ---------------- if isinstance(reference, (pd.Series, pd.DataFrame)): ref_dr = reference.index elif isinstance(reference, pd.DatetimeIndex): ref_dr = reference else: # pragma: no cover raise ValueError( "'reference' must be pd.DataFrame, pd.Series, or pd.DatetimeIndex." ) if not isinstance(other, (pd.Series, pd.DataFrame)): # pragma: no cover raise ValueError("'other' must be pd.DataFrame or pd.Series.") if not isinstance(window, pd.Timedelta): window = pd.Timedelta(days=window) if flag is not None: if isinstance(flag, str): flag = other[flag].values if len(flag) != len(ref_dr): # pragma: no cover raise ValueError("Flag must have same length as reference") flagged = flag.astype(bool) has_invalid = np.any(flagged) else: has_invalid = False # preprocessing # ------------ if ref_dr.tz is None and other.index.tz is None: # no timezone info provided for any of the inputs, so we will continue # to use timezone naive frames pass else: if ref_dr.tz is None: ref_dr = ref_dr.tz_localize(other.index.tz) warnings.warn( "No timezone given for reference, assuming it's in the same" f" timezone as other, {other.index.tz}.", UserWarning, ) elif other.index.tz is None: other = other.tz_localize(ref_dr.tz) warnings.warn( "No timezone given for other, assuming it's in the same" f" timezone as reference, {other.index.tz}.", UserWarning, ) if other.index.tz != ref_dr.tz: other = other.tz_convert(ref_dr.tz) if dropduplicates or method == "nearest": other = other[~other.index.duplicated(keep="first")] ref_duplicated = ref_dr.duplicated(keep="first") if np.any(ref_duplicated): warnings.warn("Dropping duplicated indices in reference." " This might indicate issues with your data.") ref_dr = ref_dr[~ref_dr.duplicated(keep="first")] # collocation # ----------- if method == "nearest": # Nearest neighbour collocation, uses pandas reindex if return_index or return_distance: new_cols = {} new_cols["index_other"] = other.index if return_distance: new_cols["distance_other"] = np.zeros(len(other)) other = other.assign(**new_cols) def collocate(df): return df.reindex(ref_dr, method="nearest", tolerance=window) if has_invalid: collocated = collocate(other[~flagged]) if use_invalid: invalid = collocate(other[flagged]) collocated = collocated.combine_first(invalid) else: collocated = collocate(other) if return_distance: collocated["distance_other"] = ( collocated["index_other"] - collocated.index) elif method == "mean": window_days = 2 * window / pd.Timedelta(1, "D") other_times = other.index.to_julian_date().values if not has_invalid or use_invalid: mask = np.ones_like(other_times, dtype=bool) else: mask = ~flagged other_is_series = isinstance(other, pd.Series) if other_is_series: other = pd.DataFrame(other, columns=[other.name]) ncols = other.shape[1] data = np.empty((ncols, len(ref_dr)), dtype=other.iloc[:, 0].dtype) ref_dr_jd = ref_dr.to_julian_date().values for i in range(ncols): other_data = other.iloc[:, i].values[mask] data[i, :] = resample_mean( other_times, other_data, ref_dr_jd, window_days ) collocated = pd.DataFrame(data.T, index=ref_dr, columns=other.columns) if other_is_series: collocated = collocated.iloc[:, 0] else: raise NotImplementedError( "Only nearest neighbour collocation is implemented so far") # postprocessing # -------------- if checkna: if np.any(collocated.isnull().apply(np.all)): warnings.warn("No match has been found") if dropna: collocated.dropna(inplace=True, how="all") return collocated
[docs]def combined_temporal_collocation( reference, others, window, method="nearest", dropduplicates=False, dropna=False, combined_dropna=False, flag=None, checkna=False, use_invalid=False, add_ref_data=False, ): """ Temporally collocates multiple dataframes to reference times. Parameters ---------- reference : pd.DataFrame, pd.Series, or pd.DatetimeIndex The reference onto which `other` should be collocated. If this is a DataFrame or a Series, the index must be a DatetimeIndex. If the index is timezone-naive, UTC will be assumed. others : list/tuple of pd.DataFrame or pd.Series DataFrames/Series to be collocated. Each entry must have a pd.DatetimeIndex as index. If the index is timezone-naive, the timezone of the reference data will be assumed. window : pd.Timedelta or float Window around reference timestamps in which to look for data. Floats are interpreted as number of days. method : str, optional Which method to use for the temporal collocation: - "nearest" (default): Uses the nearest valid neighbour. When this method is used, entries with duplicate index values in `other` will be dropped, and only the first of the duplicates is kept. - "mean": Takes the mean over the given window around the reference times. dropduplicates : bool, optional Whether to drop duplicated timestamps in `others`. Default is ``False``, except when ``method="nearest"``, in which case this is enforced to be ``True``. dropna : bool, optional Whether to drop NaNs from the resulting dataframe (arising for example from duplicates with ``duplicates_nan=True`` or from missing values). Default is ``False``. combined_dropna : str or bool, optional Whether and how to drop NaNs from the resulting combined DataFrame. Can be ``"any"``, ``"all"``, ``True`` or ``False``. "any" makes sure that the output dataframe only has values at times where all input frames had values, while "all" only drops lines where all values are NaN. ``True`` is the same as "any", and ``False`` (default) disables dropping NaNs. checkna: bool, optional Whether to check if only NaNs are returned (i.e. no match has been found). If set to ``True``, raises a ``UserWarning`` in case no match has been found. Default is ``False``. flag : np.ndarray or None, optional Flag column as array. If this is given, the column will be interpreted as validity indicator. Any nonzero values mark the row as invalid. Default is ``None``. use_invalid : bool, optional Whether to use invalid values marked by `flag` in case no valid values are available. Default is ``False``. add_ref_data : bool, optional If `reference` is a DataFrame or Series, add the data to the final collocated dataframe. Returns ------- collocated : pd.DataFrame or pd.Series Temporally collocated DataFrame with variables from all input frames merged together. """ dfs = [ temporal_collocation( reference, other, window, method=method, return_index=False, return_distance=False, dropduplicates=dropduplicates, dropna=dropna, checkna=checkna, flag=flag, use_invalid=use_invalid, ) for other in others ] if isinstance(reference, (pd.DataFrame, pd.Series)) and add_ref_data: # first, check if we have to remove duplicates if dropduplicates: duplicated = reference.index.duplicated(keep="first") reference = reference[~duplicated] dfs.insert(0, reference) # Before merging we have to check if the timezones are consistent. timezones = [d.index.tz for d in dfs] uniq_tzs = set(timezones) if len(uniq_tzs) == 1: pass else: actual_tzs = set(tz for tz in timezones if tz is not None) if len(actual_tzs) == 1: # convert all to common tz tz = list(actual_tzs)[0] else: # Multiple different timezones, convert all to UTC and raise a # warning tz = "UTC" warnings.warn( "Input DataFrames have mixed timezones, converting everything" " to UTC.", UserWarning, ) for d in dfs: if d.index.tz is None: d.index = d.index.tz_localize(tz) else: d.index = d.index.tz_convert(tz) merged = pd.concat(dfs, axis=1) if combined_dropna: if combined_dropna is True: combined_dropna = "any" merged = merged.dropna(how=combined_dropna) return merged
@guvectorize( [ (float32[:], float32[:], float32[:], float32, float32[:]), (float64[:], float64[:], float64[:], float64, float64[:]), ], "(n), (n), (m), () -> (m)", nopython=True, ) def resample_mean(times, values, target_times, window, resampled): # pragma: no cover """ Resamples to new times by taking a mean over a given window. Parameters ---------- times : np.ndarray Times at which values are taken as float. values : np.ndarray Array with values. target_times : np.ndarray New times to which to resample. window : float Size of the window as float, in the same units as the times (e.g., if the times are in units of days, this should be in days). Returns ------- resampled : np.ndarray """ n_orig = len(times) n_target = len(target_times) lower = 0 upper = 0 half_window = window / 2.0 for i in range(n_target): for j in range(lower, n_orig + 1): lower = j if ( lower == n_orig or times[j] >= target_times[i] - half_window ): break # check if the current window is still below the last time that we have if times[n_orig - 1] > target_times[i] + half_window: for j in range(max(0, upper), n_orig): upper = j - 1 if times[j] > target_times[i] + half_window: break else: upper = n_orig - 1 nobs = max(upper - lower + 1, 0) if nobs == 0: resampled[i] = np.nan else: resampled[i] = np.nanmean(values[lower:(upper + 1)])