2016-10-09 12 views
0

Я пытаюсь выяснить, что я делаю неправильно при попытке сравнить и скопировать блок устройства с помощью pthreads параллельно. похоже, что я выхожу из SYNC, и фаза сравнения работает неправильно. любая помощь будет оцененаКопирование блочного устройства с несколькими потоками

#ifndef __dbg_h__ 
#define __dbg_h__ 

#define _LARGEFILE64_SOURCE 
#define _GNU_SOURCE 
#include <sys/types.h> 
#include <unistd.h> 
#include <fcntl.h> 
#include <linux/fs.h> 
#include <pthread.h> 
#include <stdio.h> 
#include <errno.h> 
#include <string.h> 
#include <stdint.h> 

#ifdef NDEBUG 
#define debug(M, ...) 
#else 
#define debug(M, ...) fprintf(stderr, "DEBUG %s %s %s:%d: " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, ##__VA_ARGS__) 
#endif 

#define clean_errno() (errno == 0 ? "None" : strerror(errno)) 

#define log_err(M, ...) fprintf(stderr, "[ERROR] %s %s (%s:%d: errno: %s) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__) 

#define log_warn(M, ...) fprintf(stderr, "[WARN] %s %s (%s:%d: errno: %s) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, clean_errno(), ##__VA_ARGS__) 

#define log_info(M, ...) fprintf(stderr, "[INFO] %s %s (%s:%d) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, ##__VA_ARGS__) 

#endif 

#define blksize 8192 

void *compare_devices(void *arguments); 

struct arg_struct { 
    char device_name_a[1024]; 
    char device_name_b[1024]; 
    off_t start_offset; 
    off_t end_offset; 
    int thread_number; 
    int sub_total; 
}; 

int main(int argc, char **argv) 
{ 
    int fd; 
    long numblocks=0; 
    int i; 
    int err; 
    long total_different_size=0; 
    int number_of_threads=16; 
    struct arg_struct compare_devices_args[number_of_threads]; 
    fd = open(argv[1], O_RDONLY); 
    ioctl(fd, BLKGETSIZE, &numblocks); 
    close(fd); 
    log_info("Number of blocks: %lu, this makes %.3f GB\n", 
     numblocks, 
     (double)numblocks * 512.0/(1024 * 1024 * 1024)); 
    //read_whole_device(argv[1]); 
    long number_of_bytes_per_thread; 
    number_of_bytes_per_thread=numblocks*512/(long)number_of_threads; 
    pthread_t tid[number_of_threads]; 

    for (i=0; i<number_of_threads; i++) { 
     strcpy(compare_devices_args[i].device_name_a, argv[1]); 
     strcpy(compare_devices_args[i].device_name_b, argv[2]); 
     compare_devices_args[i].start_offset=(long)(i*number_of_bytes_per_thread); 
     compare_devices_args[i].end_offset=(long)((i+1)*number_of_bytes_per_thread-1); 
     compare_devices_args[i].thread_number=i+1; 
     err = pthread_create(&(tid[i]), NULL, &compare_devices,(void*)&compare_devices_args[i]); 
     if (err != 0) { 
      printf("\ncan't create thread :[%s]", strerror(err)); 
      return -1; 
     } 

    } 
    for (i=0; i<number_of_threads; i++) { 
     pthread_join(tid[i], NULL); 
     total_different_size+=(long)compare_devices_args[i].sub_total; 
    } 
    printf ("Total of Different size between devices - %ld\n",total_different_size); 
}; 


int read_n_bytes_from(int fd, off_t pos, char *buf, int n) 
{ 
     if (lseek(fd, pos, SEEK_SET) >= 0) 
      return read(fd, buf, n); 
     else 
      return -1; 
} 

void read_whole_device(char* device_name) 
{ 
     int fd; 
     char buf[1024*1024]; 
     size_t size; 
     int counter=0; 
     log_info("reading device %s\n",device_name); 
     fd = open(device_name, O_RDONLY|O_NONBLOCK); 
     //ssize_t size = read(fd, &buf, 1024); 
     lseek(fd, 0, SEEK_SET); 
     while ((size=read(fd, buf, 1024*1024)) > 0) { 
       printf("Read buffer %d - %d\n", size, counter); 
       counter++; 
     } 
     close(fd); 
} 

void write_block_to_device(int fd,off_t pos,char* buf, int n) 
{ 
     if (lseek(fd, pos, SEEK_SET) >= 0) { 
       //write 
     } 
} 

void compare_buffer(char* buf_first,char* buf_second,int length, int blk_size, int* result) 
{ 
     int i; 
     char buf_cpy[blk_size]; 
     for (i=0; i<=(length/blk_size);i++) 
     { 
       if (memcmp(buf_first+(blk_size*i), buf_second+(blk_size*i), blk_size) != 0) { 
         //printf ("Block %d is different\n",i); 
         result[i]=1; 
       } 
       else { 
         result[i]=0; 
       } 
     } 
} 

void *compare_devices(void *arguments) 
{ 
     struct arg_struct *args = arguments; 
     int fd_first,fd_second; 
     char buf_first[1024*1024]; 
     char buf_second[1024*104]; 
     int size_first,size_second; 
     int counter=0; 
     int result[128]; 
     int memcmp_result; 
     int i; 
     off_t pos,pos_first,pos_second; 
     int total_number_of_different_blocks,difference_in_mb; 
     long number_of_mb_to_scan=0; 
     total_number_of_different_blocks=0; 
     log_info("Thread %d - compare devices %s,%s - start %ld end %ld\n",args->thread_number,args->device_name_a,args->device_name_b,args->start_offset,args->end_offset); 
     fd_first = open(args->device_name_a, O_RDONLY); 
     fd_second = open(args->device_name_b, O_RDONLY); 
     log_info("Thread %d - %d %d\n",args->thread_number,fd_first,fd_second); 
     log_info("Thread %d - before lseek - %ld %ld\n",args->thread_number,lseek(fd_first, 0, SEEK_CUR),lseek(fd_second, 0, SEEK_CUR)); 
     lseek(fd_first, (off_t)args->start_offset, SEEK_SET); 
     lseek(fd_second, (off_t)args->start_offset, SEEK_SET); 
     log_info("Thread %d - after lseek - %ld %ld\n",args->thread_number,lseek(fd_first, 0, SEEK_CUR),lseek(fd_second, 0, SEEK_CUR)); 
     log_info("Thread %d - start %ld , %ld\n",args->thread_number,lseek(fd_first, 0, SEEK_CUR),lseek(fd_second, 0, SEEK_CUR)); 
     number_of_mb_to_scan=(args->end_offset-args->start_offset)/1024/1024; 
     log_info("Thread %d - Number of MB to scan %ld - start offset %ld\n",args->thread_number,number_of_mb_to_scan,lseek(fd_second, 0, SEEK_CUR)); 
     memset(buf_first, 0, sizeof(buf_first)); 
     memset(buf_second, 0, sizeof(buf_second)); 
     while ((size_first=read(fd_first, buf_first, 1024*1024)) > 0) { 
       size_second=read(fd_second,buf_second, 1024*1024); 
       pos_first=lseek(fd_first, 0, SEEK_CUR); 
       pos_second=lseek(fd_second, 0, SEEK_CUR); 
       log_info("Thread %d - fd (%d,%d) pos_first %lld, pos_second %lld (%d - %d) - read %d,%d\n",args->thread_number,fd_first,fd_second,(unsigned long long)pos_first-1024*1024,(unsigned long long)pos_second-1024*1024,sizeof(off_t),sizeof(pos),size_first,size_second); 
       //log_info("Thread %s - Hash Buf A - %ld Hash Buf B - %ld",args->thread_number,adler32(buf_first,size_first),adler32(buf_second,size_second)); 
       if ((memcmp_result=memcmp(buf_first, buf_second, sizeof(buf_second))) != 0) { 
         log_info("Thread %d - Found 1MB chunck which is different at pos %lld - %d (%d)\n",args->thread_number,(unsigned long long)pos_first-1024*1024, memcmp_result,sizeof(buf_first)); 
         compare_buffer(buf_first,buf_second,1024*1024,blksize,result); 
         for (i=0; i<=128; i++) { 
           if (result[i] == 1) { 
             //printf ("%d,",result[i]); 
             total_number_of_different_blocks++; 
           } 
         } 
       } 
       if (pos_first > args->end_offset) { 
         break; 
       } 
       memset(buf_first, 0, sizeof(buf_first)); 
       memset(buf_second, 0, sizeof(buf_second)); 
       counter++; 
     } 
     log_info("Thread %d - Number of MB to scan %ld - end offset %ld\n",args->thread_number,number_of_mb_to_scan,lseek(fd_first, 0, SEEK_CUR)); 
     log_info("Thread %d - Completed - Scanning %d of 1MB chuncks\n",args->thread_number,counter); 
     log_info("Thread %d - Diffence of size between %s to %s starting at %ld and ending at %ld- %d different blocks of 8k - %dMB\n",args->thread_number,args->device_name_a,args->device_name_b,args->start_offset,args->end_offset,total_number_of_different_blocks,total_number_of_different_blocks*8192/1024/1024); 
     args->sub_total=total_number_of_different_blocks/8/1024; 
     close(fd_first); 
     close(fd_second); 
     pthread_exit(arguments); 
} 

Обновленная версия:

#ifndef __dbg_h__ 
#define __dbg_h__ 

#define _LARGEFILE64_SOURCE 
#define _GNU_SOURCE 
#define _FILE_OFFSET_BITS 64 
#include <sys/types.h> 
#include <unistd.h> 
#include <fcntl.h> 
#include <linux/fs.h> 
#include <pthread.h> 
#include <stdio.h> 
#include <errno.h> 
#include <string.h> 
#include <stdint.h> 
#include <getopt.h> 
#include <openssl/sha.h> 
#include <sched.h> 

#ifdef NDEBUG 
#define debug(M, ...) 
#else 
#define debug(M, ...) fprintf(stderr, "DEBUG %s %s %s:%d: " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, ##__VA_ARGS__) 
#endif 

#define clean_errno() (errno == 0 ? "None" : strerror(errno)) 

#define log_err(M, ...) fprintf(stderr, "[ERROR] %s %s (%s:%d: errno: %s) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, clean_er 
rno(), ##__VA_ARGS__) 

#define log_warn(M, ...) fprintf(stderr, "[WARN] %s %s (%s:%d: errno: %s) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, clean_er 
rno(), ##__VA_ARGS__) 

#define log_info(M, ...) fprintf(stderr, "[INFO] %s %s (%s:%d) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, ##__VA_ARGS__) 

#endif 

#ifndef _TRY_THROW_CATCH_H_ 
#define _TRY_THROW_CATCH_H_ 

#include <setjmp.h> 

#define TRY do { jmp_buf ex_buf__; switch(setjmp(ex_buf__)) { case 0: while(1) { 
#define CATCH(x) break; case x: 
#define FINALLY break; } default: { 
#define ETRY break; } } }while(0) 
#define THROW(x) longjmp(ex_buf__, x) 

#endif /*!_TRY_THROW_CATCH_H_*/ 

unsigned long long chunck_size=1024*1024; 
int blksize=16384; 
int cb=0; 

void *compare_devices(void *arguments); 

struct arg_struct { 
    char device_name_a[1024]; 
    char device_name_b[1024]; 
    off64_t start_offset; 
    off64_t end_offset; 
    int thread_number; 
    int sub_total; 
}; 

int main(int argc, char **argv) 
{ 
    int fd,s_is_set=0,d_is_set=0,copy_blocks=0,num_cpus; 
    int opt; 
    char *source_device,*dest_device,*blksize_string,*number_of_threads_string; 
    long numblocks_src=0,numblocks_dst=0; 
    int i; 
    int err,policy; 
    long total_different_size=0; 
    int number_of_threads=16; 
    int newprio, set_thread_priority=0; 
    pthread_attr_t tattr; 
    struct sched_param param; 
    while ((opt = getopt(argc, argv, "s:d:b:p:t:c")) != -1) { 
    switch (opt) { 
    case 's': 
     source_device = optarg; 
     log_info("source device is %s\n",source_device); 
     s_is_set=1; 
     if(access(source_device, F_OK) != -1) { 
      log_info("source device is accessible\n"); 
     } 
     else { 
      log_err("cannot access source device\n"); 
      return -1; 
     } 
     break; 
    case 'd': 
     dest_device = optarg; 
     log_info("dest device is %s\n",dest_device); 
     d_is_set=1; 
     if(access(dest_device, F_OK) != -1) { 
      log_info("dest device is accessible\n"); 
     } 
     else { 
      log_err("cannot access dest device\n"); 
      return -1; 
     } 
     break; 
    case 'b': 
     blksize = atoi(optarg); 
     if (blksize%4096 != 0) { 
      log_err("Block Size is not multiple of 4k\n"); 
      return -1; 
     } 
     else 
      log_info("block size is %d\n",blksize); 
     break; 
    case 't': 
     number_of_threads = atoi(optarg); 
     num_cpus = sysconf(_SC_NPROCESSORS_ONLN); 
     if (number_of_threads%2 != 0) { 
      log_err("Number of threads are not multiple of 2\n"); 
      return -1; 
     } 
     else 
      log_info("number of threads is %d - number of cores %d\n",number_of_threads,num_cpus); 
      if (number_of_threads > num_cpus) { 
       number_of_threads=num_cpus; 
      } 
     break; 
    case 'c': 
     copy_blocks=1; 
     cb=1; 
     break; 
    case 'p': 
     if (optarg != NULL) { 
      newprio=atoi(optarg); 
      set_thread_priority=1; 
     } 
     break; 
    case ':': 
     log_info("%s: option '-%c' requires an argument\n",argv[0], optopt); 
     break; 
    case '?': 
    default: 
     log_info("%s: option '-%c' is invalid: ignored\n",argv[0], optopt); 
     break; 
    } 
    } 
    if (s_is_set != 1 || d_is_set !=1) { 
     log_err("Source device and Destination device must be specified\n"); 
     return -1; 
    } 
    struct arg_struct compare_devices_args[number_of_threads]; 
    fd = open(source_device, O_RDONLY); 
    ioctl(fd, BLKGETSIZE, &numblocks_src); 
    close(fd); 
    log_info("Number of blocks in source device: %lu, this makes %.3f GB\n", numblocks_src, (double)numblocks_src * 512.0/(1024 * 102 
4 * 1024)); 
    fd = open(dest_device, O_RDONLY); 
    ioctl(fd, BLKGETSIZE, &numblocks_dst); 
    close(fd); 
    log_info("Number of blocks in destination is device: %lu, this makes %.3f GB\n", numblocks_dst, (double)numblocks_dst * 512.0/(10 
24 * 1024 * 1024)); 
    if (numblocks_src > numblocks_dst) { 
     log_info("Number of blocks in source device is larger then number of blocks in destination device"); 
     return -1; 
    } 
    long number_of_bytes_per_thread; 
    number_of_bytes_per_thread=numblocks_src*512/(long)number_of_threads; 
    pthread_t tid[number_of_threads]; 
    for (i=0; i<number_of_threads; i++) { 
     strcpy(compare_devices_args[i].device_name_a, source_device); 
     strcpy(compare_devices_args[i].device_name_b, dest_device); 
     compare_devices_args[i].start_offset=(long)(i*number_of_bytes_per_thread); 
     compare_devices_args[i].end_offset=(long)((i+1)*number_of_bytes_per_thread-1); 
     compare_devices_args[i].thread_number=i+1; 
     if (set_thread_priority == 1) { 
      err = pthread_attr_init (&tattr); 
      policy = SCHED_RR; 
      err = pthread_attr_setschedpolicy(&tattr, policy); 
      if (err != 0) { 
       log_err("\nThread %d - Unable to set scheduler policy attributes :[%s]", i+1, strerror(err)); 
      } 
      err = pthread_attr_getschedparam (&tattr, &param); 
      if (err != 0) { 
       log_err("\nThread %d - Unable to get scheduler priority attributes :[%s]", i+1, strerror(err)); 
      } 
      log_info("\nThread %d - Old thread priority is %d",i+1,param.sched_priority); 
      param.sched_priority = newprio; 
      log_info("\nThread %d - Setting pthread priority to %d",i+1,newprio); 
      err = pthread_attr_setschedparam (&tattr, &param); 
      if (err != 0) { 
       log_err("\nThread %d - Unable to set scheduler attributes :[%s]", i+1, strerror(err)); 
      } 
      err = pthread_create(&(tid[i]), &tattr, &compare_devices,(void*)&compare_devices_args[i]); 
     } 
     else 
     { 
      err = pthread_create(&(tid[i]), NULL, &compare_devices,(void*)&compare_devices_args[i]); 
     } 
     if (err != 0) { 
      log_err("\nThread %d - can't create thread :[%s]", i+1, strerror(err)); 
      return -1; 
     } 

    } 
    for (i=0; i<number_of_threads; i++) { 
     pthread_join(tid[i], NULL); 
     total_different_size+=(long)compare_devices_args[i].sub_total; 
    } 
    log_info ("Total of Different size between devices - %ld\n",total_different_size); 
}; 


void compare_buffer(char* buf_first,char* buf_second,int length, int blk_size, int* result) 
{ 
     int i; 
     char buf_cpy[blk_size]; 
     for (i=0; i<=(length/blk_size);i++) 
     { 
       if (memcmp(buf_first+(blk_size*i), buf_second+(blk_size*i), blk_size) != 0) { 
         result[i]=1; 
       } 
       else { 
         result[i]=0; 
       } 
     } 
} 

void *compare_devices(void *arguments) 
{ 
     struct arg_struct *args = arguments; 
     int fd_first,fd_second; 
     char buf_first[chunck_size]; 
     char buf_second[chunck_size]; 
     int size_first,size_second; 
     unsigned int counter=0; 
     int result[chunck_size/blksize]; 
     int memcmp_result; 
     unsigned long long i; 
     off64_t pos,pos_first,pos_second; 
     unsigned long long total_number_of_different_blocks,difference_in_mb; 
     long number_of_mb_to_scan=0; 
     total_number_of_different_blocks=0; 
     log_info("Thread %d - compare devices %s,%s - start %llu end %llu\n",args->thread_number,args->device_name_a,args->device_nam 
e_b,args->start_offset,args->end_offset); 
     fd_first = open(args->device_name_a, O_RDONLY); 
     if (cb == 0) { 
      fd_second = open(args->device_name_b, O_RDONLY); 
     } 
     else 
      fd_second = open(args->device_name_b, O_RDWR); 
     lseek64(fd_first, (off64_t)args->start_offset, SEEK_SET); 
     lseek64(fd_second, (off64_t)args->start_offset, SEEK_SET); 
     number_of_mb_to_scan=(args->end_offset-args->start_offset)/chunck_size; 
     debug("Thread %d - Number of MB to scan %ld - start offset %ld\n",args->thread_number,number_of_mb_to_scan,lseek64(fd_second, 
0, SEEK_CUR)); 
     memset(buf_first, 0, sizeof(buf_first)); 
     memset(buf_second, 0, sizeof(buf_second)); 
     while ((size_first=read(fd_first, buf_first, chunck_size)) > 0 && counter <= number_of_mb_to_scan) { 
       size_second=read(fd_second,buf_second, chunck_size); 
       pos_first=lseek64(fd_first, 0, SEEK_CUR); 
       pos_second=lseek64(fd_second, 0, SEEK_CUR); 
       if ((memcmp_result=memcmp(buf_first, buf_second, sizeof(buf_second))) != 0) { 
         debug("Thread %d - Found 1MB chunck which is different at pos %llu - %d (%d)\n",args->thread_number,(unsigned 
long long)pos_first-chunck_size, memcmp_result,sizeof(buf_first)); 
         compare_buffer(buf_first,buf_second,chunck_size,blksize,result); 
         for (i=0; i<=chunck_size/blksize; i++) { 
           if (result[i] == 1) { 
             if (cb == 1) { 
              debug("Thread %d - chunck %d - copying block %d - location of block in device file %llu", 
args->thread_number, counter+1, i, args->start_offset+(unsigned long long)(chunck_size*counter)+(unsigned long long)(i*blksize)); 
              lseek64(fd_first,args->start_offset+(off64_t)(chunck_size*counter)+(off64_t)(i*blksize),S 
EEK_SET); 
              lseek64(fd_second,args->start_offset+(off64_t)(chunck_size*counter)+(off64_t)(i*blksize), 
SEEK_SET); 
              size_first=read(fd_first, buf_first, blksize); 
              size_second=write(fd_second,buf_first,size_first); 
              if (size_second != blksize) { 
               log_err("Thread %d - Houston we have a problem in copying block number %llu",args->th 
read_number, (unsigned long long)args->start_offset+(unsigned long long)(chunck_size*counter)+(unsigned long long)(i*blksize)); 
              } 
             } 
             total_number_of_different_blocks++; 
           } 
         } 
       } 
       if (cb == 1) { 
         lseek64(fd_second,(off64_t)args->start_offset+(off64_t)(chunck_size*(counter+1)),SEEK_SET); 
         lseek64(fd_first,(off64_t)args->start_offset+(off64_t)(chunck_size*(counter+1)),SEEK_SET); 
       } 
       if (pos_first > args->end_offset) { 
         break; 
       } 
       memset(buf_first, 0, sizeof(buf_first)); 
       memset(buf_second, 0, sizeof(buf_second)); 
       debug("Thread %d - chunck %d out of %d done - %llu %llu", args->thread_number, counter+1, number_of_mb_to_scan,pos_fi 
rst,args->end_offset); 
       counter++; 
     } 
     log_info("Thread %d - Number of MB to scan %ld - end offset %llu\n",args->thread_number,number_of_mb_to_scan,lseek64(fd_first 
, 0, SEEK_CUR)); 
     log_info("Thread %d - Completed - Scanning %d of 1MB chuncks\n",args->thread_number,counter); 
     log_info("Thread %d - Difference of size between %s to %s starting at %llu and ending at %llu- %d different blocks of %d - %d 
MB\n",args->thread_number,args->device_name_a,args->device_name_b,args->start_offset,args->end_offset,total_number_of_different_block 
s,blksize,total_number_of_different_blocks*blksize/chunck_size); 
     args->sub_total=total_number_of_different_blocks*blksize/1024; 
     close(fd_first); 
     close(fd_second); 
     pthread_exit(arguments); 
} 
+0

Самое первое, что нужно сделать, это добавить проверку на ошибки * все * соответствующие вызовы функций. В случае, если они не ввели его в журнал, указав, почему они потерпели неудачу (log 'errno' или просто использовать' perror() '. Это отладка бесплатно. – alk

+0

Работает ли код так, как ожидается, при использовании только *** одного *** потока? – alk

+0

Код выполняется, но не ведет себя так, как ожидалось. При попытке копирования между двумя идентичными лунками - он не идентифицирует, что блоки одинаковы. Некоторые из lseek не работают по назначению – curious

ответ

0
#ifndef __dbg_h__ 
#define __dbg_h__ 

#define _LARGEFILE64_SOURCE 
#define _GNU_SOURCE 
#define _FILE_OFFSET_BITS 64 
#include <sys/types.h> 
#include <unistd.h> 
#include <fcntl.h> 
#include <linux/fs.h> 
#include <pthread.h> 
#include <stdio.h> 
#include <errno.h> 
#include <string.h> 
#include <stdint.h> 
#include <getopt.h> 
#include <openssl/sha.h> 
#include <sched.h> 

#ifdef NDEBUG 
#define debug(M, ...) 
#else 
#define debug(M, ...) fprintf(stderr, "DEBUG %s %s %s:%d: " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, ##__VA_ARGS__) 
#endif 

#define clean_errno() (errno == 0 ? "None" : strerror(errno)) 

#define log_err(M, ...) fprintf(stderr, "[ERROR] %s %s (%s:%d: errno: %s) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, clean_er 
rno(), ##__VA_ARGS__) 

#define log_warn(M, ...) fprintf(stderr, "[WARN] %s %s (%s:%d: errno: %s) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, clean_er 
rno(), ##__VA_ARGS__) 

#define log_info(M, ...) fprintf(stderr, "[INFO] %s %s (%s:%d) " M "\n", __DATE__, __TIME__, __FILE__, __LINE__, ##__VA_ARGS__) 

#endif 

#ifndef _TRY_THROW_CATCH_H_ 
#define _TRY_THROW_CATCH_H_ 

#include <setjmp.h> 

#define TRY do { jmp_buf ex_buf__; switch(setjmp(ex_buf__)) { case 0: while(1) { 
#define CATCH(x) break; case x: 
#define FINALLY break; } default: { 
#define ETRY break; } } }while(0) 
#define THROW(x) longjmp(ex_buf__, x) 

#endif /*!_TRY_THROW_CATCH_H_*/ 

unsigned long long chunck_size=1024*1024; 
int blksize=16384; 
int cb=0; 

void *compare_devices(void *arguments); 

struct arg_struct { 
    char device_name_a[1024]; 
    char device_name_b[1024]; 
    off64_t start_offset; 
    off64_t end_offset; 
    int thread_number; 
    int sub_total; 
}; 

int main(int argc, char **argv) 
{ 
    int fd,s_is_set=0,d_is_set=0,copy_blocks=0,num_cpus; 
    int opt; 
    char *source_device,*dest_device,*blksize_string,*number_of_threads_string; 
    long numblocks_src=0,numblocks_dst=0; 
    int i; 
    int err,policy; 
    long total_different_size=0; 
    int number_of_threads=16; 
    int newprio, set_thread_priority=0; 
    pthread_attr_t tattr; 
    struct sched_param param; 
    while ((opt = getopt(argc, argv, "s:d:b:p:t:c")) != -1) { 
    switch (opt) { 
    case 's': 
     source_device = optarg; 
     log_info("source device is %s\n",source_device); 
     s_is_set=1; 
     if(access(source_device, F_OK) != -1) { 
      log_info("source device is accessible\n"); 
     } 
     else { 
      log_err("cannot access source device\n"); 
      return -1; 
     } 
     break; 
    case 'd': 
     dest_device = optarg; 
     log_info("dest device is %s\n",dest_device); 
     d_is_set=1; 
     if(access(dest_device, F_OK) != -1) { 
      log_info("dest device is accessible\n"); 
     } 
     else { 
      log_err("cannot access dest device\n"); 
      return -1; 
     } 
     break; 
    case 'b': 
     blksize = atoi(optarg); 
     if (blksize%4096 != 0) { 
      log_err("Block Size is not multiple of 4k\n"); 
      return -1; 
     } 
     else 
      log_info("block size is %d\n",blksize); 
     break; 
    case 't': 
     number_of_threads = atoi(optarg); 
     num_cpus = sysconf(_SC_NPROCESSORS_ONLN); 
     if (number_of_threads%2 != 0) { 
      log_err("Number of threads are not multiple of 2\n"); 
      return -1; 
     } 
     else 
      log_info("number of threads is %d - number of cores %d\n",number_of_threads,num_cpus); 
      if (number_of_threads > num_cpus) { 
       number_of_threads=num_cpus; 
      } 
     break; 
    case 'c': 
     copy_blocks=1; 
     cb=1; 
     break; 
    case 'p': 
     if (optarg != NULL) { 
      newprio=atoi(optarg); 
      set_thread_priority=1; 
     } 
     break; 
    case ':': 
     log_info("%s: option '-%c' requires an argument\n",argv[0], optopt); 
     break; 
    case '?': 
    default: 
     log_info("%s: option '-%c' is invalid: ignored\n",argv[0], optopt); 
     break; 
    } 
    } 
    if (s_is_set != 1 || d_is_set !=1) { 
     log_err("Source device and Destination device must be specified\n"); 
     return -1; 
    } 
    struct arg_struct compare_devices_args[number_of_threads]; 
    fd = open(source_device, O_RDONLY); 
    ioctl(fd, BLKGETSIZE, &numblocks_src); 
    close(fd); 
    log_info("Number of blocks in source device: %lu, this makes %.3f GB\n", numblocks_src, (double)numblocks_src * 512.0/(1024 * 102 
4 * 1024)); 
    fd = open(dest_device, O_RDONLY); 
    ioctl(fd, BLKGETSIZE, &numblocks_dst); 
    close(fd); 
    log_info("Number of blocks in destination is device: %lu, this makes %.3f GB\n", numblocks_dst, (double)numblocks_dst * 512.0/(10 
24 * 1024 * 1024)); 
    if (numblocks_src > numblocks_dst) { 
     log_info("Number of blocks in source device is larger then number of blocks in destination device"); 
     return -1; 
    } 
    long number_of_bytes_per_thread; 
    number_of_bytes_per_thread=numblocks_src*512/(long)number_of_threads; 
    pthread_t tid[number_of_threads]; 
    for (i=0; i<number_of_threads; i++) { 
     strcpy(compare_devices_args[i].device_name_a, source_device); 
     strcpy(compare_devices_args[i].device_name_b, dest_device); 
     compare_devices_args[i].start_offset=(long)(i*number_of_bytes_per_thread); 
     compare_devices_args[i].end_offset=(long)((i+1)*number_of_bytes_per_thread-1); 
     compare_devices_args[i].thread_number=i+1; 
     if (set_thread_priority == 1) { 
      err = pthread_attr_init (&tattr); 
      policy = SCHED_RR; 
      err = pthread_attr_setschedpolicy(&tattr, policy); 
      if (err != 0) { 
       log_err("\nThread %d - Unable to set scheduler policy attributes :[%s]", i+1, strerror(err)); 
      } 
      err = pthread_attr_getschedparam (&tattr, &param); 
      if (err != 0) { 
       log_err("\nThread %d - Unable to get scheduler priority attributes :[%s]", i+1, strerror(err)); 
      } 
      log_info("\nThread %d - Old thread priority is %d",i+1,param.sched_priority); 
      param.sched_priority = newprio; 
      log_info("\nThread %d - Setting pthread priority to %d",i+1,newprio); 
      err = pthread_attr_setschedparam (&tattr, &param); 
      if (err != 0) { 
       log_err("\nThread %d - Unable to set scheduler attributes :[%s]", i+1, strerror(err)); 
      } 
      err = pthread_create(&(tid[i]), &tattr, &compare_devices,(void*)&compare_devices_args[i]); 
     } 
     else 
     { 
      err = pthread_create(&(tid[i]), NULL, &compare_devices,(void*)&compare_devices_args[i]); 
     } 
     if (err != 0) { 
      log_err("\nThread %d - can't create thread :[%s]", i+1, strerror(err)); 
      return -1; 
     } 

    } 
    for (i=0; i<number_of_threads; i++) { 
     pthread_join(tid[i], NULL); 
     total_different_size+=(long)compare_devices_args[i].sub_total; 
    } 
    log_info ("Total of Different size between devices - %ld\n",total_different_size); 
}; 


void compare_buffer(char* buf_first,char* buf_second,int length, int blk_size, int* result) 
{ 
     int i; 
     char buf_cpy[blk_size]; 
     for (i=0; i<=(length/blk_size);i++) 
     { 
       if (memcmp(buf_first+(blk_size*i), buf_second+(blk_size*i), blk_size) != 0) { 
         result[i]=1; 
       } 
       else { 
         result[i]=0; 
       } 
     } 
} 

void *compare_devices(void *arguments) 
{ 
     struct arg_struct *args = arguments; 
     int fd_first,fd_second; 
     char buf_first[chunck_size]; 
     char buf_second[chunck_size]; 
     int size_first,size_second; 
     unsigned int counter=0; 
     int result[chunck_size/blksize]; 
     int memcmp_result; 
     unsigned long long i; 
     off64_t pos,pos_first,pos_second; 
     unsigned long long total_number_of_different_blocks,difference_in_mb; 
     long number_of_mb_to_scan=0; 
     total_number_of_different_blocks=0; 
     log_info("Thread %d - compare devices %s,%s - start %llu end %llu\n",args->thread_number,args->device_name_a,args->device_nam 
e_b,args->start_offset,args->end_offset); 
     fd_first = open(args->device_name_a, O_RDONLY); 
     if (cb == 0) { 
      fd_second = open(args->device_name_b, O_RDONLY); 
     } 
     else 
      fd_second = open(args->device_name_b, O_RDWR); 
     lseek64(fd_first, (off64_t)args->start_offset, SEEK_SET); 
     lseek64(fd_second, (off64_t)args->start_offset, SEEK_SET); 
     number_of_mb_to_scan=(args->end_offset-args->start_offset)/chunck_size; 
     debug("Thread %d - Number of MB to scan %ld - start offset %ld\n",args->thread_number,number_of_mb_to_scan,lseek64(fd_second, 
0, SEEK_CUR)); 
     memset(buf_first, 0, sizeof(buf_first)); 
     memset(buf_second, 0, sizeof(buf_second)); 
     while ((size_first=read(fd_first, buf_first, chunck_size)) > 0 && counter <= number_of_mb_to_scan) { 
       size_second=read(fd_second,buf_second, chunck_size); 
       pos_first=lseek64(fd_first, 0, SEEK_CUR); 
       pos_second=lseek64(fd_second, 0, SEEK_CUR); 
       if ((memcmp_result=memcmp(buf_first, buf_second, sizeof(buf_second))) != 0) { 
         debug("Thread %d - Found 1MB chunck which is different at pos %llu - %d (%d)\n",args->thread_number,(unsigned 
long long)pos_first-chunck_size, memcmp_result,sizeof(buf_first)); 
         compare_buffer(buf_first,buf_second,chunck_size,blksize,result); 
         for (i=0; i<=chunck_size/blksize; i++) { 
           if (result[i] == 1) { 
             if (cb == 1) { 
              debug("Thread %d - chunck %d - copying block %d - location of block in device file %llu", 
args->thread_number, counter+1, i, args->start_offset+(unsigned long long)(chunck_size*counter)+(unsigned long long)(i*blksize)); 
              lseek64(fd_first,args->start_offset+(off64_t)(chunck_size*counter)+(off64_t)(i*blksize),S 
EEK_SET); 
              lseek64(fd_second,args->start_offset+(off64_t)(chunck_size*counter)+(off64_t)(i*blksize), 
SEEK_SET); 
              size_first=read(fd_first, buf_first, blksize); 
              size_second=write(fd_second,buf_first,size_first); 
              if (size_second != blksize) { 
               log_err("Thread %d - Houston we have a problem in copying block number %llu",args->th 
read_number, (unsigned long long)args->start_offset+(unsigned long long)(chunck_size*counter)+(unsigned long long)(i*blksize)); 
              } 
             } 
             total_number_of_different_blocks++; 
           } 
         } 
       } 
       if (cb == 1) { 
         lseek64(fd_second,(off64_t)args->start_offset+(off64_t)(chunck_size*(counter+1)),SEEK_SET); 
         lseek64(fd_first,(off64_t)args->start_offset+(off64_t)(chunck_size*(counter+1)),SEEK_SET); 
       } 
       if (pos_first > args->end_offset) { 
         break; 
       } 
       memset(buf_first, 0, sizeof(buf_first)); 
       memset(buf_second, 0, sizeof(buf_second)); 
       debug("Thread %d - chunck %d out of %d done - %llu %llu", args->thread_number, counter+1, number_of_mb_to_scan,pos_fi 
rst,args->end_offset); 
       counter++; 
     } 
     log_info("Thread %d - Number of MB to scan %ld - end offset %llu\n",args->thread_number,number_of_mb_to_scan,lseek64(fd_first 
, 0, SEEK_CUR)); 
     log_info("Thread %d - Completed - Scanning %d of 1MB chuncks\n",args->thread_number,counter); 
     log_info("Thread %d - Difference of size between %s to %s starting at %llu and ending at %llu- %d different blocks of %d - %d 
MB\n",args->thread_number,args->device_name_a,args->device_name_b,args->start_offset,args->end_offset,total_number_of_different_block 
s,blksize,total_number_of_different_blocks*blksize/chunck_size); 
     args->sub_total=total_number_of_different_blocks*blksize/1024; 
     close(fd_first); 
     close(fd_second); 
     pthread_exit(arguments); 
} 
+0

Рад, что вы хотите помочь здесь. Только ответы на код чаще всего не очень полезны. Не могли бы вы использовать короткий параграф или два, чтобы объяснить, как ваш код отвечает на вопрос и помогает айзеру? Спасибо заранее. –

+0

Проблема с исходным кодом заключалась в том, что буфер не был распределен правильно: char buf_sec Зонд [1024 * 104]; в функции compare_devices. он был изменен на char buf_second [1024 * 1024]. – curious