226 lines
6.8 KiB
Python
226 lines
6.8 KiB
Python
# -*- coding: UTF-8 -*-
|
|
|
|
import os
|
|
import sys
|
|
import getopt
|
|
import json
|
|
from yaml import load, YAMLError
|
|
try:
|
|
from yaml import CSafeLoader as SafeLoader
|
|
except ImportError:
|
|
from yaml import SafeLoader
|
|
from pathlib import Path
|
|
|
|
def _merge_config(src, dest):
|
|
for key, value in src.items():
|
|
if isinstance(value, dict):
|
|
node = dest.setdefault(key, {})
|
|
_merge_config(value, node)
|
|
else:
|
|
dest[key] = value
|
|
return dest
|
|
|
|
def _parse_config_set(params, a):
|
|
a = a.strip()
|
|
if a.startswith('['):
|
|
return "config parameters are not of list type"
|
|
if a.startswith('{'):
|
|
try:
|
|
value = json.loads(a)
|
|
_merge_config(value, params)
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
return "failed to decode JSON struct: %s" % (e)
|
|
|
|
parts = a.partition('=')
|
|
key = parts[0].strip()
|
|
operator = parts[1].strip()
|
|
value = parts[2].strip()
|
|
if (operator != '='):
|
|
return "format required: key=jsonvalue or JSON struct"
|
|
if (key == ''):
|
|
return "format required: key=jsonvalue; key may not be an empty string"
|
|
try:
|
|
value = json.loads(value)
|
|
except json.JSONDecodeError as e:
|
|
return "failed to decode JSON value: %s" % (e)
|
|
|
|
key_list = key.split('.')
|
|
p = params
|
|
while True:
|
|
key = key_list.pop(0)
|
|
more = len(key_list) > 0
|
|
if not more:
|
|
p[key] = value
|
|
break
|
|
p = params.setdefault(key, {})
|
|
return None
|
|
|
|
class Config():
|
|
def __init__(self, scriptname, progver, commands):
|
|
scriptname = os.path.realpath(scriptname)
|
|
self.basedir = os.path.dirname(scriptname)
|
|
self.progname = os.path.basename(scriptname)
|
|
self.progver = progver
|
|
self.commands = commands
|
|
# options
|
|
self.help = False
|
|
self.version = False
|
|
self.verbose = False
|
|
self.force = False
|
|
self.config = "config"
|
|
self.params = {}
|
|
# command
|
|
self.cmd = None
|
|
# config params
|
|
self.config_params = {}
|
|
|
|
def usage(self, exitcode):
|
|
usage = """Usage: """ + self.progname + """ [<options>] <command> [<options>]
|
|
|
|
Commands:
|
|
"""
|
|
for cmd, desc in self.commands:
|
|
usage += " " + cmd + (' ' if len(cmd) >= 31 else ' ' * (31-len(cmd))) + desc + "\n"
|
|
usage += """
|
|
Options:
|
|
-h, --help Print this help
|
|
-V, --version Print version number
|
|
-c, --config dir Specify config directory to read [""" + self.config + """]
|
|
-s, --set key=jsonvalue Override config parameter 'key' with JSON format value
|
|
-s, --set jsonstruct Override config parameters providing JSON struct
|
|
-f, --force Force recreating result
|
|
-v, --verbose Print verbose messages
|
|
"""
|
|
sys.stderr.write(usage)
|
|
if exitcode is not None:
|
|
sys.exit(exitcode)
|
|
|
|
def parse(self, argv):
|
|
try:
|
|
(opt, args) = getopt.gnu_getopt(argv[1:], "hVvc:s:f", [
|
|
"help", "version", "verbose", "config=", "set=", "force"
|
|
])
|
|
except getopt.GetoptError as e:
|
|
sys.stderr.write("Error: %s\n" % str(e))
|
|
self.usage(2)
|
|
|
|
for o, a in opt:
|
|
if o == '-h' or o == "--help":
|
|
self.help = True
|
|
elif o == '-V' or o == "--version":
|
|
self.version = True
|
|
elif o == '-v' or o == "--verbose":
|
|
self.verbose = True
|
|
elif o == '-f' or o == "--force":
|
|
self.force = True
|
|
elif o == '-c' or o == "--config":
|
|
if (a == ''):
|
|
sys.stderr.write("Error: option %s requires a non-empty argument\n" % (o))
|
|
self.usage(2)
|
|
self.config = a
|
|
elif o == '-s' or o == "--set":
|
|
error = _parse_config_set(self.params, a)
|
|
if error:
|
|
sys.stderr.write("Error: option %s argument is invalid: %s\n" % (o, error))
|
|
self.usage(2)
|
|
|
|
if self.version:
|
|
sys.stdout.write("Version: %s\n" % (self.progver))
|
|
if self.help:
|
|
self.usage(None)
|
|
if self.version or self.help:
|
|
sys.exit(0)
|
|
|
|
self.load_config()
|
|
|
|
for c in args:
|
|
if self.cmd != None:
|
|
sys.stderr.write("Multiple commands specified: %s vs %s\n" % (self.cmd, c))
|
|
self.usage(2)
|
|
for cmd, _ in self.commands:
|
|
if c == cmd:
|
|
self.cmd = c
|
|
break
|
|
if self.cmd == None:
|
|
sys.stderr.write("Unknown command specified: %s\n" % (c))
|
|
self.usage(2)
|
|
if self.cmd == None:
|
|
sys.stderr.write("Missing command\n")
|
|
self.usage(2)
|
|
|
|
_merge_config(self.params, self.config_params)
|
|
if self.verbose:
|
|
sys.stdout.write("Config parameters:\n")
|
|
for k, v in self.config_params.items():
|
|
sys.stdout.write(" %s=%s\n" % (k, str(v)))
|
|
|
|
def parse_file(self, filepath):
|
|
try:
|
|
obj = load(open(filepath, 'r'), SafeLoader)
|
|
_merge_config(obj, self.config_params)
|
|
except YAMLError as exc:
|
|
sys.stdout.write("Error: reading configuration file failed: %s" % (exc))
|
|
|
|
def load_config(self):
|
|
configdir = self.config if self.config.startswith('/') else self.basedir + '/' + self.config
|
|
if self.verbose:
|
|
sys.stdout.write("Reading config directory %s\n" % (configdir))
|
|
if not Path(configdir).is_dir():
|
|
sys.stderr.write("Warning: config directory %s does not exist\n" % (configdir))
|
|
return
|
|
with os.scandir(configdir) as it:
|
|
for entry in it:
|
|
if not entry.name.startswith('.') and (entry.name.endswith('.yml') or entry.name.endswith('.yaml')) and entry.is_file():
|
|
if self.verbose:
|
|
sys.stdout.write("Parsing config file %s\n" % (entry.path))
|
|
self.parse_file(entry.path)
|
|
|
|
def get_config_param(self, path, mandatory = False, default = None, type = 'str'):
|
|
missing = False
|
|
lpath = path.split('.')
|
|
index = 0
|
|
d = self.config_params
|
|
while index < len(lpath):
|
|
key = lpath[index]
|
|
index += 1
|
|
if d == None:
|
|
missing = True
|
|
break
|
|
if not isinstance(d, dict):
|
|
missing = True
|
|
break
|
|
if key not in d:
|
|
missing = True
|
|
break
|
|
d = d[key]
|
|
if missing:
|
|
if mandatory:
|
|
sys.stdout.write("Configuration setting '%s' missing\n" % (path))
|
|
return default
|
|
if type == 'str':
|
|
return str(d)
|
|
if type == 'boolean':
|
|
if isinstance(d, (int, float)):
|
|
return int(d) != 0
|
|
value = str(d)
|
|
if value == '' or value == '0' or value == 'no' or value == 'No' or value == 'false' or value == 'False':
|
|
return False
|
|
return True
|
|
if type == 'int':
|
|
try:
|
|
return int(d)
|
|
except ValueError as e:
|
|
sys.stdout.write("Configuration '%s' is not an integer\n" % (path))
|
|
return default
|
|
if type == 'float':
|
|
try:
|
|
return float(d)
|
|
except ValueError as e:
|
|
sys.stdout.write("Configuration '%s' is not a float\n" % (path))
|
|
return default
|
|
except OverflowError as e:
|
|
sys.stdout.write("Configuration '%s' is overflows a float\n" % (path))
|
|
return default
|
|
return d
|