Source code for pyam.utils

from pathlib import Path
import itertools
import logging
import string
import six
import re
import dateutil

from pyam.index import get_index_levels, replace_index_labels
from pyam.logging import raise_data_error
import numpy as np
import pandas as pd
from collections.abc import Iterable

try:
    import seaborn as sns
except ImportError:
    pass

logger = logging.getLogger(__name__)

# common indices
DEFAULT_META_INDEX = ["model", "scenario"]
META_IDX = ["model", "scenario"]
YEAR_IDX = ["model", "scenario", "region", "year"]
IAMC_IDX = ["model", "scenario", "region", "variable", "unit"]
SORT_IDX = ["model", "scenario", "variable", "year", "region"]
LONG_IDX = IAMC_IDX + ["year"]

# required columns
REQUIRED_COLS = ["region", "variable", "unit"]

# illegal terms for data/meta column names to prevent attribute conflicts
ILLEGAL_COLS = ["data", "meta"]

# dictionary to translate column count to Excel column names
NUMERIC_TO_STR = dict(
    zip(
        range(0, 702),
        [i for i in string.ascii_uppercase]
        + [
            "{}{}".format(i, j)
            for i, j in itertools.product(
                string.ascii_uppercase, string.ascii_uppercase
            )
        ],
    )
)

KNOWN_FUNCS = {
    "min": np.min,
    "max": np.max,
    "avg": np.mean,
    "mean": np.mean,
    "sum": np.sum,
}


def requires_package(pkg, msg, error_type=ImportError):
    """Decorator when a function requires an optional dependency

    Parameters
    ----------
    pkg : imported package object
    msg : string
        Message to show to user with error_type
    error_type : python error class
    """

    def _requires_package(func):
        def wrapper(*args, **kwargs):
            if pkg is None:
                raise error_type(msg)
            return func(*args, **kwargs)

        return wrapper

    return _requires_package


def isstr(x):
    """Returns True if x is a string"""
    return isinstance(x, six.string_types)


def isscalar(x):
    """Returns True if x is a scalar"""
    return not isinstance(x, Iterable) or isstr(x)


def islistable(x):
    """Returns True if x is a list but not a string"""
    return isinstance(x, Iterable) and not isstr(x)


def to_list(x):
    """Return x as a list"""
    return x if islistable(x) else [x]


def remove_from_list(x, items):
    """Remove `items` from list `x`"""
    items = to_list(items)
    return [i for i in x if i not in items]


def write_sheet(writer, name, df, index=False):
    """Write a pandas.DataFrame to an ExcelWriter

    The function applies formatting of the column width depending on maxwidth
    of values and column header

    Parameters
    ----------
    writer: pandas.ExcelWriter
        an instance of a :class:`pandas.ExcelWriter`
    name: string
        name of the sheet to be written
    df: pandas.DataFrame
        a :class:`pandas.DataFrame` to be written to the sheet
    index: boolean, default False
        should the index be written to the sheet
    """
    if index:
        df = df.reset_index()
    df.to_excel(writer, name, index=False)
    worksheet = writer.sheets[name]
    for i, col in enumerate(df.columns):
        if df.dtypes[col].name.startswith(("float", "int")):
            width = len(str(col)) + 2
        else:
            width = (
                max([df[col].map(lambda x: len(str(x or "None"))).max(), len(col)]) + 2
            )
        # this line fails if using an xlsx-engine other than openpyxl
        try:
            worksheet.column_dimensions[NUMERIC_TO_STR[i]].width = width
        except AttributeError:
            pass


def read_pandas(path, sheet_name="data*", *args, **kwargs):
    """Read a file and return a pandas.DataFrame"""
    if isinstance(path, Path) and path.suffix == ".csv":
        return pd.read_csv(path, *args, **kwargs)
    else:
        if isinstance(path, pd.ExcelFile):
            xl = path
        else:
            xl = pd.ExcelFile(
                path, engine="xlrd" if path.suffix == ".xls" else "openpyxl"
            )

        sheet_names = pd.Series(xl.sheet_names)

        # reading multiple sheets
        if len(sheet_names) > 1:
            sheets = kwargs.pop("sheet_name", sheet_name)
            # apply pattern-matching for sheet names (use * as wildcard)
            sheets = sheet_names[pattern_match(sheet_names, values=sheets)]
            if sheets.empty:
                raise ValueError(f"No sheets {sheet_name} in file {path}!")

            df = pd.concat([xl.parse(s, *args, **kwargs) for s in sheets])

        # read single sheet (if only one exists in file) ignoring sheet name
        else:
            df = pd.read_excel(xl, *args, **kwargs)

        # remove unnamed and empty columns, and rows were all values are nan
        def is_empty(name, s):
            if str(name).startswith("Unnamed: "):
                try:
                    if len(s) == 0 or all(np.isnan(s)):
                        return True
                except TypeError:
                    pass
            return False

        empty_cols = [c for c in df.columns if is_empty(c, df[c])]
        return df.drop(columns=empty_cols).dropna(axis=0, how="all")


def read_file(path, *args, **kwargs):
    """Read data from a file"""
    # extract kwargs that are intended for `format_data`
    format_kwargs = dict(index=kwargs.pop("index"))
    for c in [i for i in IAMC_IDX + ["year", "time", "value"] if i in kwargs]:
        format_kwargs[c] = kwargs.pop(c)
    return format_data(read_pandas(path, *args, **kwargs), **format_kwargs)


def format_data(df, index, **kwargs):
    """Convert a pandas.Dataframe or pandas.Series to the required format"""
    if isinstance(df, pd.Series):
        df.name = df.name or "value"
        df = df.to_frame()

    # check for R-style year columns, converting where necessary
    def convert_r_columns(c):
        try:
            first = c[0]
            second = c[1:]
            if first == "X":
                try:
                    #  bingo! was X2015 R-style, return the integer
                    return int(second)
                except:
                    # nope, not an int, fall down to final return statement
                    pass
        except:
            # not a string/iterable/etc, fall down to final return statement
            pass
        return c

    df.columns = df.columns.map(convert_r_columns)

    # if `value` is given but not `variable`,
    # melt value columns and use column name as `variable`
    if "value" in kwargs and "variable" not in kwargs:
        value = kwargs.pop("value")
        value = value if islistable(value) else [value]
        _df = df.set_index(list(set(df.columns) - set(value)))
        dfs = []
        for v in value:
            if v not in df.columns:
                raise ValueError("column `{}` does not exist!".format(v))
            vdf = _df[v].to_frame().rename(columns={v: "value"})
            vdf["variable"] = v
            dfs.append(vdf.reset_index())
        df = pd.concat(dfs).reset_index(drop=True)

    # otherwise, rename columns or concat to IAMC-style or do a fill-by-value
    for col, value in kwargs.items():
        if col in df:
            raise ValueError(f"Conflict of kwarg with column `{col}` in dataframe!")

        if isstr(value) and value in df:
            df.rename(columns={value: col}, inplace=True)
        elif islistable(value) and all([c in df.columns for c in value]):
            df[col] = df.apply(lambda x: concat_with_pipe(x, value), axis=1)
            df.drop(value, axis=1, inplace=True)
        elif isstr(value):
            df[col] = value
        else:
            raise ValueError(f"Invalid argument for casting `{col}: {value}`")

    # all lower case
    str_cols = [c for c in df.columns if isstr(c)]
    df.rename(columns={c: str(c).lower() for c in str_cols}, inplace=True)

    if "notes" in df.columns:  # this came from the database
        logger.info("Ignoring notes column in dataframe")
        df.drop(columns="notes", inplace=True)
        col = df.columns[0]  # first column has database copyright notice
        df = df[~df[col].str.contains("database", case=False)]
        if "scenario" in df.columns and "model" not in df.columns:
            # model and scenario are jammed together in RCP data
            scen = df["scenario"]
            df.loc[:, "model"] = scen.apply(lambda s: s.split("-")[0].strip())
            df.loc[:, "scenario"] = scen.apply(
                lambda s: "-".join(s.split("-")[1:]).strip()
            )

    # reset the index if meaningful entries are included there
    if not list(df.index.names) == [None]:
        df.reset_index(inplace=True)

    # check that there is no column in the timeseries data with reserved names
    conflict_cols = [i for i in df.columns if i in ILLEGAL_COLS]
    if conflict_cols:
        msg = f"Column name {conflict_cols} is illegal for timeseries data.\n"
        _args = ", ".join([f"{i}_1='{i}'" for i in conflict_cols])
        msg += f"Use `IamDataFrame(..., {_args})` to rename at initialization."
        raise ValueError(msg)

    # check that index and required columns exist
    missing_index = [c for c in index if c not in df.columns]
    if missing_index:
        raise ValueError(f"Missing index columns: {missing_index}")

    missing_required_col = [c for c in REQUIRED_COLS if c not in df.columns]
    if missing_required_col:
        raise ValueError(f"Missing required columns: {missing_required_col}")

    # check whether data in wide format (IAMC) or long format (`value` column)
    if "value" in df.columns:
        # check if time column is given as `year` (int) or `time` (datetime)
        if "year" in df.columns and "time" not in df.columns:
            time_col = "year"
        elif "time" in df.columns and "year" not in df.columns:
            time_col = "time"
        else:
            raise ValueError("Invalid time domain, must have either `year` or `time`!")
        extra_cols = [
            c
            for c in df.columns
            if c not in index + REQUIRED_COLS + [time_col, "value"]
        ]
    else:
        # if in wide format, check if columns are years (int) or datetime
        cols = [c for c in df.columns if c not in index + REQUIRED_COLS]
        year_cols, time_cols, extra_cols = [], [], []
        for i in cols:
            # if the column name can be cast to integer, assume it's a year column
            try:
                int(i)
                year_cols.append(i)

            # otherwise, try casting to datetime
            except (ValueError, TypeError):
                try:
                    dateutil.parser.parse(str(i))
                    time_cols.append(i)

                # neither year nor datetime, so it is an extra-column
                except ValueError:
                    extra_cols.append(i)

        if year_cols and not time_cols:
            time_col = "year"
            melt_cols = sorted(year_cols)
        else:
            time_col = "time"
            melt_cols = sorted(year_cols) + sorted(time_cols)
        if not melt_cols:
            raise ValueError("Missing time domain")

        # melt the dataframe
        df = pd.melt(
            df,
            id_vars=index + REQUIRED_COLS + extra_cols,
            var_name=time_col,
            value_vars=melt_cols,
            value_name="value",
        )

    # cast value column to numeric and drop nan
    try:
        df["value"] = pd.to_numeric(df["value"])
    except ValueError as e:
        # get the row number where the error happened
        row_nr_regex = re.compile(r"(?<=at position )\d+")
        row_nr = int(row_nr_regex.search(str(e)).group())
        short_error_regex = re.compile(r".*(?= at position \d*)")
        short_error = short_error_regex.search(str(e)).group()
        raise_data_error(f"{short_error} in `data`", df.iloc[[row_nr]])

    df.dropna(inplace=True, subset=["value"])

    # replace missing units by an empty string for user-friendly filtering
    df.loc[df.unit.isnull(), "unit"] = ""

    # verify that there are no nan's left (in columns)
    null_rows = df.isnull().T.any()
    if null_rows.any():
        cols = ", ".join(df.columns[df.isnull().any().values])
        raise_data_error(
            f"Empty cells in `data` (columns: '{cols}')", df.loc[null_rows]
        )
    del null_rows

    # cast to pd.Series, check for duplicates
    idx_cols = index + REQUIRED_COLS + [time_col] + extra_cols
    df = df.set_index(idx_cols).value

    # format the time-column
    _time = [to_time(i) for i in get_index_levels(df.index, time_col)]
    df.index = replace_index_labels(df.index, time_col, _time)

    rows = df.index.duplicated()
    if any(rows):
        raise_data_error(
            "Duplicate rows in `data`", df[rows].index.to_frame(index=False)
        )
    del rows
    if df.empty:
        logger.warning("Formatted data is empty!")

    return df.sort_index(), index, time_col, extra_cols


def sort_data(data, cols):
    """Sort data rows and order columns by cols"""
    return data.sort_values(cols)[cols + ["value"]].reset_index(drop=True)


def merge_meta(left, right, ignore_conflict=False):
    """Merge two `meta` tables; raise if values are in conflict (optional)

    If conflicts are ignored, values in `left` take precedence over `right`.
    """
    left = left.copy()  # make a copy to not change the original object
    diff = right.index.difference(left.index)
    sect = right.index.intersection(left.index)

    # merge `right` into `left` for overlapping scenarios ( `sect`)
    if not sect.empty:
        # if not ignored, check that overlapping `meta` columns are equal
        if not ignore_conflict:
            cols = [i for i in right.columns if i in left.columns]
            if not left.loc[sect, cols].equals(right.loc[sect, cols]):
                conflict_idx = (
                    pd.concat([right.loc[sect, cols], left.loc[sect, cols]])
                    .drop_duplicates()
                    .index.drop_duplicates()
                )
                msg = "conflict in `meta` for scenarios {}".format(
                    [i for i in pd.DataFrame(index=conflict_idx).index]
                )
                raise ValueError(msg)
        # merge new columns
        cols = [i for i in right.columns if i not in left.columns]
        left = left.merge(
            right.loc[sect, cols], how="outer", left_index=True, right_index=True
        )

    # join `other.meta` for new scenarios (`diff`)
    if not diff.empty:
        left = pd.concat([left, right.loc[diff, :]], sort=False)

    # remove any columns that are all-nan
    return left.dropna(axis=1, how="all")


[docs]def find_depth(data, s="", level=None): """Return or assert the depth (number of ``|``) of variables Parameters ---------- data : str or list of strings IAMC-style variables s : str, default '' remove leading `s` from any variable in `data` level : int or str, default None if None, return depth (number of ``|``); else, return list of booleans whether depth satisfies the condition (equality if level is int, >= if ``.+``, <= if ``.-``) """ if isstr(data): return _find_depth([data], s, level)[0] return _find_depth(data, s, level)
def _find_depth(data, s="", level=None): """Internal implementation of `find_depth()ยด""" # remove wildcard as last character from string, escape regex characters _s = re.compile("^" + _escape_regexp(s.rstrip("*"))) _p = re.compile("\\|") # find depth def _count_pipes(val): return len(_p.findall(re.sub(_s, "", val))) if _s.match(val) else None n_pipes = map(_count_pipes, to_list(data)) # if no level test is specified, return the depth as (list of) int if level is None: return list(n_pipes) # if `level` is given, set function for finding depth level =, >=, <= |s if not isstr(level): test = lambda x: level == x if x is not None else False elif level[-1] == "-": level = int(level[:-1]) test = lambda x: level >= x if x is not None else False elif level[-1] == "+": level = int(level[:-1]) test = lambda x: level <= x if x is not None else False else: raise ValueError("Unknown level type: `{}`".format(level)) return list(map(test, n_pipes)) def pattern_match( data, values, level=None, regexp=False, has_nan=False, return_codes=False ): """Return list where data matches values The function matches model/scenario names, variables, regions and meta columns to pseudo-regex (if `regexp == False`) for filtering (str, int, bool) """ codes = [] matches = np.zeros(len(data), dtype=bool) values = values if islistable(values) else [values] # issue (#40) with string-to-nan comparison, replace nan by empty string _data = data.fillna("") if has_nan else data for s in values: if return_codes and isinstance(data, pd.Index): try: codes.append(data.get_loc(s)) continue except KeyError: pass if isstr(s): pattern = re.compile(_escape_regexp(s) + "$" if not regexp else s) depth = True if level is None else find_depth(_data, s, level) matches |= data.str.match(pattern) & depth else: matches = np.logical_or(matches, data == s) if return_codes: codes.extend(np.where(matches)[0]) return codes return matches def _escape_regexp(s): """Escape characters with specific regexp use""" return ( str(s) .replace("|", "\\|") .replace(".", "\.") # `.` has to be replaced before `*` .replace("*", ".*") .replace("+", "\+") .replace("(", "\(") .replace(")", "\)") .replace("$", "\\$") ) def print_list(x, n): """Return a printable string of a list shortened to n characters""" # if list is empty, only write count if len(x) == 0: return "(0)" # write number of elements, subtract count added at end from line width x = [i if i != "" else "''" for i in map(str, x)] count = f" ({len(x)})" n -= len(count) # if not enough space to write first item, write shortest sensible line if len(x[0]) > n - 5: return "..." + count # if only one item in list if len(x) == 1: return f"{x[0]} (1)" # add first item lst = f"{x[0]}, " n -= len(lst) # if possible, add last item before number of elements if len(x[-1]) + 4 > n: return lst + "..." + count else: count = f"{x[-1]}{count}" n -= len({x[-1]}) + 3 # iterate over remaining entries until line is full for i in x[1:-1]: if len(i) + 6 <= n: lst += f"{i}, " n -= len(i) + 2 else: lst += "... " break return lst + count def to_time(x): """Cast a value to either year (int) or datetime""" # if the column name can be cast to integer, assume it's a year column try: j = int(x) is_year = True # otherwise, try casting to Timestamp (pandas-equivalent of datetime) except (ValueError, TypeError): try: j = pd.Timestamp(x) is_year = False except ValueError: raise ValueError(f"Invalid time domain: {x}") # This is to guard against "years" with decimals (e.g., '2010.5') if is_year and float(x) != j: raise ValueError(f"Invalid time domain: {x}") return j def to_int(x, index=False): """Formatting series or timeseries columns to int and checking validity If `index=False`, the function works on the :class:`pandas.Series` x; else, the function casts the index of x to int and returns x with new index """ _x = x.index if index else x cols = list(map(int, _x)) error = _x[cols != _x] if not error.empty: raise ValueError(f"Invalid values: {error}") if index: x.index = cols return x else: return _x
[docs]def concat_with_pipe(x, cols=None): """Concatenate a pandas.Series x using ``|``, drop None or numpy.nan""" cols = cols or x.index return "|".join([x[i] for i in cols if x[i] not in [None, np.nan]])
[docs]def reduce_hierarchy(x, depth): """Reduce the hierarchy (indicated by ``|``) of x to the specified depth""" _x = x.split("|") depth = len(_x) + depth - 1 if depth < 0 else depth return "|".join(_x[0 : (depth + 1)])
[docs]def get_variable_components(x, level, join=False): """Return components for requested level in a list or join these in a str. Parameters ---------- x : str Uses ``|`` to separate the components of the variable. level : int or list of int Position of the component. join : bool or str, optional If True, IAMC-style (``|``) is used as separator for joined components. """ _x = x.split("|") if join is False: return [_x[i] for i in level] if islistable(level) else _x[level] else: level = [level] if type(level) == int else level join = "|" if join is True else join return join.join([_x[i] for i in level])
def s(n): """Return an s if n!=1 for nicer formatting of log messages""" return "s" if n != 1 else ""