2013-09-24 5 views
6

У меня есть разреженный массив a (в основном нули):разреженное сжатие массива с использованием SIMD (AVX2)

unsigned char a[1000000]; 

, и я хотел бы создать массив b индексов для ненулевых элементов a с использованием инструкций SIMD на архитектуре Intel x64 с AVX2. Я ищу советы, как сделать это эффективно. В частности, существуют ли SIMD-инструкции для получения позиций последовательных ненулевых элементов в регистре SIMD, расположенных последовательно?

+1

Непосредственно, но вы можете 'pcmpeqb' его против нуля, затем' pmovmskb', что в обычном регистре и извлечь первый индекс с помощью 'bsf' (а затем второй и т. Д., Надеюсь, не слишком много) – harold

+1

Вам нужно быть более конкретным, чем просто «SIMD» - какую архитектуру вы планируете?x86, ARM, PowerPC, POWER и некоторые GPGPU имеют разные SIMD-расширения. Также x86 имеет несколько SIMD-расширений: MMX, SSE, SSE2, SSE3, SSSE3, SSE4, AVX, AVX2 и т. Д. (Обратите внимание, что AVX2 имеет инструкции SIMD, которые могут быть полезны в этом контексте). –

+0

@Paul R Извините. Я отредактировал свой вопрос - AVX2 является приемлемым. –

ответ

0

Хотя набор инструкций AVX2 имеет множество инструкций GATHER, но его производительность слишком медленная. И самый эффективный способ сделать это - обработать массив вручную.

1

Если вы ожидаете число ненулевых элементов очень низкое (то есть намного меньше, чем 1%), то вы можете просто проверить каждый 16-байтовый кусок за то, что отлично от нуля:

int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
    if (mask != 65535) { 
     //store zero bits of mask with scalar code 
    } 

Если процент хороших элементов достаточно мала, стоимость неверно предсказанных ветвей и стоимость медленного скалярного кода внутри «if» были бы незначительными.


Что касается хорошего общего решения, сначала рассмотрите реализацию уплотнения потока SSE. Она удаляет все нулевые элементы из массива байтов (идея взята из here):

__m128i shuf [65536]; //must be precomputed 
char cnt [65536]; //must be precomputed 

int compress(const char *src, int len, char *dst) { 
    char *ptr = dst; 
    for (int i = 0; i < len; i += 16) { 
     __m128i reg = _mm_load_si128((__m128i*)&src[i]); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]); 
     _mm_storeu_si128((__m128i*)ptr, compressed); 
     ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask); 
    } 
    return ptr - dst; 
} 

Как вы видите, (_mm_shuffle_epi8 + таблица поиска) может сделать чудеса. Я не знаю другого способа векторизации структурно сложного кода, такого как сжатие потока.


Теперь единственная оставшаяся проблема с вашим запросом заключается в том, что вы хотите получить индексы. Каждый индекс должен храниться в 4-байтовом значении, поэтому кусок из 16 входных байтов может выдавать до 64 байтов вывода, которые не вписываются в один регистр SSE.

Один из способов справиться с этим - честно распаковать выход на 64 байта. Таким образом, вы заменяете reg на константу (0,1,2,3,4, ..., 15) в коде, затем распаковываете регистр SSE в 4 регистра и добавляете регистр с четырьмя значениями i. Это потребует гораздо больших инструкций: 6 инструкций по распаковке, 4 добавления и 3 магазина (один уже есть). Что касается меня, это огромные накладные расходы, особенно если вы ожидаете менее 25% ненулевых элементов.

В качестве альтернативы вы можете ограничить количество ненулевых байтов, обрабатываемых одноименной итерацией, на 4, так что для вывода всегда достаточно одного регистра. Вот код образца:

__m128i shufMask [65536]; //must be precomputed 
char srcMove [65536]; //must be precomputed 
char dstMove [65536]; //must be precomputed 

int compress_ids(const char *src, int len, int *dst) { 
    const char *ptrSrc = src; 
    int *ptrDst = dst; 
    __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15); 
    __m128i base = _mm_setzero_si128(); 
    while (ptrSrc < src + len) { 
     __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc); 
     __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); 
     int mask = _mm_movemask_epi8(zeroMask); 
     __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]); 
     __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128()); 
     ids32 = _mm_add_epi32(ids32, base); 
     _mm_storeu_si128((__m128i*)ptrDst, ids32); 
     ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4); 
     ptrSrc += srcMove[mask]; //no alternative without LUT 
     base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask])); 
    } 
    return ptrDst - dst; 
} 

Одним из недостатков этого подхода состоит в том, что теперь каждая последующая итерация цикла не может начинаться до тех пор, пока строка ptrDst += dstMove[mask]; выполняется на предыдущей итерации. Таким образом, критический путь резко возрос. Аппаратная гиперпоточность или ее ручная эмуляция могут удалить это наказание.


Итак, как вы видите, есть много вариантов этой основной идеи, все из которых решить вашу проблему с разной степенью эффективности. Вы также можете уменьшить размер LUT, если вам это не нравится (опять же, за счет снижения производительности пропускной способности).

Этот подход нельзя полностью распространить на более широкие регистры (т.AVX2 и AVX-512), но вы можете попытаться объединить команды нескольких последовательных итераций в одну инструкцию AVX2 или AVX-512, что немного увеличит пропускную способность.

Примечание: я не тестировал какой-либо код (поскольку предварительное вычисление LUT правильно требует заметного усилия).

+0

Было бы неплохо увидеть, как ваш подход LUT сравнивается с моим [ответом] (http://stackoverflow.com/a/41958528/2439725) на основе инструкций манипулятора бит (BMI1 и BMI2). – wim

2

Пять способов вычислить индексы ненулевых являются:

  • Semi Векторизованных петля: Загрузить вектор SIMD с гольцов, сравнить с нулем и применить movemask. Используйте небольшой скалярный цикл, если любой из символов отличен от нуля (также предлагается @stgatilov). Это хорошо работает для очень редких массивов. Функция arr2ind_movmsk в приведенном ниже коде использует инструкции BMI1 для скалярной петли.

  • Векторизованный цикл: процессоры Intel Haswell и более новая поддержка наборов команд BMI1 и BMI2. BMI2 содержит инструкция pext (Parallel bits extract, see wikipedia link), , которая оказывается полезной здесь. См. arr2ind_pext в приведенном ниже коде.

  • Классический скалярный цикл с оператором if: arr2ind_if.

  • Скалярная петля без веток: arr2ind_cmov.

  • Таблица поиска: @stgatilov показывает, что вместо команд pdep и других целых чисел можно использовать таблицу поиска . Это может работать хорошо, однако таблица поиска довольно велика: она не подходит для кеша L1. Не тестировалось здесь. См. Также обсуждение here.


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c 

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros: 
       ./a.out 20000 25 
*/ 

#include <stdio.h> 
#include <immintrin.h> 
#include <stdint.h> 
#include <omp.h> 
#include <string.h> 


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0, k; 
    __m256i msk; 
    m0=0; 
    for (i=0;i<n;i=i+32){        /* Load 32 bytes and compare with zero:   */ 
     msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256()); 
     k=_mm256_movemask_epi8(msk); 
     k=~k;           /* Search for nonzero bits instead of zero bits. */ 
     while (k){ 
     ind[m0]=i+_tzcnt_u32(k);      /* Count the number of trailing zero bits in k. */ 
     m0++; 
     k=_blsr_u32(k);        /* Clear the lowest set bit in k.     */ 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    uint64_t  cntr_const = 0xFEDCBA; 
    __m256i  shft  = _mm256_set_epi64x(0x04,0x00,0x04,0x00); 
    __m256i  vmsk  = _mm256_set1_epi8(0x0F); 
    __m256i  cnst16  = _mm256_set1_epi32(16); 
    __m256i  shf_lo  = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02, 
              0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00); 
    __m256i  shf_hi  = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06, 
              0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04); 
    __m128i  pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);            

    __m256i  i_vec  = _mm256_setzero_si256(); 
    m0=0; 
    for (i=0;i<n;i=i+16){ 
     __m128i v   = _mm_load_si128((__m128i *)&a[i]);      /* Load 16 bytes.                    */ 
     __m128i msk  = _mm_cmpeq_epi8(v,_mm_setzero_si128());    /* Generate 16x8 bit mask.                  */ 
       msk  = _mm_srli_epi64(msk,4);        /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_shuffle_epi8(msk,pshufbcnst);      /* Pack 16x8 bit mask to 16x4 bit mask.               */ 
       msk  = _mm_xor_si128(msk,_mm_set1_epi32(-1));    /* Invert 16x4 mask.                   */ 
     uint64_t msk64  = _mm_cvtsi128_si64x(msk);        /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/ 
     int  p   = _mm_popcnt_u64(msk64)>>2;        /* p is the number of nonzeros in 16 bytes of a.            */ 
     uint64_t cntr  = _pext_u64(cntr_const,msk64);       /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */ 
                        /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.     */ 
     __m256i cntr256 = _mm256_set1_epi64x(cntr); 
       cntr256 = _mm256_srlv_epi64(cntr256,shft); 
       cntr256 = _mm256_and_si256(cntr256,vmsk); 
     __m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo); 
     __m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi); 
       cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo); 
       cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi); 

          _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);  /* Note that the stores of iteration i and i+16 may overlap.               */ 
          _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
       m0   = m0+p; 
       i_vec  = _mm256_add_epi32(i_vec,cnst16); 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     if (a[i]!=0){ 
     ind[m0]=i; 
     m0=m0+1; 
     } 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){ 
    int i, m0; 
    m0=0; 
    for (i=0;i<n;i++){ 
     ind[m0]=i; 
     m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */ 
    } 
    *m=m0; 
    return 0; 
} 


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i; 
    for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%u\n",i,ind[i],a[ind[i]]); 
    printf("\n"); fflush(stdout); 
    return 0; 
} 


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){ 
    int i;        /* Compute a hash to compare the results of different methods. */ 
    unsigned int chk=0; 
    for (i=0;i<m;i++){ 
     chk=((chk<<1)|(chk>>31))^(ind[i]); 
    } 
    printf("chk = %10X\n",chk); 
    return 0; 
} 



int main(int argc, char **argv){ 
int n, i, m; 
unsigned int j, k, d; 
unsigned char *a; 
int *ind; 
double t0,t1; 
int meth, nrep; 
char txt[30]; 

sscanf(argv[1],"%d",&n);   /* Length of array a.         */ 
n=n>>5;        /* Adjust n to a multiple of 32.       */ 
n=n<<5; 
sscanf(argv[2],"%u",&d);   /* The approximate fraction of nonzeros in a is: d/1024 */ 
printf("n=%d, d=%u\n",n,d); 

a=_mm_malloc(n*sizeof(char),32); 
ind=_mm_malloc(n*sizeof(int),32);  

            /* Generate a pseudo random array a.      */ 
j=73659343;     
for (i=0;i<n;i++){ 
    j=j*653+1; 
    k=(j & 0x3FF00)>>8;    /* k is a pseudo random number between 0 and 1023  */ 
    if (k<d){ 
     a[i] = (j&0xFE)+1;   /* Set a[i] to nonzero.         */ 
    }else{ 
     a[i] = 0; 
    } 

} 

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u\n",i,a[i]);}} printf("\n"); */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: "; 
char txt1[]="arr2ind_pext: "; 
char txt2[]="arr2ind_if:  "; 
char txt3[]="arr2ind_cmov: "; 

nrep=10000;         /* Repeat a function nrep times to make relatively accurate timings possible.       */ 
               /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519     */ 
               /* With nrep=10000:  ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513     */ 
printf("nrep = \%d \n\n",nrep); 
arr2ind_movmsk(a,n,ind,&m);     /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */ 
for (meth=0;meth<4;meth++){ 
    t0=omp_get_wtime(); 
    switch (meth){ 
     case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);   strcpy(txt,txt0); break; 
     case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);   strcpy(txt,txt1); break; 
     case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);    strcpy(txt,txt2); break; 
     case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);   strcpy(txt,txt3); break; 
     default: ; 
    } 
    t1=omp_get_wtime(); 
    printf("method = %s ",txt); 
    /* print_chk(a,ind,m); */ 
    printf(" elapsed time = %6.2f\n",t1-t0); 
} 
print_nonz(a, ind, 2);           /* Do something with the results     */ 
printf("density = %f %% \n\n",((double)m)/((double)n)*100);  /* Actual nonzero density of array a.   */ 

/* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros.      */ 

return 0; 
} 

/* 
With nrep=1000000: 
./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519  
With nrep=10000: 
./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513  
*/ 


Код был протестирован с размером массива п = 10016 (данные помещается в кэш L1) и п = 1 млн, с различных ненулевых плотностей около 0,5%, 5% и 50%. Для точного времени функции назывались 1000000 и 10000 раз, соответственно.


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500 
        0.53%  5.1%  50.0% 
arr2ind_movmsk:  0.27  0.53  4.89 
arr2ind_pext:   1.44  1.59  1.45 
arr2ind_if:   5.93  8.95  33.82 
arr2ind_cmov:   6.82  6.83  6.82 

Time in seconds, size n=1000000, 1e4 function calls. 

        0.49%  5.1%  50.1% 
arr2ind_movmsk:  0.57  2.03  5.37 
arr2ind_pext:   1.47  1.47  1.46 
arr2ind_if:   5.88  8.98  38.59 
arr2ind_cmov:   6.82  6.81  6.81 


В этих примерах векторизованные петли быстрее, чем скалярных петель. Производительность arr2ind_movmsk сильно зависит от плотности a. Это только быстрее, чем arr2ind_pext, если плотность достаточно мала. Точка безубыточности также зависит от размера массива n. Функция 'arr2ind_if' явно страдает от неудачного предсказания ветвления при 50% ненулевой плотности.