from pathlib import Path
import json
import logging
import requests
import yaml
from functools import lru_cache
import numpy as np
import pandas as pd
from collections.abc import Mapping
from pyam.core import IamDataFrame
from pyam.utils import META_IDX, IAMC_IDX, isstr, pattern_match
from pyam.logging import deprecation_warning
logger = logging.getLogger(__name__)
# set requests-logger to WARNING only
logging.getLogger('requests').setLevel(logging.WARNING)
_AUTH_URL = 'https://db1.ene.iiasa.ac.at/EneAuth/config/v1'
_CITE_MSG = """
You are connected to the {} scenario explorer hosted by IIASA.
If you use this data in any published format, please cite the
data as provided in the explorer guidelines: {}
""".replace('\n', '')
# path to local configuration settings
DEFAULT_IIASA_CREDS = Path('~').expanduser() / '.local' / 'pyam' / 'iiasa.yaml'
[docs]def set_config(user, password, file=None):
"""Save username and password for the IIASA API connection to a file"""
file = Path(file) if file is not None else DEFAULT_IIASA_CREDS
if not file.parent.exists():
file.parent.mkdir(parents=True)
with open(file, mode='w') as f:
logger.info(f'Setting IIASA-connection configuration file: {file}')
yaml.dump(dict(username=user, password=password), f)
def _get_config(file=None):
"""Read username and password for IIASA API connection from file"""
file = Path(file) if file is not None else DEFAULT_IIASA_CREDS
if file.exists():
with open(file, 'r') as stream:
return yaml.safe_load(stream)
def _check_response(r, msg='Trouble with request', error=RuntimeError):
if not r.ok:
raise error('{}: {}'.format(msg, str(r.text)))
def _get_token(creds, base_url):
"""Parse credentials and get token from IIASA authentication service"""
plaintextcreds = True
# try reading default config or parse file
if creds is None:
creds = _get_config()
plaintextcreds = False
elif isinstance(creds, Path) or isstr(creds):
_creds = _get_config(creds)
if _creds is None:
logger.error(f'Could not read credentials from `{creds}`')
creds = _creds
plaintextcreds = False
# if (still) no creds, get anonymous auth and return
if creds is None:
url = '/'.join([base_url, 'anonym'])
r = requests.get(url)
_check_response(r, 'Could not get anonymous token')
return r.json(), None
# parse creds, write warning
if isinstance(creds, Mapping):
user, pw = creds['username'], creds['password']
else:
user, pw = creds
if plaintextcreds:
logger.warning('You provided credentials in plain text. DO NOT save '
'these in a repository or otherwise post them online')
deprecation_warning('Please use `pyam.iiasa.set_config(<user>, <pwd>)`'
' to store your credentials in a file!',
'Providing credentials in plain text')
# get user token
headers = {'Accept': 'application/json',
'Content-Type': 'application/json'}
data = {'username': user, 'password': pw}
url = '/'.join([base_url, 'login'])
r = requests.post(url, headers=headers, data=json.dumps(data))
_check_response(r, 'Login failed for user: {}'.format(user))
return r.json(), user
[docs]class Connection(object):
"""A class to facilitate querying an IIASA Scenario Explorer database API
Parameters
----------
name : str, optional
The name of a database API.
See :attr:`pyam.iiasa.Connection.valid_connections` for a list
of available APIs.
creds : str, :class:`pathlib.Path`, list-like, or dict, optional
By default, the class will search for user credentials which
were set using :meth:`pyam.iiasa.set_config`.
Alternatively, you can provide a path to a yaml file
with entries of 'username' and 'password'.
base_url : str, optional
custom authentication server URL
Notes
-----
Credentials (username & password) are not required to access any public
Scenario Explorer instances (i.e., with Guest login).
Providing credentials as an ordered container (tuple, list, etc.)
or as a dictionary with keys `user` and `password` is (still) supported
for backwards compatibility. However, this option is NOT RECOMMENDED
and will be deprecated in future releases of pyam.
"""
def __init__(self, name=None, creds=None, auth_url=_AUTH_URL):
self._auth_url = auth_url
self._token, self._user = _get_token(creds, base_url=self._auth_url)
# connect if provided a name
self._connected = None
if name:
self.connect(name)
if self._user:
logger.info(f'You are connected as user `{self._user}`')
else:
logger.info('You are connected as an anonymous user')
@property
@lru_cache()
def _connection_map(self):
url = '/'.join([self._auth_url, 'applications'])
headers = {'Authorization': 'Bearer {}'.format(self._token)}
r = requests.get(url, headers=headers)
_check_response(r, 'Could not get valid connection list')
aliases = set()
conn_map = {}
for x in r.json():
if 'config' in x:
env = next((r['value'] for r in x['config']
if r['path'] == 'env'), None)
name = x['name']
if env is not None:
if env in aliases:
logger.warning('Duplicate instance alias {}'
.format(env))
conn_map[name] = name
first_duplicate = conn_map.pop(env)
conn_map[first_duplicate] = first_duplicate
else:
conn_map[env] = name
aliases.add(env)
else:
conn_map[name] = name
return conn_map
@property
@lru_cache()
def valid_connections(self):
"""Return available resources (database API connections)"""
return list(self._connection_map.keys())
[docs] def connect(self, name):
"""Connect to a specific resource (database API)"""
if name in self._connection_map:
name = self._connection_map[name]
valid = self._connection_map.values()
if len(valid) == 0:
raise RuntimeError(
'No valid connections found for the provided credentials.'
)
if name not in valid:
msg = """
{} not recognized as a valid connection name.
Choose from one of the supported connections for your user: {}.
"""
raise ValueError(msg.format(name, self._connection_map.keys()))
url = '/'.join([self._auth_url, 'applications', name, 'config'])
headers = {'Authorization': 'Bearer {}'.format(self._token)}
r = requests.get(url, headers=headers)
_check_response(r, 'Could not get application information')
response = r.json()
idxs = {x['path']: i for i, x in enumerate(response)}
self._auth_url = response[idxs['baseUrl']]['value']
# TODO: proper citation (as metadata) instead of link to the about page
if 'uiUrl' in idxs:
about = '/'.join([response[idxs['uiUrl']]['value'], '#', 'about'])
logger.info(_CITE_MSG.format(name, about))
# TODO: use API "nice-name"
self._connected = name
@property
def current_connection(self):
"""Currently connected resource (database API connection)"""
return self._connected
[docs] def index(self, default=True):
"""Return the index of models and scenarios in the connected resource
Parameters
----------
default : bool, optional
If `True`, return *only* the default version of a model/scenario.
Any model/scenario without a default version is omitted.
If `False`, returns all versions.
"""
cols = ['version'] if default else ['version', 'is_default']
return self._query_index(default)[META_IDX + cols].set_index(META_IDX)
@lru_cache()
def _query_index(self, default=True):
# TODO merge this function with `meta()`
default = 'true' if default else 'false'
add_url = 'runs?getOnlyDefaultRuns={}'
url = '/'.join([self._auth_url, add_url.format(default)])
headers = {'Authorization': 'Bearer {}'.format(self._token)}
r = requests.get(url, headers=headers)
_check_response(r, 'Could not retrieve the resource index')
return pd.read_json(r.content, orient='records')
@property
@lru_cache()
def meta_columns(self):
"""Return the list of meta indicators in the connected resource"""
url = '/'.join([self._auth_url, 'metadata/types'])
headers = {'Authorization': 'Bearer {}'.format(self._token)}
r = requests.get(url, headers=headers)
_check_response(r)
return pd.read_json(r.content, orient='records')['name']
[docs] def models(self):
"""List all models in the connected resource"""
return pd.Series(self._query_index()['model'].unique(),
name='model')
[docs] def scenarios(self):
"""List all scenarios in the connected resource"""
return pd.Series(self._query_index()['scenario'].unique(),
name='scenario')
[docs] @lru_cache()
def variables(self):
"""List all variables in the connected resource"""
url = '/'.join([self._auth_url, 'ts'])
headers = {'Authorization': 'Bearer {}'.format(self._token)}
r = requests.get(url, headers=headers)
_check_response(r)
df = pd.read_json(r.content, orient='records')
return pd.Series(df['variable'].unique(), name='variable')
[docs] @lru_cache()
def regions(self, include_synonyms=False):
"""List all regions in the connected resource
Parameters
----------
include_synonyms : bool
whether to include synonyms
(possibly leading to duplicate region names for
regions with more than one synonym)
"""
url = '/'.join([self._auth_url, 'nodes?hierarchy=%2A'])
headers = {'Authorization': 'Bearer {}'.format(self._token)}
params = {'includeSynonyms': include_synonyms}
r = requests.get(url, headers=headers, params=params)
_check_response(r)
return self.convert_regions_payload(r.content, include_synonyms)
@staticmethod
def convert_regions_payload(response, include_synonyms):
df = pd.read_json(response, orient='records')
if df.empty:
return df
if 'synonyms' not in df.columns:
df['synonyms'] = [list()] * len(df)
df = df.astype({
'id': str,
'name': str,
'hierarchy': str,
'parent': str,
'synonyms': object
})
if include_synonyms:
df = df[['name', 'synonyms']].explode('synonyms')
return df.rename(columns={'name': 'region', 'synonyms': 'synonym'})
return pd.Series(df['name'].unique(), name='region')
def _query_post_data(self, **kwargs):
def _get_kwarg(k):
# TODO refactor API to return all models if model-list is empty
x = kwargs.pop(k, '*' if k == 'model' else [])
return [x] if isstr(x) else x
m_pattern = _get_kwarg('model')
s_pattern = _get_kwarg('scenario')
v_pattern = _get_kwarg('variable')
r_pattern = _get_kwarg('region')
def _match(data, patterns):
# this is empty, return empty list which means "everything"
if not patterns:
return []
# otherwise match everything
matches = np.array([False] * len(data))
for p in patterns:
matches |= pattern_match(data, p)
return data[matches].unique()
# get unique run ids
meta = self._query_index()
meta = meta[meta.is_default]
models = _match(meta['model'], m_pattern)
scenarios = _match(meta['scenario'], s_pattern)
if len(models) == 0 and len(scenarios) == 0:
runs = []
else:
where = np.array([True] * len(meta))
if len(models) > 0:
where &= meta.model.isin(models)
if len(scenarios) > 0:
where &= meta.scenario.isin(scenarios)
runs = meta.run_id[where].unique().tolist()
# get unique other values
variables = _match(self.variables(), v_pattern)
# pass empty list to API if all variables selected
if len(variables) == len(self.variables()):
variables = []
regions = _match(self.regions(), r_pattern)
# pass empty list to API if all regions selected
if len(regions) == len(self.regions()):
regions = []
logger.debug(f"Prepared filter for {len(regions)} region(s), "
f"{len(variables)} variables and {len(runs)} runs")
data = {
"filters": {
"regions": list(regions),
"variables": list(variables),
"runs": list(runs),
"years": [],
"units": [],
"timeslices": []
}
}
return data
[docs] def query(self, default=True, meta=True, **kwargs):
"""Query the connected resource for timeseries data (with filters)
Parameters
----------
default : bool, optional
Return *only* the default version of each scenario.
Any (`model`, `scenario`) without a default version is omitted.
If :obj:`False`, return all versions.
meta : bool or list, optional
If :obj:`True`, merge all meta columns indicators
(or subset if list is given).
kwargs
Available keyword arguments include
- model
- scenario
- region
- variable
Returns
-------
IamDataFrame
Examples
--------
You can read from a :class:`pyam.iiasa.Connection` instance using
keyword arguments similar to filtering an :class:`IamDataFrame`:
.. code-block:: python
Connection.query(model='MESSAGE*', scenario='SSP2*',
variable=['Emissions|CO2', 'Primary Energy'])
"""
# TODO: API returns timeseries data for non-default versions
if default is not True:
msg = 'Querying for non-default scenarios is not (yet) supported'
raise ValueError(msg)
# retrieve data
headers = {
'Authorization': 'Bearer {}'.format(self._token),
'Content-Type': 'application/json',
}
_args = json.dumps(self._query_post_data(**kwargs))
url = '/'.join([self._auth_url, 'runs/bulk/ts'])
logger.debug(f'Query timeseries data from {url} with data {_args}')
r = requests.post(url, headers=headers, data=_args)
_check_response(r)
# refactor returned json object to be castable to an IamDataFrame
dtype = dict(model=str, scenario=str, variable=str, unit=str,
region=str, year=int, value=float, version=int)
data = pd.read_json(r.content, orient='records', dtype=dtype)
logger.debug(f'Response: {len(r.content)} bytes, {len(data)} records')
cols = IAMC_IDX + ['year', 'value', 'subannual', 'version']
# keep only known columns or init empty df
data = pd.DataFrame(data=data, columns=cols)
# check if timeseries data has subannual disaggregation, drop if not
if 'subannual' in data:
timeslices = data.subannual.dropna().unique()
if all([i in [-1, 'Year'] for i in timeslices]):
data.drop(columns='subannual', inplace=True)
# check if there are multiple version for any model/scenario
lst = (
data[META_IDX + ['version']].drop_duplicates()
.groupby(META_IDX).count().version
)
# checking if there are multiple versions
# for every model/scenario combination
# TODO this is probably not necessary
if len(lst) > 1 and max(lst) > 1:
raise ValueError('multiple versions for {}'.format(
lst[lst > 1].index.to_list()))
data.drop(columns='version', inplace=True)
# cast to IamDataFrame
df = IamDataFrame(data)
# merge meta categorization and quantitative indications
if meta:
_meta = self.meta().loc[df.meta.index]
for i in _meta.columns if meta is True else meta + ['version']:
df.set_meta(_meta[i])
return IamDataFrame(df)
[docs]def read_iiasa(name, default=True, meta=True, creds=None, base_url=_AUTH_URL,
**kwargs):
"""Query an IIASA Scenario Explorer database API and return as IamDataFrame
Parameters
----------
name : str
A valid name of an IIASA scenario explorer instance,
see :attr:`pyam.iiasa.Connection.valid_connections`
default : bool, optional
Return *only* the default version of each scenario.
Any (`model`, `scenario`) without a default version is omitted.
If :obj:`False`, return all versions.
meta : bool or list of strings, optional
If `True`, include all meta categories & quantitative indicators
(or subset if list is given).
creds : str, :class:`pathlib.Path`, list-like, or dict, optional
| Credentials (username & password) are not required to access
any public Scenario Explorer instances (i.e., with Guest login).
| See :class:`pyam.iiasa.Connection` for details.
| Use :meth:`pyam.iiasa.set_config` to set credentials
for accessing private/restricted Scenario Explorer instances.
base_url : str
Authentication server URL
kwargs
Arguments for :meth:`pyam.iiasa.Connection.query`
"""
return Connection(name, creds, base_url)\
.query(default=default, meta=meta, **kwargs)