Source code for pyam.timeseries

# -*- coding: utf-8 -*-

import numpy as np
from pyam.logger import logger
from pyam.utils import isstr, to_int

# %%


[docs]def fill_series(x, year): """Returns the value of a timeseries (indexed over years) for a year by linear interpolation. Parameters ---------- x: pandas.Series a timeseries to be interpolated year: int year of interpolation """ x = x.dropna() if year in x.index and not np.isnan(x[year]): return x[year] else: prev = [i for i in x.index if i < year] nxt = [i for i in x.index if i > year] if prev and nxt: p = max(prev) n = min(nxt) return ((n - year) * x[p] + (year - p) * x[n]) / (n - p) else: return np.nan
[docs]def cumulative(x, first_year, last_year): """Returns the cumulative sum of a timeseries (indexed over years), implements linear interpolation between years, ignores nan's in the range. The function includes the last-year value of the series, and raises a warning if start_year or last_year is outside of the timeseries range and returns nan Parameters ---------- x: pandas.Series a timeseries to be summed over time first_year: int first year of the sum last_year: int last year of the sum (inclusive) """ # if the timeseries does not cover the range `[first_year, last_year]`, # return nan to avoid erroneous aggregation if min(x.index) > first_year: logger().warning('the timeseries `{}` does not start by {}'.format( x.name or x, first_year)) return np.nan if max(x.index) < last_year: logger().warning('the timeseries `{}` does not extend until {}' .format(x.name or x, last_year)) return np.nan # make sure we're using integers to_int(x, index=True) x[first_year] = fill_series(x, first_year) x[last_year] = fill_series(x, last_year) years = [i for i in x.index if i >= first_year and i <= last_year and ~np.isnan(x[i])] years.sort() # loop over years if not np.isnan(x[first_year]) and not np.isnan(x[last_year]): value = 0 for (i, yr) in enumerate(years[:-1]): next_yr = years[i + 1] # the summation is shifted to include the first year fully in sum, # otherwise, would return a weighted average of `yr` and `next_yr` value += ((next_yr - yr - 1) * x[next_yr] + (next_yr - yr + 1) * x[yr]) / 2 # the loop above does not include the last element in range # (`last_year`), therefore added explicitly value += x[last_year] return value
def cross_threshold(x, threshold=0, direction=['from above', 'from below']): """Returns a list of the years in which a timeseries (indexed over years) crosses a given threshold Parameters ---------- x: pandas.Series a timeseries indexed over years threshold: float, default 0 the threshold that the timeseries is checked against direction: str, optional, default `['from above', 'from below']` whether to return all years where the threshold is crossed or only where threshold is crossed in a specific direction """ prev_yr, prev_val = None, None years = [] direction = [direction] if isstr(direction) else list(direction) if not set(direction).issubset(set(['from above', 'from below'])): raise ValueError('invalid direction `{}`'.format(direction)) for yr, val in zip(x.index, x.values): if np.isnan(val): # ignore nans in the timeseries continue if prev_val is None: prev_yr, prev_val = yr, val continue if not np.sign(prev_val - threshold) == np.sign(val - threshold): if ('from above' in direction and prev_val > val) \ or ('from below' in direction and prev_val < val): change = (val - prev_val) / (yr - prev_yr) # add one because int() rounds down cross_yr = prev_yr + int((threshold - prev_val) / change) + 1 years.append(cross_yr) prev_yr, prev_val = yr, val return years