Низкая производительность для обработки сигналов


У меня есть программа делать много итераций (тысяч до нескольких миллионов до сотен миллионов). Это займет довольно много времени (несколько минут, до нескольких дней), и несмотря на все мои усилия, чтобы оптимизировать это, я все еще немного застрял.

Профиль: через cProfile через консоль вызвать

ncalls     tottime  percall  cumtime  percall filename:lineno(function)
    500/1    0.018    0.000  119.860  119.860 {built-in method builtins.exec}
        1    0.006    0.006  119.860  119.860 Simulations_profiling.py:6(<module>)
      6/3    0.802    0.134  108.302   36.101 Simulations_profiling.py:702(optimization)
    38147    0.581    0.000  103.411    0.003 Simulations_profiling.py:270(overlap_duo_combination)
   107691   28.300    0.000  102.504    0.001 Simulations_profiling.py:225(duo_overlap)
 12615015   69.616    0.000   69.616    0.000 {built-in method builtins.round}

Первый вопрос, Какие 2 первые строчки? Я предположил, что это была программа под названием.

Я собираюсь заменить round() метод сравнения допуска в моей if / else заявления, таким образом, избегая в этот раз расход топлива. Я хотел бы продолжить оптимизацию следующие 2 функции, но я не могу найти новый подход.

import itertools
import numpy as np

class Signal:
    def __init__(self, fq, t0, tf, width=0.3):
        self.fq = fq                                    # frequency in Hz
        self.width = float(width)                       # cathodic phase width in ms
        self.t0 = t0                                    # Instant of the first pulse in ms
        self.tf = tf                                    # End point in ms

        # List of instant at which a stim pulse is triggered in ms
        self.timeline = np.round(np.arange(t0, self.tf, 1/fq*1000), 3)

    def find_closest_t(self, t):
        val = min(self.timeline, key=lambda x:abs(x-t))
        id = np.where(self.timeline==val)[0][0]

        if val <= t or id == 0:
            return val, id
        else:
            return self.timeline[id-1], id-1

def duo_overlap(S1, S2, perc):

    pulse_id_t1, pulse_id_t2 = [], []

    start = max(S1.t0, S2.t0) - max(S1.width, S2.width)
    if start <= 0:
        start = 0
        start_id_S1 = 0
        start_id_S2 = 0
    else:
        start_id_S1 = S1.find_closest_t(start)[1]
        start_id_S2 = S2.find_closest_t(start)[1]

    stop = min(S1.tf, S2.tf) + max(S1.width, S2.width)

    for i in range(start_id_S1, len(S1.timeline)):
        if S1.timeline[i] > stop:
            break

        for j in range(start_id_S2, len(S2.timeline)):
            if S2.timeline[j] > stop:
                break

            d = round(abs(S2.timeline[j] - S1.timeline[i]), 3)  # Computation of the distance of the 2 point

            if S1.timeline[i] <= S2.timeline[j] and d < S1.width:
                pulse_id_t1.append(i)
                pulse_id_t2.append(j)
                continue

            elif S1.timeline[i] >= S2.timeline[j] and d < S2.width:
                pulse_id_t1.append(i)
                pulse_id_t2.append(j)
                continue

            else:
                continue

    return pulse_id_t1, pulse_id_t2

def overlap_duo_combination(signals, perc=0):

    overlap = dict()
    for i in range(len(signals)):
        overlap[i] = list()

    for subset in itertools.combinations(signals, 2):
        p1, p2 = duo_overlap(subset[0], subset[1], perc = perc)
        overlap[signals.index(subset[0])] += p1
        overlap[signals.index(subset[1])] += p2

    return overlap

Описание программы:

У меня есть прямоугольный сигнал ширины Signal.width и частоты Signal.fq начиная с Signal.t0 и заканчивая Signal.tf. Во время инициализации Signal например, timeline вычисляется: это 1Д-массив чисел с плавающей точкой, в которой each number corresponds to the instant at which a pulse is triggered.

Пример:

IN: Signal(50, 0, 250).timeline
OUT: array([  0.,  20.,  40.,  60.,  80., 100., 120., 140., 160., 180., 200., 220., 240.])

A pulse is triggered at t = 0, t = 20, t = 40, ... Each pulse has a width of 0.3.

duo_overlap() принимает 2 сигнала на входе (и процент, который мы будем держать на 0 в этом примере. Эта функция вычисляет идентификатор импульса на С1 и С2 (ID в массиве сроки), которые перекрываются друг с другом.

Пример:

Если импульс начинается в момент T = 0 на S1 и пульса начинается при t = 0.2 для С2, с 0.2 - 0 = 0.2 < 0.3 (С1.ширина), в 2 импульсы перекрываются.

Я пытался оптимизировать эту функцию зацикливания только на удостоверении личности, в котором они могут пересекаться (start_id, stop) вычисляется впереди.Но как вы можете видеть, эта функция по-прежнему очень трудоемкая из-за большого количества звонков.

Последняя функция, overlap_duo_combination() принимает N сигналов во входных данных как список (или кортеж / итерируемый) (2 <= Н <= 6 на практике) и создает dict() в котором ключом является идентификатор сигнала во входном списке, а значение-список перекрывающихся импульсов ИД (сравнение 2 по 2 сигналов во входном списке).

Пример:

Входной сигнал: сигналы = (С1, С2, С3) импульсный п°2 из С1 пересекаются с широтно н°3 С2 и импульсный П°3 С2 пересекаются с широтно н°5 С3.

Выход: дикт[0] = [2] / дикт[1] = [3, 3] / дикт[2] = [5]

В 3 выскакивает дважды S2, потому что это будет добавить первую плитку duo_overlap() называется на S1 и S2, а второй раз, когда он caleed на S2 и S3. Я не хочу, чтобы избежать дубликатов, так это информацию о том, сколько разных импульсов накладываются друг на друга (в данном случае 2 импульсов накладываются друг на друга с пульсом П°3 С2 в).

Бы у вас есть идея, предложение, код или что-то redue сложность этой части кода?

В настоящее время я рассматриваю внедрение это, так как у меня в распоряжении компании NVIDIA 1080 ти, но я не знаю языка Си. Может быть стоит переключиться на ГПУ это внутренняя функция, которая не займет много времени для выполнения, когда называют, но называют тысячи раз?

Спасибо, что прочитали такой длинный пост, и спасибо за помощь!

Редактировать

После замечания, реализацию и NumPy из duo_overlap:

def duo_overlap_np(S1, S2, perc):
    p1_overlapping = np.zeros(shape = (len(S1.timeline)))
    p2_overlapping = np.zeros(shape = (len(S2.timeline)))

    start = max(S1.t0, S2.t0) - max(S1.width, S2.width)
    if start <= 0:
        start = 0
        start_id_S1 = 0
        start_id_S2 = 0
    else:
        start_id_S1 = S1.find_closest_t(start)[1]
        start_id_S2 = S2.find_closest_t(start)[1]

    stop = min(S1.tf, S2.tf) + max(S1.width, S2.width)

    for i in range(start_id_S1, len(S1.timeline)):
        if S1.timeline[i] > stop:
            break

        for j in range(start_id_S2, len(S2.timeline)):
            if S2.timeline[j] > stop:
                break

            d = round(abs(S2.timeline[j] - S1.timeline[i]), 3)  # Computation of the distance of the 2 point

            if S1.timeline[i] <= S2.timeline[j] and d < S1.width:
                p1_overlapping[i] = 1
                p2_overlapping[j] = 1
                continue

            elif S1.timeline[i] >= S2.timeline[j] and d < S2.width:
                p1_overlapping[i] = 1
                p2_overlapping[j] = 1
                continue

            else:
                continue

    return list(np.nonzero(p1_overlapping)[0]), list(np.nonzero(p2_overlapping)[0])

Я вполне уверен, что я не получить это право, так как это не сильно улучшает ситуацию. На 50 звонков, я получаю следующие сроки в секунду:

 Old: 8.20273494720459
 New: 8.188030004501343

EDIT2: в отношении ответа, правильный linspace() это:

import numpy as np

t0 = 0
tf = 300
fq = 60

t1 = np.round(np.arange(t0, tf, 1/fq*1000), 3)
t2 = np.round(np.linspace(t0, int(tf-(1/fq*1000)), int((tf-t0)*fq/1000)), 1)

И для find_closest_t:

def find_closest_t(timeline, t):
    id = np.searchsorted(timeline, t)
    if id == 0:
        return timeline[0], id
    else:
        return timeline[id-1], id-1


Комментарии
2 ответа

У меня есть только несколько незначительных вещей, чтобы предложить

Сделав несколько быстрых испытаний, НП.linspace значительно быстрее (около 60х), чем НП.создать

self.timeline = np.round(np.linspace(t0, self.tf, fq/1000), 3)

для find_closest_t, использовать тот факт, что личность.временная шкала находится в отсортированном порядке, так и NumPy имеет функцию для этого: https://docs.scipy.org/doc/numpy-1.14.0/reference/generated/numpy.searchsorted.html это позволит произвести поиск займет o(журнал(N)) вместо o(n), где N-это размер шкалы. Вы можете использовать его, чтобы найти наибольшее значение, которое меньше чем или равно T, а затем просто проверяем следующее значение найти наименьшее, что больше, чем T. Наиболее близким к т-одна из Т

ind1 = np.searchsorted(self.timeline, t)
val1 = np.abs(self.timeline[ind1]-t)
ind2 = ind1+1
val2 = np.abs(self.timeline[ind2]-t)
ind, val = ind1, val1 if val1 < val2 else ind2, val2

Вы должны были бы добавить условную, чтобы проверить, чтобы убедиться, что ind1 не последний индекс, или после нее (в обоих случаях, последний показатель должен быть Вал и IND)

если он не в отсортированном порядке, и NumPy функции оптимизированы, чтобы быть быстрее на массивах, чем те, которые предназначены для любого итератора:

val = np.min(np.abs(self.timeline-t))

не точно такой же, как ваши расчеты Вэл, но быстрее

Для петель в duo_overlap_np, вместо того, чтобы делать С1.хронология[я] > стоп, я бы предложил:

s1_high = np.searchsorted(S1.timeline, stop, side='right')
s2_high = np.searchsorted(S2.timeline, stop, side='right')
for i in range(start_id_S1, s1_high):
for j in range(start_id_S2, s2_high):

Я не думаю, что избавившись от булевых проверить бы сохранить все, что много раз, хотя поскольку для С2 проверить вы используете его снова и снова в петлю, он мог сэкономить немного времени

наконец, по возвращении, преобразования массивов numpy в списки также занимает немного времени вычислений. Не уверен, что простым способом можно было бы изменить, что в этом случае, хотя

4
ответ дан 20 марта 2018 в 11:03 Источник Поделиться

Стиль кода


  • вместо перебора индексов такой for j in range(start_id_S2, len(S2.timeline)): вы можете использовать enumerate как это: for i, s1 in enumerate(S1.timeline[start_id_S1:], start_id_S1):

  • нет необходимости для всех continue заявления и окончательной else статья в последнем if-часть

производительности

Вы также можете использовать тот факт, что импульсы для того, чтобы перебрать сигналов одновременно. Таким образом, вы не будете иметь П! сравнение, где вы перебрать все дерево

Для этого я сделал небольшое изменение в ваш Signal класс:

class Signal:
def __init__(self, freq, start, end, width=0.3):
self.freq = freq # frequency in Hz
self.width = float(width) # cathodic phase width in ms
self.start = start # Instant of the first pulse in ms
self.end = end # End point in ms

# List of instant at which a stim pulse is triggered in ms
self.timeline = np.round(np.arange(start, end, 1000/freq), 3)
self.pulses = np.stack((self.timeline, self.timeline + self.width), axis=1)

def find_closest_t(self, t):
val = min(self.timeline, key=lambda x:abs(x-t))
id = np.where(self.timeline==val)[0][0]

if val <= t or id == 0:
return val, id
else:
return self.timeline[id-1], id-1

def find_closest_t_np(self, t):
idx = max(np.searchsorted(self.timeline, t) - 1, 0)
return idx

def __iter__(self):
return iter(self.pulses)
# or yield from map(tuple, self.pulses) # if you need tuples

def __bool__(self):
return bool(self.timeline.size)

Чтобы перебрать все сигналы, в то же время, мы,


  1. собрать словарь с сигналами и их первые импульсы.

  2. искать совпадения импульсов

  3. если это новая совпадения, выход его

  4. заранее итерации по сигналу, где конец импульса первично

  5. если этот итератор будет исчерпан, снять сигнал с дикт с сигналами

код:

from collections import namedtuple

def duo_overlap_iter(signals, perc=0):
pulse = namedtuple('Pulse', 'name iter index start end ')
iters = ((i, iter(signal)) for i, signal in enumerate(signals) if signal)
iters = {name: pulse(name, it, 0, *next(it)) for name, it in iters}
seen = set()

while iters:
for overlap in find_overlap(iters.values()):
if overlap not in seen:
yield overlap
seen.add(overlap)
try:
p0 = min(iters.values(), key=lambda x: (x.end, -x.start))
iters[p0.name] = pulse(p0.name, p0.iter, p0.index + 1, *next(p0.iter))
except StopIteration:
del iters[p0.name]

Чтобы найти накладок, мы используем itertools.combinations. Мы отдаем перекрытием как frozenset С именем и индексом соответствующего сигнала

def find_overlap(pulses):
for p0, p1 in combinations(pulses, 2):
p = frozenset(((p0.name, p0.index), (p1.name, p1.index)))
if p1.start <= p0.end and p0.start <= p1.end:
yield p

Образец Данных

S0 = Signal(20 , 100, 0,)  # empty
S1 = Signal(50, 0, 250)
S2 = Signal(30, 10, 300, 2)
S3 = Signal(20, -10, 280, 2)
signals = S0, S1, S2, S3

результат выборки

list(duo_overlap_iter(signals))


[frozenset({(1, 2), (3, 1)}), frozenset({(3, 3), (1, 7)}), frozenset({(1, 12), (3, 5)})]

Окончательные результаты

Чтобы получить результаты в том, что ваш код представляет его, вы можете сделать что-то вроде этого:

def overlap_duo_comb_iter(signals, perc=0):

overlap = {i: [] for i, _ in enumerate(signals)}

for (s0, i0), (s1, i1) in duo_overlap_iter(signals):
overlap[s0].append(i0)
overlap[s1].append(i1)

return overlap

код библиотеки numpy

Во время моего пересмотра сроков, я заметил отредактированного кода с использованием библиотеки numpy, вы еще не используете то сигналы сортируются. Вы перебирать весь таймлайн вместо того, чтобы остановиться после 2-го сигнала не может больше пересекаться.

def duo_overlap_np(S1, S2, perc):
p1_overlapping = np.zeros_like(S1.timeline)
p2_overlapping = np.zeros_like(S2.timeline)

start = max(S1.start, S2.start)
start_id_S1 = S1.find_closest_t_np(start)

stop = min(S1.pulses[-1][1], S2.pulses[-1][1])
for i, (s1, s1_end) in enumerate(S1.pulses[start_id_S1:], start_id_S1):
if s1 > stop:
break

start_id_S2 = S2.find_closest_t_np(s1)
for j, (s2, s2_end) in enumerate(S2.pulses[start_id_S2:], start_id_S2):
if s2 > s1_end:
break
if s1 > s2_end:
continue
p1_overlapping[i] = 1
p2_overlapping[j] = 1

return list(np.nonzero(p1_overlapping)[0]), list(np.nonzero(p2_overlapping)[0])

Тайминги

print(overlap_duo_combination(signals))
% timeit overlap_duo_combination(signals)


{0: [], 1: [2, 7, 12], 2: [], 3: [1, 3, 5]}
1.33 ms ± 72.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

print(overlap_duo_combination(signals, func=duo_overlap_np))
assert overlap_duo_combination(signals) == overlap_duo_combination(signals, func=duo_overlap_np)
% timeit overlap_duo_combination(signals, func=duo_overlap_np)


{0: [], 1: [2, 7, 12], 2: [], 3: [1, 3, 5]}
267 µs ± 4.75 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

print(overlap_duo_comb_iter(signals))
assert overlap_duo_combination(signals) == overlap_duo_comb_iter(signals,)
% timeit overlap_duo_comb_iter(signals)


{0: [], 1: [2, 7, 12], 2: [], 3: [1, 3, 5]}
600 µs ± 12.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

print(list(duo_overlap_iter(signals)))
% timeit list(duo_overlap_iter(signals))


[frozenset({(1, 2), (3, 1)}), frozenset({(3, 3), (1, 7)}), frozenset({(1, 12), (3, 5)})]
605 µs ± 33.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

Код можно найти на гитхабе

Заключение

Адаптированный код и NumPy голодном для этого набора сигналов, но работает только с фиксированной длиной импульса. Поэтому, если ваши импульсы фиксированной длины, используйте numpy код, если вы просто получите импульсов, с начала и конца, мой код может обрабатывать эти тоже

1
ответ дан 21 марта 2018 в 12:03 Источник Поделиться