1. 首页
  2. IT资讯

LINUX 如何实现多线程进行cp复制

关于这个问题,意义虽然有限因为一般来说在复制文件的时候,实际的瓶颈来自于I/O,不管开启多少个线程实际上速度并不会快多少,但是为了练习多线程编程,
这里给出了一种C++代码实现的方式,代码附在最后。

实际上就是将一个文件分割为多个片段,开启多个线程进行同时复制,如果用户制定的并行大于服务器实际的CPU核数,程序会自动降级并行度为CPU核数,如果文件小于
100M则并行度始终为1。

root@bogon:/home/gaopeng/mmm# ./parcp log.log log10.log 2       
set parallel:2
Your cpu core is:4
real parallel:2
Will Create 2 Threads
140677902710528:0:174522367:3:4
140677894317824:174522368:349044736:3:4
Copy Thread:140677902710528 work 25%
Copy Thread:140677894317824 work 25%
Copy Thread:140677902710528 work 50%
Copy Thread:140677902710528 work 75%
Copy Thread:140677902710528 work 100%
Copy Thread:140677902710528 work Ok!!
Copy Thread:140677894317824 work 50%
Copy Thread:140677894317824 work 75%
Copy Thread:140677894317824 work Ok!!

复制完成后进行md5验证
root@bogon:/home/gaopeng/mmm# md5sum log.log
f64acc21f7187a865938b340b3eda198  log.log
root@bogon:/home/gaopeng/mmm# md5sum log10.log
f64acc21f7187a865938b340b3eda198  log10.log
可以看出校验是通过的

代码如下:

点击(此处)折叠或打开

  1. #include<iostream>
  2. #include <map>
  3. #include<stdint.h>
  4. #include<stdio.h>
  5. #include<string.h>
  6. #include<unistd.h>
  7. #include<sys/types.h>
  8. #include<sys/stat.h>
  9. #include <sys/sysinfo.h>
  10. #include<fcntl.h>
  11. #include<errno.h>
  12. #include <time.h>
  13. #include <stdarg.h>
  14. #include <stdlib.h>
  15. #include <pthread.h>
  16. #define MAX_BUFFER 65536
  17. using namespace std;
  18. pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
  19. class thread_info
  20. {
  21.         private:
  22.                 uint64_t start_pos;
  23.                 uint64_t end_pos;
  24.                 int fdr;
  25.                 int fdw;
  26.         public:
  27.                 pthread_t t_id;
  28.                 static int do_id;
  29.         public:
  30.                 thread_info()
  31.                 {
  32.                         start_pos = 0;
  33.                         end_pos = 0;
  34.                         fdr = 0;
  35.                         fdw = 0;
  36.                         t_id = 0;
  37.                 }
  38.                 void set_start(uint64_t a)
  39.                 {
  40.                         start_pos = a;
  41.                 }
  42.                 void set_end(uint64_t a)
  43.                 {
  44.                         end_pos = a;
  45.                 }
  46.                 void set_fd(int a,int b)
  47.                 {
  48.                         fdr = a;
  49.                         fdw = b;
  50.                 }
  51.                 uint64_t get_start(void)
  52.                 {
  53.                         return start_pos;
  54.                 }
  55.                 uint64_t get_stop(void)
  56.                 {
  57.                         return end_pos;
  58.                 }
  59.                 int get_fdr(void)
  60.                 {
  61.                         return fdr;
  62.                 }
  63.                 int get_fdw(void)
  64.                 {
  65.                         return fdw;
  66.                 }
  67.                 void print(void)
  68.                 {
  69.                         cout<<start_pos<<“:”<<end_pos<<“:”<<t_id<<endl;
  70.                 }
  71. };
  72. int thread_info::do_id = 0;
  73. class ABS_dispatch
  74. {
  75.         public:
  76.                 ABS_dispatch()
  77.                 {
  78.                         par_thr = 0;
  79.                         max_cpu = 0;
  80.                 }
  81.                 virtual thread_info* call_thread(void) = 0;
  82.                 virtual void make_init(uint32_t t_n) = 0;
  83.                 uint32_t getcpu(void)
  84.                 {
  85.                         return get_nprocs() ;
  86.                 }
  87.                 virtual ~ABS_dispatch(){
  88.                 }
  89.         protected:
  90.                 uint32_t par_thr;
  91.                 uint32_t max_cpu;
  92. };
  93. class dispatch:public ABS_dispatch
  94. {
  95.         public:
  96.                 typedef multimap<uint64_t,uint64_t>::iterator pair_piece_iterator;
  97.                 dispatch():ABS_dispatch()
  98.         {
  99.                 file_idr = 0;
  100.                 file_idw = 0;
  101.                 file_size = 0;
  102.                 tread_arr_p = NULL;
  103.         }
  104.                 virtual thread_info* call_thread(void);
  105.                 virtual void make_init(uint32_t t_n)
  106.                 {
  107.                         max_cpu = getcpu(); //
  108.                         cout<<“Your cpu core is:”<<max_cpu<<“n”;
  109.                         if(t_n > max_cpu) //parallel
  110.                         {
  111.                                 cout<<“Parallel downgrad to cpu core:”<<max_cpu<<“n”;
  112.                                 par_thr = max_cpu;
  113.                         }
  114.                         else
  115.                         {
  116.                                 par_thr = t_n;
  117.                         }
  118.                 }
  119.                 void set_init(int file_idr,int file_idw)
  120.                 {
  121.                         file_size = lseek(file_idr,0,SEEK_END);
  122.                         if(file_size<100000000)
  123.                         {
  124.                                 cout<<“File small than 100M par = 1” <<“n”;
  125.                                 par_thr = 1;
  126.                         }
  127.                         this>file_idr = file_idr;
  128.                         this>file_idw = file_idw;
  129.                 }
  130.                 uint32_t real_par()
  131.                 {
  132.                         return par_thr;
  133.                 }
  134.                 virtual ~dispatch()
  135.                 {
  136.                         pair_piece.clear();
  137.                         delete [] tread_arr_p;
  138.                 }
  139.         private:
  140.                 int file_idr;
  141.                 int file_idw;
  142.                 multimap<uint64_t,uint64_t> pair_piece;
  143.                 uint64_t file_size;
  144.         public:
  145.                 thread_info* tread_arr_p;
  146. };
  147. static void* do_work(void* argc)
  148. {
  149.         uint64_t b;
  150.         uint64_t e;
  151.         int fdr;
  152.         int fdw;
  153.         char* buffer[MAX_BUFFER]={0};
  154.         thread_info* tread_arr_p;
  155.         uint64_t loopc = 0;
  156.         uint64_t loopc25 = 0;
  157.         uint64_t i = 0;
  158.         int m = 1;
  159.         pthread_t t_id;
  160.         tread_arr_p = static_cast<thread_info*>(argc);
  161.         //临界区 MUTEX
  162.         pthread_mutex_lock(&counter_mutex);
  163.         b = (tread_arr_p+ tread_arr_p>do_id)>get_start();
  164.         e = (tread_arr_p+ tread_arr_p>do_id)>get_stop();
  165.         fdr = (tread_arr_p+ tread_arr_p>do_id)>get_fdr();
  166.         fdw = (tread_arr_p+ tread_arr_p>do_id)>get_fdw();
  167.         t_id = (tread_arr_p+ tread_arr_p>do_id)>t_id ;
  168.         cout<< t_id <<“:”<<b<<“:”<<e<<“:”<<fdr<<“:”<<fdw<<“n”;
  169.         tread_arr_p>do_id++;
  170.         pthread_mutex_unlock(&counter_mutex);
  171.         //临界区
  172.         loopc = e/uint64_t(MAX_BUFFER);
  173.         loopc25 = loopc/(uint64_t)4;
  174.         while(i<loopc)
  175.         {
  176.                 if(i == loopc25*m )
  177.                 {
  178.                         cout<< “Copy Thread:”<<t_id<<” work “<<25*m<<“%n”;
  179.                         m++;
  180.                 }
  181.                 memset(buffer,0,MAX_BUFFER);
  182.                 pread(fdr,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));
  183.                 pwrite(fdw,buffer,MAX_BUFFER,uint64_t(i*MAX_BUFFER));
  184.                 i++;
  185.         }
  186.         memset(buffer,0,MAX_BUFFER);
  187.         pread(fdr,buffer,(euint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));
  188.         pwrite(fdw,buffer,(euint64_t(i*MAX_BUFFER)),uint64_t(i*MAX_BUFFER));
  189.         cout<< “Copy Thread:”<<t_id<<” work Ok!!”<<“n”;
  190.         return NULL;
  191. }
  192. thread_info* dispatch::call_thread()
  193. {
  194.         int i = 0;
  195.         uint64_t temp_size = 0;
  196.         temp_size = file_size/par_thr;
  197.         tread_arr_p = new thread_info[par_thr];
  198.         cout<<“Will Create “<<par_thr<<” Threadsn”;
  199.         //cout<<tread_arr_p<<endl;
  200.         //cout<<sizeof(thread_info)<<endl;
  201.         for(i = 0;i<par_thr1;i++)
  202.         {
  203.                 pair_piece.insert( pair<uint64_t,uint64_t>(temp_size*i,temp_size*(i+1)1 ));
  204.         }
  205.         pair_piece.insert( pair<uint64_t,uint64_t>(temp_size*i,file_size ));
  206.         i = 1;
  207.         for(pair_piece_iterator it =pair_piece.begin();it !=pair_piece.end() ;it++)
  208.         {
  209.                 //cout<<“–Thread: “<<i<<“n”;
  210.                 //cout<<it>first<<“n”;
  211.                 //cout<<it>second<<“n”;
  212.                 //cout<<tread_arr_p+(i1)<<endl;
  213.                 (tread_arr_p+(i1))>set_start(it>first);
  214.                 (tread_arr_p+(i1))>set_end(it>second);
  215.                 (tread_arr_p+(i1))>set_fd(file_idr,file_idw);
  216.                 pthread_create(&((tread_arr_p+(i1))>t_id),NULL,do_work,static_cast<void*>(tread_arr_p));
  217.                 //(tread_arr_p+(i1))>print();
  218.                 i++;
  219.         }
  220.         return tread_arr_p;
  221. }
  222. int main(int argc,char** argv)
  223. {
  224.         dispatch test;
  225.         thread_info* thread_info_p = NULL;
  226.         uint32_t real_par;
  227.         void *tret;
  228.         int fdr = open(argv[1],O_RDONLY|O_NOFOLLOW);
  229.         int fdw = open(argv[2],O_RDWR|O_CREAT|O_EXCL,0755);
  230.         cout<<“Author: gaopeng QQ:22389860 Blog:http://blog.itpub.net/7728585n”;
  231.         if(argc<4)
  232.         {
  233.                 cout<<“USAGE:parcp sourcefile destfile parallen”;
  234.                 return 1;
  235.         }
  236.         if(fdr == 1 || fdw == 1)
  237.         {
  238.                 perror(“open readfile:”);
  239.                 return 1;
  240.         }
  241.         if(fdw == 1)
  242.         {
  243.                 perror(“open wirtefile:”);
  244.                 return 1;
  245.         }
  246.         if(sscanf(argv[3],“%u”,&real_par) == EOF)
  247.         {
  248.                 perror(“sscanf:”);
  249.                 return 1;
  250.         }
  251.         cout<<“set parallel:”<<real_par<<“n”;
  252.         //cout<<lseek(fd,0,SEEK_SET) <<endl;
  253.         test.make_init(real_par);
  254.         test.set_init(fdr,fdw);
  255.         real_par = test.real_par();
  256.         cout<<“real parallel:” <<real_par<<“n”;
  257.         thread_info_p = test.call_thread();
  258.         for(int i = 0 ;i<real_par;i++)
  259.         {
  260.                 //cout<<(thread_info_p+i)>t_id<<endl;
  261.                 pthread_join((thread_info_p+i)>t_id,&tret);
  262.                 //cout<<reinterpret_cast<long>(tret)<<endl;
  263.         }
  264.         close(fdw);
  265.         close(fdr);
  266. }

如果觉得不错 您可以考虑请作者喝杯茶(微信支付):                                作者微信号:

LINUX 如何实现多线程进行cp复制                   

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/7728585/viewspace-2137204/,如需转载,请注明出处,否则将追究法律责任。

主题测试文章,只做测试使用。发布者:深沉的少年,转转请注明出处:http://www.cxybcw.com/183830.html

联系我们

13687733322

在线咨询:点击这里给我发消息

邮件:1877088071@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code