import json
import logging
import os.path
from functools import lru_cache
from io import StringIO
from pathlib import Path
import httpx
import ixmp4
import jwt
import numpy as np
import pandas as pd
import requests
import yaml
from ixmp4.cli.platforms import tabulate_manager_platforms
from ixmp4.conf import settings
from ixmp4.conf.auth import ManagerAuth
from requests.auth import AuthBase
from pyam.core import IamDataFrame
from pyam.ixmp4 import read_ixmp4
from pyam.logging import deprecation_warning
from pyam.str import is_str
from pyam.utils import (
IAMC_IDX,
META_IDX,
is_list_like,
pattern_match,
)
logger = logging.getLogger(__name__)
# set requests-logger to WARNING only
logging.getLogger("requests").setLevel(logging.WARNING)
_AUTH_URL = "https://api.manager.ece.iiasa.ac.at"
_CITE_MSG = """
You are connected to the {} scenario explorer hosted by IIASA.
If you use this data in any published format, please cite the
data as provided in the explorer guidelines: {}
""".replace("\n", "")
IXMP4_LOGIN = "Please run `ixmp4 login <username>` in a console"
# path to local configuration settings
DEFAULT_IIASA_CREDS = Path("~").expanduser() / ".local" / "pyam" / "iiasa.yaml"
def set_config(*args, **kwargs):
raise DeprecationWarning(f"This method is deprecated. {IXMP4_LOGIN}.")
def _read_config(file):
"""Read username and password for IIASA API connection from file"""
with open(file, "r") as stream:
creds = yaml.safe_load(stream)
return ManagerAuth(**creds, url=str(settings.manager_url))
def _check_response(r, msg="Error connecting to IIASA database", error=RuntimeError):
if not r.ok:
raise error(f"{msg}: {r.text}")
class SceSeAuth(AuthBase):
def __init__(self, creds: str = None, auth_url: str = _AUTH_URL):
"""Connection to the Scenario Services manager service for authentication.
Parameters
----------
creds : pathlib.Path or str, optional
Path to a file with authentication credentials. This feature is deprecated,
please run `ixmp4 login <username>` in a console instead.
auth_url : str, optional
Url of the authentication service
"""
self.client = httpx.Client(base_url=auth_url, timeout=10.0, http2=True)
if creds is None:
if DEFAULT_IIASA_CREDS.exists():
deprecation_warning(
f"{IXMP4_LOGIN} and manually delete the file "
f"'{DEFAULT_IIASA_CREDS}'. Using a pyam-credentials file",
)
self.auth = _read_config(DEFAULT_IIASA_CREDS)
else:
self.auth = ixmp4.conf.settings.default_auth
elif isinstance(creds, Path) or is_str(creds):
deprecation_warning(f"{IXMP4_LOGIN}.", "Using a pyam-credentials file")
self.auth = _read_config(creds)
else:
raise DeprecationWarning(
"Passing credentials as clear-text is not allowed. "
f"{IXMP4_LOGIN} instead."
)
# self.auth is None if connection to manager service cannot be established
if self.auth is None:
raise httpx.ConnectError("No connection to IIASA manager service.")
# explicit token for anonymous login is not necessary for ixmp4 platforms
# but is required for legacy Scenario Explorer databases
if self.auth.user.username == "@anonymous":
self._get_anonymous_token()
else:
self.user = self.auth.user.username
self.access_token = self.auth.access_token
def _get_anonymous_token(self):
r = self.client.get("/legacy/anonym/")
if r.status_code >= 400:
raise ValueError("Unknown API error: " + r.text)
self.user, self.access_token = None, r.json()
def __call__(self):
try:
# raises jwt.ExpiredSignatureError if token is expired
jwt.decode(
self.access_token,
options={"verify_signature": False, "verify_exp": True},
)
except jwt.ExpiredSignatureError:
if self.auth.user.username == "@anonymous":
self._get_anonymous_token()
else:
self.auth.refresh_or_reobtain_jwt()
self.access_token = self.auth.access_token
return {"Authorization": "Bearer " + self.access_token}
[docs]
class Connection(object):
"""A class to facilitate querying an IIASA Scenario Explorer database API
Parameters
----------
name : str, optional
The name of a database API.
Use :attr:`valid_connections <pyam.iiasa.Connection.valid_connections>`
for a list of available APIs.
creds : pathlib.Path or str, optional
Path to a file with authentication credentials. This feature is deprecated,
please run `ixmp4 login <username>` in a console instead.
auth_url : str, optional
custom authentication server URL
Notes
-----
Credentials (username & password) are not required to access public |ixmp4|
or Scenario Explorer databases (i.e., with Guest login).
"""
def __init__(self, name=None, creds=None, auth_url=_AUTH_URL):
self._auth_url = auth_url # scenario services manager API
self._base_url = None # database connection API
self.auth = SceSeAuth(creds=creds, auth_url=self._auth_url)
# connect if provided a name
self._connected = None
if name:
self.connect(name)
if self.auth.user is not None:
logger.info(f"You are connected as user `{self.auth.user}`")
else:
logger.info("You are connected as an anonymous user")
@property
@lru_cache()
def _connection_map(self):
# TODO: application-list will be reimplemented in conjunction with ixmp-server
r = self.auth.client.get("legacy/applications", headers=self.auth())
if r.status_code >= 400:
raise ValueError("Unknown API error: " + r.text)
aliases = set()
conn_map = {}
for x in r.json():
if "config" in x:
env = next(
(r["value"] for r in x["config"] if r["path"] == "env"), None
)
name = x["name"]
if env is not None:
if env in aliases:
logger.warning("Duplicate instance alias {}".format(env))
conn_map[name] = name
first_duplicate = conn_map.pop(env)
conn_map[first_duplicate] = first_duplicate
else:
conn_map[env] = name
aliases.add(env)
else:
conn_map[name] = name
return conn_map
@property
@lru_cache()
def valid_connections(self):
"""Return available resources (database API connections)"""
logger.warning(
"IIASA is migrating to a database infrastructure using the ixmp4 package."
"Use `pyam.iiasa.platforms()` to list available ixmp4 databases."
)
return list(self._connection_map.keys())
[docs]
def connect(self, name):
"""Connect to a specific resource (database API)"""
if name in self._connection_map:
name = self._connection_map[name]
valid = self._connection_map.values()
if name not in valid:
raise ValueError(
f"You do not have access to instance '{name}' or it does not exist. "
"Use `pyam.iiasa.Connection().valid_connections` for a list "
"of accessible services."
)
# TODO: config will be reimplemented in conjunction with ixmp-server
r = self.auth.client.get(
f"legacy/applications/{name}/config", headers=self.auth()
)
if r.status_code >= 400:
raise ValueError("Unknown API error: " + r.text)
response = r.json()
idxs = {x["path"]: i for i, x in enumerate(response)}
self._base_url = response[idxs["baseUrl"]]["value"]
# TODO: proper citation (as metadata) instead of link to the about page
if "uiUrl" in idxs:
about = "/".join([response[idxs["uiUrl"]]["value"], "#", "about"])
logger.info(_CITE_MSG.format(name, about))
# TODO: use API "nice-name"
self._connected = name
@property
def current_connection(self):
"""Currently connected resource (database API connection)"""
return self._connected
@property
def meta_columns(self):
"""Return the list of meta indicators in the connected resource"""
url = "/".join([self._base_url, "metadata/types"])
r = requests.get(url, headers=self.auth())
_check_response(r)
return pd.read_json(StringIO(r.text), orient="records")["name"]
def _query_index(self, default_only=True, meta=False, cols=[], **kwargs):
# TODO: at present this reads in all data for all scenarios,
# it could be sped up in the future to try to query a subset
_default = "true" if default_only else "false"
_meta = "true" if meta else "false"
add_url = f"runs?getOnlyDefaultRuns={_default}&includeMetadata={_meta}"
url = "/".join([self._base_url, add_url])
r = requests.get(url, headers=self.auth())
_check_response(r)
# cast response to dataframe, apply filter by kwargs, and return
runs = pd.read_json(StringIO(r.text), orient="records")
if runs.empty:
logger.warning("No permission to view model(s) or no scenarios exist.")
return pd.DataFrame([], columns=META_IDX + ["version", "run_id"] + cols)
if kwargs:
keep = np.ones(len(runs), dtype=bool)
for key, values in kwargs.items():
if key not in META_IDX + ["version"]:
raise ValueError(f"Invalid filter: '{key}'")
keep_col = pattern_match(pd.Series(runs[key].values), values)
keep = np.logical_and(keep, keep_col)
return runs[keep]
else:
return runs
[docs]
def index(self, default_only=True, **kwargs):
"""Return the index of models and scenarios
Parameters
----------
default_only : bool, optional
If `True`, return *only* the default version of a model/scenario.
If `False`, return all versions.
**kwargs
Arguments to filter by *model* and *scenario*, `*` can be used as wildcard.
"""
if "default" in kwargs:
default_only = _new_default_api(kwargs)
cols = ["version"] if default_only else ["version", "is_default"]
return self._query_index(default_only, **kwargs)[META_IDX + cols].set_index(
META_IDX
)
[docs]
def properties(self, default_only=True, **kwargs):
"""Return the audit properties of scenarios
Parameters
----------
default_only : bool, optional
If `True`, return *only* the default version of a model/scenario.
If `False`, return all versions.
**kwargs
Arguments to filer by *model* and *scenario*, `*` can be used as wildcard
"""
if "default" in kwargs:
default_only = _new_default_api(kwargs)
audit_cols = ["cre_user", "cre_date", "upd_user", "upd_date"]
other_cols = ["version"] if default_only else ["version", "is_default"]
cols = audit_cols + other_cols
_df = self._query_index(default_only, meta=True, cols=cols, **kwargs)
audit_mapping = dict([(i, i.replace("_", "ate_")) for i in audit_cols])
return _df.set_index(META_IDX).rename(columns=audit_mapping)
[docs]
def models(self):
"""List all models in the connected resource"""
return pd.Series(self._query_index()["model"].unique(), name="model")
[docs]
def scenarios(self):
"""List all scenarios in the connected resource"""
return pd.Series(self._query_index()["scenario"].unique(), name="scenario")
[docs]
@lru_cache()
def variables(self):
"""List all variables in the connected resource"""
url = "/".join([self._base_url, "ts"])
r = requests.get(url, headers=self.auth())
_check_response(r)
df = pd.read_json(StringIO(r.text), orient="records")
return pd.Series(df["variable"].unique(), name="variable")
[docs]
@lru_cache()
def regions(self, include_synonyms=False):
"""List all regions in the connected resource
Parameters
----------
include_synonyms : bool
whether to include synonyms
(possibly leading to duplicate region names for
regions with more than one synonym)
"""
url = "/".join([self._base_url, "nodes?hierarchy=%2A"])
params = {"includeSynonyms": include_synonyms}
r = requests.get(url, headers=self.auth(), params=params)
_check_response(r)
return self.convert_regions_payload(r.text, include_synonyms)
@staticmethod
def convert_regions_payload(response, include_synonyms):
df = pd.read_json(StringIO(response), orient="records")
if df.empty:
return df
if "synonyms" not in df.columns:
df["synonyms"] = [list()] * len(df)
df = df.astype(
{
"id": str,
"name": str,
"hierarchy": str,
"parent": str,
"synonyms": object,
}
)
if include_synonyms:
df = df[["name", "synonyms"]].explode("synonyms")
return df.rename(columns={"name": "region", "synonyms": "synonym"})
return pd.Series(df["name"].unique(), name="region")
def _query_post(self, meta, default_only=True, **kwargs): # noqa: C901
def _get_kwarg(k):
# TODO refactor API to return all models if model-list is empty
x = kwargs.pop(k, "*" if k == "model" else [])
return [x] if is_str(x) else x
m_pattern = _get_kwarg("model")
s_pattern = _get_kwarg("scenario")
v_pattern = _get_kwarg("variable")
r_pattern = _get_kwarg("region")
def _match(data, patterns):
# this is empty, return empty list which means "everything"
if not patterns:
return []
# otherwise match everything
matches = np.array([False] * len(data))
for p in patterns:
matches |= pattern_match(data, p)
return data[matches].unique()
# drop non-default runs if only default is requested
if default_only and hasattr(meta, "is_default"):
meta = meta[meta.is_default]
# determine relevant run id's
meta = meta.reset_index()
models = _match(meta["model"], m_pattern)
scenarios = _match(meta["scenario"], s_pattern)
if len(models) == 0 and len(scenarios) == 0:
runs = []
else:
where = np.array([True] * len(meta))
if len(models) > 0:
where &= meta.model.isin(models)
if len(scenarios) > 0:
where &= meta.scenario.isin(scenarios)
runs = meta.run_id[where].unique().tolist()
# get unique other values
variables = _match(self.variables(), v_pattern)
# pass empty list to API if all variables selected
if len(variables) == len(self.variables()):
variables = []
regions = _match(self.regions(), r_pattern)
# pass empty list to API if all regions selected
if len(regions) == len(self.regions()):
regions = []
data = {
"filters": {
"regions": list(regions),
"variables": list(variables),
"runs": list(runs),
"years": [],
"units": [],
"timeslices": [],
}
}
return data
[docs]
def query(self, default_only=True, meta=True, **kwargs):
"""Query the connected resource for timeseries data (with filters)
Parameters
----------
default_only : bool, optional
If `True`, return *only* the default version of a model/scenario.
If `False`, return all versions.
meta : bool or list, optional
If :obj:`True`, merge all meta columns indicators
(or subset if list is given).
**kwargs
Available keyword arguments include
- model
- scenario
- region
- variable
Returns
-------
IamDataFrame
Examples
--------
You can read from a :class:`pyam.iiasa.Connection` instance using
keyword arguments similar to filtering an :class:`IamDataFrame`:
.. code-block:: python
Connection.query(
model="MESSAGE*",
scenario="SSP2*",
variable=["Emissions|CO2", "Primary Energy"],
)
"""
if "default" in kwargs:
default_only = _new_default_api(kwargs)
headers = self.auth().copy()
headers["Content-Type"] = "application/json"
# retrieve meta (with run ids) or only index
if meta:
_meta = self.meta(default_only=default_only, run_id=True)
# downselect to subset of meta columns if given as list
if is_list_like(meta):
# always merge 'version' (even if not requested explicitly)
# 'run_id' is required to determine `_args`, dropped later
_meta = _meta[list(set(meta).union(["version", "run_id"]))]
else:
_meta = self._query_index(default_only=default_only).set_index(META_IDX)
# return nothing if no data exists at all
if _meta.empty:
return
# retrieve data
_args = json.dumps(self._query_post(_meta, default_only=default_only, **kwargs))
url = "/".join([self._base_url, "runs/bulk/ts"])
r = requests.post(url, headers=headers, data=_args)
_check_response(r)
# refactor returned json object to be castable to an IamDataFrame
dtype = dict(
model=str,
scenario=str,
variable=str,
unit=str,
region=str,
year=int,
value=float,
version=int,
)
data = pd.read_json(StringIO(r.text), orient="records", dtype=dtype)
cols = IAMC_IDX + ["year", "value", "subannual", "version"]
# keep only known columns or init empty df
data = pd.DataFrame(data=data, columns=cols)
# check if timeseries data has subannual disaggregation, drop if not
if "subannual" in data:
timeslices = data.subannual.dropna().unique()
if all([i in [-1, "Year"] for i in timeslices]):
data.drop(columns="subannual", inplace=True)
# define the index for the IamDataFrame
if default_only:
index = META_IDX
data.drop(columns="version", inplace=True)
else:
index = META_IDX + ["version"]
logger.info(f"Initializing `IamDataFrame` with non-default index {index}")
# merge meta indicators (if requested) and cast to IamDataFrame
if meta:
# 'run_id' is necessary to retrieve data, not returned by default
if not (is_list_like(meta) and "run_id" in meta):
_meta.drop(columns="run_id", inplace=True)
return IamDataFrame(data, meta=_meta, index=index)
else:
return IamDataFrame(data, index=index)
def _new_default_api(kwargs):
# TODO: argument `default` is deprecated, remove this warning for release >= 3.0
raise DeprecationWarning(
"The argument `default` is deprecated, use `default_only` instead."
)
[docs]
def read_iiasa(name, default_only=True, meta=True, creds=None, **kwargs):
"""Read data from an |ixmp4| platform or an IIASA Scenario Explorer database.
Parameters
----------
name : str
| Name of an |ixmp4| platform or an IIASA Scenario Explorer database instance.
| Use :attr:`platforms <pyam.iiasa.platforms>` for a list of |ixmp4| platforms
hosted by IIASA.
| Use :attr:`valid_connections <pyam.iiasa.Connection.valid_connections>`
for a list of available Scenario Explorer database instances.
default_only : bool, optional
If `True`, return *only* the default version of a model/scenario.
If `False`, return all versions.
meta : bool or list of strings, optional
If `True`, include all meta categories & quantitative indicators
(or subset if list is given).
creds : str or :class:`pathlib.Path`, optional
Path to a file with authentication credentials. This feature is deprecated,
please run ``ixmp4 login <username>`` in a console instead.
**kwargs
Arguments for :meth:`pyam.read_ixmp4` or :meth:`pyam.iiasa.Connection.query`.
Notes
-----
Credentials (username & password) are not required to access any public |ixmp4|
or Scenario Explorer database (i.e., with Guest login).
"""
if name in [i.name for i in ixmp4.conf.settings.manager.list_platforms()]:
if meta is not True:
raise NotImplementedError(
"Reading from ixmp4 platforms requires `meta=True`"
)
return read_ixmp4(name, default_only=default_only, **kwargs)
return Connection(name, creds).query(default_only=default_only, meta=meta, **kwargs)
[docs]
def lazy_read_iiasa(file, name, default_only=True, meta=True, creds=None, **kwargs):
"""
Try to load data from a local cache, failing that, loads it from an IIASA database.
Check if the file in a given location is an up-to-date version of an IIASA
database. If so, load it. If not, load data from the IIASA scenario explorer
database API and save to that location. Does not check that the previously read
version is a complete instance of the database, so if the initial load applies a
filter, you will read only data that passes the same filter as well as any
additional filter you apply.
Parameters
----------
file : str or :class:`pathlib.Path`
The location to test for valid data and save the data if not up-to-date. Must be
either xlsx or csv.
name : str
| Name of an IIASA Scenario Explorer database instance.
| Use :attr:`valid_connections <pyam.iiasa.Connection.valid_connections>`
for a list of available instances.
default_only : bool, optional
If `True`, return *only* the default version of a model/scenario.
If `False`, return all versions.
meta : bool or list of strings, optional
If `True`, include all meta categories & quantitative indicators
(or subset if list is given).
creds : str or :class:`pathlib.Path`, optional
Path to a file with authentication credentials. This feature is deprecated,
please run ``ixmp4 login <username>`` in a console instead.
**kwargs
Arguments for :meth:`pyam.read_ixmp4` or :meth:`pyam.iiasa.Connection.query`.
Notes
-----
This feature does currently not support reading data from |ixmp4| platforms.
Credentials (username & password) are not required to access any public |ixmp4|
or Scenario Explorer database (i.e., with Guest login).
"""
if name in [
platform.name for platform in ixmp4.conf.settings.manager.list_platforms()
]:
raise NotImplementedError(
"The function `lazy_read_iiasa()` does not support ixmp4 platforms."
)
file = Path(file)
assert file.suffix in [
".csv",
".xlsx",
], "We will only read and write to csv and xlsx format."
if os.path.exists(file):
date_set = pd.to_datetime(os.path.getmtime(file), unit="s")
version_info = Connection(name, creds).properties()
latest_new = np.nanmax(pd.to_datetime(version_info["create_date"]))
latest_update = np.nanmax(pd.to_datetime(version_info["update_date"]))
latest = pd.Series([latest_new, latest_update]).max()
if latest < date_set:
old_read = IamDataFrame(file)
if kwargs:
old_read = old_read.filter(**kwargs)
logger.info("Database read from file")
return old_read
else:
logger.info("Database out of date and will be re-downloaded")
# If we get here, we need to redownload the database
new_read = read_iiasa(
name,
meta=meta,
default_only=default_only,
creds=creds,
**kwargs,
)
Path(file).parent.mkdir(parents=True, exist_ok=True)
if file.suffix == ".csv":
new_read.to_csv(file)
else:
new_read.to_excel(file)
return new_read