tools/configuration.py

226 lines
6.8 KiB
Python

# -*- coding: UTF-8 -*-
import os
import sys
import getopt
import json
from yaml import load, YAMLError
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader
from pathlib import Path
def merge_config(src, dest):
for key, value in src.items():
if isinstance(value, dict):
node = dest.setdefault(key, {})
merge_config(value, node)
else:
dest[key] = value
return dest
def parse_config_set(params, a):
a = a.strip()
if a.startswith('['):
return "config parameters are not of list type"
if a.startswith('{'):
try:
value = json.loads(a)
merge_config(value, params)
return None
except json.JSONDecodeError as e:
return "failed to decode JSON struct: %s" % (e)
parts = a.partition('=')
key = parts[0].strip()
operator = parts[1].strip()
value = parts[2].strip()
if (operator != '='):
return "format required: key=jsonvalue or JSON struct"
if (key == ''):
return "format required: key=jsonvalue; key may not be an empty string"
try:
value = json.loads(value)
except json.JSONDecodeError as e:
return "failed to decode JSON value: %s" % (e)
key_list = key.split('.')
p = params
while True:
key = key_list.pop(0)
more = len(key_list) > 0
if not more:
p[key] = value
break
p = params.setdefault(key, {})
return None
class Config():
def __init__(self, scriptname, progver, commands):
scriptname = os.path.realpath(scriptname)
self.basedir = os.path.dirname(scriptname)
self.progname = os.path.basename(scriptname)
self.progver = progver
self.commands = commands
# options
self.help = False
self.version = False
self.verbose = False
self.force = False
self.config = "config"
self.params = {}
# command
self.cmd = None
# config params
self.config_params = {}
def usage(self, exitcode):
usage = """Usage: """ + self.progname + """ [<options>] <command> [<options>]
Commands:
"""
for cmd, desc in self.commands:
usage += " " + cmd + (' ' if len(cmd) >= 31 else ' ' * (31-len(cmd))) + desc + "\n"
usage += """
Options:
-h, --help Print this help
-V, --version Print version number
-c, --config dir Specify config directory to read [""" + self.config + """]
-s, --set key=jsonvalue Override config parameter 'key' with JSON format value
-s, --set jsonstruct Override config parameters providing JSON struct
-f, --force Force recreating result
-v, --verbose Print verbose messages
"""
sys.stderr.write(usage)
if exitcode is not None:
sys.exit(exitcode)
def parse(self, argv):
try:
(opt, args) = getopt.gnu_getopt(argv[1:], "hVvc:s:f", [
"help", "version", "verbose", "config=", "set=", "force"
])
except getopt.GetoptError as e:
sys.stderr.write("Error: %s\n" % str(e))
self.usage(2)
for o, a in opt:
if o == '-h' or o == "--help":
self.help = True
elif o == '-V' or o == "--version":
self.version = True
elif o == '-v' or o == "--verbose":
self.verbose = True
elif o == '-f' or o == "--force":
self.force = True
elif o == '-c' or o == "--config":
if (a == ''):
sys.stderr.write("Error: option %s requires a non-empty argument\n" % (o))
self.usage(2)
self.config = a
elif o == '-s' or o == "--set":
error = parse_config_set(self.params, a)
if error:
sys.stderr.write("Error: option %s argument is invalid: %s\n" % (o, error))
self.usage(2)
if self.version:
sys.stdout.write("Version: %s\n" % (self.progver))
if self.help:
self.usage(None)
if self.version or self.help:
sys.exit(0)
self.load_config()
for c in args:
if self.cmd != None:
sys.stderr.write("Multiple commands specified: %s vs %s\n" % (self.cmd, c))
self.usage(2)
for cmd, _ in self.commands:
if c == cmd:
self.cmd = c
break
if self.cmd == None:
sys.stderr.write("Unknown command specified: %s\n" % (c))
self.usage(2)
if self.cmd == None:
sys.stderr.write("Missing command\n")
self.usage(2)
merge_config(self.params, self.config_params)
if self.verbose:
sys.stdout.write("Config parameters:\n")
for k, v in self.config_params.items():
sys.stdout.write(" %s=%s\n" % (k, str(v)))
def parse_file(self, filepath):
try:
obj = load(open(filepath, 'r'), SafeLoader)
merge_config(obj, self.config_params)
except YAMLError as exc:
sys.stdout.write("Error: reading configuration file failed: %s" % (exc))
def load_config(self):
configdir = self.config if self.config.startswith('/') else self.basedir + '/' + self.config
if self.verbose:
sys.stdout.write("Reading config directory %s\n" % (configdir))
if not Path(configdir).is_dir():
sys.stderr.write("Warning: config directory %s does not exist\n" % (configdir))
return
with os.scandir(configdir) as it:
for entry in it:
if not entry.name.startswith('.') and (entry.name.endswith('.yml') or entry.name.endswith('.yaml')) and entry.is_file():
if self.verbose:
sys.stdout.write("Parsing config file %s\n" % (entry.path))
self.parse_file(entry.path)
def get_config_param(self, path, mandatory = False, default = None, type = 'str'):
missing = False
lpath = path.split('.')
index = 0
d = self.config_params
while index < len(lpath):
key = lpath[index]
index += 1
if d == None:
missing = True
break
if not isinstance(d, dict):
missing = True
break
if key not in d:
missing = True
break
d = d[key]
if missing:
if mandatory:
sys.stdout.write("Configuration setting '%s' missing\n" % (path))
return default
if type == 'str':
return str(d)
if type == 'boolean':
if isinstance(d, (int, float)):
return int(d) != 0
value = str(d)
if value == '' or value == '0' or value == 'no' or value == 'No' or value == 'false' or value == 'False':
return False
return True
if type == 'int':
try:
return int(d)
except ValueError as e:
sys.stdout.write("Configuration '%s' is not an integer\n" % (path))
return default
if type == 'float':
try:
return float(d)
except ValueError as e:
sys.stdout.write("Configuration '%s' is not a float\n" % (path))
return default
except OverflowError as e:
sys.stdout.write("Configuration '%s' is overflows a float\n" % (path))
return default
return d