Обработка больших данных временных рядов


Я в настоящее время обработки некоторых крупных временных рядов данных с пандами, и у меня есть функция, которая невыносимо медленно, и я уверен, что это можно сделать быстрее.

Проблема: я учусь на фабрику, которая производит вещи. Она работает непрерывно всю неделю, но по выходным он выключается. До конца недели, и в начале нового, завод ведет себя по-разному, что мешает анализу, что я делаю, и поэтому я хочу, чтобы отфильтровать окно времени по этим выходным.

У меня есть большие таблицы данных, назовем его df, чьи строки являются изделия, произведенные и столбцы являются их различными атрибутами, одним из которых является время, в которое он был произведен, df['timeProduced']. Эти изделия изготавливаются на нерегулярно расположенные точки во времени.Я хочу отбросить строк в таблице, timeProduced вход был рядом с одним из этих периодов выключения. Фактические данные, являются конфиденциальными, но это выглядит примерно так:

index   partId        colour   timeProduced  \ ...
1       '026531|352'  Red    2017-02-01 00:00:02   
2       '026531|353'  Blue   2017-02-01 00:00:03   
3       '026531|354'  Blue   2017-02-01 00:00:05   
4       '026531|355'  Green  2017-02-01 00:00:09   

Это занимает десятки минут, чтобы хруст через миллион записей. Я знаю, что это медленно, потому что он вовсе не векторизовать, но я не уверен, как это сделать чисто и NumPy векторизация реализации/панд. Любые идеи?

def dropIrregularGaps(series, gapLength, runIn):
    '''
    Designed for time-series data where there is points sampled at irregular time intervals.
    Detects adjacent points that are sampled too far apart, and then removes points on either 
    side of the gap which are within a defined runIn period.
    Assumes timeseries data is already sorted. If not, will deliver garbage. 
    series is a pandas series object, with values as pandas DateTime objects.
    gapLength is the amount of time that is considered to be a shutdown
    runIn is the length of time to remove on either side of the gap.
    returns a list of indices that are valid
    '''
    samples = list(series)
    indices = list(series.index)

    prev = samples[0]
    ind = indices[0]
    allGoodIndices = []
    currentGoodIndices = [ind]
    currentGoodTimes = [prev]
    skipPoint = None

    for new, ind in zip(samples[1:], indices[1:]):
        if skipPoint:
            if new - skipPoint >= runIn:
                # if a gap has been detected, skip over all points until the current
                # point is past the run-in period.
                skipPoint = None
                currentGoodIndices = [ind]
                currentGoodTimes = [new]

        elif new - prev > gapLength:
            # if a gap is detected. cut out the cooldown period from the list,
            # and add what remains to the list of goodIndices.
            endPoint = currentGoodTimes[-1]
            while currentGoodTimes and (endPoint - currentGoodTimes[-1] < runIn):
                del (currentGoodTimes[-1])
                del (currentGoodIndices[-1])

            allGoodIndices += currentGoodIndices
            currentGoodIndices = []
            currentGoodTimes = []
            skipPoint = new
        else:
            currentGoodIndices += [ind]
            currentGoodTimes += [new]

        prev = new

    allGoodIndices += currentGoodIndices

    return allGoodIndices

Я пользоваться этой функцией, взяв мои таблицы данных, и работает:

result = dropIrregularGaps(df['timeProduced'],pd.Timedelta('4 hours'), pd.Timedelta('8 hours 0 minutes'))

Я затем использовать результат для индекса в таблице данных, давая мне таблицы данных без запуска/периода восстановления.

df = df.loc[result]


507
3
задан 9 апреля 2018 в 04:04 Источник Поделиться
Комментарии
1 ответ

Некоторые общие советы

функции

разделить работу в функции, так что вы можете проверить каждую часть по отдельности, каждый делает определенную работу, которая может быть проверена индивидуально

Пеп-8

Попробуйте следовать рекомендациям

Мой алгоритм

Если ваши панды версия >= 0.20, вы можете использовать pandas.merge_asof Если у вас есть ряд с конца и начала рабочей недели

фиктивные данные

np.random.seed(1)
gap_max, run_in = 3, 2
indices = [0, 1, 2, 3, 7, 8, 9, 13, 15, 16, 17, 18]
values = np.random.random(size = len(indices))
data = pd.DataFrame({'time': time, 'values': values})


     time  values
0 0 0.417022004702574
1 1 0.7203244934421581
2 2 0.00011437481734488664
3 3 0.30233257263183977
4 7 0.14675589081711304
5 8 0.0923385947687978
6 9 0.1862602113776709
7 13 0.34556072704304774
8 15 0.39676747423066994
9 16 0.538816734003357
10 17 0.4191945144032948
11 18 0.6852195003967595

так что для этих данных, мы ожидаем, что значения 1, 2, 3, 7, 8, 9, 13, 15 за

Выявление пробелов

Этот разрыв может быть найден с помощью DataFrame.shift.

def find_weekend(times, gap_max):
gap = times - times.shift(1) > gap_max
week_start = times[gap]
weekend_start = times[gap.shift(-1).fillna(False)]
return weekend_start, week_start
find_weekend(data['time'], gap_max)


 3    3
6 9
Name: index, dtype: int64,
4 7
7 13
Name: index, dtype: int64

Обозначение начала данных как начало недели может быть сделано путем добавления gap.iloc[0] = True в качестве 2-й линии. Обозначение конца данных, а также в конце недели может быть сделано путем изменения в .fillna(True)

слияние с данными

Так merge_asof ожидает DataFrameS, то сначала надо сделать некоторые преобразования

def drop_irregular_gaps(data, gap_max, run_in):
weekend_start, week_start = find_weekend(data[time_label], gap_max)
df_week_end = weekend_start.to_frame(name=time_label).assign(run_out=True)
df_week_start = week_start.to_frame(name=time_label).assign(run_in=True)
df_data = data[[time_label]]

Затем мы можем использовать 2 сливает, один вперед, чтобы отметить конец недели, один назад, чтобы отметить начало недели

    before_weekend = pd.merge_asof(
df_data, df_week_end,
on=time_label, direction='forward', tolerance=run_in,
).set_index(time_label)['run_out'].fillna(False).values
after_weekend = pd.merge_asof(
df_data, df_week_start,
on=time_label, direction='backward', tolerance=run_in,
).set_index(time_label)['run_in'].fillna(False).values

Эти 2 массива с True как значение, если они находятся в run_in или run_out период


array([False,  True,  True,  True,  True,  True,  True, False, False,
False, False, False], dtype=bool),
array([False, False, False, False, True, True, True, True, True,
False, False, False], dtype=bool)

Далее мы просто нам or и not для логического индексирования

    to_drop = before_weekend | after_weekend
return data[~to_drop]

drop_irregular_gaps(data, gap_max, run_in)


      time    values
0 0 0.417022004702574
9 16 0.538816734003357
10 17 0.4191945144032948
11 18 0.6852195003967595

Это может быть легко адаптирована к 2 отдельные значения run_in

Данных datetime

Алгоритм должен быть агностиком о том, time_label данные-это числовые или datetime. Я проверил этот алгоритм также работает с фиктивными данными

data_start = pd.Timestamp('20180101')
time = data_start + pd.to_timedelta([0, 1, 2, 3, 7, 8, 9, 13, 15, 16, 17, 18], unit='day')
gap_max, run_in = pd.to_timedelta(3, unit='day'), pd.to_timedelta(2, unit='day')
values = np.random.random(size = len(indices))
data = pd.DataFrame({'time': time, 'values': values})
drop_irregular_gaps(data, gap_max, run_in)


      time        values
0 2018-01-01 0.417022004702574
9 2018-01-17 0.538816734003357
10 2018-01-18 0.4191945144032948
11 2018-01-19 0.6852195003967595

альтернатива без merge_asof

Так видимо merge_asof не работает так хорошо с повторяющимися данными, вот вариант с петлей. Если есть много выходных, это может быть медленнее, но я думаю, это все равно будет быстрее, чем исходный код

def mark_runin(time, week_endpoints, run_in, direction='backward'):
mask = np.zeros_like(time, dtype=bool)
for point in week_endpoints:
interval = (point, point + run_in) if direction == 'forward' else (point - run_in, point)
mask |= time.between(*interval).values
return mask
mark_runin(time, weekend_start, run_in)


array([False,  True,  True,  True,  True,  True,  True, False, False, False, False, False], dtype=bool)

def drop_irregular_gaps2(data, gap_max, run_in, time_label = 'time'):
times = data[time_label]
weekend_start, week_start = find_weekend(times, gap_max)
before_weekend = mark_runin(times, weekend_start, run_in, direction = 'backward')
after_weekend = mark_runin(times, week_start, run_in, direction = 'forward')
to_drop = before_weekend | after_weekend
return data[~to_drop]
drop_irregular_gaps2(data, gap_max, run_in)


  time        values
0 2018-01-01 0.417022004702574
9 2018-01-17 0.538816734003357
10 2018-01-18 0.4191945144032948
11 2018-01-19 0.6852195003967595

2
ответ дан 10 апреля 2018 в 09:04 Источник Поделиться