2011-12-18 1 views
5

Этот код переносит матрицу четырьмя способами. Первый выполняет последовательную запись, а не последовательные чтения. Второй - наоборот. Следующие два те же, но с записью пропусков кеша. Кажется, что последовательная запись выполняется быстрее, а пропуск кеша быстрее. Что я не понимаю, если кеш пропускается, почему последовательные записи еще быстрее?Использование кэша в матрице транспонируется в c

QueryPerformanceCounter(&before); 
for (i = 0; i < N; ++i) 
    for (j = 0; j < N; ++j) 
     tmp[i][j] = mul2[j][i]; 
QueryPerformanceCounter(&after); 
printf("Transpose 1:\t%ld\n", after.QuadPart - before.QuadPart); 

QueryPerformanceCounter(&before); 
for (j = 0; j < N; ++j) 
    for (i = 0; i < N; ++i) 
    tmp[i][j] = mul2[j][i]; 
QueryPerformanceCounter(&after); 
printf("Transpose 2:\t%ld\n", after.QuadPart - before.QuadPart); 

QueryPerformanceCounter(&before); 
for (i = 0; i < N; ++i) 
    for (j = 0; j < N; ++j) 
     _mm_stream_si32(&tmp[i][j], mul2[j][i]); 
QueryPerformanceCounter(&after); 
printf("Transpose 3:\t%ld\n", after.QuadPart - before.QuadPart); 

QueryPerformanceCounter(&before); 
for (j = 0; j < N; ++j) 
    for (i = 0; i < N; ++i) 
     _mm_stream_si32(&tmp[i][j], mul2[j][i]); 
QueryPerformanceCounter(&after); 
printf("Transpose 4:\t%ld\n", after.QuadPart - before.QuadPart); 

EDIT: выход

Transpose 1: 47603 
Transpose 2: 92449 
Transpose 3: 38340 
Transpose 4: 69597 
+0

Можете ли вы показать нам некоторые номера, а также размеры, которые вы тестируете? – Mysticial

+0

Кэш необходимо обновить при записи, если загружаются связанные строки кэша (что, скорее всего, в простом случае). –

+0

Результаты этих тестов зависят от текущего содержимого кеша и промахов/хитов кэша, что может привести к смещению результатов. Возможно, стоит переделать тесты, но каждый раз начинать с известного состояния кеша. возможно, аннулирование кеша до того, как каждый тест может помочь. –

ответ

4

процессор имеет запись, сочетающий буфер для объединения пишет на строку кэша, чтобы это произошло в одном пакете. В этом случае (кеш пропущен для последовательной записи) этот буфер объединения записи действует как кеш с одной строкой, что делает результаты очень похожими на кеш, которые не пропускаются.

Чтобы быть точным, при пропуске кеша записи все еще происходят в пакетах в память.

Поведение здесь write-combining logic.

0

Чтобы улучшить использование кеша, вы можете использовать не линейную структуру памяти для матрицы. С 4x4 32-битной флоат-плитой можно было транспонировать только с единственным доступом к каждой строке кэша. Плюс в качестве бонусной плитки можно легко переносить с помощью _MM_TRANSPOSE4_PS.

Транспонирование очень большой матрицы по-прежнему очень интенсивно работает с памятью. Он по-прежнему будет иметь ограниченную пропускную способность, но, по крайней мере, загрузка кеш-слов почти оптимальна. Я не знаю, может ли производительность быть оптимизирована. Мое тестирование показывает, что ноутбук за несколько лет удается переставить 16k * 16k (память 1G) примерно за 300 мс.

Я попытался использовать также _mm_stream_sd, но по какой-то причине производительность ухудшилась. Я не понимаю, что невременная память записывает достаточно, чтобы иметь практическое предположение, почему производительность падает с _mm_stream_ps. Возможная причина, конечно, в том, что строка кеша уже находится в кеше L1, готовом для операции записи.

Но на самом деле важная часть с нелинейной матрицей позволила бы полностью исключить транспонирование и просто запустить умножение в дружеском порядке. Но я использую только код, который я использую, чтобы улучшить свои знания об управлении кешем в алгоритмах.

Я еще не попытался протестировать, если предварительная выборка улучшит использование полосы пропускания памяти. Текущий код работает примерно на 0,5 инструкций за цикл (хороший кэш-код работает примерно 2 раза за цикл на этом CPU), что оставляет много бесплатных циклов для инструкций предварительной выборки, что позволяет даже довольно сложный расчет оптимизировать время предварительной выборки во время выполнения.

пример кода из моего теста на транспонирование.

#define MATSIZE 16384 
#define align(val, a) (val + (a - val % a)) 
#define tilewidth 4 
typedef int matrix[align(MATSIZE,tilewidth)*MATSIZE] __attribute__((aligned(64))); 


float &index(matrix m, unsigned i, unsigned j) 
{ 
    /* tiled address calculation */ 
    /* single cache line is used for 4x4 sub matrices (64 bytes = 4*4*sizeof(int) */ 
    /* tiles are arranged linearly from top to bottom */ 
    /* 
    * eg: 16x16 matrix tile positions: 
    * t1 t5 t9 t13 
    * t2 t6 t10 t14 
    * t3 t7 t11 t15 
    * t4 t8 t12 t16 
    */ 
    const unsigned tilestride = tilewidth * MATSIZE; 
    const unsigned comp0 = i % tilewidth; /* i inside tile is least significant part */ 
    const unsigned comp1 = j * tilewidth; /* next part is j multiplied by tile width */ 
    const unsigned comp2 = i/tilewidth * tilestride; 
    const unsigned add = comp0 + comp1 + comp2; 
    return m[add]; 
} 

/* Get start of tile reference */ 
float &tile(matrix m, unsigned i, unsigned j) 
{ 
    const unsigned tilestride = tilewidth * MATSIZE; 
    const unsigned comp1 = j * tilewidth; /* next part is j multiplied by tile width */ 
    const unsigned comp2 = i/tilewidth * tilestride; 
    return m[comp1 + comp2]; 

} 
template<bool diagonal> 
static void doswap(matrix mat, unsigned i, unsigned j) 
{ 
     /* special path to swap whole tile at once */ 
     union { 
      float *fs; 
      __m128 *mm; 
     } src, dst; 
     src.fs = &tile(mat, i, j); 
     dst.fs = &tile(mat, j, i); 
     if (!diagonal) { 
      __m128 srcrow0 = src.mm[0]; 
      __m128 srcrow1 = src.mm[1]; 
      __m128 srcrow2 = src.mm[2]; 
      __m128 srcrow3 = src.mm[3]; 
      __m128 dstrow0 = dst.mm[0]; 
      __m128 dstrow1 = dst.mm[1]; 
      __m128 dstrow2 = dst.mm[2]; 
      __m128 dstrow3 = dst.mm[3]; 

      _MM_TRANSPOSE4_PS(srcrow0, srcrow1, srcrow2, srcrow3); 
      _MM_TRANSPOSE4_PS(dstrow0, dstrow1, dstrow2, dstrow3); 

#if STREAMWRITE == 1 
      _mm_stream_ps(src.fs + 0, dstrow0); 
      _mm_stream_ps(src.fs + 4, dstrow1); 
      _mm_stream_ps(src.fs + 8, dstrow2); 
      _mm_stream_ps(src.fs + 12, dstrow3); 
      _mm_stream_ps(dst.fs + 0, srcrow0); 
      _mm_stream_ps(dst.fs + 4, srcrow1); 
      _mm_stream_ps(dst.fs + 8, srcrow2); 
      _mm_stream_ps(dst.fs + 12, srcrow3); 
#else 
      src.mm[0] = dstrow0; 
      src.mm[1] = dstrow1; 
      src.mm[2] = dstrow2; 
      src.mm[3] = dstrow3; 
      dst.mm[0] = srcrow0; 
      dst.mm[1] = srcrow1; 
      dst.mm[2] = srcrow2; 
      dst.mm[3] = srcrow3; 
#endif 
     } else { 
      __m128 srcrow0 = src.mm[0]; 
      __m128 srcrow1 = src.mm[1]; 
      __m128 srcrow2 = src.mm[2]; 
      __m128 srcrow3 = src.mm[3]; 

      _MM_TRANSPOSE4_PS(srcrow0, srcrow1, srcrow2, srcrow3); 

#if STREAMWRITE == 1 
      _mm_stream_ps(src.fs + 0, srcrow0); 
      _mm_stream_ps(src.fs + 4, srcrow1); 
      _mm_stream_ps(src.fs + 8, srcrow2); 
      _mm_stream_ps(src.fs + 12, srcrow3); 
#else 
      src.mm[0] = srcrow0; 
      src.mm[1] = srcrow1; 
      src.mm[2] = srcrow2; 
      src.mm[3] = srcrow3; 
#endif 
     } 
    } 
} 

static void transpose(matrix mat) 
{ 
    const unsigned xstep = 256; 
    const unsigned ystep = 256; 
    const unsigned istep = 4; 
    const unsigned jstep = 4; 
    unsigned x1, y1, i, j; 
    /* need to increment x check for y limit to allow unrolled inner loop 
    * access entries close to diagonal axis 
    */ 
    for (x1 = 0; x1 < MATSIZE - xstep + 1 && MATSIZE > xstep && xstep; x1 += xstep) 
     for (y1 = 0; y1 < std::min(MATSIZE - ystep + 1, x1 + 1); y1 += ystep) 
      for (i = x1 ; i < x1 + xstep; i += istep) { 
       for (j = y1 ; j < std::min(y1 + ystep, i); j+= jstep) 
       { 
        doswap<false>(mat, i, j); 
       } 
       if (i == j && j < (y1 + ystep)) 
        doswap<true>(mat, i, j); 
      } 

    for (i = 0 ; i < x1; i += istep) { 
     for (j = y1 ; j < std::min(MATSIZE - jstep + 1, i); j+= jstep) 
     { 
      doswap<false>(mat, i, j); 
     } 
     if (i == j) 
      doswap<true>(mat, i, j); 
    } 
    for (i = x1 ; i < MATSIZE - istep + 1; i += istep) { 
     for (j = y1 ; j < std::min(MATSIZE - jstep + 1, i); j+= jstep) 
     { 
      doswap<false>(mat, i, j); 
     } 
     if (i == j) 
      doswap<true>(mat, i, j); 
    } 
    x1 = MATSIZE - MATSIZE % istep; 
    y1 = MATSIZE - MATSIZE % jstep; 

    for (i = x1 ; i < MATSIZE; i++) 
     for (j = 0 ; j < std::min((unsigned)MATSIZE, i); j++) 
        std::swap(index(mat, i, j+0), index(mat, j+0, i)); 

    for (i = 0; i < x1; i++) 
     for (j = y1 ; j < std::min((unsigned)MATSIZE, i) ; j++) 
        std::swap(index(mat, i, j+0), index(mat, j+0, i)); 
}