Source code for pytesmo.io.ismn.readers
# Copyright (c) 2013,Vienna University of Technology, Department of Geodesy and Geoinformation
# All rights reserved.
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the <organization> nor the
# names of its contributors may be used to endorse or promote products
# derived from this software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Created on Jul 31, 2013
@author: Christoph Paulik christoph.paulik@geo.tuwien.ac.at
'''
import os
import pandas as pd
from datetime import datetime
import numpy as np
variable_lookup = {'sm': 'soil moisture',
'ts': 'soil temperature',
'su': 'soil suction',
'p': 'precipitation',
'ta': 'air temperature',
'fc': 'field capacity',
'wp': 'permanent wilting point',
'paw': 'plant available water',
'ppaw': 'potential plant available water',
'sat': 'saturation',
'si_h': 'silt fraction',
'sd': 'snow depth',
'sa_h': 'sand fraction',
'cl_h': 'clay fraction',
'oc_h': 'organic carbon',
'sweq': 'snow water equivalent',
'tsf': 'surface temperature',
'tsfq': 'surface temperature quality flag original'
}
[docs]class ISMNTimeSeries(object):
"""
class that contains a time series of ISMN data read from one text file
Attributes
----------
network : string
network the time series belongs to
station : string
station name the time series belongs to
latitude : float
latitude of station
longitude : float
longitude of station
elevation : float
elevation of station
variable : list
variable measured
depth_from : list
shallower depth of layer the variable was measured at
depth_to : list
deeper depth of layer the variable was measured at
sensor : string
sensor name
data : pandas.DataFrame
data of the time series
"""
def __init__(self, data):
for key in data:
setattr(self, key, data[key])
def __repr__(self):
return '%s %s %.2f m - %.2f m %s measured with %s ' % (
self.network,
self.station,
self.depth_from[0],
self.depth_to[0],
self.variable[0],
self.sensor)
[docs] def plot(self, *args, **kwargs):
"""
wrapper for pandas.DataFrame.plot which adds title to plot
and drops NaN values for plotting
Returns
-------
ax : axes
matplotlib axes of the plot
Raises
------
ISMNTSError
if data attribute is not a pandas.DataFrame
"""
if type(self.data) is pd.DataFrame:
tempdata = self.data.dropna()
tempdata = tempdata[tempdata.columns[0]]
ax = tempdata.plot(*args, figsize=(15, 5), **kwargs)
ax.set_title(self.__repr__())
return ax
else:
raise ISMNTSError("data attribute is not a pandas.DataFrame")
[docs]def get_info_from_file(filename):
"""
reads first line of file and splits filename
this can be used to construct necessary metadata information
for all ISMN formats
Parameters
----------
filename : string
filename including path
Returns
-------
header_elements : list
first line of file split into list
filename_elements : list
filename without path split by _
"""
with open(filename, 'U') as f:
header = f.readline()
header_elements = header.split()
path, filen = os.path.split(filename)
filename_elements = filen.split('_')
return header_elements, filename_elements
[docs]def get_metadata_header_values(filename):
"""
get metadata from ISMN textfiles in the format called
Variables stored in separate files (CEOP formatted)
Parameters
----------
filename : string
path and name of file
Returns
-------
metadata : dict
dictionary of metadata information
"""
header_elements, filename_elements = get_info_from_file(filename)
if len(filename_elements) > 9:
sensor = '_'.join(filename_elements[6:len(filename_elements) - 2])
else:
sensor = filename_elements[6]
if filename_elements[3] in variable_lookup:
variable = [variable_lookup[filename_elements[3]]]
else:
variable = [filename_elements[3]]
metadata = {'network': header_elements[1],
'station': header_elements[2],
'latitude': float(header_elements[3]),
'longitude': float(header_elements[4]),
'elevation': float(header_elements[5]),
'depth_from': [float(header_elements[6])],
'depth_to': [float(header_elements[7])],
'variable': variable,
'sensor': sensor}
return metadata
[docs]def read_format_header_values(filename):
"""
Reads ISMN textfiles in the format called
Variables stored in separate files (Header + values)
Parameters
----------
filename : string
path and name of file
Returns
-------
time_series : ISMNTimeSeries
ISMNTimeSeries object initialized with metadata and data from file
"""
metadata = get_metadata_header_values(filename)
data = pd.read_csv(filename, skiprows=1, delim_whitespace=True,
names=['date', 'time', metadata['variable'][0],
metadata['variable'][0] + '_flag',
metadata['variable'][0] + '_orig_flag'],
parse_dates=[[0, 1]])
data.set_index('date_time', inplace=True)
metadata['data'] = data
return ISMNTimeSeries(metadata)
[docs]def get_metadata_ceop_sep(filename):
"""
get metadata from ISMN textfiles in the format called
Variables stored in separate files (CEOP formatted)
Parameters
----------
filename : string
path and name of file
Returns
-------
metadata : dict
dictionary of metadata information
"""
header_elements, filename_elements = get_info_from_file(filename)
if len(filename_elements) > 9:
sensor = '_'.join(filename_elements[6:len(filename_elements) - 2])
else:
sensor = filename_elements[6]
if filename_elements[3] in variable_lookup:
variable = [variable_lookup[filename_elements[3]]]
else:
variable = [filename_elements[3]]
metadata = {'network': filename_elements[1],
'station': filename_elements[2],
'variable': variable,
'depth_from': [float(filename_elements[4])],
'depth_to': [float(filename_elements[5])],
'sensor': sensor,
'latitude': float(header_elements[7]),
'longitude': float(header_elements[8]),
'elevation': float(header_elements[9])
}
return metadata
[docs]def read_format_ceop_sep(filename):
"""
Reads ISMN textfiles in the format called
Variables stored in separate files (CEOP formatted)
Parameters
----------
filename : string
path and name of file
Returns
-------
time_series : ISMNTimeSeries
ISMNTimeSeries object initialized with metadata and data from file
"""
metadata = get_metadata_ceop_sep(filename)
data = pd.read_csv(filename, delim_whitespace=True, usecols=[0, 1, 12, 13, 14],
names=['date', 'time',
metadata['variable'][0],
metadata['variable'][0] + '_flag',
metadata['variable'][0] + '_orig_flag'],
parse_dates=[[0, 1]])
data.set_index('date_time', inplace=True)
metadata['data'] = data
return ISMNTimeSeries(metadata)
[docs]def get_metadata_ceop(filename):
"""
get metadata from ISMN textfiles in the format called
CEOP Reference Data Format
Parameters
----------
filename : string
path and name of file
Returns
-------
metadata : dict
dictionary of metadata information
"""
header_elements, filename_elements = get_info_from_file(filename)
metadata = {'network': filename_elements[1],
'station': header_elements[6],
'variable': ['ts', 'sm'],
'sensor': 'n.s',
'depth_from': ['multiple'],
'depth_to': ['multiple'],
'latitude': float(header_elements[7]),
'longitude': float(header_elements[8]),
'elevation': float(header_elements[9])
}
return metadata
[docs]def read_format_ceop(filename):
"""
Reads ISMN textfiles in the format called
CEOP Reference Data Format
Parameters
----------
filename : string
path and name of file
Returns
-------
time_series : ISMNTimeSeries
ISMNTimeSeries object initialized with metadata and data from file
"""
metadata = get_metadata_ceop(filename)
data = pd.read_csv(filename, delim_whitespace=True, usecols=[0, 1, 11, 12, 13, 14, 15],
names=['date', 'time', 'depth_from',
metadata['variable'][0],
metadata['variable'][0] + '_flag',
metadata['variable'][1],
metadata['variable'][1] + '_flag'],
na_values=['-999.99'],
parse_dates=[[0, 1]])
date_index = data['date_time']
depth_index = data['depth_from']
del data['date_time']
del data['depth_from']
data.index = pd.MultiIndex.from_arrays([depth_index,
depth_index,
date_index])
data.index.names = ['depth_from', 'depth_to', 'date']
data = data.sortlevel(0)
metadata['depth_from'] = np.unique(
data.index.get_level_values(0).values).tolist()
metadata['depth_to'] = np.unique(
data.index.get_level_values(1).values).tolist()
metadata['data'] = data
return ISMNTimeSeries(metadata)
[docs]def tail(f, lines=1, _buffer=4098):
"""Tail a file and get X lines from the end
Parameters
----------
f: file like object
lines: int
lines from the end of the file to read
_buffer: int
buffer to use to step backwards in the file.
References
----------
Found at http://stackoverflow.com/a/13790289/1314882
"""
# place holder for the lines found
lines_found = []
# block counter will be multiplied by buffer
# to get the block size from the end
block_counter = -1
# loop until we find X lines
while len(lines_found) < lines:
try:
f.seek(block_counter * _buffer, os.SEEK_END)
except IOError: # either file is too small, or too many lines requested
f.seek(0)
lines_found = f.readlines()
break
lines_found = f.readlines()
# we found enough lines, get out
if len(lines_found) > lines:
break
# decrement the block counter to get the
# next X bytes
block_counter -= 1
return lines_found[-lines:]
[docs]def get_min_max_timestamp_header_values(filename):
"""
Get minimum and maximum observation timestamp from header values format.
"""
with open(filename, mode='rU') as fid:
_ = fid.readline()
first = fid.readline()
last = tail(fid)[0]
min_date = datetime.strptime(first[:16], '%Y/%m/%d %H:%M')
max_date = datetime.strptime(last[:16], '%Y/%m/%d %H:%M')
return min_date, max_date
[docs]def get_min_max_timestamp_ceop_sep(filename):
"""
Get minimum and maximum observation timestamp from ceop_sep format.
"""
with open(filename, mode='rU') as fid:
first = fid.readline()
last = tail(fid)[0]
min_date = datetime.strptime(first[:16], '%Y/%m/%d %H:%M')
max_date = datetime.strptime(last[:16], '%Y/%m/%d %H:%M')
return min_date, max_date
[docs]def get_min_max_timestamp_ceop(filename):
"""
Get minimum and maximum observation timestamp from ceop format.
"""
with open(filename, mode='rU') as fid:
first = fid.readline()
last = tail(fid)[0]
min_date = datetime.strptime(first[:16], '%Y/%m/%d %H:%M')
max_date = datetime.strptime(last[:16], '%Y/%m/%d %H:%M')
return min_date, max_date
[docs]def get_min_max_timestamp(filename):
"""
Determine the file type and get the minimum and maximum observation
timestamp
"""
dicton = globals()
func = dicton['get_min_max_timestamp_' + get_format(filename)]
return func(filename)
[docs]def get_format(filename):
"""
get's the file format from the length of
the header and filename information
Parameters
----------
filename : string
Returns
-------
methodname : string
name of method used to read the detected format
Raises
------
ReaderException
if filename or header parts do not fit one of the formats
"""
header_elements, filename_elements = get_info_from_file(filename)
if len(filename_elements) == 5 and len(header_elements) == 16:
return 'ceop'
if len(header_elements) == 15 and len(filename_elements) >= 9:
return 'ceop_sep'
if len(header_elements) < 14 and len(filename_elements) >= 9:
return 'header_values'
raise ReaderException(
"This does not seem to be a valid ISMN filetype %s" % filename)
[docs]def read_data(filename):
"""
reads ISMN data in any format
Parameters
----------
filename: string
Returns
-------
timeseries: IMSNTimeSeries
"""
dicton = globals()
func = dicton['read_format_' + get_format(filename)]
return func(filename)
[docs]def get_metadata(filename):
"""
reads ISMN metadata from any format
Parameters
----------
filename: string
Returns
-------
metadata: dict
"""
dicton = globals()
func = dicton['get_metadata_' + get_format(filename)]
return func(filename)