From 74b4e15b12edff1d70d2aeaefe5ef522b0bee73a Mon Sep 17 00:00:00 2001 From: Matthias Diener Date: Thu, 17 Mar 2022 22:56:46 -0500 Subject: [PATCH] add a wrapper to profile array contexts --- examples/wave/wave-op-mpi.py | 2 +- grudge/array_context.py | 444 ++++++++++++++++++++++++++++++++++- 2 files changed, 444 insertions(+), 2 deletions(-) diff --git a/examples/wave/wave-op-mpi.py b/examples/wave/wave-op-mpi.py index ab6c41e5e..8e4b877e5 100644 --- a/examples/wave/wave-op-mpi.py +++ b/examples/wave/wave-op-mpi.py @@ -180,7 +180,7 @@ def bump(actx, dcoll, t=0): def main(ctx_factory, dim=2, order=3, visualize=False, lazy=False, use_quad=False, use_nonaffine_mesh=False): cl_ctx = ctx_factory() - queue = cl.CommandQueue(cl_ctx) + queue = cl.CommandQueue(cl_ctx, properties=cl.command_queue_properties.PROFILING_ENABLE) comm = MPI.COMM_WORLD num_parts = comm.Get_size() diff --git a/grudge/array_context.py b/grudge/array_context.py index 4e1ac8382..8129f047a 100644 --- a/grudge/array_context.py +++ b/grudge/array_context.py @@ -103,6 +103,448 @@ class PytatoPyOpenCLArrayContext(_PytatoPyOpenCLArrayContextBase): # }}} +# {{{ Profiling + +import pyopencl as cl +import loopy as lp +import pytools +from pytools.py_codegen import PythonFunctionGenerator +import numpy as np + +class StatisticsAccumulator: + """Class that provides statistical functions for multiple values. + + .. automethod:: __init__ + .. automethod:: add_value + .. automethod:: sum + .. automethod:: mean + .. automethod:: max + .. automethod:: min + .. autoattribute:: num_values + """ + + def __init__(self, scale_factor: float = 1) -> None: + """Initialize an empty StatisticsAccumulator object. + + Parameters + ---------- + scale_factor + Scale returned statistics by this factor. + """ + # Number of values stored in the StatisticsAccumulator + self.num_values: int = 0 + + self._sum = 0 + self._min = None + self._max = None + self.scale_factor = scale_factor + + def add_value(self, v: float) -> None: + """Add a new value to the statistics.""" + if v is None: + return + self.num_values += 1 + self._sum += v + if self._min is None or v < self._min: + self._min = v + if self._max is None or v > self._max: + self._max = v + + def sum(self) -> Optional[float]: + """Return the sum of added values.""" + if self.num_values == 0: + return None + + return self._sum * self.scale_factor + + def mean(self) -> Optional[float]: + """Return the mean of added values.""" + if self.num_values == 0: + return None + + return self._sum / self.num_values * self.scale_factor + + def max(self) -> Optional[float]: + """Return the max of added values.""" + if self.num_values == 0: + return None + + return self._max * self.scale_factor + + def min(self) -> Optional[float]: + """Return the min of added values.""" + if self.num_values == 0: + return None + + return self._min * self.scale_factor + + +@dataclass +class SingleCallKernelProfile: + """Class to hold the results of a single kernel execution.""" + + time: int + flops: int + bytes_accessed: int + footprint_bytes: int + + +@dataclass +class MultiCallKernelProfile: + """Class to hold the results of multiple kernel executions.""" + + num_calls: int + time: StatisticsAccumulator + flops: StatisticsAccumulator + bytes_accessed: StatisticsAccumulator + footprint_bytes: StatisticsAccumulator + + +@dataclass +class ProfileEvent: + """Holds a profile event that has not been collected by the profiler yet.""" + + cl_event: cl._cl.Event + translation_unit: lp.TranslationUnit + args_tuple: tuple + + +class ProfilingActx: + """An array context that profiles OpenCL kernel executions. + + .. automethod:: tabulate_profiling_data + .. automethod:: call_loopy + .. automethod:: get_profiling_data_for_kernel + .. automethod:: reset_profiling_data_for_kernel + + Inherits from :class:`arraycontext.PyOpenCLArrayContext`. + + .. note:: + + Profiling of :mod:`pyopencl` kernels (that is, kernels that do not get + called through :meth:`call_loopy`) is restricted to a single instance of + this class. If there are multiple instances, only the first one created + will be able to profile these kernels. + """ + + def __init__(self, queue, allocator=None, logmgr: Any = None, **kwargs) -> None: + super().__init__(queue, allocator) + + if not queue.properties & cl.command_queue_properties.PROFILING_ENABLE: + raise RuntimeError("Profiling was not enabled in the command queue. " + "Please create the queue with " + "cl.command_queue_properties.PROFILING_ENABLE.") + + # list of ProfileEvents that haven't been transferred to profiled results yet + self.profile_events = [] + + # dict of kernel name -> SingleCallKernelProfile results + self.profile_results = {} + + # dict of (Kernel, args_tuple) -> calculated number of flops, bytes + self.kernel_stats = {} + self.logmgr = logmgr + + # Only store the first kernel exec hook for elwise kernels + # if cl.array.ARRAY_KERNEL_EXEC_HOOK is None: + # cl.array.ARRAY_KERNEL_EXEC_HOOK = self.array_kernel_exec_hook + + def clone(self): + """Return a semantically equivalent but distinct version of *self*.""" + from warnings import warn + warn("Cloned PyOpenCLProfilingArrayContexts can not " + "profile elementwise PyOpenCL kernels.") + return type(self)(self.queue, self.allocator, self.logmgr) + + def __del__(self): + """Release resources and undo monkey patching.""" + del self.profile_events[:] + self.profile_results.clear() + self.kernel_stats.clear() + + def array_kernel_exec_hook(self, knl, queue, gs, ls, *actual_args, wait_for): + """Extract data from the elementwise array kernel.""" + evt = knl(queue, gs, ls, *actual_args, wait_for=wait_for) + + name = knl.function_name + + args_tuple = tuple( + (arg.size) + for arg in actual_args if isinstance(arg, cl.array.Array)) + + try: + self.kernel_stats[knl][args_tuple] + except KeyError: + nbytes = 0 + nops = 0 + for arg in actual_args: + if isinstance(arg, cl.array.Array): + nbytes += arg.size * arg.dtype.itemsize + nops += arg.size + res = SingleCallKernelProfile(time=0, flops=nops, bytes_accessed=nbytes, + footprint_bytes=nbytes) + self.kernel_stats.setdefault(knl, {})[args_tuple] = res + + if self.logmgr and f"{name}_time" not in self.logmgr.quantity_data: + self.logmgr.add_quantity(KernelProfile(self, name)) + + self.profile_events.append(ProfileEvent(evt, knl, args_tuple)) + + return evt + + def _wait_and_transfer_profile_events(self) -> None: + # First, wait for completion of all events + if self.profile_events: + cl.wait_for_events([pevt.cl_event for pevt in self.profile_events]) + + # Then, collect all events and store them + for t in self.profile_events: + t_unit = t.translation_unit + if isinstance(t_unit, lp.TranslationUnit): + name = t_unit.default_entrypoint.name + else: + # It's actually a cl.Kernel + name = t_unit.function_name + + r = self._get_kernel_stats(t_unit, t.args_tuple) + time = t.cl_event.profile.end - t.cl_event.profile.start + + new = SingleCallKernelProfile(time, r.flops, r.bytes_accessed, + r.footprint_bytes) + + self.profile_results.setdefault(name, []).append(new) + + self.profile_events = [] + + def get_profiling_data_for_kernel(self, kernel_name: str) \ + -> MultiCallKernelProfile: + """Return profiling data for kernel `kernel_name`.""" + self._wait_and_transfer_profile_events() + + time = StatisticsAccumulator(scale_factor=1e-9) + gflops = StatisticsAccumulator(scale_factor=1e-9) + gbytes_accessed = StatisticsAccumulator(scale_factor=1e-9) + fprint_gbytes = StatisticsAccumulator(scale_factor=1e-9) + num_calls = 0 + + if kernel_name in self.profile_results: + knl_results = self.profile_results[kernel_name] + + num_calls = len(knl_results) + + for r in knl_results: + time.add_value(r.time) + gflops.add_value(r.flops) + gbytes_accessed.add_value(r.bytes_accessed) + fprint_gbytes.add_value(r.footprint_bytes) + + return MultiCallKernelProfile(num_calls, time, gflops, gbytes_accessed, + fprint_gbytes) + + def reset_profiling_data_for_kernel(self, kernel_name: str) -> None: + """Reset profiling data for kernel `kernel_name`.""" + self.profile_results.pop(kernel_name, None) + + def tabulate_profiling_data(self) -> pytools.Table: + """Return a :class:`pytools.Table` with the profiling results.""" + self._wait_and_transfer_profile_events() + + tbl = pytools.Table() + + # Table header + tbl.add_row(["Function", "Calls", + "Time_sum [s]", "Time_min [s]", "Time_avg [s]", "Time_max [s]", + "GFlops/s_min", "GFlops/s_avg", "GFlops/s_max", + "BWAcc_min [GByte/s]", "BWAcc_mean [GByte/s]", "BWAcc_max [GByte/s]", + "BWFoot_min [GByte/s]", "BWFoot_mean [GByte/s]", "BWFoot_max [GByte/s]", + "Intensity (flops/byte)"]) + + # Precision of results + g = ".4g" + + total_calls = 0 + total_time = 0 + + for knl in self.profile_results.keys(): + r = self.get_profiling_data_for_kernel(knl) + + # Extra statistics that are derived from the main values returned by + # self.get_profiling_data_for_kernel(). These are already GFlops/s and + # GBytes/s respectively, so no need to scale them. + flops_per_sec = StatisticsAccumulator() + bandwidth_access = StatisticsAccumulator() + + knl_results = self.profile_results[knl] + for knl_res in knl_results: + flops_per_sec.add_value(knl_res.flops/knl_res.time) + bandwidth_access.add_value(knl_res.bytes_accessed/knl_res.time) + + total_calls += r.num_calls + + total_time += r.time.sum() + + time_sum = f"{r.time.sum():{g}}" + time_min = f"{r.time.min():{g}}" + time_avg = f"{r.time.mean():{g}}" + time_max = f"{r.time.max():{g}}" + + if r.footprint_bytes.sum() is not None: + fprint_mean = f"{r.footprint_bytes.mean():{g}}" + fprint_min = f"{r.footprint_bytes.min():{g}}" + fprint_max = f"{r.footprint_bytes.max():{g}}" + else: + fprint_mean = "--" + fprint_min = "--" + fprint_max = "--" + + if r.flops.sum() > 0: + bytes_per_flop_mean = f"{r.bytes_accessed.sum() / r.flops.sum():{g}}" + flops_per_sec_min = f"{flops_per_sec.min():{g}}" + flops_per_sec_mean = f"{flops_per_sec.mean():{g}}" + flops_per_sec_max = f"{flops_per_sec.max():{g}}" + else: + bytes_per_flop_mean = "--" + flops_per_sec_min = "--" + flops_per_sec_mean = "--" + flops_per_sec_max = "--" + + bandwidth_access_min = f"{bandwidth_access.min():{g}}" + bandwidth_access_mean = f"{bandwidth_access.sum():{g}}" + bandwidth_access_max = f"{bandwidth_access.max():{g}}" + + tbl.add_row([knl, r.num_calls, time_sum, + time_min, time_avg, time_max, + flops_per_sec_min, flops_per_sec_mean, flops_per_sec_max, + bandwidth_access_min, bandwidth_access_mean, bandwidth_access_max, + fprint_min, fprint_mean, fprint_max, + bytes_per_flop_mean]) + + tbl.add_row(["Total", total_calls, f"{total_time:{g}}"] + ["--"] * 13) + + return tbl + + def _get_kernel_stats(self, t_unit: lp.TranslationUnit, args_tuple: tuple) \ + -> SingleCallKernelProfile: + return self.kernel_stats[t_unit][args_tuple] + + def _cache_kernel_stats(self, t_unit: lp.TranslationUnit, kwargs: dict) \ + -> tuple: + """Generate the kernel stats for a program with its args.""" + args_tuple = tuple( + (key, value.shape) if hasattr(value, "shape") else (key, value) + for key, value in kwargs.items()) + + # Are kernel stats already in the cache? + try: + self.kernel_stats[t_unit][args_tuple] + return args_tuple + except KeyError: + # If not, calculate and cache the stats + ep_name = t_unit.default_entrypoint.name + executor = t_unit.target.get_kernel_executor(t_unit, self.queue, + entrypoint=ep_name) + info = executor.translation_unit_info( + ep_name, executor.arg_to_dtype_set(kwargs)) + + typed_t_unit = executor.get_typed_and_scheduled_translation_unit( + ep_name, executor.arg_to_dtype_set(kwargs)) + kernel = typed_t_unit[ep_name] + + idi = info.implemented_data_info + + param_dict = kwargs.copy() + param_dict.update({k: None for k in kernel.arg_dict.keys() + if k not in param_dict}) + + param_dict.update( + {d.name: None for d in idi if d.name not in param_dict}) + + # Generate the wrapper code + wrapper = executor.get_wrapper_generator() + + gen = PythonFunctionGenerator("_mcom_gen_args_profile", list(param_dict)) + + wrapper.generate_integer_arg_finding_from_shapes(gen, kernel, idi) + wrapper.generate_integer_arg_finding_from_offsets(gen, kernel, idi) + wrapper.generate_integer_arg_finding_from_strides(gen, kernel, idi) + + param_names = kernel.all_params() + gen("return {%s}" % ", ".join( + f"{repr(name)}: {name}" for name in param_names)) + + # Run the wrapper code, save argument values in domain_params + domain_params = gen.get_picklable_function()(**param_dict) + + # Get flops/memory statistics + op_map = lp.get_op_map(typed_t_unit, subgroup_size="guess") + # bytes_accessed = lp.get_mem_access_map( + # typed_t_unit, subgroup_size="guess") \ + # .to_bytes().eval_and_sum(domain_params) + + bytes_accessed = 0 + + flops = op_map.filter_by(dtype=[np.float32, np.float64]).eval_and_sum( + domain_params) + + # Footprint gathering is not yet available in loopy with + # kernel callables: + # https://github.com/inducer/loopy/issues/399 + if 0: + try: + footprint = lp.gather_access_footprint_bytes(typed_t_unit) + footprint_bytes = sum(footprint[k].eval_with_dict(domain_params) + for k in footprint) + + except lp.symbolic.UnableToDetermineAccessRange: + footprint_bytes = None + else: + footprint_bytes = None + + res = SingleCallKernelProfile( + time=0, flops=flops, bytes_accessed=bytes_accessed, + footprint_bytes=footprint_bytes) + + self.kernel_stats.setdefault(t_unit, {})[args_tuple] = res + + if self.logmgr: + if f"{ep_name}_time" not in self.logmgr.quantity_data: + self.logmgr.add_quantity(KernelProfile(self, ep_name)) + + return args_tuple + + def call_loopy(self, t_unit, **kwargs) -> dict: + """Execute the loopy kernel and profile it.""" + try: + t_unit = self._loopy_transform_cache[t_unit] + except KeyError: + orig_t_unit = t_unit + t_unit = self.transform_loopy_program(t_unit) + self._loopy_transform_cache[orig_t_unit] = t_unit + del orig_t_unit + + evt, result = t_unit(self.queue, **kwargs, allocator=self.allocator) + + if self._wait_event_queue_length is not False: + prg_name = t_unit.default_entrypoint.name + wait_event_queue = self._kernel_name_to_wait_event_queue.setdefault( + prg_name, []) + + wait_event_queue.append(evt) + if len(wait_event_queue) > self._wait_event_queue_length: + wait_event_queue.pop(0).wait() + + # Generate the stats here so we don't need to carry around the kwargs + args_tuple = self._cache_kernel_stats(t_unit, kwargs) + + self.profile_events.append(ProfileEvent(evt, t_unit, args_tuple)) + + return result + + + +# }}} + + class MPIBasedArrayContext: mpi_communicator: "MPI.Comm" @@ -246,7 +688,7 @@ def clone(self): # {{{ distributed + pyopencl -class MPIPyOpenCLArrayContext(PyOpenCLArrayContext, MPIBasedArrayContext): +class MPIPyOpenCLArrayContext(ProfilingActx, PyOpenCLArrayContext, MPIBasedArrayContext): """An array context for using distributed computation with :mod:`pyopencl` eager evaluation.