Source code for pytesmo.validation_framework.data_manager

# Copyright (c) 2015, Vienna University of Technology (TU Wien), Department
# of Geodesy and Geoinformation (GEO).
# All rights reserved.

# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#   * Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#   * Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the distribution.
#   * Neither the name of the Vienna University of Technology, Department
#     of Geodesy and Geoinformation nor the names of its contributors may
#     be used to endorse or promote products derived from this software
#     without specific prior written permission.

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL VIENNA UNIVERSITY OF TECHNOLOGY,
# DEPARTMENT OF GEODESY AND GEOINFORMATION BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import itertools
import warnings

import pandas as pd

from pygeobase.object_base import TS


[docs]class DataManager(object): """ Class to handle the data management. Parameters ---------- datasets : dict of dicts :Keys: string, datasets names :Values: dict, containing the following fields 'class': object Class containing the method read_ts for reading the data. 'columns': list List of columns which will be used in the validation process. 'args': list, optional Args for reading the data. 'kwargs': dict, optional Kwargs for reading the data 'grids_compatible': boolean, optional If set to True the grid point index is used directly when reading other, if False then lon, lat is used and a nearest neighbour search is necessary. default: False 'use_lut': boolean, optional If set to True the grid point index (obtained from a calculated lut between reference and other) is used when reading other, if False then lon, lat is used and a nearest neighbour search is necessary. default: False 'lut_max_dist': float, optional Maximum allowed distance in meters for the lut calculation. default: None ref_name: string Name of the reference dataset period : list, optional Of type [datetime start, datetime end]. If given then the two input datasets will be truncated to start <= dates <= end. read_ts_names: string or dict of strings, optional if another method name than 'read_ts' should be used for reading the data then it can be specified here. If it is a dict then specify a function name for each dataset. Methods ------- use_lut(other_name) Returns lut between reference and other if use_lut for other dataset was set to True. get_result_names() Return results names based on reference and others names. read_reference(*args) Function to read and prepare the reference dataset. read_other(other_name, *args) Function to read and prepare the other datasets. """ def __init__(self, datasets, ref_name, period=None, read_ts_names='read_ts'): """ Initialize parameters. """ self.datasets = datasets self._add_default_values() self.reference_name = ref_name self.other_name = [] for dataset in datasets.keys(): if dataset != ref_name: self.other_name.append(dataset) if 'use_lut' not in self.datasets[dataset]: self.datasets[dataset]['use_lut'] = False try: self.reference_grid = self.datasets[ self.reference_name]['class'].grid except AttributeError: self.reference_grid = None self.period = period self.luts = self.get_luts() if type(read_ts_names) is dict: self.read_ts_names = read_ts_names else: d = {} for dataset in datasets: d[dataset] = read_ts_names self.read_ts_names = d def _add_default_values(self): """ Add defaults for args, kwargs, grids_compatible, use_lut and lut_max_dist to dataset dictionary. """ defaults = {'use_lut': False, 'args': [], 'kwargs': {}, 'grids_compatible': False, 'lut_max_dist': None} for dataset in self.datasets.keys(): new_defaults = dict(defaults) new_defaults.update(self.datasets[dataset]) self.datasets[dataset] = new_defaults
[docs] def get_luts(self): """ Returns luts between reference and others if use_lut for other datasets was set to True. Returns ------- luts : dict Keys: other datasets names Values: lut between reference and other, or None """ luts = {} for other_name in self.other_name: if self.datasets[other_name]['use_lut']: luts[other_name] = self.reference_grid.calc_lut( self.datasets[other_name]['class'].grid, max_dist=self.datasets[other_name]['lut_max_dist']) else: luts[other_name] = None return luts
@property def ds_dict(self): ds_dict = {} for dataset in self.datasets.keys(): ds_dict[dataset] = self.datasets[dataset]['columns'] return ds_dict
[docs] def get_results_names(self, n=2): return get_result_names(self.ds_dict, self.reference_name, n=n)
[docs] def read_reference(self, *args): """ Function to read and prepare the reference dataset. Calls read_ts of the dataset. Takes either 1 (gpi) or 2 (lon, lat) arguments. Parameters ---------- gpi : int Grid point index lon : float Longitude of point lat : float Latitude of point Returns ------- ref_df : pandas.DataFrame or None Reference dataframe. """ return self.read_ds(self.reference_name, *args)
[docs] def read_other(self, name, *args): """ Function to read and prepare a datasets. Calls read_ts of the dataset. Takes either 1 (gpi) or 2 (lon, lat) arguments. Parameters ---------- name : string Name of the other dataset. gpi : int Grid point index lon : float Longitude of point lat : float Latitude of point Returns ------- data_df : pandas.DataFrame or None Data DataFrame. """ return self.read_ds(name, *args)
[docs] def read_ds(self, name, *args): """ Function to read and prepare a datasets. Calls read_ts of the dataset. Takes either 1 (gpi) or 2 (lon, lat) arguments. Parameters ---------- name : string Name of the other dataset. gpi : int Grid point index lon : float Longitude of point lat : float Latitude of point Returns ------- data_df : pandas.DataFrame or None Data DataFrame. """ ds = self.datasets[name] args = list(args) args.extend(ds['args']) try: func = getattr(ds['class'], self.read_ts_names[name]) data_df = func(*args, **ds['kwargs']) if type(data_df) is TS or issubclass(type(data_df), TS): data_df = data_df.data except IOError: warnings.warn( "IOError while reading dataset {} with args {:}".format(name, args)) return None except RuntimeError as e: if e.args[0] == "No such file or directory": warnings.warn( "IOError while reading dataset {} with args {:}".format(name, args)) return None else: raise e if len(data_df) == 0: warnings.warn("No data for dataset {}".format(name)) return None if isinstance(data_df, pd.DataFrame) == False: warnings.warn("Data is not a DataFrame {:}".format(args)) return None if self.period is not None: # here we use the isoformat since pandas slice behavior is # different when using datetime objects. data_df = data_df[ self.period[0].isoformat():self.period[1].isoformat()] if len(data_df) == 0: warnings.warn("No data for dataset {} with arguments {:}".format(name, args)) return None else: return data_df
[docs] def get_data(self, gpi, lon, lat): """ Get all the data from this manager for a certain grid point, longitude, latidude combination. Parameters ---------- gpi: int grid point indices lon: float grid point longitude lat: type grid point latitude Returns ------- df_dict: dict of pandas.DataFrames Dictionary with dataset names as the key and pandas.DataFrames containing the data for the point as values. The dict will be empty if no data is available. """ df_dict = {} ref_dataframe = self.read_reference(gpi) # if no reference data available continue with the next gpi if ref_dataframe is None: return df_dict other_dataframes = self.get_other_data(gpi, lon, lat) # if no other data available continue with the next gpi if len(other_dataframes) == 0: return df_dict df_dict = other_dataframes df_dict.update({self.reference_name: ref_dataframe}) return df_dict
[docs] def get_other_data(self, gpi, lon, lat): """ Get all the data for non reference datasets from this manager for a certain grid point, longitude, latidude combination. Parameters ---------- gpi: int grid point indices lon: float grid point longitude lat: type grid point latitude Returns ------- other_dataframes: dict of pandas.DataFrames Dictionary with dataset names as the key and pandas.DataFrames containing the data for the point as values. The dict will be empty if no data is available. """ other_dataframes = {} for other_name in self.other_name: grids_compatible = self.datasets[ other_name]['grids_compatible'] if grids_compatible: other_dataframe = self.read_other( other_name, gpi) elif self.luts[other_name] is not None: other_gpi = self.luts[other_name][gpi] if other_gpi == -1: continue other_dataframe = self.read_other( other_name, other_gpi) else: other_dataframe = self.read_other( other_name, lon, lat) if other_dataframe is not None: other_dataframes[other_name] = other_dataframe return other_dataframes
[docs]def flatten(seq): l = [] for elt in seq: t = type(elt) if t is tuple or t is list: for elt2 in flatten(elt): l.append(elt2) else: l.append(elt) return l
[docs]def get_result_names(ds_dict, refkey, n=2): """ Return result names based on all possible combinations based on a reference dataset. Parameters ---------- ds_dict: dict Dict of lists containing the dataset names as keys and a list of the columns to read from the dataset as values. refkey: string dataset name to use as a reference n: int Number of datasets for combine with each other. If n=2 always two datasets will be combined into one result. If n=3 always three datasets will be combined into one results and so on. n has to be <= the number of total datasets. Returns ------- results_names : list of tuples Containing all combinations of (referenceDataset.column, otherDataset.column) """ results_names = [] ref_columns = [] for column in ds_dict[refkey]: ref_columns.append((refkey, column)) other_columns = [] other_names = list(ds_dict) del other_names[other_names.index(refkey)] for other in sorted(other_names): for column in ds_dict[other]: other_columns.append((other, column)) for comb in itertools.product(ref_columns, itertools.combinations(other_columns, n - 1)): results_names.append(comb) # flatten to one level and remove those that do not have n unique # datasets results_names = flatten(results_names) # iterate in chunks of n*2 over the list result_combos = [] for chunk in [results_names[pos:pos + n * 2] for pos in range(0, len(results_names), n * 2)]: combo = [] datasets = chunk[::2] columns = chunk[1::2] # if datasets are compared to themselves then don't include the # combination if len(set(datasets)) != n: continue for dataset, column in zip(datasets, columns): combo.append((dataset, column)) result_combos.append(tuple(combo)) return result_combos