# osmo_gsm_tester: read and manage config files and global config # # Copyright (C) 2016-2020 by sysmocom - s.f.m.c. GmbH # # Author: Neels Hofmeyr # Author: Pau Espin Pedrol # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # discussion for choice of config file format: # # Python syntax is insane, because it allows the config file to run arbitrary # python commands. # # INI file format is nice and simple, but it doesn't allow having the same # section numerous times (e.g. to define several modems or BTS models) and does # not support nesting. # # JSON has too much braces and quotes to be easy to type # # YAML formatting is lean, but: # - too powerful. The normal load() allows arbitrary code execution. There is # safe_load(). # - allows several alternative ways of formatting, better to have just one # authoritative style. # - tries to detect types. It would be better to receive every setting as # simple string rather than e.g. an IMSI as an integer. # - e.g. an IMSI starting with a zero is interpreted as octal value, resulting # in super confusing error messages if the user merely forgets to quote it. # - does not tell me which line a config item came from, so no detailed error # message is possible. # # The Python ConfigParserShootout page has numerous contestants, but many of # those seem to be not widely used / standardized or even tested. # https://wiki.python.org/moin/ConfigParserShootout # # The optimum would be a stripped down YAML format. # In the lack of that, we shall go with yaml.load_safe() + a round trip # (feeding back to itself), converting keys to lowercase and values to string. # There is no solution for octal interpretations nor config file source lines # unless, apparently, we implement our own config parser. import yaml import os import copy import pprint from . import log, util, template from . import schema from .util import is_dict, is_list, Dir, get_tempdir override_conf = None CFG_STATE_DIR = 'state_dir' CFG_SUITES_DIR = 'suites_dir' CFG_SCENARIOS_DIR = 'scenarios_dir' CFG_TRIAL_DIR = 'trial_dir' CFG_DEFAULT_SUITES_CONF = 'default_suites_conf_path' CFG_DEFAULTS_CONF = 'defaults_conf_path' CFG_RESOURCES_CONF = 'resource_conf_path' MAIN_CONFIG_SCHEMA = { CFG_STATE_DIR: schema.STR, CFG_SUITES_DIR + '[]': schema.STR, CFG_SCENARIOS_DIR + '[]': schema.STR, CFG_TRIAL_DIR: schema.STR, CFG_DEFAULT_SUITES_CONF: schema.STR, CFG_DEFAULTS_CONF: schema.STR, CFG_RESOURCES_CONF: schema.STR, } DF_CFG_STATE_DIR = '/var/tmp/osmo-gsm-tester/state/' DF_CFG_SUITES_DIR = ['./suites'] DF_CFG_SCENARIOS_DIR = ['./scenarios'] DF_CFG_TRIAL_DIR = './trial' DF_CFG_DEFAULT_SUITES_CONF = './default-suites.conf' DF_CFG_DEFAULTS_CONF = './defaults.conf' DF_CFG_RESOURCES_CONF = './resources.conf' DEFAULT_CONFIG_FILENAME = 'main.conf' DEFAULT_CONFIG_LOCATIONS = [ '.', os.path.join(os.getenv('HOME'), '.config', 'osmo-gsm-tester', DEFAULT_CONFIG_FILENAME), os.path.join('/usr/local/etc/osmo-gsm-tester', DEFAULT_CONFIG_FILENAME), os.path.join('/etc/osmo-gsm-tester', DEFAULT_CONFIG_FILENAME) ] MAIN_CONFIG = None MAIN_CONFIG_PATH = None def _find_main_config_path(): if override_conf: locations = [ override_conf ] elif os.getenv('OSMO_GSM_TESTER_CONF'): ENV_CONF = os.getenv('OSMO_GSM_TESTER_CONF') log.err('Using environment variable OSMO_GSM_TESTER_CONF=%s(/paths.conf) is deprecated. Rather use -c command line argument!' % ENV_CONF) locations = [ ENV_CONF + 'paths.conf' ] # directory is expected in OSMO_GSM_TESTER_CONF, bakcward compatibility else: locations = DEFAULT_CONFIG_LOCATIONS for l in locations: real_l = os.path.realpath(l) if os.path.isfile(real_l): log.dbg('Found main configuration file in ', l, 'which is', real_l, _category=log.C_CNF) return real_l raise RuntimeError('Main configuration file not found in %r' % ([l for l in locations])) def _get_main_config_path(): global MAIN_CONFIG_PATH if MAIN_CONFIG_PATH is None: MAIN_CONFIG_PATH = _find_main_config_path() return MAIN_CONFIG_PATH def main_config_path_to_abspath(val): 'Relative files in main config are relative towards the config file, not towards $CWD' # If val is a list of paths, recurse to translate its paths. if isinstance(val, list): for i in range(len(val)): val[i] = main_config_path_to_abspath(val[i]) return val if not val.startswith(os.pathsep): return os.path.realpath(os.path.join(os.path.dirname(_get_main_config_path()), val)) return val def _get_main_config(): global MAIN_CONFIG if MAIN_CONFIG is None: cfg = read(_get_main_config_path(), MAIN_CONFIG_SCHEMA) if cfg is None: cfg = {} MAIN_CONFIG = { CFG_STATE_DIR: DF_CFG_STATE_DIR, CFG_SUITES_DIR: DF_CFG_SUITES_DIR, CFG_SCENARIOS_DIR: DF_CFG_SCENARIOS_DIR, CFG_TRIAL_DIR: DF_CFG_TRIAL_DIR, CFG_DEFAULT_SUITES_CONF: DF_CFG_DEFAULT_SUITES_CONF, CFG_DEFAULTS_CONF: DF_CFG_DEFAULTS_CONF, CFG_RESOURCES_CONF: DF_CFG_RESOURCES_CONF, } overlay(MAIN_CONFIG, cfg) for key, path in sorted(MAIN_CONFIG.items()): MAIN_CONFIG[key] = main_config_path_to_abspath(path) log.dbg('MAIN CONFIG:\n' + pprint.pformat(MAIN_CONFIG), _category=log.C_CNF) return MAIN_CONFIG def get_main_config_value(cfg_name, fail_if_missing=True): cfg = _get_main_config() f = cfg.get(cfg_name, None) if f is None and fail_if_missing: raise RuntimeError('Missing configuration %s' % (cfg_name)) return f def read_config_file(cfg_name, validation_schema=None, if_missing_return=False): '''Read content of config file cfg_name (referring to key in main config). If "if_missing_return" is different than False, then instead of failing it will return whatever it is stored in that arg ''' fail_if_missing = True if if_missing_return is not False: fail_if_missing = False path = get_main_config_value(cfg_name, fail_if_missing=fail_if_missing) if path is None: return if_missing_return return read(path, validation_schema=validation_schema, if_missing_return=if_missing_return) def get_state_dir(): return Dir(get_main_config_value(CFG_STATE_DIR)) def get_suites_dirs(): return [Dir(d) for d in get_main_config_value(CFG_SUITES_DIR)] def get_scenarios_dirs(): return [Dir(d) for d in get_main_config_value(CFG_SCENARIOS_DIR)] DEFAULTS_CONF = None def get_defaults(for_kind): global DEFAULTS_CONF if DEFAULTS_CONF is None: DEFAULTS_CONF = read_config_file(CFG_DEFAULTS_CONF, if_missing_return={}) defaults = DEFAULTS_CONF.get(for_kind, {}) return copy.deepcopy(defaults) def read(path, validation_schema=None, if_missing_return=False): log.ctx(path) if not os.path.isfile(path) and if_missing_return is not False: return if_missing_return with open(path, 'r') as f: config = yaml.safe_load(f) config = _standardize(config) if config and validation_schema: schema.validate(config, validation_schema) return config def write(path, config): log.ctx(path) with open(path, 'w') as f: f.write(tostr(config)) def fromstr(config_str, validation_schema=None): config = yaml.safe_load(config_str) config = _standardize(config) if validation_schema is not None: schema.validate(config, validation_schema) return config def tostr(config): return _tostr(_standardize(config)) def _tostr(config): return yaml.dump(config, default_flow_style=False) def _standardize_item(item): if item is None: return None if isinstance(item, (tuple, list)): return [_standardize_item(i) for i in item] if isinstance(item, dict): return dict([(key.lower(), _standardize_item(val)) for key,val in item.items()]) return str(item) def _standardize(config): config = yaml.safe_load(_tostr(_standardize_item(config))) return config def overlay(dest, src): if is_dict(dest): if not is_dict(src): raise ValueError('cannot combine dict with a value of type: %r' % type(src)) for key, val in src.items(): log.ctx(key=key) dest_val = dest.get(key) dest[key] = overlay(dest_val, val) return dest if is_list(dest): if not is_list(src): raise ValueError('cannot combine list with a value of type: %r' % type(src)) copy_len = min(len(src),len(dest)) for i in range(copy_len): log.ctx(idx=i) dest[i] = overlay(dest[i], src[i]) for i in range(copy_len, len(src)): dest.append(src[i]) return dest return src def replicate_times(d): ''' replicate items that have a "times" > 1 'd' is a dict matching WANT_SCHEMA, which is the same as the RESOURCES_SCHEMA, except each entity that can be reserved has a 'times' field added, to indicate how many of those should be reserved. ''' d = copy.deepcopy(d) for key, item_list in d.items(): idx = 0 while idx < len(item_list): item = item_list[idx] times = int(item.pop('times', 1)) for j in range(1, times): item_list.insert(idx + j, copy.deepcopy(item)) idx += times return d # vim: expandtab tabstop=4 shiftwidth=4