Source code for pytesmo.time_series.anomaly

'''
Created on June 20, 2013
'''
import warnings

import pandas as pd
import numpy as np
from cadati.conv_doy import doy
from cadati.jd_date import julian2date
from pytesmo.time_series.filtering import moving_average


[docs]def calc_anomaly(Ser, window_size=35, climatology=None, respect_leap_years=True, return_clim=False): ''' Calculates the anomaly of a time series (Pandas series). Both, climatology based, or moving-average based anomalies can be calculated Parameters ---------- Ser : pandas.Series Input data (index must be a DateTimeIndex) window_size : float, optional (default: 35) The window-size [days] of the moving-average window to calculate the anomaly reference (only used if climatology is not provided) climatology : pandas.Series (index: 1-366), optional (default: None) if provided, anomalies will be based on the climatology timespan : [timespan_from, timespan_to], datetime.datetime(y,m,d), optional If set, only a subset respect_leap_years : boolean, optional (default: True) If set then leap years will be respected during matching of the climatology to the time series return_clim : boolean, optional (default: False) if set to true the return argument will be a DataFrame which also contains the climatology time series. Only has an effect if climatology is used. Returns ------- anomaly : pandas.Series or pandas.DataFrame Series containing the calculated anomalies. If `return_clim` is set to true, a DataFrame will be returned, where one column contains the anomalies and another the climatology broadcasted over the whole index. If a climatology with a 'std' column was passed initially, this column will also be returned in the DataFrame if `return_clim` is chosen. ''' if climatology is not None: if type(Ser.index) == pd.DatetimeIndex: year, month, day = (np.asarray(Ser.index.year), np.asarray(Ser.index.month), np.asarray(Ser.index.day)) else: year, month, day = julian2date(Ser.index.values)[0:3] if respect_leap_years: doys = doy(month, day, year) else: doys = doy(month, day) df = pd.DataFrame() df['absolute'] = Ser df['doy'] = doys if isinstance(climatology, pd.Series): clim = pd.DataFrame({'climatology': climatology}) else: clim = climatology df = df.join(clim, on='doy', how='left') anomaly = df['absolute'] - df['climatology'] anomaly.index = df.index if return_clim: anomaly = pd.DataFrame({'anomaly': anomaly}) anomaly['climatology'] = df['climatology'] if 'std' in df.columns: anomaly['climatology_std'] = df['std'] else: reference = moving_average(Ser, window_size=window_size) anomaly = Ser - reference return anomaly
def _index_units(year, month, day, unit="day", respect_leap_years=True) -> (np.array, int): """Provide the indices for the specified unit type and the index lenth""" if respect_leap_years: args = month, day, year else: args = month, day if unit == "day": return doy(*args), 366 elif unit == "month": return month, 12 else: raise ValueError(f"Invalid unit: {unit}")
[docs]def calc_climatology(Ser, moving_avg_orig=5, moving_avg_clim=None, median=False, std=False, timespan=None, fill=np.nan, wraparound=True, respect_leap_years=False, interpolate_leapday=False, fillna=True, min_obs_orig=1, min_obs_clim=1, output_freq="day"): """ Calculates the climatology of a data set. Parameters ---------- Ser : pandas.Series Time series to compute climatology for (index must be a DateTimeIndex or julian date) moving_avg_orig : float, optional (default: 5) The size of the moving_average window [days] that will be applied on the input Series (gap filling, short-term rainfall correction) moving_avg_clim : float, optional (default: None) The size of the moving_average window in days that will be applied on the calculated climatology (long-term event correction). If None is passed, it will be calculated from the 'output_freq' value: - 'day': 35 - 'month': 3 median : boolean, optional (default: False) if set to True, the climatology will be based on the median conditions std: boolean, optional (default: False) if set to True, there will be 2 columns, one for the median or mean and one of the standard deviation of the aggregated data points. timespan : [timespan_from, timespan_to], datetime.datetime(y,m,d), optional Set this to calculate the climatology based on a subset of the input Series fill : float or int, optional (default: np.nan) Fill value to use for days on which no climatology exists wraparound : boolean, optional (default: True) If set then the climatology is wrapped around at the edges before doing the second running average (long-term event correction) respect_leap_years : boolean, optional (default: False) If set then leap years will be respected during the calculation of the climatology. Only valid with 'output_freq' value set to 'day'. Default: False interpolate_leapday: boolean, optional (default: False) <description>. Only valid with 'output_freq' value set to 'day'. Default: False fillna: boolean, optional (default: True) If set, then the moving average used for the calculation of the climatology will be filled at the nan-values min_obs_orig: int (default: 1) Minimum observations required to give a valid output in the first moving average applied on the input series min_obs_clim: int (default: 1) Minimum observations required to give a valid output in the second moving average applied on the calculated climatology output_freq: str, optional (default: 'day') Determines the output frequency (time unit) of the climatology calculation (independently of the 'Ser' input frequency). Currently, supported options are 'day', 'month'. Returns ------- climatology : pandas.Series or pandas.DataFrame Containing the calculated climatology. The size of the series depends on the type of climatology being calculated, based on the value of 'output_freq': - 366 values for a daily climatology, behaving as a leap year - 12 values for a monthly climatology If 'std' is set to True, the output will be a DataFrame with 2 columns: 'climatology' and 'std'. """ # establish the moving window size default_moving_avg_clim = {"day": 35, "month": 3} if moving_avg_clim is None: moving_avg_clim = default_moving_avg_clim[output_freq] if output_freq == "month" and moving_avg_clim > 5: # in case someone changes unit but not moving_avg_clim a warning # might be useful warnings.warn(f"Window for moving average of climatology set to " f"{moving_avg_clim} months, is this intentional?") if output_freq != "day": # irrelevant at lower frequencies than daily respect_leap_years, interpolate_leapday = False, False if timespan is not None: Ser = Ser.truncate(before=timespan[0], after=timespan[1]) Ser = moving_average( Ser, window_size=moving_avg_orig, fillna=fillna, min_obs=min_obs_orig) Ser = pd.DataFrame(Ser) if type(Ser.index) == pd.DatetimeIndex: year, month, day = (np.asarray(Ser.index.year), np.asarray(Ser.index.month), np.asarray(Ser.index.day)) else: year, month, day = julian2date(Ser.index.values)[0:3] # provide indices for the selected unit indices, n_idx = _index_units( year, month, day, unit=output_freq, respect_leap_years=respect_leap_years) Ser['unit'] = indices if median: clim = Ser.groupby('unit').median() clim.name = 'climatology' else: clim = Ser.groupby('unit').mean() clim.name = 'climatology' if std: std_ser = Ser.groupby('unit').std() clim_ser = pd.concat([ clim.loc[:, 0].rename(clim.name), std_ser.loc[:, 0].rename('std')], axis=1) # yapf: disable else: clim_ser = pd.DataFrame( data={'climatology': clim.values.flatten()}, index=clim.index.values) clim_ser = clim_ser.reindex(np.arange(n_idx) + 1) if wraparound: index_old = clim_ser.index.copy() left_mirror = clim_ser.iloc[-moving_avg_clim:] right_mirror = clim_ser.iloc[:moving_avg_clim] # Shift index to start at n_idx - index at -moving_avg_clim # to run over a whole year while keeping gaps the same size right_mirror.index = right_mirror.index + n_idx * 2 clim_ser.index = clim_ser.index + n_idx clim_ser = pd.concat([left_mirror, clim_ser, right_mirror]) clim_ser['climatology'] = moving_average( clim_ser['climatology'], window_size=moving_avg_clim, fillna=fillna, min_obs=min_obs_clim) if wraparound: clim_ser = clim_ser.iloc[moving_avg_clim:-moving_avg_clim] clim_ser.index = index_old # keep hardcoding as it's only for doys if interpolate_leapday and not respect_leap_years: clim_ser.loc[60, :] = np.mean( (clim_ser.loc[59, :], clim_ser.loc[61, :])) elif interpolate_leapday and respect_leap_years: clim_ser.loc[366, :] = np.mean( (clim_ser.loc[365, :], clim_ser.loc[1, :])) clim_ser = clim_ser.fillna(fill) if len(clim_ser.columns) == 1: return clim_ser.iloc[:, 0] else: return clim_ser