Source code for perfume.perfume

# -*- coding: utf-8 -*-

"""Main module."""

import collections
import time
import uuid

from bokeh import io as bi
from bokeh import models as bm
import bokeh.palettes
from bokeh import plotting as bp
from IPython import display as ipdisplay
import numpy as np
import pandas as pd

from perfume import analyze
from perfume import colors


[docs]class Timer(object): def __enter__(self): self._begin = time.perf_counter() return self def __exit__(self, *exc_info): self._end = time.perf_counter() @property def begin(self): return self._begin @property def end(self): return self._end
[docs] def elapsed_seconds(self): return self.end - self.begin
[docs] @classmethod def time(cls, fn, *args, **kwargs): with cls() as timer: fn(*args, **kwargs) return timer.elapsed_seconds()
[docs]class Display(object): def __init__(self, names, initial_size, width=900, height=480): # Call this once to raise an error early if necessary: self._colors = colors.colors(len(names)) self._start = time.perf_counter() self._initial_size = initial_size self._sources = collections.OrderedDict( [ ( name, { "hist": bm.ColumnDataSource( data={"top": [], "left": [], "right": []} ), "pdf": bm.ColumnDataSource(data={"x": [], "y": []}), "stddev": bm.ColumnDataSource( data={"base": [], "lower": [], "upper": []} ), "median": bm.ColumnDataSource(data={"x": [], "y": []}), }, ) for name in names ] ) self._width = width self._height = height self._plot = None self._elapsed_rendering_seconds = 0.0 self._describe_widget = ipdisplay.HTML("") self._display_id = str(uuid.uuid1())
[docs] def elapsed_rendering_ratio(self): elapsed = time.perf_counter() - self._start return self._elapsed_rendering_seconds / elapsed
[docs] def initialize_plot(self, title): with Timer() as timer: plot = bp.figure( title=title, plot_width=self._width, plot_height=self._height ) plot.xaxis.axis_label = "millis" plot.yaxis.visible = False _colors = iter(self._colors) for name, sources in self._sources.items(): color = next(_colors) plot.quad( top="top", bottom=0, left="left", right="right", source=sources["hist"], alpha=0.3, fill_color=color, line_color=color, ) plot.line( "x", "y", source=sources["pdf"], legend=name, alpha=0.5, line_color=color, line_width=4, ) stddev = bm.Whisker( base="base", lower="lower", upper="upper", source=sources["stddev"], dimension="width", line_alpha=0.7, line_color=color, line_width=2, ) for head in (stddev.lower_head, stddev.upper_head): head.line_color = color head.line_width = 2 head.line_alpha = 0.7 plot.add_layout(stddev) median = bm.Whisker( base="y", lower="x", upper="x", source=sources["median"], dimension="width", line_alpha=0.7, line_color=color, line_width=2, ) for head in (median.lower_head, median.upper_head): head.line_color = color head.line_width = 2 head.line_alpha = 0.7 plot.add_layout(median) self._elapsed_rendering_seconds -= timer.elapsed_seconds() return plot
@staticmethod def _ks_style(s): if np.isnan(s): return "visibility: hidden" else: thresholds = [1.22, 1.36, 1.48, 1.63, 1.73, 1.95] cs = list(reversed(bokeh.palettes.RdYlGn[len(thresholds) + 1])) color = cs[np.searchsorted(thresholds, s)] return "background-color: {}".format(color)
[docs] def update(self, samples): # If this is a module-level import, readthedocs fails because # this triggers an import of _tkinter, which isn't built in to # the python that they use. import seaborn as sns with Timer() as timer: timings = analyze.timings(samples) bucketed_timings = analyze.bucket_resample_timings(samples) for name, sources in self._sources.items(): array = timings[name].values hist, edges = np.histogram(array, density=True, bins="auto") x, y = sns.distributions._statsmodels_univariate_kde( array, "gau", "scott", 200, 3, (-np.inf, np.inf), cumulative=False, ) whisker_height = np.max(y) / 2 lower, median, upper = np.percentile(array, [25., 50., 75.]) sources["hist"].data = { "top": hist, "left": edges[:-1], "right": edges[1:] } sources["pdf"].data = {"x": x, "y": y} sources["stddev"].data = { "base": [whisker_height], "lower": [lower], "upper": [upper], } sources["median"].data = {"x": [median], "y": [whisker_height]} describe_html = ( timings.describe().style.set_precision(3).set_caption( "Descriptive Timing Statistics" ).render() ) if len(self._sources) > 1: ks_frame = analyze.ks_test(timings) ks_bk_frame = analyze.ks_test(bucketed_timings) ks_html = ( ks_frame.style.applymap(self._ks_style).set_precision( 3 ).set_caption( "K-S test" ).render() ) ks_bk_html = ( ks_bk_frame.style.applymap(self._ks_style).set_precision( 2 ).set_caption( "Bucketed K-S test" ).render() ) html = describe_html + ks_html + ks_bk_html self._describe_widget.data = html.replace( "table", 'table style="display:inline"' ) else: self._describe_widget.data = describe_html total_bench_time = timings[self._initial_size:].sum().sum() / 1000. elapsed = time.perf_counter() - self._start num_samples = len(timings.index) title = ( "{} samples, {:.2f} sec elapsed, {:.2f} samples/sec, " "{:.2f}% efficiency" ).format( num_samples, elapsed, (num_samples - self._initial_size) / elapsed, 100. * total_bench_time / elapsed ) if self._plot is None: self._plot = self.initialize_plot(title) bi.show(self._plot, notebook_handle=True) ipdisplay.display( self._describe_widget, display_id=self._display_id ) else: self._plot.title.text = title bi.push_notebook() ipdisplay.update_display( self._describe_widget, display_id=self._display_id ) self._elapsed_rendering_seconds += timer.elapsed_seconds()
def _flatten(l): return [n for sublist in l for n in sublist]
[docs]def bench(*fns, samples=None, efficiency=.9): """Benchmarks functions, displaying results in a Jupyter notebook. Runs ``fns`` repeatedly, collecting timing information, until :exc:`KeyboardInterrupt` is raised, at which point benchmarking stops and the results so far are returned. Parameters ---------- fns : list of callable A list of functions to benchmark and compare samples : pandas.DataFrame Optionally, pass the results of a previous call to :func:`.bench` to continue from its already collected data. efficiency : float Number between 0 and 1. Represents the target portion of time we aim to spend running the functions under test (so, we spend up to :math:`1 - efficiency` time analyzing and rendering plots). Returns ------- pandas.DataFrame A dataframe containing the results so far. The row index is just an autoincrement integer, and the column index is a :class:`~pandas.MultiIndex` where the first level is function name and the second level is ``begin`` or ``end``. """ if samples is None: sample_records = [] else: sample_records = [tuple(r) for r in samples.to_records(index=False)] names = [fn.__name__ for fn in fns] disp = Display(names, len(sample_records)) index = pd.MultiIndex( levels=[names, ("begin", "end")], labels=[ _flatten([(i, i) for i in range(len(names))]), [0, 1] * len(names) ], names=("function", "timing"), ) try: while True: sample = [] for fn in fns: with Timer() as timer: fn() sample.extend((timer.begin, timer.end)) sample_records.append(tuple(t * 1000 for t in sample)) if ( len(sample_records) > 10 and disp.elapsed_rendering_ratio() < (1. - efficiency) ): samples = pd.DataFrame.from_records( iter(sample_records), columns=index ) disp.update(samples) except KeyboardInterrupt: return pd.DataFrame.from_records(iter(sample_records), columns=index)