"""
Manages finding, running and recoding benchmark results.
This module has shamelessly borrows from
the `airspeed velocity (asv) <http://asv.readthedocs.io/en/latest>`_
file `benchmark.py <https://github.com/spacetelescope/asv/blob/master/asv/benchmark.py>`_.
See the `airspeed velocity (asv) <http://asv.readthedocs.io/en/latest>`_
`LICENSE <https://github.com/spacetelescope/asv/blob/master/LICENSE.rst>`_.
"""
from __future__ import absolute_import
# Licensed under a 3-clause BSD style license - see LICENSE.rst
import mpi4py.MPI as mpi
import sys
import datetime
try:
import cProfile as profile
import pstats
except BaseException:
profile = None
from hashlib import sha256
import inspect
import itertools
import json
import os
import re
import textwrap
import timeit
import gc
from importlib import import_module
from .. import logging as _logging
from .utils.misc import get_process_time_timer
__license__ = "https://github.com/spacetelescope/asv/blob/master/LICENSE.rst"
def _get_attr(source, name, ignore_case=False):
if ignore_case:
attrs = [getattr(source, key) for key in dir(source)
if key.lower() == name.lower()]
if len(attrs) > 1:
raise ValueError(
"{0} contains multiple {1} functions.".format(
source.__name__, name))
elif len(attrs) == 1:
return attrs[0]
else:
return None
else:
return getattr(source, name, None)
def _get_all_attrs(sources, name, ignore_case=False):
for source in sources:
val = _get_attr(source, name, ignore_case=ignore_case)
if val is not None:
yield val
def _get_first_attr(sources, name, default, ignore_case=False):
for val in _get_all_attrs(sources, name, ignore_case=ignore_case):
return val
return default
[docs]def get_setup_cache_key(func):
if func is None:
return None
return '{0}:{1}'.format(inspect.getsourcefile(func),
inspect.getsourcelines(func)[1])
[docs]def get_source_code(items):
"""
Extract source code of given items, and concatenate and dedent it.
"""
sources = []
prev_class_name = None
for func in items:
try:
lines, lineno = inspect.getsourcelines(func)
except TypeError:
continue
if not lines:
continue
src = "\n".join(line.rstrip() for line in lines)
src = textwrap.dedent(src)
class_name = None
if inspect.ismethod(func):
# Add class name
if hasattr(func, 'im_class'):
class_name = func.im_class.__name__
elif hasattr(func, '__qualname__'):
names = func.__qualname__.split('.')
if len(names) > 1:
class_name = names[-2]
if class_name and prev_class_name != class_name:
src = "class {0}:\n {1}".format(
class_name, src.replace("\n", "\n "))
elif class_name:
src = " {1}".format(
class_name, src.replace("\n", "\n "))
sources.append(src)
prev_class_name = class_name
return "\n\n".join(sources).rstrip()
[docs]class Benchmark(object):
"""
Represents a single benchmark.
"""
# The regex of the name of function or method to be considered as
# this type of benchmark. The default in the base class, will
# match nothing.
name_regex = re.compile('^$')
[docs] def __init__(self, name, func, attr_sources):
self.name = name
self.func = func
self.pretty_name = getattr(func, "pretty_name", name)
self._attr_sources = list(attr_sources)
self._setups = None
self._teardowns = None
self._setup_cache = None
self.setup_cache_key = None
self.setup_cache_timeout = None
self.timeout = None
self.code = None
self.version = None
self.type = None
self.unit = None
self._redo_setup_next = False
self._params = None
self.param_names = None
self._current_params = None
self._comm = None
self._setup_error = None
self._profiler = None
[docs] def initialise(self):
"""
"""
if self._setups is None:
if isinstance(self._attr_sources[-1], str):
self._attr_sources[-1] = import_module(self._attr_sources[-1])
self._setups = list(_get_all_attrs(self._attr_sources, 'setup', True))[::-1]
self._teardowns = list(_get_all_attrs(self._attr_sources, 'teardown', True))
self._setup_cache = _get_first_attr(self._attr_sources, 'setup_cache', None)
self.setup_cache_key = get_setup_cache_key(self._setup_cache)
self.setup_cache_timeout = _get_first_attr([self._setup_cache], "timeout", None)
self.timeout = _get_first_attr(self._attr_sources, "timeout", 60.0)
self.code = get_source_code([self.func] + self._setups + [self._setup_cache])
if sys.version_info[0] >= 3:
code_text = self.code.encode('utf-8')
else:
code_text = self.code
code_hash = sha256(code_text).hexdigest()
self.version = str(_get_first_attr(self._attr_sources, "version", code_hash))
self.type = "base"
self.unit = "unit"
self._redo_setup_next = False
self._params = _get_first_attr(self._attr_sources, "params", [])
self.param_names = _get_first_attr(self._attr_sources, "param_names", [])
self._current_params = ()
# Enforce params format
try:
self.param_names = [str(x) for x in list(self.param_names)]
except ValueError:
raise ValueError("%s.param_names is not a list of strings" % (self.name,))
try:
self._params = list(self._params)
except ValueError:
raise ValueError("%s.params is not a list" % (self.name,))
if self._params and not isinstance(self._params[0], (tuple, list)):
# Accept a single list for one parameter only
self._params = [self._params]
else:
self._params = [[item for item in entry] for entry in self._params]
if len(self.param_names) != len(self._params):
self.param_names = self.param_names[:len(self._params)]
self.param_names += ['param%d' % (k + 1,) for k in range(len(self.param_names),
len(self._params))]
# Exported parameter representations
self.params_repr = [[repr(item) for item in entry] for entry in self._params]
@property
def root_rank(self):
"""
An :samp:`int` indicating the *root* rank process of :attr:`comm`.
"""
return 0
@property
def comm(self):
"""
The :obj:`mpi4pi.MPI.Comm` used for synchronization.
"""
return self._comm
@comm.setter
def comm(self, comm):
self._comm = comm
@property
def setup_error(self):
"""
The error which occured during :meth:`do_setup`, :samp:`None` if no error occurred.
"""
return self._setup_error
[docs] def barrier(self):
"""
Barrier.
"""
self.comm.barrier()
[docs] def bcast(self, value):
"""
Broadcast value from :attr:`root_rank` to all ranks of :attr:`comm`.
:rtype: :obj:`object`
:return: value on rank :attr:`root_rank` rank process.
"""
return self.comm.bcast(value, self.root_rank)
@property
def params(self):
"""
The list of benchmark parameters.
"""
self.initialise()
return self._params
@property
def current_params(self):
"""
The current set of parameters, set via :meth:`set_param_idx`.
"""
return self._current_params
[docs] def set_param_idx(self, param_idx):
"""
Set the parameter combo via the index :samp:`{param_idx}`.
:raises ValueError: if :samp:`param_idx` is out of range.
"""
self.initialise()
try:
self._current_params, = itertools.islice(
itertools.product(*self._params),
param_idx, param_idx + 1)
except ValueError:
raise ValueError(
"Invalid benchmark parameter permutation index: %r" % (param_idx,))
[docs] def insert_param(self, param):
"""
Insert a parameter at the front of the parameter list.
"""
self.initialise()
self._current_params = tuple([param] + list(self._current_params))
def __repr__(self):
return '<{0} {1}>'.format(self.__class__.__name__, self.name)
[docs] def do_setup(self):
self.initialise()
self._setup_error = None
try:
for setup in self._setups:
setup(*self._current_params)
except NotImplementedError as e:
# allow skipping test
self._setup_error = e
return True
return False
[docs] def redo_setup(self):
self.initialise()
if not self._redo_setup_next:
self._redo_setup_next = True
return
self.do_teardown()
self.do_setup()
[docs] def do_teardown(self):
for teardown in self._teardowns:
teardown(*self._current_params)
[docs] def do_setup_cache(self):
if self._setup_cache is not None:
return self._setup_cache()
[docs] def do_run(self):
return self.run(*self._current_params)
[docs] def do_profile_run(self):
if profile is None:
raise RuntimeError("cProfile could not be imported")
self._profiler = None
self.redo_setup()
if self.comm.rank == self.root_rank:
self._profiler = profile.Profile()
self._profiler.disable()
self.run(*self._current_params)
profiler = self._profiler
self._profiler = None
return profiler
[docs]class TimeBenchmark(Benchmark):
"""
Represents a single benchmark for timing.
"""
name_regex = re.compile(
'^(Time[A-Z_].+)|(time_.+)$')
[docs] def __init__(self, name, func, attr_sources):
Benchmark.__init__(self, name, func, attr_sources)
self.type = "time"
self.unit = "seconds"
self._attr_sources = attr_sources
self._repeat = None
self._number = None
self._goal_time = None
self._warmup_time = None
self._default_timer = None
self._timer = None
self._wall_timer = None
def _load_vars(self):
self._repeat = _get_first_attr(self._attr_sources, 'repeat', 0)
self._number = int(_get_first_attr(self._attr_sources, 'number', 0))
self._goal_time = _get_first_attr(self._attr_sources, 'goal_time', 0.1)
self._warmup_time = _get_first_attr(self._attr_sources, 'warmup_time', -1)
self._timer = _get_first_attr(self._attr_sources, 'timer', self.default_timer)
self._wall_timer = _get_first_attr(self._attr_sources, 'wall_timer', mpi.Wtime)
@property
def default_timer(self):
"""
An :obj:`callable` used to measure benchmark duration, e.g. :func:`time.process_time`.
"""
if self._default_timer is None:
self._default_timer = get_process_time_timer()
return self._default_timer
@default_timer.setter
def default_timer(self, timer):
self._default_timer = timer
@property
def repeat(self):
if self._repeat is None:
self._load_vars()
return self._repeat
@property
def number(self):
if self._number is None:
self._load_vars()
return self._number
@property
def goal_time(self):
if self._goal_time is None:
self._load_vars()
return self._goal_time
@property
def warmup_time(self):
if self._warmup_time is None:
self._load_vars()
return self._warmup_time
@property
def timer(self):
if self._timer is None:
self._load_vars()
return self._timer
@property
def wall_timer(self):
if self._wall_timer is None:
self._load_vars()
return self._wall_timer
[docs] def wall_time(self):
"""
Return *current* time in seconds.
"""
return self.wall_timer()
[docs] def do_setup(self):
result = Benchmark.do_setup(self)
# For parameterized tests, setup() is allowed to change these
self._load_vars()
return result
[docs] def run(self, *param):
number = self.number
repeat = self.repeat
if repeat == 0:
repeat = 10
warmup_time = self.warmup_time
if warmup_time < 0:
if '__pypy__' in sys.modules:
warmup_time = 1.0
else:
# Transient effects exist also on CPython, e.g. from
# OS scheduling
warmup_time = 0.1
if param:
def func():
return self.func(*param)
else:
func = self.func
timer = \
timeit.Timer(
stmt=func,
setup=self.redo_setup,
timer=self.timer
)
samples, number, samples_pre_barrier, samples_post_barrier = \
self.benchmark_timing(timer, repeat, warmup_time, number=number)
samples_list = [samples, samples_pre_barrier, samples_post_barrier]
for i in range(len(samples_list)):
samples_list[i] = [s / number for s in samples_list[i]]
return \
{
'samples': samples_list[0],
'number': number,
'wall_samples_pre_barrier': samples_list[1],
'wall_samples_post_barrier': samples_list[2]
}
[docs] def benchmark_timing(self, timer, repeat, warmup_time, number=0):
goal_time = self.goal_time
start_time = self.bcast(self.wall_time())
max_time = start_time + min(warmup_time + 1.3 * repeat * goal_time,
self.timeout - 1.3 * goal_time)
def too_slow():
# too slow, don't take more samples
return self.bcast(self.wall_time()) > max_time
if number == 0:
# Select number & warmup.
#
# This needs to be done at the same time, because the
# benchmark timings at the beginning can be larger, and
# lead to too small number being selected.
number = 1
while True:
self._redo_setup_next = False
self.barrier()
gc.disable()
start = self.wall_time()
timing = timer.timeit(number)
self.barrier()
gc.enable()
end = self.wall_time()
wall_time, timing = self.bcast((end - start, timing))
actual_timing = max(wall_time, timing)
if actual_timing >= goal_time:
if self.bcast(self.wall_time()) > start_time + warmup_time:
break
else:
try:
p = min(10.0, max(1.1, goal_time / actual_timing))
except ZeroDivisionError:
p = 10.0
number = max(number + 1, int(p * number))
if too_slow():
return [timing], number, [timing], [timing]
elif warmup_time > 0:
# Warmup
while True:
self._redo_setup_next = False
gc.disable()
timing = self.bcast(timer.timeit(number))
gc.enable()
if self.bcast(self.wall_time()) >= start_time + warmup_time:
break
if too_slow():
return [timing], number, [timing], [timing]
# Collect samples
samples = []
wall_samples_pre_barrier = []
wall_samples_post_barrier = []
for j in range(repeat):
self.barrier()
gc.disable()
start = self.wall_time()
if self._profiler is not None:
self._profiler.enable()
timing = timer.timeit(number)
if self._profiler is not None:
self._profiler.disable()
end_pre_barrier = self.wall_time()
self.barrier()
end_post_barrier = self.wall_time()
gc.enable()
samples.append(timing)
wall_samples_pre_barrier.append(end_pre_barrier - start)
wall_samples_post_barrier.append(end_post_barrier - start)
if too_slow():
break
return samples, number, wall_samples_pre_barrier, wall_samples_post_barrier
benchmark_types = [
TimeBenchmark,
]
[docs]def disc_files(root, package=''):
"""
Iterate over all .py files in a given directory tree.
"""
if os.path.isdir(root):
filename_list = os.listdir(root)
elif os.path.isfile(root):
root, filename = os.path.split(root)
filename_list = [filename, ]
package = ".".join(package.strip().rstrip(".").split(".")[:-1]) + "."
else:
raise ValueError("root=%s not an existing file or directory." % root)
for filename in filename_list:
path = os.path.join(root, filename)
if os.path.isfile(path):
filename, ext = os.path.splitext(filename)
if ext == '.py':
module = import_module(package + filename)
yield module
elif os.path.isdir(path):
for x in disc_files(path, package + filename + "."):
yield x
def _get_benchmark(attr_name, module, klass, func):
try:
name = func.benchmark_name
except AttributeError:
name = None
search = attr_name
else:
search = name.split('.')[-1]
for cls in benchmark_types:
if cls.name_regex.match(search):
break
else:
return
# relative to benchmark_dir
mname = module.__name__.split('.', 1)[1]
if klass is None:
if name is None:
name = ".".join([mname, func.__name__])
sources = [func, module]
else:
instance = klass()
func = getattr(instance, func.__name__)
if name is None:
name = ".".join([mname, klass.__name__, func.__name__])
sources = [func, instance, module.__name__]
return cls(name, func, sources)
[docs]def disc_benchmarks(root, package=None):
"""
Discover all benchmarks in a given directory tree, yielding Benchmark
objects
For each class definition, looks for any methods with a
special name.
For each free function, yields all functions with a special
name.
"""
if package is None:
package = os.path.basename(root)
for module in disc_files(root, package + '.'):
for attr_name, module_attr in (
(k, v) for k, v in module.__dict__.items()
if not k.startswith('_')
):
if inspect.isclass(module_attr):
for name, class_attr in inspect.getmembers(module_attr):
if (inspect.isfunction(class_attr) or
inspect.ismethod(class_attr)):
benchmark = _get_benchmark(name, module, module_attr,
class_attr)
if benchmark is not None:
yield benchmark
elif inspect.isfunction(module_attr):
benchmark = _get_benchmark(attr_name, module, None, module_attr)
if benchmark is not None:
yield benchmark
[docs]def create_runner_argument_parser():
"""
"""
import argparse
ap = argparse.ArgumentParser("mpi-array-benchmarks")
ap.add_argument(
"-d", "--discover",
action='store_true',
help="Only discover benchmarks, do not run them."
)
ap.add_argument(
"-q", "--quick",
action='store_true',
help="Quick benchmark run, only execute each benchmark once, skip repeats.",
default=False
)
ap.add_argument(
"-p", "--profile",
action='store_true',
help="Profile the run.",
default=False
)
ap.add_argument(
"--profile_file",
action='store',
help="Name of file to store profile data.",
default="mpia_profile.dat"
)
ap.add_argument(
"-o", "--results_file",
action='store',
help="Name of the JSON file where benchmark results are to be stored.",
default="mpia_bench_results.json"
)
ap.add_argument(
"-b", "--benchmarks_file",
action='store',
help="Name of the JSON file where individual benchmark details are to be stored.",
default="mpia_benchmarks.json"
)
ap.add_argument(
"-t", "--default_timer",
action='store',
help="The default timer used to measure benchmark duration.",
choices=["Wtime", "process_time"],
default="process_time"
)
ap.add_argument(
"-f", "--filter_regex",
action='store',
help="A regular expression for filtering which benchmark(s) to run.",
default=".*"
)
ap.add_argument(
"module_name",
nargs="*",
help="Name of modules to search for benchmarks.",
default=[]
)
return ap
[docs]def root_and_package_from_name(module_name):
"""
Returns root filename for the package/module named by :samp:`{module_name}`.
"""
module = import_module(module_name)
root = module.__file__
dir, filename = os.path.split(root)
if filename.find("__init__.py") == 0:
root = dir
return root, module.__name__
[docs]class BenchmarkRunner(object):
"""
Discovers and runs benchmarks.
"""
[docs] def __init__(self, argv=None):
"""
Initialise.
:type argv: :obj:`list` of :obj:`str`
:param argv: Command line arguments, parsed with the :obj:`argparse.ArgumentParser`
instance returned by :func:`create_argument_parser`.
"""
if argv is None:
argv = []
arg_parser = self.create_argument_parser()
self._args = arg_parser.parse_args(args=argv)
self._filter_name_regex = None
self._root_logger = None
self._rank_logger = None
self._bench_module_names = None
self._bench_results = None
self._benchmarks = None
self._profile_stats = None
if self._args.default_timer == "Wtime":
self._default_timer = mpi.Wtime
elif self._args.default_timer == "process_time":
self._default_timer = get_process_time_timer()
else:
raise ValueError("Invalid default_timer = '%s'." % self._args.default_timer)
@property
def root_logger(self):
"""
A :obj:`logging.Logger` object for logging root-rank process messages.
"""
if self._root_logger is None:
self._root_logger = \
_logging.get_root_logger(__name__ + "." + self.__class__.__name__, comm=self.comm)
return self._root_logger
@property
def rank_logger(self):
"""
A :obj:`logging.Logger` object for logging all rank process messages.
"""
if self._rank_logger is None:
self._rank_logger = \
_logging.get_rank_logger(__name__ + "." + self.__class__.__name__, comm=self.comm)
return self._rank_logger
@property
def default_timer(self):
"""
An :obj:`callable` used to measure benchmark duration, e.g. :func:`time.process_time`.
"""
return self._default_timer
@property
def comm(self):
"""
A :obj:`mpi4py.MPI.Comm` object, typically :attr:`mpi4py.MPI.COMM_WORLD`.
"""
return mpi.COMM_WORLD
@property
def root_rank(self):
"""
An :obj:`int` indicating the rank of the master MPI (root) process.
"""
return 0
@property
def is_root_rank(self):
"""
A :obj:`bool`, if :samp:`True` this is running on the :attr:`root_rank` MPI rank.
"""
return (self.comm.rank == self.root_rank)
@property
def do_quick_run(self):
"""
A :obj:`bool`, if :samp:`True`, performs a *quick* run. Each benchmark
is only executed once.
"""
return self._args.quick
@property
def do_profile(self):
"""
A :obj:`bool`, if :samp:`True`, performs a *profile* run in addition to the run.
"""
return self._args.profile
@property
def profile_file_name(self):
"""
A :obj:`str` indicating the file name in which profile info is saved.
"""
return self._args.profile_file
@property
def profile_stats(self):
"""
A :obj:`pstats.Stats` object in which profile info is accumulated.
"""
return self._profile_stats
[docs] def create_profile_stats(self, profile_file_name):
"""
Factory function for creating a :obj:`pstats.Stats` instance.
:type profile_file_name: :obj:`str`
:param profile_file_name: Name of file which contains profile data.
:rtype: :obj:`pstats.Stats`
:return: Stats object initialised with data from :samp:`{profile_file_name}`.
"""
if sys.version_info[0] <= 2:
from StringIO import StringIO
else:
from io import StringIO
profile_stats = pstats.Stats(profile_file_name, stream=StringIO())
return profile_stats
@property
def discover_only(self):
"""
A :obj:`bool`, if :samp:`True` only *discover* tests and do not run them.
"""
return self._args.discover
@property
def bench_results_file_name(self):
"""
A :obj:`str`, name of file where benchmark results are written.
"""
return self._args.results_file
@property
def bench_results(self):
"""
A :obj:`list` of benchmark results.
"""
return self._bench_results
@property
def benchmarks(self):
"""
The :obj:`list` of :obj:`Benchmark` objects.
"""
return self._benchmarks
@property
def benchmark_module_names(self):
"""
The :obj:`list` of module names which are searched to discover benchmarks.
"""
if self._bench_module_names is None:
self._bench_module_names = \
[
name for name in self._args.module_name
if not (os.path.split(name)[1].find("__main__") >= 0)
]
if len(self._bench_module_names) <= 0:
self._bench_module_names = ["mpi_array.benchmarks", ]
return self._bench_module_names
@property
def benchmarks_file_name(self):
"""
A :obj:`str`, name of file where benchmark results are written.
"""
return self._args.benchmarks_file
@property
def filter_name_regex(self):
"""
The :obj:`re.RegularExpression` used to filter the benchmarks by name.
"""
if self._filter_name_regex is None:
import re as _re
self._filter_name_regex = _re.compile(self._args.filter_regex)
return self._filter_name_regex
[docs] def filter(self, b):
"""
Returns :samp:`True` if the :obj:`Benchmark` :samp:`{b}` passes
the filtering criterion.
"""
return self.filter_name_regex.match(b.name) is not None
[docs] def create_argument_parser(self):
"""
Creates :obj:`argparse.ArgumentParser` for handling command line.
:rtype: :obj:`argparse.ArgumentParser`
:return: A :obj:`argparse.ArgumentParser` for parsing command line.
.. seealso:: :func:`create_runner_argument_parser`
"""
return create_runner_argument_parser()
[docs] def discover_benchmarks(self):
"""
Find benchmarks, store :obj:`Benchmark` objects in :attr:`benchmarks`.
"""
if self.is_root_rank:
self._benchmarks = []
for module_name in self.benchmark_module_names:
root, package = root_and_package_from_name(module_name)
self._benchmarks += [b for b in disc_benchmarks(root, package) if self.filter(b)]
else:
self._benchmarks = None
[docs] def handle_profile(self, profiler):
"""
"""
if profiler is not None:
profiler.dump_stats(self.profile_file_name)
if self._profile_stats is None:
self._profile_stats = self.create_profile_stats(self.profile_file_name)
else:
self._profile_stats.add(self.profile_file_name)
[docs] def run_benchmarks(self):
"""
Runs the benchmarks, results are stored in :attr:`bench_results`.
Assumes benchmarks have already been *discovered*.
"""
class QuickBenchmarkAttrs(object):
"""
Over-rides for :attr:`repeat` and :attr:`number` for *quick* benchmark run.
"""
repeat = 1
number = 1
self._bench_results = None
benchmarks = self.benchmarks
if benchmarks is not None:
benchmarks = sorted(benchmarks, key=lambda x: x.name)
benchmarks = self.comm.bcast(benchmarks, self.root_rank)
results = []
self.root_logger.info(
"Running %4d benchmarks, COMM_WORLD.size=%4d (benchmarks found in modules %s)...",
len(benchmarks) if benchmarks is not None else 0,
mpi.COMM_WORLD.size,
self.benchmark_module_names
)
self.root_logger.info(
"%68s,%16s,%16s,%6s,%8s, %s",
"benchmark.name",
"min sample (sec)",
"max sample (sec)",
"repeat",
"number",
"params"
)
if benchmarks is not None:
for benchmark in benchmarks:
if self.do_quick_run:
benchmark._attr_sources.insert(0, QuickBenchmarkAttrs)
benchmark.default_timer = self.default_timer
benchmark.comm = self.comm
benchmark.initialise()
if (benchmark.params is not None) and (len(benchmark.params) > 0):
param_iter = enumerate(itertools.product(*benchmark.params))
else:
param_iter = [(None, None)]
for param_idx, params in param_iter:
started_at = datetime.datetime.utcnow()
if param_idx is not None:
benchmark.set_param_idx(param_idx)
skip = False
if (
(not skip)
and
self.do_quick_run
and
(param_idx is not None)
and
(param_idx > 0)
):
skip = True
skip_reason = "quick run, only first param run."
else:
skip = benchmark.do_setup()
skip_reason = benchmark.setup_error
try:
if skip:
result = \
{
'samples': None,
'number': 0,
'wall_samples_pre_barrier': None,
'wall_samples_post_barrier': None
}
else:
result = benchmark.do_run()
if self.do_profile:
profiler = benchmark.do_profile_run()
self.handle_profile(profiler)
finally:
benchmark.do_teardown()
if not skip:
self.root_logger.info(
"%68s,%16.8f,%16.8f,%6d,%8d, %s",
benchmark.name,
min(result["samples"]),
max(result["samples"]),
benchmark.repeat,
result["number"],
[
("%s=%s" % (benchmark.param_names[i], params[i]))
for i in range(len(params))
] if params is not None else None
)
else:
self.root_logger.info(
"%68s,%49s, %s",
benchmark.name,
"skipped...%s" % skip_reason,
[
("%s=%s" % (benchmark.param_names[i], params[i]))
for i in range(len(params))
] if params is not None else None
)
result["skip_reason"] = None
if skip:
result["skip_reason"] = str(skip_reason)
result["walltime_finished_at"] = str(datetime.datetime.utcnow())
result["walltime_started_at"] = str(started_at)
result["comm_name"] = self.comm.Get_name()
result["comm_size"] = self.comm.size
result["bench_name"] = benchmark.name
result["params"] = \
{
benchmark.param_names[i]: repr(benchmark.current_params[i])
for i in range(len(benchmark.param_names))
}
results.append(result)
if self.root_rank == self.comm.rank:
self._bench_results = results
[docs] def run(self):
"""
Discover and run benchmarks.
"""
self.discover_benchmarks()
if not self.discover_only:
self.run_benchmarks()
[docs] def write_bench_results(self, bench_results, bench_results_file_name):
"""
Writes the benchmark results :samp:`{bench_results}` as :mod:`json`
string to file named :samp:`{bench_results_file_name}`.
"""
if (bench_results_file_name is not None) and (bench_results is not None):
with open(bench_results_file_name, "wt") as fd:
json.dump(bench_results, fd, indent=2, sort_keys=True)
[docs] def write_benchmarks(self, benchmarks, benchmarks_file_name):
"""
Writes individual :obj:`Benchmark` elements of :samp:`{benchmarks}`
as :mod:`json` string to file named :samp:`{benchmarks_file_name}`.
"""
if (benchmarks_file_name is not None) and (benchmarks is not None):
benchmark_dicts = \
[
dict(
(k, v) for (k, v) in benchmark.__dict__.items()
if isinstance(v, (str, int, float, list, dict, bool)) and not
k.startswith('_')
)
for benchmark in benchmarks
]
with open(benchmarks_file_name, "wt") as fd:
json.dump(benchmark_dicts, fd, indent=2, sort_keys=True)
[docs] def write_profile_stats(self, profile_stats, profile_file_name):
"""
Dump the profile stats from the :obj:`pstats.Stats` object to file.
:type profile_stats: :obj:`pstat.Stats`
:param profile_stats: The stats to dump to file.
:type profile_file_name: :obj:`str`
:param profile_file_name: The name of the file where stats are dumped
(overwritten if it exists).
.. seealso: :meth:`pstat.Stats.dump_stats`
"""
if (profile_stats is not None) and (profile_file_name is not None):
profile_stats.dump_stats(profile_file_name)
[docs] def log_profile_stats(self, profile_stats, sort_keys=('cumtime',)):
"""
Logs the output from :meth:`pstats.Stats.print_stats`.
:type profile_stats: :obj:`pstat.Stats`
:param profile_stats: The stats to dump to file.
"""
if profile_stats is not None:
profile_stats.sort_stats(*sort_keys)
profile_stats.print_stats()
s = profile_stats.stream.getvalue()
self.rank_logger.info(s)
[docs] def run_and_write_results(self):
"""
Discovers, runs and records benchmark results in files.
"""
self.run()
if self.bench_results is not None:
self.write_bench_results(self.bench_results, self.bench_results_file_name)
if self.benchmarks is not None:
self.write_benchmarks(self.benchmarks, self.benchmarks_file_name)
if self.do_profile:
self.write_profile_stats(self._profile_stats, self.profile_file_name)
self.log_profile_stats(self._profile_stats)
[docs]def run_main(argv):
"""
Runs the benchmarks.
:type argv: :obj:`list` of :obj:`str`
:param argv: The command line arguments (e.g. :samp:`sys.argv`).
.. seealso:: :obj:`BenchmarkRunner`
"""
runner = BenchmarkRunner(argv=argv)
runner.run_and_write_results()
__all__ = [s for s in dir() if not s.startswith('_')]