Производительность: повторные зачистки больших двоичных данных


Как часть большого алгоритма выборки схеме, я несколько раз перебрав весь набор данных. Когда данных растет большой, это становится довольно большой узким местом в моем алгоритме и я ищу способы, чтобы сделать его быстрее.

Некоторые вещи, которые я рассматриваю, являются:

  • Каковы наиболее эффективные типы данных? Например, для переменной x Я использую std::vector< std::vector<bool> > поскольку данные являются бинарными; 0 или 1. Однако, это приводит к множеству преобразований типа. Есть ли лучший способ?
  • Могу ли я использовать итераторы для перебора более эффективно данные?

Все предложения приветствуются!

Редактировать: производительность критическая функция выполняет следующие действия:

Цель кодекса заключается в имитации набора данных размера Nп ня, одновременно оценивая вероятность соотношения. Смоделированные новые данные Бинарные, и создается rlogis(0.0, 1.0) <= val ? 1 : 0. Последующая оценка соотношения происходит через acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];. Эти два шага можно считать оптимальным с точки зрения производительности мне; мой вопрос в первую очередь об окружающих структур.

Код:

#include <iostream>
#include <vector>
#include <chrono>

using namespace std; // <-- only here for the example; I don't normally do this

typedef unsigned int uint;
typedef std::vector<double> vec;
typedef std::vector<vec> mat;

// will be replaced by actual random number generator
double rlogis() {
    return rand() % 5 - 2;
}


// improving performance of this function is my goal
double orig(const uint np, const uint ni, const uint nd,
               const mat& theta, const mat& C,
               const vec& delta, const vec& lambdaNew,
               const int d, std::vector<std::vector<bool> >& x) {

    double acc = 0.0;
    bool xnew;
    vec tmp(nd);
    for (uint j = 0; j < ni; ++j) {

        for (uint d2 = 0; d2 < nd; ++d2) {
            tmp[d2] = lambdaNew[d2] * C[j][d2];
        }

        for (uint i = 0; i < np; ++i) {

            double val = delta[j];
            for (uint d2 = 0; d2 < nd; ++d2) {
                val += tmp[d2] * theta[i][d2];
            }

            xnew = rlogis() <= val;
            acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];

        }
    }
    return acc;
}

// wrapper function for timing
int main() {

    // dimensions of the data
    const uint np = 2E4; // this could be larger, easily 1E5
    const uint ni = 200; // this stays constant
    const uint nd = 3;   // could be 4-5, but not 10

    // objects
    std::vector<double> delta(ni);
    std::vector<double> lambdaNew(nd);
    vector< std::vector<double> > theta(np, std::vector<double>(nd));
    vector< std::vector<double> > C(ni, std::vector<double>(nd));
    std::vector< std::vector<bool> > x(np, std::vector<bool>(ni));

    // fill objects with values (exact values are irrelevant)
    srand(1);
    int d = rand() % nd;
    for (uint i = 0; i < np; ++i) {
        for (uint d = 0; d < nd; ++d) {
            theta[i][d] = 2 * rand() - 1;
        }
        for (uint j = 0; j < ni; ++j) {
            x[i][j] = bool(int(rand() + .5));
        }
    }
    for (uint j = 0; j < ni; ++j) {
        delta[j] = 2 * rand() - 1;
        for (uint d = 0; d < nd; ++d) {
            C[j][d] = 2 * rand() - 1;
        }
    }
    for (uint d = 0; d < nd; ++d) {
        lambdaNew[d] = 5 * rand() + 1;
    }

    // start timing
    {
        srand(2);
        auto start = chrono::steady_clock::now();
        auto ans = orig(np, ni, nd, theta, C, delta, lambdaNew, d, x);
        auto time1 = chrono::steady_clock::now() - start;

        cout << "Orig: " << ans << "\n"
             << "Time: " << chrono::duration <double, milli> (time1).count() << " ms\n" << endl;
    }

    return 0;
}


439
5
задан 9 марта 2018 в 03:03 Источник Поделиться
Комментарии
3 ответа

Вы не приводите никаких эксплуатационных измерений, так что каждый ответ здесь-это в основном догадки, но я подозреваю, что отсутствие скорости у вас возникли как-то связано с std::vector<bool>.

Проблема в том, что std::vector<bool> это не совсем то, чем кажется. В то время как вектор любой другой тип действительно работает в байтах, специализацию bool работает с отдельными битами, упаковка их как можно плотнее. Это, конечно, здорово, если вы хотите уменьшить использование памяти, но это также очень медленно, поскольку вы должны маски и сдвиг битов вокруг каждый раз, когда вы хотите получить доступ к значению.

По этой причине, в сочетании с тем, что он не ведет себя как обычный вектор в определенных случаях (которые, к сожалению, не совсем редкий), std::vector<bool> в основном рассматривается как недостаток дизайна в стандартной библиотеке и не вижу особого смысла.

Вы должны попытаться заменить std::vector<bool> С подобной тип фитинга, например std::vector<char>и измерения, чтобы определить, является ли и насколько это устраняет проблему.

Если вы еще не убеждены, вот быстрый и грязный тест, чтобы показать, насколько медленнее std::vector<bool> это.

6
ответ дан 9 марта 2018 в 04:03 Источник Поделиться

Я компактный код для удобства чтения, и добавить константные значения:

    // loop1, ni = 200; this stays constant
for (uint j = 0; j < ni; ++j) {

// loop2, nd = 3-5
for (uint d2 = 0; d2 < nd; ++d2) tmp[d2] = lambdaNew[d2] * C[j][d2];

//loop3 np = 2E4, easily 1E5
for (uint i = 0; i < np; ++i) {

//loop4, nd = 3-5
double val = delta[j];
for (uint d2 = 0; d2 < nd; ++d2) val += tmp[d2] * theta[i][d2];

xnew = rlogis() <= val;
acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];
}
}

Это будет объем оперативной памяти, так вот размеры объекта:

double delta[200];
double lambdaNew[3...5]
double C[200][3...5]

double theta[2e4...1e5][3...5] 4.0 MB
bool x[2e4...1e5][200] 2.5 MB

Поскольку тета прочитано 200 раз нам очень хочется, чтобы поместиться в кэш (в противном случае она должна быть извлечена из памяти каждый раз), но его большой размер может быть проблемой.

Кроме того, тета-это вектор векторов и я считаю, что реализация будет выделять каждую строку матрицы отдельно с новой[], который будет добавить некоторые заголовки и кладем ее в ближайший допустимый размер блока. Это проблематично здесь как линии очень мала (3-5 дублей), поэтому кучи операций управления памятью рискует оказаться значительным. См. здесь для некоторых чисел.

ОЗУ ДДР оптимизирован для высокой пропускной способности лопнуть или последовательного чтения, и совершенно хреново на случайный доступ. То, что вы хотите прочитать вашу матрицу последовательно, но здесь этого не будет, вместо процессора будет читать каждую строку, а затем перейти к следующему указателю. Скорее всего из-за предвыборка и т. д., доступ все равно будет последовательным, но, возможно, половина пропускной способности оперативной памяти будет потрачено впустую на бесполезные вещи, как заголовки и отступы. Хуже, поскольку кэш-память работает по линии кэша, драгоценные кэш будет тратится на заголовки и отступы. Кэш имеет жизненно важное значение, вы не хотите заполнять его бесполезными вещами, вместо того, чтобы вы хотите максимально использовать его.

Поэтому:


  • С помощью соответствующей матрицы выделяется как единый непрерывный блок памяти, который будет читать последовательно без каких-либо отверстий или прыгает или дурит указатель косвенной адресации

  • Использовать поплавки вместо удваивается, если вы можете, если это делает его пригодным в кэше L2, то это будет намного быстрее.

Теперь, вы хотите loop4 быть развернут и с использованием соответствующих векторных инструкций и SIMD. Если Северная Дакота-это известно во время компиляции константы, шаблон функции и поставить его в качестве параметра, а затем проверить ваши флаги компилятора и убедиться, что компилятор разворачивает правильно петлю, глядя на разобранный выход. Я считаю, что новый вкус ГСП имеет "скалярное произведение" наставление, которое бы идеально. Если Северная Дакота может варьироваться в крошечном диапазоне как 3..5, наверное, стоит для компиляции шаблонных версий с ND=3,4,5 с петель раскатали. Это должно быть очень быстро. Еще раз проверьте, если вы можете использовать поплавки вместо парном разряде.

Это должно превратить loop4 в смехотворно низкое количество инструкций SSE.

Затем следует профиль, и Ваше следующее узкое место, вероятно, будет генератор случайных чисел в rlogis. Попробуйте найти быстрая. Вы должны проверить его срок. Период слчис() не может быть достаточно для ваших нужд. Если ты подлый ублюдок, ты еще ядро генерирует случайные числа и использовать ФИФО, но это требует некоторого навыка, чтобы убедиться, что издержки на синхронизацию не отменяет выгоды.

Теперь, логического массива.

Поскольку она доступна как Х[я][Дж], а i-индекс внутреннего, вы должны перенести ее и поставить индекса внутреннего цикла в прошлом, как в Х[Дж][я]. Это важно, так как массив большой, и [Дж] и идет до 200, чтобы ваш текущий код не открыть последовательно в памяти, а он будет делать случайные пропуски.

Случайным ОЗУ ДДР стоимость доступа составляет огромный! Он может быть до 100 наносекунд. Л1-Л2 кэшей принимайте 1-3 НС. 100нс на процессоре 3 ГГц составляет 300 циклов! Ваш процессор может выполнять несколько инструкций за один цикл... один случайный доступ к памяти может стоить как 1000 инструкции или больше, если вы реально не повезло... последовательные доступы ваш друг!

Излишне говорить, что массив bool с одним значением одного бита должен несколько бит сложа операций в Доступ, так что, используя один байт на значение быстрее... если сейчас в 8 раз больше, выбора больше не помещается в кэш. Если ДДР ОЗУ должен быть доступен, тогда он будет намного медленнее, чем сложа биты.

loop2 можно раскатал также.

loop3 может быть распараллелен с помощью OpenMP, но она так коротка, я не уверен, что будут какие-то успехи. Это зависит от того, насколько дорогостоящим rlogis() является. Вы всегда можете попробовать и сравнить.

Все это также может работать на ГПУ. В этом случае loop3 может быть распараллелен по максимуму.

Помните, вещи, как с помощью итераторов и указателей может иметь значение немного, но узоров доступ к памяти, кэширование, случайный/последовательный, и т. д., реальных проблем в вашем случае. Просто транспонирование матрицы может иметь огромное влияние на производительность, если доступ к памяти стал последовательный, а не случайный/пропускает.

Модули DDR4 может дать вам 60 ГБ/с при последовательном доступе... но случайный байтовый доступ 10м/с. Оуч.

4
ответ дан 9 марта 2018 в 09:03 Источник Поделиться

Интерфейс представляет собой настоящий беспорядок между C и C++. Почему вы предлагаете ni, nd и np когда они на самом деле размеры соответствующих массивах? Также вы должны не использовать std::vector<bool> особенно если вы используете его в качестве поддельного целое

Что приводит к следующему:

double orig(const mat& theta, const mat& C,
const vec& delta, const vec& lambdaNew,
const int d, std::vector<std::vector<int> >& x) {

double acc = 0.0;
bool xnew;
vec tmp(lambdaNew.size());
for (std::size_t j = 0; j < C.size(); ++j) {

for (std::size_t d2 = 0; d2 < C[j].size(); ++d2) {
tmp[d2] = lambdaNew[d2] * C[j][d2];
}

for (std::size_t i = 0; i < theta.size(); ++i) {
double val = delta[j];
for (uint d2 = 0; d2 < nd; ++d2) {
val += tmp[d2] * theta[i][d2];
}

xnew = rlogis() <= val;
acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];
}
}
return acc;
}

Почему xnew определена вне цикла он используется, хотя вы никогда не использовать его?

Следующее, что нужно заметить, это то, что tmp на самом деле больше копия lambdaNew. Один может возникнуть соблазн просто написать vec tmp = lambdaNew; а не vec tmp(lambdaNew.size());однако, что мешает возможной оптимизации компилятором, если lambdaNew передается в приятном пути к функция. Поэтому лучше пусть компилятор делать копии

double orig(const mat& theta, const mat& C,
const vec& delta, vec lambdaNew,
const int d, std::vector<std::vector<int> >& x) {

double acc = 0.0;
for (std::size_t j = 0; j < C.size(); ++j) {

for (std::size_t d2 = 0; d2 < C[j].size(); ++d2) {
lambdaNew[d2] *= C[j][d2];
}

for (std::size_t i = 0; i < theta.size(); ++i) {
double val = delta[j];
for (uint d2 = 0; d2 < nd; ++d2) {
val += lambdaNew[d2] * theta[i][d2];
}

bool xnew = rlogis() <= val;
acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];
}
}
return acc;
}

Если вы хотите сделать умножение я на C++, как вы можете использовать std::transform за это.

for (std::size_t d2 = 0; d2 < C[j].size(); ++d2) {
lambdaNew[d2] *= C[j][d2];
}

становится

std::transform(lambdaNew.begin(), 
lambdaNew.end(),
C[j].begin(),
lambdaNew.begin(),
std::multiplies<double>());

Не то, что его довольно, но если вы действительно любите стл... более интересными являются другие стл функции, например std::inner_product. Он делает то, что он говорит, добавляя продукты поэлементно двух диапазонов начальное значение. Так что этот кусок кода:

double val = delta[j];
for (uint d2 = 0; d2 < nd; ++d2) {
val += lambdaNew[d2] * theta[i][d2];
}

становится:

double val = std::inner_product(lambdaNew.begin(),
lambdaNew.end(),
theta[i].begin(),
delta[j]);

Теперь мы вместе в этом

double orig(const mat& theta, const mat& C,
const vec& delta, vec lambdaNew,
const int d, std::vector<std::vector<int> >& x) {

double acc = 0.0;
for (std::size_t j = 0; j < C.size(); ++j) {
std::transform(lambdaNew.begin(),
lambdaNew.end(),
C[j].begin(),
lambdaNew.begin(),
std::multiplies<double>());

for (std::size_t i = 0; i < theta.size(); ++i) {
const double val = std::inner_product(lambdaNew.begin(),
lambdaNew.end(),
theta[i].begin(),
delta[j]);

bool xnew = rlogis() <= val;
acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];
}
}
return acc;
}

Я бы скорее поставил x-xnew инит своего временного, чем определение xnew

bool xnew = rlogis() <= val;
acc += theta[i][d] * (x[i][j] - xnew) * C[j][d];

становится:

const int factor = x[i][j] - (rlogis() <= val ? 0 : 1);
acc += factor * theta[i][d] * C[j][d];

И мы можем положить его вместе:

double orig(const mat& theta, const mat& C,
const vec& delta, vec lambdaNew,
const int d, std::vector<std::vector<int> >& x) {

double acc = 0.0;
for (std::size_t j = 0; j < C.size(); ++j) {
std::transform(lambdaNew.begin(),
lambdaNew.end(),
C[j].begin(),
lambdaNew.begin(),
std::multiplies<double>());

for (std::size_t i = 0; i < theta.size(); ++i) {
const double val = std::inner_product(lambdaNew.begin(),
lambdaNew.end(),
theta[i].begin(),
delta[j]);

const int factor = x[i][j] - (rlogis() <= val ? 0 : 1);
acc += factor * theta[i][d] * C[j][d];
}
}
return acc;
}

Я бы предположил, что там будет только два действительно улучшения. Первый-это переход от vecbool. Вторая-прям копия lambdaNew что обеспечивает обход обнуления ТМП и возможности для продвижения строительства.

2
ответ дан 9 марта 2018 в 06:03 Источник Поделиться