# !usr/bin/env python2
# -*- coding: utf-8 -*-
#
# Licensed under a 3-clause BSD license.
#
# @Author: Brian Cherinka
# @Date: 2016-10-11 13:24:56
# @Last modified by: Brian Cherinka
# @Last Modified time: 2018-06-30 14:01:05
from __future__ import absolute_import, division, print_function
import os
import sys
import glob
import re
from collections import OrderedDict
import six
import json
import datetime
from tree import log, config as cfg_params
try:
import rapidfuzz
except ImportError:
rapidfuzz = None
if ((sys.version_info.major == 3 and sys.version_info.minor > 2) or
(sys.version_info.major == 2 and sys.version_info.minor >= 7)):
from configparser import ConfigParser as SafeConfigParser
else:
from configparser import SafeConfigParser
orig_environ = os.environ.copy()
[docs]
class Tree(object):
''' Initialize the sdss tree object
This class provides Python programmatic access to the SDSS tree envionment structure
Parameters
----------
key : str | list
A section or list of sections of the tree to add into the local environment
uproot_with : str
A new TREE_DIR path used to override an existing TREE_DIR environment variable
config : str
Name of manual config file to load. Default is sdsswork.
update : bool
If True, overwrites existing tree environment variables in your
local environment. Default is False.
exclude : list
A list of environment variables to exclude
from forced updates
root : str
An absolute directory path to override as the software product root
git : bool
If True, looks for SDSS_GIT_ROOT environment variable as product root instead
of SDSS_SVN_ROOT
Attributes
----------
treedir : str
The directory of the tree
sasbasedir : str
The root directory of SAS
productroot_dir : str
The root directory of installed software products from svn or github
environ : dict
All of the environment paths defnined in the currntly loaded SDSS configuration file
paths : dict
All of the sdss_access paths defined in the currently loaded configuration
phase : int
Which SDSS phase the currently loaded config belongs to
'''
# possible software product roots
_product_roots = ['PRODUCT_ROOT', 'SDSS_GIT_ROOT', 'SDSS_SVN_ROOT', 'SDSS_INSTALL_PRODUCT_ROOT',
'SDSS_PRODUCT_ROOT', 'SDSS4_PRODUCT_ROOT']
def __init__(self, config=None, key=None, uproot_with=None, update=None, exclude=None,
product_root=None, git=None):
self.config_name = config or os.getenv('TREE_VER', 'sdsswork')
self.release = self.get_release_from_config()
self.exclude = exclude or []
update = update or False
self._keys = key
self._file_replace = '@FILESYSTEM@'
# set the roots
self.set_roots(uproot_with=uproot_with)
# load the configuraiton file
self.load_config()
# create the environment
self.branch_out(limb=key)
# add the paths to the os.environ
if key is not None:
self.add_paths_to_os(key='general', update=update)
self.add_paths_to_os(key=key, update=update)
# set the software product root, $PRODUCT_ROOT envvar
self.productroot_dir = None
self.set_product_root(root=product_root, git=git)
def __repr__(self):
return ('Tree(sas_base_dir={0}, config={1}, release={2})'.format(self.sasbasedir, self.config_name, self.release))
@property
def phase(self):
''' Return the phase of the survey from the loaded "work" environment '''
phase = self.environ['default'].get('phase', None)
if phase and phase.isdigit():
phase = int(phase)
return phase
@property
def release_date(self):
''' Return the release date of the tree configuration '''
rd = self.environ['default'].get('release_date', 'None')
if rd != 'None':
return datetime.date.fromisoformat(rd)
[docs]
def list_keys(self):
''' List the available keys you can load '''
return [k for k in self.environ.keys() if k not in ['general', 'default']]
[docs]
def set_roots(self, uproot_with=None):
''' Set the roots of the tree in the os environment
Parameters
----------
uproot_with : str
A new TREE_DIR path used to override an existing TREE_DIR environment variable
'''
# Check for TREE_DIR
self.treedir = get_tree_dir(uproot_with=uproot_with)
# Check sas_base_dir
if 'SAS_BASE_DIR' in os.environ:
self.sasbasedir = os.environ["SAS_BASE_DIR"]
else:
self.sasbasedir = os.path.expanduser('~/sas')
# make the directories
if not os.path.isdir(self.sasbasedir):
os.makedirs(self.sasbasedir)
def _read_config(self, config=None, bases=None):
''' Read a config file
Uses ConfigParser to read in a cfg file. If a base
is identified, then recursively reads in all cfg files and builds
a master config dictionary object
Parameters
----------
config : str
Optional name of config file to load
bases : list
A list of parent config files
Returns
-------
dict
A configParser dictionary object
'''
# format name and file
config_name = config if config.endswith('.cfg') else '{0}.cfg'.format(config)
config_file = os.path.join(self.treedir, 'data', config_name)
assert os.path.isfile(
config_file) is True, 'config file {0} must exist in the data directory'.format(config_file)
# read initial config file
cfg = SafeConfigParser()
cfg.optionxform = lambda option: option
cfg.read(config_file)
# check for any bases
bases = bases if bases else []
bases.insert(0, os.path.join(self.treedir, 'data', config_name))
hasbase = 'base' in cfg.defaults()
# read base config file
if hasbase:
return self._read_config(cfg.defaults()['base'], bases=bases)
else:
# read in the full list of config files
cfg = SafeConfigParser()
cfg.optionxform = lambda option: option
cfg.read(bases)
# if both eboss and boss, then remove boss
if 'EBOSS' in cfg.sections() and 'BOSS' in cfg.sections():
cfg.remove_section('BOSS')
return cfg
def _check_config(self, config=None):
''' check the config
checks the config for syntax and existence. Defaults to sdsswork
or a DR if it doesn't exist.
Parameters
----------
config : str
The name of the config to check
Returns
-------
str
The (updated) name of the config
'''
# check initial argument
cfgname = (config or self.config_name)
cfgname = 'sdsswork' if cfgname is None else cfgname
assert isinstance(cfgname, six.string_types), 'config name must be a string'
cfgname = cfgname.lower()
config_name = cfgname if cfgname.endswith('.cfg') else '{0}.cfg'.format(cfgname)
# default to another config if not available
file_base = [os.path.basename(f) for f in self._cfg_files]
if config_name not in file_base:
if 'dr' in config_name:
drint = max(map(int, [re.findall(r'\d{1,2}', f)[0] for f in file_base if 'dr' in f]))
log.warning('{0} not found. Defaulting to dr{1}.cfg'.format(config_name, drint))
self.config_name = 'dr{0}'.format(drint)
else:
log.warning('{0} not found. Defaulting to sdsswork'.format(config_name))
self.config_name = 'sdsswork'
config_name = self.config_name + '.cfg'
return config_name
[docs]
def load_config(self, config=None):
''' Load a config file
Parameters
----------
config : str
Optional name of manual config file to load
'''
# get a list of all config files
self._cfg_files = glob.glob(os.path.join(self.treedir, 'data', '*.cfg'))
self._cfg_files.sort()
# check the config file
config_name = self._check_config(config)
# set the confifile
configfile = os.path.join(self.treedir, 'data', config_name)
assert os.path.isfile(configfile) is True, ('configfile {0} must exist in the '
'data directory'.format(configfile))
self.config_file = configfile
# read the config
self._cfg = self._read_config(config=self.config_name)
def _create_environment(self, cfg=None, sections=None):
''' create the environment from the config
Creates a dictionary with environment definitions
expanded out
Parameters
----------
config : str
Optional name of manual config file to load
sections : list
A list of config sections to load
Returns
-------
dict
An ordered dictionary of envvar definitions
'''
# pass in a cfg dict or use the one attached
cfg = cfg or self._cfg
# create the local tree environment
environ = OrderedDict()
environ['default'] = cfg.defaults()
# set the filesystem envvar to sas_base_dir
filesystem = 'FILESYSTEM' if 'FILESYSTEM' in environ[
'default'] else 'filesystem' if 'filesystem' in environ['default'] else None
if environ['default'][filesystem] == self._file_replace:
environ['default'][filesystem] = self.sasbasedir
# add all sections into the tree environ
sections = sections if sections else cfg.sections()
for section in sections:
# skip if PATHS
if section == 'PATHS':
continue
section = section if section in cfg.sections() else section.upper()
environ[section] = OrderedDict()
options = cfg.options(section)
for opt in options:
if opt in environ['default']:
continue
val = cfg.get(section, opt)
if val.find(self._file_replace) == 0:
val = val.replace(self._file_replace, self.sasbasedir)
environ[section][opt] = val
return environ
def _create_paths(self, cfg=None):
''' create a dictionary of path definitions
Extracts the PATHS section from a ConfigParser object
and builds a dictionary of paths for sdss_access
Parameters
----------
cfg : object
A configParser object
Returns
-------
dict
An ordered dictionary of sdss_access path definitions
'''
# pass in a cfg dict or use the one attached
cfg = cfg or self._cfg
# return if no PATHS found
if not cfg.has_section('PATHS'):
return None
paths = OrderedDict()
for opt in cfg.options('PATHS'):
if opt in cfg.defaults():
continue
paths[opt] = cfg.get('PATHS', opt)
# sort the paths by name
paths = OrderedDict({k: paths[k] for k in sorted(paths.keys())})
return paths
[docs]
def branch_out(self, limb=None):
''' Set the individual section branches
This adds the various sections of the config file into the
tree environment for access later. Optionally can specify a specific
branch. This does not yet load them into the os environment.
Parameters
----------
limb : str | list
A section or lists of sections of the config to add into the environ
'''
# Filter on sections
if not limb:
limbs = self._cfg.sections()
else:
# we must have the general always + secton
limb = limb if isinstance(limb, list) else [limb]
limbs = ['general']
limbs.extend(limb)
# add all limbs into the tree environ
self.environ = self._create_environment(sections=limbs)
# add all paths into the tree paths dictionary
self.paths = self._create_paths()
[docs]
def add_limbs(self, key=None):
''' Add a new section from the tree into the existing os environment
Parameters
----------
key : str
The section name to grab from the environment
'''
self.branch_out(limb=key)
self.add_paths_to_os(key=key)
[docs]
def get_paths(self, key):
''' Retrieve a set of environment paths from the config
Parameters
----------
key : str
The section name to grab from the environment
Returns
-------
dict
An ordered dict containing all of the paths from the
specified section, as key:val = name:path
'''
newkey = key if key in self.environ else key.upper() if key.upper() \
in self.environ else None
if newkey:
return self.environ[newkey]
else:
raise KeyError('Key {0} not found in tree environment'.format(key))
[docs]
def add_paths_to_os(self, key=None, update=None):
''' Add the paths in tree environ into the os environ
This code goes through the tree environ and checks
for existence in the os environ, then adds them
Parameters
----------
key : str
The section name to check against / add
update : bool
If True, overwrites existing tree environment variables in your
local environment. Default is False.
'''
if key is not None:
allpaths = key if isinstance(key, list) else [key]
else:
allpaths = [k for k in self.environ.keys() if 'default' not in k]
for key in allpaths:
paths = self.get_paths(key)
self._check_paths(paths, update=update)
def _check_paths(self, paths, update=None):
''' Check if the path is in the os environ, and if not add it
Paramters
---------
paths : dict
An ordered dict containing all of the paths from the
a given section, as key:val = name:path
update : bool
If True, overwrites existing tree environment variables in your
local environment. Default is False.
'''
# set up the exclusion list
exclude = [] if not self.exclude else self.exclude \
if isinstance(self.exclude, list) else [self.exclude]
# check the path names
for pathname, path in paths.items():
if update and pathname.upper() not in exclude:
os.environ[pathname.upper()] = os.path.normpath(path)
elif pathname.upper() not in os.environ:
os.environ[pathname.upper()] = os.path.normpath(path)
[docs]
def replant_tree(self, config=None, exclude=None, preserve_envvars=None):
""" Replant the tree with a different config setup
Resets the python tree with the new config. Automatically updates the session
os.environ with the new tree config environment variables. If ``preserve_envvars``
is set to True, preserves the original os environ during tree update. If
``preserve_envvars`` is set to a list of environment variables, preserves only that
subset.
Parameters
----------
config : str
The config name to reload
exclude : list
A list of environment variables to exclude from forced updates
preserve_envvars : bool | list
Flag to indicate some or all original environment variables to preserve
"""
# reinitialize a new Tree with a new config and
# automatically overwrite any existing envvars
config = 'sdsswork' if not config else config
self.__init__(key=self._keys, config=config, update=True, exclude=exclude)
# look for any preserved envvars from the configuration file
if not preserve_envvars:
preserve_envvars = self._get_preserved_envvars()
orig = self.get_orig_os_environ()
if preserve_envvars is True:
# preserve the entire original os environ
os.environ.update(orig)
elif isinstance(preserve_envvars, (list, tuple)):
# preserve just a subset of envvars
for envvar in preserve_envvars:
if envvar in orig:
os.environ[envvar] = orig[envvar]
def _get_preserved_envvars(self):
""" Retrieve any list of environment variables to preserve from the tree.yml config file """
preserved = cfg_params.get('preserve_envvars', None)
return preserved
[docs]
def list_configs(self):
''' List available configs to load '''
return self.list_available_configs()
[docs]
def show_forest(self, config=None):
''' Show the environment for a specified config
Creates a dictionary environment for each config in the list of
available configurations.
Parameters
----------
config : str
The config to show
Returns
-------
dict
A dictionary of config environment(s)
'''
configs = [config] if config else self.list_configs()
cfgs = {}
for cfg in configs:
cfg_name = cfg.split('.cfg')[0]
cfg = self._read_config(cfg_name)
cfgs[cfg_name] = self._create_environment(cfg)
if len(configs) == 1:
return cfgs[cfg_name]
return cfgs
@staticmethod
def _sort_configs(release: str = 'dr', cfgs: list = None) -> list:
""" Sort the list of configs by a release group
Sorts the list of configs by a release group, e.g.
'DR', 'MPL', or "IPL". Within each group sorts by the integer
number of the release.
Parameters
----------
release : str, optional
The release group name, by default 'dr'
cfgs : list, optional
The input list of configs, by default None
Returns
-------
list
A sorted list of configs
Raises
------
ValueError
when the release group is not recognized
"""
release = release.lower()
if release not in ['dr', 'mpl', 'ipl']:
raise ValueError(f'{release} is not a valid release group')
# sort the release subset of config files
relsort = sorted([i for i in cfgs if release in i],
key=lambda t: int(re.findall(f'{release}(.*?).cfg', t)[0]))
rest = [[i] for i in cfgs if release not in i]
rest.insert(1, relsort)
return sum(rest, [])
[docs]
@staticmethod
def list_available_configs():
''' List the available config files able to be loaded '''
# get tree dir
treedir = get_tree_dir()
# look up the config files from the data directory
data_path = os.path.join(treedir, 'data')
cfgs = [i for i in os.listdir(data_path) if i.endswith('.cfg') and 'basework' not in i]
cfgs.sort()
# sort each release subset of config files
sorted_cfgs = Tree._sort_configs(release='dr', cfgs=cfgs)
sorted_cfgs = Tree._sort_configs(release='mpl', cfgs=sorted_cfgs)
sorted_cfgs = Tree._sort_configs(release='ipl', cfgs=sorted_cfgs)
return sorted_cfgs
[docs]
@classmethod
def get_available_releases(cls, public=None):
''' Get the available releases
Parameters
----------
public : bool
If True, only return public data releases
'''
# get the configs
cfgs = cls.list_available_configs()
# parse the data releases
releases = []
for i in cfgs:
if public and not i.startswith('dr'):
continue
b = i.split('.cfg', 1)[0]
if 'dr' in b or 'mpl' in b or 'ipl' in b:
# uppercase any DR or MPLs
releases.append(b.upper())
elif 'work' in b and 'WORK' not in releases:
# reduce alll xxxxwork cfgs to a single "work" release
releases.append('WORK')
return releases
[docs]
def get_release_from_config(self) -> str:
""" Get a release name from a config
Convert a config name into its valid release name. All
"work" config, i.e. "sdsswork", belong to the "WORK" release.
Returns
-------
str
the release name
"""
release = self.config_name.upper()
if 'work' in self.config_name or self.config_name == 'sdss5':
release = 'WORK'
return release
[docs]
@staticmethod
def reset_os_environ():
''' Resets os.environ with the orignal cache before tree mods '''
os.environ = orig_environ
[docs]
@staticmethod
def get_orig_os_environ():
''' Returns the original os.environ '''
return orig_environ.copy()
[docs]
def to_dict(self, collapse=True):
''' Convert tree environment to standard dicts
Converts the nested ``tree.environ`` into a series of
ordinary dicts.
Parameters
----------
collapse : bool
If True, collapses nested dicts into a single dict. Default is True.
'''
if collapse is False:
dd = json.loads(json.dumps(self.environ))
dd.pop('default')
return dd
dd = {}
for k, v in self.environ.items():
if k == 'default':
continue
for kk, vv in v.items():
dd[kk] = vv
return dd
[docs]
@staticmethod
def get_product_root(root=None, git=None):
''' Get the sdss product root used for svn/git products
Attempts to extract the root directory for SDSS-installed git/svn products.
Uses the following environment variables in order of precendence:
PRODUCT_ROOT, SDSS_SVN_ROOT, SDSS_INSTALL_PRODUCT_ROOT, SDSS_PRODUCT_ROOT,
SDSS4_PRODUCT_ROOT. If no root is found uses one directory up from SAS_BASE_DIR.
Parameters
----------
root : str
An absolute directory path to override as the product root
git : bool
If True, looks for SDSS_GIT_ROOT environment variable as product root.
Returns
-------
dict
The directory path to sdss-installed svn/git products
'''
# override with an input root directory
if root:
return root
# use an existing $PRODUCT_ROOT envvar
product_root = os.getenv("PRODUCT_ROOT", None)
if product_root:
return product_root
# check in the config for a product root definition
product_root = cfg_params.get("PRODUCT_ROOT", None)
if product_root:
return product_root
# attempt to extract a product root from a variety of environment variables
repo_root = 'SDSS_GIT_ROOT' if git else 'SDSS_SVN_ROOT'
product_root = os.getenv(repo_root, os.getenv(
"SDSS_INSTALL_PRODUCT_ROOT", os.getenv("SDSS4_PRODUCT_ROOT", None)))
if not product_root:
product_root = os.getenv("SAS_BASE_DIR").rsplit('/', 1)[0]
return product_root
[docs]
def set_product_root(self, root=None, git=None):
''' Sets the sdss product root used for svn/git products
Sets the root directory for SDSS-installed git/svn products as
the $PRODUCT_ROOT environment variable. Attempts to find a viable $PRODUCT_ROOT
using ``get_product_method``. Viable product roots in order of precendence:
SDSS_SVN_ROOT, SDSS_INSTALL_PRODUCT_ROOT, SDSS_PRODUCT_ROOT, SDSS4_PRODUCT_ROOT.
If no root is found uses one directory up from SAS_BASE_DIR.
Parameters
----------
root : str
An absolute directory path to override as the product root
git : bool
If True, looks for SDSS_GIT_ROOT environment variable as product root.
'''
product_root = self.get_product_root(root=root, git=git)
os.environ['PRODUCT_ROOT'] = product_root
self.productroot_dir = product_root
[docs]
def write_old_paths_inifile(self, no_pipe=None):
''' Write out an old sdss_paths ini file
New syntax for special functions is "@[name]|" compared
to old syntax of "%[name]". With no_pipe set to True,
converts the new syntax to exactly the old syntax. If set to
False, will write special functions as "%[name]|".
Parameters
----------
no_pipe : bool
If True, removes the special function | character
'''
paths_dir = os.path.join(self.treedir, 'data/sdss_paths.ini')
with open(paths_dir, 'w') as f:
f.write('# Paths for SDSS files. Each file type is given a template full path.\n')
f.write('# \n')
f.write('# This file has been deprecated as of tree/3.0.0 and sdss_access/1.0.0 \n')
f.write('# It should not be updated manually. Use the tree.write_olds_paths method in \n')
f.write('# the python tree code to generate a new version of this file as needed.\n')
f.write('# \n')
f.write('\n')
f.write("[paths]\n")
for name, template in self.paths.items():
# switch special functions back to %
if no_pipe:
template = template.replace('@', '%').replace('|', '')
else:
template = template.replace('@', '%')
# write out path, template
f.write('{0} = {1}\n'.format(name, template))
[docs]
def check_missing_path_envvars(self):
""" Checks paths envars against main envvar list
Extracts the used environment variables from all the
sdss_access path definitions in tree.paths, and checks them
against the list of defined environment variables in tree.environ.
Returns a list of any path environment variables that are
missing from the valid definitions in the environment.
Returns
-------
list
A list of missing environment variables
"""
# get the list of environment variables
envvars = self.to_dict().keys()
# get the list of access paths and extract envvars
paths = self.paths.values()
path_envvars = set(re.findall(r'\$(.*?)\/', '\t'.join(paths)))
return [i for i in path_envvars if i not in envvars]
[docs]
def identify_envvar(self, file):
""" Identifies the environment variable used in a file path """
env = None
for env, val in reversed(list(self.to_dict().items())):
if val in file:
return env
[docs]
def identify_section(self, envvar, guess=False):
""" Identifies the tree ini section from an environment variable """
sec = None
# looks if envvar exactly in list of sections
for tsec, envs in reversed(sorted(self.environ.items())):
if envvar in envs:
sec = tsec
break
# guess based on if envvar starts with section name
if guess and envvar.startswith(tsec):
sec = tsec
break
# return section name if found
if sec:
return sec
# if still no identified or guessed section, guess with rapidfuzz
if guess:
if rapidfuzz:
envlist = sorted(self.to_dict().keys())
sec_guess = rapidfuzz.process.extractOne(envvar, envlist, score_cutoff=70)
if sec_guess:
return self.identify_section(sec_guess[0])
else:
log.warning('rapidfuzz package is not installed. Cannot make a guess.')
return sec
[docs]
def get_tree_dir(uproot_with=None):
''' Return the path to the tree product directory
Parameters
----------
uproot_with : str
A new TREE_DIR path used to override an existing TREE_DIR environment variable
Returns
-------
str
The path to the tree python product directory
'''
treedir = os.environ.get('TREE_DIR', None) if not uproot_with else uproot_with
if not treedir:
treefilepath = os.path.dirname(os.path.abspath(__file__))
if 'python/' in treefilepath:
treedir = treefilepath.rsplit('/', 2)[0]
else:
treedir = treefilepath
treedir = treefilepath
os.environ['TREE_DIR'] = treedir
return treedir
def _get_history(name: str, cfg_type: str) -> dict:
""" Get the history of a path or environment variable
Searches all Tree config files for the definition of a specified
environment variable or access path name.
Parameters
----------
name : str
The name of the path or environment variable
cfg_type : str
The type of config variable to access, either 'path' or 'envvar'
Returns
-------
dict
The definitions of the requested parameter in the tree config files
Raises
------
ValueError
when cfg_type is not 'path' or 'envvar'
"""
if cfg_type not in ['envvar', 'path']:
raise ValueError('cfg_type can only be "envvar" or "path"')
output = {}
for cfg in Tree.list_available_configs():
if 'basework' in cfg:
continue
t=Tree(cfg)
if cfg_type == 'envvar':
data = t.to_dict()
elif cfg_type == 'path':
data = t.paths
output[cfg.split('.')[0]] = data.get(name, None)
return output
[docs]
def get_envvar_history(name: str) -> dict:
""" Get the history of a given environment variable
Returns a dictionary of the given environment variable definition in all
available tree config files.
Parameters
----------
name : str
The name of the environment variable
Returns
-------
dict
The environment variable definitions in each tree config file
"""
return _get_history(name, cfg_type='envvar')
[docs]
def get_path_history(name: str) -> dict:
""" Get the history of a given access path name
Returns a dictionary of the given acess path definition in all
available tree config files.
Parameters
----------
name : str
The name of the access path
Returns
-------
dict
The path definitions in each tree config file
"""
return _get_history(name, cfg_type='path')