1. 首页
  2. IT资讯

Linux libaio 异步I/O

水平有限,错误请指出


最近准备仔细看看innodb 异步I/O的实现,而在LINUX平台下Innodb中一般我们使用的都是libaio叫做LINUX NATIVE AIO,这有别于POSIX实现的AIO,因为以前对异步I/O并不熟悉,因为在很多LINUX 系统编程书籍上都没有介绍,而网上也是资料不多。当然其好处还是非常明显的,能够在使用O_DIRECT 打开文件的情况下,保证性能而不是消耗CPU资源在等待I/O落盘上,在MYSQL中如果使用了O_DIRECT打开了数据文件那么异步I/O将会发挥作用。当然MYSQL有一个自己模拟的异步I/O但是在现在支持LIBAIO的LINUX中一般都是 NATIVE AIO了。所以我仔细看了一下LINUX NATIVE AIO。
我主要参考如下文章:
http://www.360doc.com/content/14/0109/14/9008018_343854180.shtml
应该是谷歌的工程师写的,他最后列子我也详细的研究了一遍并且进行了修改/编译加上了中文注释。
吐槽一把Linux系统编程久了不用好多都模糊了,明年一定要好好复习一下。

简书地址
http://www.jianshu.com/p/f6371735e520


一、基本数据结构简介

  • io_context_t:它是一个成为上下文的结构,在内部它包含一个完成队列,在线程之间是可以共享的。
  • iocb:单次读写操作需求,下面是主要的一些定义
void* data;  short aio_lio_opcode;  int aio_fildes;  union  {   strcut   {     void* buf;     unsigned long nbytes;     long long offset;   } c;  } u; 

data:是一个用户定义传入数据
aio_lio_opcode:是一个标示可以取

IO_CMD_PREAD:读  IO_CMD_PWRITE:写  或者其他支持的标志 

aio_fileds:是iocb读取或者写入的文件描述符fd
u.c.buf:是一个读取或者写入的内存数据指针
u.c.nbytes:内存数据字节长度
u.c.offset:读取文件的偏移量

其次union u中实际包含其他的可能的I/O类型如下,有兴趣的需要在看看

 union {                 struct io_iocb_common           c;                 struct io_iocb_vector           v;                 struct io_iocb_poll             poll;                 struct io_iocb_sockaddr saddr;         } u; 

iocb应该使用io_prep_pread和io_prep_pwrite进行初始化如下:

void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset);  void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); 

我们发现 int fd,void *buf, size_t count, long long offset 刚好对应了

aio_fileds:  u.c.buf:  u.c.nbytes:  u.c.offset: 

而aio_lio_opcode可以从调用方式(io_prep_pread/io_prep_pwrite)看出来。

  • io_event

void* data:和iocb中的用户数据同一指针
struct iocb *obj:也就是iocb
long loing res:读或者写的字节数

二、相关函数

  • 1、建立一个io_context_t上下文
    int io_setup(unsigned nr_events, io_context_t *ctxp);

  • nr_events:本io_context_t支持的最大event最大队列,注意和后面的io_getevents的nr兼容.

  • ctxp:一根指针io_context_t用于初始化,但是这个io_context_t必须提前建立好并且赋值为0.
    LINUX MAIN PAGE:

DESCRIPTION        io_setup()  creates  an  asynchronous  I/O  context capable of receiving at least nr_events.  ctxp must not point to an AIO context that already        exists, and must be initialized to 0 prior to the call.  On successful creation of the AIO context, *ctxp is filled in with the  resulting  han-        dle.    RETURN VALUE        On success, io_setup() returns 0.  For the failure return, see NOTES.  ERRORS        EAGAIN The specified nr_events exceeds the user's limit of available events.        EFAULT An invalid pointer is passed for ctxp.        EINVAL ctxp is not initialized, or the specified nr_events exceeds internal limits.  nr_events should be greater than 0.        ENOMEM Insufficient kernel resources are available.        ENOSYS io_setup() is not implemented on this architecture. 
  • 2、销毁一个io_context_t上下文
    这个没啥好说的看看原型和LINUX MAIN PAGE即可
    int io_destroy(aio_context_t ctx);
    LINUX MAIN PAGE:
DESCRIPTION         io_destroy()  removes  the  asynchronous  I/O context from the list of I/O contexts and then destroys it.  io_destroy() can also cancel any out-         standing asynchronous I/O actions on ctx and block on completion.    RETURN VALUE         On success, io_destroy() returns 0.  For the failure return, see NOTES.    ERRORS         EFAULT The context pointed to is invalid.         EINVAL The AIO context specified by ctx is invalid.         ENOSYS io_destroy() is not implemented on this architecture. 
  • 3、提交异步I/O操作
    int io_submit(io_context_t ctx_id, long nr, struct iocb **iocbpp);

  • ctx_id:异步I/O上下文

  • nr:后面iocb数组的长度

  • iocbpp:也就是iocb的数组
    我们可以发现一个io_submit可以提交多个iocb异步I/O需求,但是它们之间是没有顺序的,如果提交多个iocb需求可以显著的提高性能,正常情况下其不会被堵塞,如果被堵塞可能由于没有使用O_DIRECT打开文件导致

LINUX MAIN PAGE:

DESCRIPTION         io_submit()  queues  nr  I/O request blocks for processing in the AIO context ctx_id.  iocbpp should be an array of nr AIO control blocks, which         will be submitted to context ctx_id.  RETURN VALUE         On success, io_submit() returns the number of iocbs submitted (which may be 0 if nr is zero).  For the failure return, see NOTES.  ERRORS         EAGAIN Insufficient resources are available to queue any iocbs.         EBADF  The file descriptor specified in the first iocb is invalid.         EFAULT One of the data structures points to invalid data.         EINVAL The aio_context specified by ctx_id is invalid.  nr is less than 0.  The iocb at *iocbpp[0] is not properly initialized, or the operation                specified is invalid for the file descriptor in the iocb.         ENOSYS io_submit() is not implemented on this architecture. 
  • 4、获取I/O完成状态
    int io_getevents(io_context_t ctx_id, long min_nr, long nr, struct io_event *events, struct timespec *timeout);

  • ctx_id:上下文

  • min_nr:io_getevents函数调用在达到min_nr将会返回结果io_events

  • nr:最大的io_event返回个数

  • events:它是一个返回io_event的一个数组

  • timeout:调用io_getevents堵塞的最大时间,如果达到这个值io_getevents函数调用 ,将会提前结束,返回实际的events数组和个数,可能会少于nr。
    LINUX MAIN PAGE:

DESCRIPTION         io_getevents()  attempts  to  read  at least min_nr events and up to nr events from the completion queue of the AIO context specified by ctx_id.         timeout specifies the amount of time to wait for events, where a NULL timeout waits until at least min_nr events  have  been  seen.   Note  that         timeout is relative and will be updated if not NULL and the operation blocks.    RETURN VALUE         On success, io_getevents() returns the number of events read: 0 if no events are available, or less than min_nr if the timeout has elapsed.  For         the failure return, see NOTES.    ERRORS         EFAULT Either events or timeout is an invalid pointer.         EINVAL ctx_id is invalid.  min_nr is out of range or nr is out of range.         EINTR  Interrupted by a signal handler; see signal(7).         ENOSYS io_getevents() is not implemented on this architecture. 

三、列子

如果要明白呢一个系统函数的使用最重要的还是看看它的使用套路,下面的列子最好能够好好看看

点击(此处)折叠或打开

  1. /*************************************************************************
  2.   > File Name: test.cpp
  3.   > Author: gaopeng QQ:22389860 all right reserved
  4.   > Mail: gaopp_200217@163.com
  5.   > Created Time: Fri 17 Nov 2017 12:52:56 AM CST
  6.  ************************************************************************/
  7. #include<iostream>
  8. #include<libaio.h>
  9. #include<stdlib.h>
  10. #include<stdio.h>
  11. #include<sys/stat.h>
  12. #include<sys/types.h>
  13. #include<fcntl.h>
  14. #define PAGE_SIZE 1<<12
  15. #define CHECK_ERROR(r,w,fs,f) (check_error(r,w,fs,f,__FILE__, __LINE__))
  16. #define MAX_NR 10000
  17. #define MIN_NR 1
  18. /*
  19.  * ret:fun ret
  20.  * what:what value
  21.  * sf_flag:succuess or failure
  22.  * 1 succuess
  23.  * 0 failure
  24.  *
  25.  */
  26. int check_error(int ret,int what,int sf_flag,const char* fun, const char* szFile, const int iLine)
  27. {
  28.     if(sf_flag == 1)
  29.     {
  30.         if(ret != what)
  31.         {
  32.             printf(“error file:%s line:%d”,szFile,iLine);
  33.             perror(fun);
  34.             exit(1);
  35.         }
  36.     }
  37.     else if(sf_flag == 0)
  38.     {
  39.         if(ret == what)
  40.         {
  41.             printf(“error file:%s line:%d”,szFile,iLine);
  42.             perror(fun);
  43.             exit(1);
  44.         }
  45.     }
  46.     return 0;
  47. }
  48. class AIOre
  49. {
  50.     public:
  51.                 int* buffer;
  52.         virtual void Complete(int ret) = 0;
  53.         AIOre()
  54.         {
  55.                         int ret = posix_memalign((void**)(&buffer),PAGE_SIZE,PAGE_SIZE);/* 分配一个4096(bytes)大小的4096对其内存空间 */
  56.             CHECK_ERROR(ret,1,0,“posix_memalign”);
  57.         }
  58.         virtual ~AIOre()
  59.         {
  60.                         printf(“Virtual AIOre destory function to free this buffer!”);
  61.             free(buffer);
  62.         }
  63. };
  64. class Adder
  65. {
  66.         public:
  67.                 virtual void Add(int amount) = 0;
  68.                 virtual ~Adder(){};
  69. };
  70. class AIORead:public AIOre
  71. {
  72.     private:
  73.         Adder* adder;//父类指针
  74.     public:
  75.         AIORead(Adder* adder):AIOre()//父类指针指向子类对象
  76.                 {
  77.                        this>adder = adder;
  78.                 }
  79.         virtual void Complete(int res)
  80.         {
  81.             //return check
  82.             int value = buffer[0];
  83.                         printf(“Read of %d Completed %d res “,value,res);
  84.             //多态
  85.             adder>Add(value);
  86.         }
  87. };
  88. class AIOWrite:public AIOre
  89. {
  90.     private:
  91.         int value;
  92.     public:
  93.         AIOWrite(int value):AIOre()
  94.                 {
  95.         buffer[0] = value;
  96.                 this>value = value;
  97.                 }
  98.         virtual void Complete(int res)
  99.         {
  100.             //error check
  101.                         printf(“Write of %d Completed %d n”,value,res);
  102.         }
  103. };
  104. class AIOAdder:public Adder
  105. {
  106.     public:
  107.         int fd;
  108.         io_context_t ioctx;
  109.         int counter; /* 偏移量 */
  110.                 int reap_counter; /* event个数 */
  111.                 int sum; /* */
  112.                 int length; /* 文件大小/PAGE_SIZE */
  113.         AIOAdder(int length)
  114.         {
  115.             ioctx = 0;//必须初始化为0
  116.             counter = 0;
  117.                         reap_counter = 0;
  118.             sum = 0;
  119.             this>length = length;
  120.         }
  121.                 void init() /* 初始化打开文件并且预分配文件大小 */
  122.         {
  123.             printf(“Open filen”);
  124.                         fd = open(“test”,O_RDWR|O_DIRECT|O_CREAT,0644); //必须包含O_DIRECT
  125.             CHECK_ERROR(fd,0,1,“open”);
  126.             printf(“Allocating enough space for the sumn”);
  127.             {
  128.                 int ret = fallocate(fd,0,0,PAGE_SIZE*length);/* 预先分配length*4096大小的文件 */
  129.                 CHECK_ERROR(fd,1,0,“fallocate”);
  130.             }
  131.             printf(“Setting the io Contextn”);
  132.             {
  133.                                 int ret = io_setup(100,&ioctx); /* 初始化ioctx*/
  134.                 CHECK_ERROR(ret,1,0,“io_setup”);
  135.             }
  136.         }
  137.         virtual void Add(int amount)
  138.         {
  139.             sum += amount;
  140.             printf(“Adding %d for toal of %d n”,amount,sum);
  141.         }
  142.         
  143.         void submitwr()
  144.         {
  145.             printf(“submitting a wirte to %d n”,counter);
  146.             struct iocb iocb;//建立一个异步I/O需求
  147.             struct iocb* iocbs = &iocb;
  148.                         AIORe *req = new AIOWrite(counter); /* 这里使用counter去初始化buffer buffer 4K大小 但是counter只有4 BYTES */
  149.             /* void io_prep_pwrite(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); */
  150.             /* 初始化这个异步I/O需求 counter为偏移量 */
  151.             io_prep_pwrite(&iocb,fd,req>buffer,PAGE_SIZE,counter*PAGE_SIZE);
  152.                         iocb.data=req; /* 用户指针实际上就是本次提交Write操作的类对象指针用于释放buffer */
  153.             int res = io_submit(ioctx,1,&iocbs);/* 提交这个I/O不会堵塞 */
  154.             CHECK_ERROR(ret,1,0,“io_submit”);
  155.         }
  156.         void writefile()
  157.         {
  158.             reap_counter = 0;
  159.                         for(counter = 0;counter < length;counter++) /* 偏移量不断增加不断写入 */
  160.             {
  161.                                 submitwr(); /* 异步提交操作 实际在多线程下本线程提交后则可以干其他事情了不会堵塞等待而耗费CPU */
  162.                                 reap(); /* 获得i/o状态 */
  163.             }
  164.                         reapremain();
  165.         }
  166.         void submitrd()
  167.         {
  168.             printf(“submitting a read from %d n”,counter);
  169.             struct iocb iocb;//建立一个异步I/O需求
  170.             struct iocb* iocbs = &iocb;
  171.             AIORe *req = new AIORead(this);
  172.             /* void io_prep_pread(struct iocb *iocb, int fd, void *buf, size_t count, long long offset); */
  173.             io_prep_pread(&iocb,fd,req>buffer,PAGE_SIZE,counter*PAGE_SIZE);
  174.             iocb.data = req;
  175.             int res = io_submit(ioctx,1,&iocbs);
  176.             CHECK_ERROR(ret,1,0,“io_submit”);
  177.             printf(“test:%p %pn”,&iocb,iocbs);
  178.         }
  179.         void readfile()
  180.         {
  181.             reap_counter = 0;
  182.             for(counter=0;counter<length;counter++)
  183.             {
  184.                 submitrd();
  185.                 reap();//here paramter used
  186.             }
  187.             reap remaining();
  188.         }
  189.         int doreap(int min_nr)
  190.         {
  191.             printf(“Reap between %ld and %ld io_eventsn”,min_nr,max_nr);//what mean
  192.                         struct io_event* events = new io_event[MAX_NR]; /* pending I/O max support */
  193.             struct timespec timeout;
  194.             timeout.tv_sec = 0;
  195.             timeout.tv_nsec = 100000000;
  196.             int num_events;
  197.             printf(“Calling io_getevents”);
  198.                         num_events = io_getevents(ioctx,min_nr,MAX_NR,events,&timeout); /* 获得异步I/O event个数 */
  199.                         CHECK_ERROR(num_events,1,0,“io_getevents”);
  200.             printf(“Calling completion function on results”);
  201.                         for(int i = 0;i<num_events;i++) /* 开始获取每一个event并且做相应处理 */
  202.             {
  203.                                 struct io_event event = events[i];
  204.                                 AIORe* req = (AIORe*)(event.data); /* 多态AIORe可以是度或者写及 AIOWrite/AIORead */
  205.                 req>Complete(event.res);
  206.                                 delete req; /* 到这里一次I/O就完成了,删除内存对象包含buffer */
  207.             }
  208.             delete events;
  209.             printf(“Reaped %ld io_getevents”,num_events);
  210.                         reap_counter = num_events+reap_counter; /* 将event个数汇总 */
  211.                         return num_events; /* 返回本次获取的event个数 */
  212.         }
  213.     
  214.         void reap()
  215.         {
  216.                         if(counter >= MIN_NR) /* 如果大于了min_nr才开始reap */
  217.             {
  218.                                 doreap(MIN_NR);
  219.             }
  220.         }
  221.                 void reapremain() /* 做最后的reap */
  222.         {
  223.             while(reap_counter<length)
  224.             {
  225.                 doreap(1);
  226.             }
  227.         }
  228.         ~AIOAdder()
  229.         {
  230.             printf(“Closing AIO context and file”);
  231.             io_destroy(ioctx);
  232.             colse(fd);
  233.         }
  234.                 int Sum()
  235.         {
  236.             printf(“Writing consecutive integers to file”);
  237.             writefile();
  238.             printf(“Rriting consecutive integers to file”);
  239.             readfile();
  240.             return sum;
  241.         }
  242. };
  243. int main()
  244. {
  245.     AIOAdder adder(10000);
  246.     adder.init();/* 文件预先分配大小 */
  247.     adder.writefile();
  248.     adder.readfile();
  249. }

作者微信

  

<length) {="" doreap(1);="" }="" ~aioadder()="" printf("closing="" aio="" context="" and="" file");="" io_destroy(ioctx);="" colse(fd);="" int="" sum()="" printf("writing="" consecutive="" integers="" to="" writefile();="" printf("rriting="" readfile();="" return="" sum;="" };="" main()="" aioadder="" adder(10000);="" adder.init();="" *="" 文件预先分配大小="" adder.writefile();="" adder.readfile();="" }

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/7728585/viewspace-2147684/,如需转载,请注明出处,否则将追究法律责任。

主题测试文章,只做测试使用。发布者:深沉的少年,转转请注明出处:http://www.cxybcw.com/183484.html

联系我们

13687733322

在线咨询:点击这里给我发消息

邮件:1877088071@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code