1. 首页
  2. IT资讯

十九:从库MTS多线程并行回放(一)(笔记)

欢迎关注我的《深入理解MySQL主从原理 32讲 》,如下:

如果图片不能显示可查看下面链接:
https://www.jianshu.com/p/d636215d767f

一、分发调用流程

  ->ev->apply_event(rli); Log_event::apply_event 这里如果是非MTS进行应用 如果MTS  如果是GTID event 进行WORKER线程的分配 ,如果不是则获取WORKER线程        -> 是否是进行 MTS recovery if (rli->is_mts_recovery())           根据 bitmap 设置进行跳过处理             if (rli->is_mts_recovery())//如果是恢复 这个地方就是前面恢复扫描出来的位置                 {                   bool skip=                     bitmap_is_set(&rli->recovery_groups, rli->mts_recovery_index) &&                     (get_mts_execution_mode(::server_id,                                             rli->mts_group_status ==                                             Relay_log_info::MTS_IN_GROUP,                                             rli->current_mts_submode->get_type() ==                                             MTS_PARALLEL_TYPE_DB_NAME)                      == EVENT_EXEC_PARALLEL);                   if (skip)                   {                     DBUG_RETURN(0);                   }                   else                   {                     DBUG_RETURN(do_apply_event(rli));                   }                 }        -> 如果是单线程直接调用 do_apply_event        -> 如果是多线程MTS !!!!!!!!!!!!!!!!          ->Log_event::get_slave_worker 主要是根据不同的EVENT进行不同的操作 包含1、判定是否可以并发 2、判定由哪一个worker进行执行            ->如果是GTID event !!!匿名GTID Event也可以               ->is_gtid_event             ->初始化一个组 Slave_job_group             ->在GAQ中分配队列序号             ->rli->mts_groups_assigned++ 增加             ->使用GTID-event的位置和mts_groups_assigned将GROUP中的master_log_pos位置 和 total_seqno初始化             ->将GROUP加到 GAQ并且分配的 序号 gaq->assigned_group_index= gaq->en_queue(&group);             ->初始化一个Slave_job_item              ->加入到rli->curr_group_da.push_back中             ->进行GTID 模式下 判定是否可以并发              ->schedule_next_event                ->Mts_submode_logical_clock::schedule_next_event 基于COMMIT_ORDER和WRITE SET的都使用这个方法                   主要判断是否可以进行并发并且进行等待                 ->获取GTID EVENT中的last commit和seq number                 ->如果不能进行并发则需要等待last commit > LWM SEQ NUMBER(最新一次除没有提交事物之前的一个事物的seq number)                   ->wait_for_last_committed_trx 进入等待 他会设置一个min_waited_timestamp 作为                     其他事物提交时更新LWM SEQ NUMBER的标记,等待直到last commit<=LWM SEQ NUMBER                     等待标记为                     stage_worker_waiting_for_commit_parent                      Waiting for dependent transaction to commit                     同时还会更新 mts_total_wait_overlap                     my_atomic_add64(&rli->mts_total_wait_overlap, diff_timespec(&ts[1], &ts[0]));                      获取 LWM SEQ NUMBER 的源码注释:                                     the last time index containg lwm                                        +------+                                        | LWM  |                                        |  |   |                                        V  V   V                         GAQ:x  xoooooxxxxxXXXXX...X                                      ^   ^                                      |   | LWM+1                                      |                                      +- tne new current_lwm                               <---- logical (commit) time ----                         here `x' stands for committed, `X' for committed and discarded from                         the running range of the queue, `o' for not committed.           ->如果 query event !!!             ->初始化一个Slave_job_item              ->将其加入到rli->curr_group_da.push_back(job_item);中             ->设置 rli->curr_group_seen_begin= true; 说明找到了query event             ->进行DATABASE模式的分配 不考虑           ->如果是MAP EVENT             ->开始获取WORKER线程到这里已经可以并发执行了,需要进行WORKER线程的获取              ret_worker=rli->current_mts_submode->get_least_occupied_worker(rli, &rli->workers,this);               Mts_submode_logical_clock::get_least_occupied_worker              -> 第一次rli->last_assigned_worker为空 这需要新分配                 -> Mts_submode_logical_clock::get_free_worker 进行分配                    ->循环每一个worker线程,看是否有正在等待处理的event,找到一个没有任何工作的worker线程                      这里也能出是轮询每一个worker线程找到空闲的worker线程就可以了。判断标准就是                       if (w_i->jobs.len == 0)                    -> 如果没有找到,分配失败,进行等待等待为                      stage_slave_waiting_for_workers_to_process_queue                      Waiting for slave workers to process their queues                    -> 循环获取work线程,直到成功                    -> 获取成功后更新信息                       等待的时间:rli->mts_total_wait_worker_avail += diff_timespec(&ts[1], &ts[0]);                       增加一次等待次数:rli->mts_wq_no_underrun_cnt++;                    ->如果开启了参数 slave_preserve_commit_order=1 注册事物                      rli->get_commit_order_manager()->register_trx(worker);                    ->ptr_group->worker_id= ret_worker->id;//设置本次事物组的worker_id 就是分配的工作线程             ->伴随着Woker线程的分配,如果是开启了参数slave_preserve_commit_order需要注册这个事务               if (rli->get_commit_order_manager() != NULL && worker != NULL)                 rli->get_commit_order_manager()->register_trx(worker);//注册事物          ->如果是DEL event               步骤同上 只是不需要分配work线程了因为已经分配了          ->如果是XID event               步骤同上 不过还需要更新group 的checkpoint信息 如下:                 if (!ret_worker->checkpoint_notified) //将GROUP中填写 checkpoint信息                 {                   if (!ptr_group)                     ptr_group= gaq->get_job_group(rli->gaq->assigned_group_index);                   ptr_group->checkpoint_log_name=                      my_strdup(key_memory_log_event, rli->get_group_master_log_name(), MYF(MY_WME));                   ptr_group->checkpoint_log_pos= rli->get_group_master_log_pos();                   ptr_group->checkpoint_relay_log_name=                     my_strdup(key_memory_log_event, rli->get_group_relay_log_name(), MYF(MY_WME));                   ptr_group->checkpoint_relay_log_pos= rli->get_group_relay_log_pos();                   ptr_group->shifted= ret_worker->bitmap_shifted; //checkpoint 后 移动的个数 用于后面提交的时候改变参考Slave_worker::commit_positions    设置参考mts_checkpoint_routine()                   ret_worker->bitmap_shifted= 0;//重置移动量                   ret_worker->checkpoint_notified= TRUE;                 }                 ptr_group->checkpoint_seqno= rli->checkpoint_seqno; //获取seqno 这个值会在chkpt后减去偏移量                 ptr_group->ts= common_header->when.tv_sec + (time_t) exec_time; // Seconds_behind_master related  //checkpoint的时候会将这个值再次传递 mts_checkpoint_routine()                 rli->checkpoint_seqno++;//增加seqno                      到这里 Log_event::get_slave_worker 每个event的处理流程完成,每次都会回到                 Log_event::apply_event         ->Log_event::apply_event 返回到 apply_event_and_update_pos            ->回到apply_event_and_update_pos 下面逻辑MTS才进行 也就是入队到woker中去        开始进入worker 队列,GTID和QUERY EVNET会跟随 MAP EVENT一起进入队列加入了li->curr_group_da中        初始化map event的Slave_job_item         设置ev属于在GAP中的位置 ev->mts_group_idx= rli->gaq->assigned_group_index;        如果是map event的话还会帮助GTID和QUERY event入队        然后自己入队(append_item_to_jobs(job_item, w, rli))        其他event 比如delete event和xid event则自己调用(append_item_to_jobs(job_item, w, rli))        入队        -> append_item_to_jobs(job_item, w, rli)          ->如果入队的event 因为worker线程的队列已经满了则等待:            进入状态stage_slave_waiting_worker_queue            Waiting for Slave Worker queue            wroker队列的大小为:mts_slave_worker_queue_len_max= 16384;            每次等待增加一次            worker->jobs.overfill= TRUE;            worker->jobs.waited_overfill++;            rli->mts_wq_overfill_cnt++;        (rli->is_parallel_exec() && rli->mts_events_assigned % 1024 == 1)         如果每个event的前面的操作操作120秒 则会出现通知 这个警告经常遇到:        从上面我们看到的等待来讲超过120秒的可能有3种        1、由于上一组并发有大事物没有提交           导致不能并发worker线程的等待时间        2、worker线程都在完成工作及在应用上一个事物的event,没有新的worker线程以供新分配        3、worker线程已经分配,但是由于worker线程的分配队列为16384,如果应用比较慢则可能入不了           分配队列,一般也是大事物造成的。  sql_print_information("Multi-threaded slave statistics%s: "                  "seconds elapsed = %lu; "                  "events assigned = %llu; "                  "worker queues filled over overrun level = %lu; "                  "waited due a Worker queue full = %lu; "                  "waited due the total size = %lu; "                  "waited at clock conflicts = %llu "                  "waited (count) when Workers occupied = %lu "                  "waited when Workers occupied = %llu",                  rli->get_for_channel_str(),                  static_cast<unsigned long>                  (my_now - rli->mts_last_online_stat),//消耗总时间 单位秒                  rli->mts_events_assigned,//总的event分配的个数                  rli->mts_wq_overrun_cnt,// worker线程分配队列大于 90%的次数 当前硬编码  14746                  rli->mts_wq_overfill_cnt,    //由于work 分配队列已满造成的等待次数 当前硬编码 16384                  rli->wq_size_waits_cnt, //大Event的个数 一般不会存在                  rli->mts_total_wait_overlap,//由于上一组并行有大事物没有提交导致不能分配worker线程的等待时间 单位纳秒                  rli->mts_wq_no_underrun_cnt, //work线程由于没有空闲的而等待的次数                  rli->mts_total_wait_worker_avail);//work线程由于没有空闲的而等待的时间   单位纳秒      ->回到apply_event_and_update_pos 下面 进行pos的更新 这个pos是 event_relay_log_pos ,不会出现在show slave或者其他地方          更新内部变量读取到的relay log位置和名字 这个不用于外部访问           uint event_relay_log_number; 这两个是正在执行的relay log的位置           ulonglong event_relay_log_pos;  

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/7728585/viewspace-2649958/,如需转载,请注明出处,否则将追究法律责任。

主题测试文章,只做测试使用。发布者:深沉的少年,转转请注明出处:http://www.cxybcw.com/183958.html

联系我们

13687733322

在线咨询:点击这里给我发消息

邮件:1877088071@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息

QR code