久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

mysql的thread

164次閱讀
沒有評論

共計 9366 個字符,預(yù)計需要花費 24 分鐘才能閱讀完成。

本篇內(nèi)容主要講解“mysql 的 thread_running 數(shù)量分析”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓丸趣 TV 小編來帶大家學(xué)習(xí)“mysql 的 thread_running 數(shù)量分析”吧!

thread pool 的原理:

已在 server 層完成解析;

層創(chuàng)建多組常駐線程,用于接收客戶端連接發(fā)送的 query 并代為執(zhí)行,而不是為每個連接單獨創(chuàng)建一個線程。

層進行 running thread 數(shù)量判斷,如果達到閾值則直接報錯或 sleep。

狀態(tài)變量記錄了當(dāng)前并發(fā)執(zhí)行 stmt/command 的數(shù)量,執(zhí)行前加 1 執(zhí)行后減 1;

突然飆高的誘因:

客戶端連接暴增;

系統(tǒng)性能瓶頸,如 CPU,IO 或者 mem swap;

異常 sql;

會表現(xiàn)出 hang 住的假象。

執(zhí)行,為此引入兩個閾值 low_watermark 和 high_watermark,以及變量 threads_running_ctl_mode(selects 或者 all);

前,檢查 thread_running,

若其已達 high_watermark 閾值則直接拒絕執(zhí)行并返回錯誤:mysql
server is too busy

若其位于 low 和 high 之間,則 sleep 5ms,然后繼續(xù)嘗試,累計等待 100ms 后則執(zhí)行

對于已經(jīng)開啟事務(wù)和 super 用戶,不做限制

控制 query 類型:SELECTS/ALL,默認(rèn)為 SELECTS,表示只影響 SELECT 語句

源碼見注 1

優(yōu)化為基于 FIFO 的 cond-wait/signal(實現(xiàn) 8 個 FIFO);

高水位限流(這點保持不變);

低水位優(yōu)化;其他解決方案:mariadb 開發(fā) thread pool,percona 在其上實現(xiàn)了優(yōu)先隊列;

優(yōu)勢:思路與 thread pool 一致,但代碼更簡潔(不到 1000 行);而且增加了特定 query 的過濾;

代碼見注 2

新增 thread_active 記錄并發(fā)線程數(shù),位于 mysql_execute_command(sql 解析之后),高水位則在 query 解析之前判斷;

只統(tǒng)計 select/DML,而 commit/rollback 則放過。

采用 FIFO,當(dāng) thread_active =
thread_running_low_watermark 時進程進入 FIFO 等待,其他線程執(zhí)行完 sql 后喚醒 FIFO;

內(nèi),同時引入 threads_running_wait_timeout 控制線程在 FIFO 最大等待時間,超時則直接報錯返回。

引入 8 個 FIFO,降低了進出 FIFO 的鎖競爭,線程采用 RR 分配到不同 fifo,每個隊列限制并發(fā)運行線程為 threads_running_low_watermark/8。

,開始執(zhí)行 query,[解析后進行低水位判斷,若通過則執(zhí)行],執(zhí)行當(dāng)前 sql 完畢后,thread 可能發(fā)起新 query,則重復(fù) [] 過程。

:進入 FIFO 排隊最長時間,等待超時后 sql 被拒,默認(rèn) 100,單位為毫秒 ms。

當(dāng)前并發(fā) SELECT/INSERT/UPDATE/DELETE 執(zhí)行的線程數(shù)目;

:當(dāng)前進入到 FIFO 中等待的線程數(shù)目;

未打補丁版本,設(shè)置 innodb_thread_concurrency=0

未打補丁版本,innodb_thread_concurrency=32

低水位限流補丁版本(活躍線程數(shù)不超過 64)

注 1

http://www.gpfeng.com/wp-content/uploads/2013/09/threads_running_control.txt

+static my_bool thread_running_control(THD *thd, ulong tr)
+{
+  int slept_cnt= 0;
+  ulong tr_low, tr_high;
+  DBUG_ENTER(thread_running_control
+  
+  /* 
+    Super user/slave thread will not be affected at any time,
+    transactions that have already started will continue.
+  */
+  if (thd- security_ctx- master_access SUPER_ACL|| – 對于 super 權(quán)限的用戶和已經(jīng)開啟的事務(wù)不做限制
+      thd- in_active_multi_stmt_transaction() ||
+      thd- slave_thread)  
+    DBUG_RETURN(FALSE);
+
+  /* 
+    To promise that tr_low will never be greater than tr_high, 
+    as values may be changed between these two statements.
+    eg. 
+        (low, high) = (200, 500)
+        1. read low = 200
+        2. other sessions: set low = 20; set high = 80
+        3. read high = 80
+    Don t take a lock here to avoid lock contention.
+  */
+  do 
+  {
+    tr_low= thread_running_low_watermark;
+    tr_high= thread_running_high_watermark;
+
+  } while (tr_low tr_high);
+
+check_buzy:

+  /* tr_high is promised to be non-zero.*/ 
+  if ((tr_low == 0 tr tr_high) || (tr_low != 0 tr tr_low))
+    DBUG_RETURN(FALSE);
+  
+  if (tr = tr_high)
+  { 
+    int can_reject= 1;
+
+    /* thread_running_ctl_mode: 0 – SELECTS, 1 – ALL. */
+    if (thread_running_ctl_mode == 0)
+    {
+      int query_is_select= 0;
+      if (thd- query_length() = 8)
+      {
+        char *p= thd- query();  – 讀取 query text 的前 6 個字符,以判斷是否為 select
+        if (my_toupper(system_charset_info, p[0]) == S
+            my_toupper(system_charset_info, p[1]) == E
+            my_toupper(system_charset_info, p[2]) == L
+            my_toupper(system_charset_info, p[3]) == E
+            my_toupper(system_charset_info, p[4]) == C
+            my_toupper(system_charset_info, p[5]) == T )
+
+          query_is_select= 1;
+      }
+
+      if (!query_is_select)
+        can_reject= 0;
+    }
+
+    if (can_reject)
+    {
+      inc_thread_rejected();
+      DBUG_RETURN(TRUE);
+    }
+    else
+      DBUG_RETURN(FALSE);
+  }
+    
+  if (tr_low != 0 tr = tr_low)
+  {
+    /* 
+      If total slept time exceed 100ms and thread running does not
+      reach high watermark, let it in.
+    */
+    if (slept_cnt = 20)
+      DBUG_RETURN(FALSE);
+    
+    dec_thread_running()
+    
+    /* wait for 5ms. */
+    my_sleep(5000UL); 
+
+    slept_cnt++;
+    tr= inc_thread_running() – 1;
+    
+    goto check_buzy;
+  }
+
+  DBUG_RETURN(FALSE);
+}
+
+/**
   Perform one connection-level (COM_XXXX) command.
   @param command         type of command to perform
@@ -1016,7 +1126,8 @@
   thd- set_query_id(get_query_id());
   if (!(server_command_flags[command] CF_SKIP_QUERY_ID))
     next_query_id();
–  inc_thread_running();
+  /* remember old value of thread_running for *thread_running_control*. */
+  int32 tr= inc_thread_running() – 1;
   if (!(server_command_flags[command] CF_SKIP_QUESTIONS))
     statistic_increment(thd- status_var.questions, LOCK_status);

@@ -1129,6 +1240,13 @@
   {
     if (alloc_query(thd, packet, packet_length))
       break;                                 // fatal error is set
+
+    if (thread_running_control(thd, (ulong)tr))
+    {
+      my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));
+      break;
+    }
+
     MYSQL_QUERY_START(thd- query(), thd- thread_id, (char *) (thd- db ? thd- db :),   thd- security_ctx- priv_user[0])

注 2  
http://www.gpfeng.com/wp-content/uploads/2014/01/tr-control.diff_.txt  
+/**
   Perform one connection-level (COM_XXXX) command.
 
   @param command         type of command to perform
@@ -1177,7 +1401,7 @@
     command= COM_SHUTDOWN;
   }
   thd- set_query_id(next_query_id());
–  inc_thread_running();
+  int32 tr= inc_thread_running();
 
   if (!(server_command_flags[command] CF_SKIP_QUESTIONS))
     statistic_increment(thd- status_var.questions, LOCK_status);
@@ -1209,6 +1433,15 @@
     goto done;
   }
 
+  if (command == COM_QUERY alloc_query(thd, packet, packet_length))
+    goto endof_case;                 // fatal error is set
+
+  if (thread_running_control_high(thd, tr))
+  {
+    my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, MYF(0));
+    goto endof_case;
+  }
+
   switch (command) {
   case COM_INIT_DB:
   {
@@ -1311,8 +1544,6 @@
   }
   case COM_QUERY:
   {
–    if (alloc_query(thd, packet, packet_length))
–      break;                                 // fatal error is set
     MYSQL_QUERY_START(thd- query(), thd- thread_id,
                       (char *) (thd- db ? thd- db :),
                        thd- security_ctx- priv_user[0],
@@ -1751,6 +1982,7 @@
     my_message(ER_UNKNOWN_COM_ERROR, ER(ER_UNKNOWN_COM_ERROR), MYF(0));
     break;
   }
+endof_case:
 
 done:
   DBUG_ASSERT(thd- derived_tables == NULL
@@ -2502,12 +2734,37 @@
   Opt_trace_array trace_command_steps(thd- opt_trace, steps
 
   DBUG_ASSERT(thd- transaction.stmt.cannot_safely_rollback() == FALSE);
+  bool count_active= false;
 
   if (need_traffic_control(thd, lex- sql_command))
   {
     thd- killed = THD::KILL_QUERY;
     goto error;
   }
+
+  switch (lex- sql_command) {
+
+  case SQLCOM_SELECT:
+  case SQLCOM_UPDATE:
+  case SQLCOM_UPDATE_MULTI:
+  case SQLCOM_DELETE:
+  case SQLCOM_DELETE_MULTI:
+  case SQLCOM_INSERT:
+  case SQLCOM_INSERT_SELECT:
+  case SQLCOM_REPLACE:
+  case SQLCOM_REPLACE_SELECT:
+    count_active= true;
+    break;
+  default:
+    break;
+  }
+
+  if (count_active thread_running_control_low_enter(thd))
+  {
+    my_error(ER_SERVER_THREAD_RUNNING_TOO_HIGH, myf(0));
+    goto error;
+  }
+
   status_var_increment(thd- status_var.com_stat[lex- sql_command]);
 
   switch (gtid_pre_statement_checks(thd))
@@ -4990,6 +5247,9 @@
 
 finish:
 
+  if (count_active)
+    thread_running_control_low_exit(thd);
+
   DBUG_ASSERT(!thd- in_active_multi_stmt_transaction() ||
                thd- in_multi_stmt_transaction_mode());
 
 

+static my_bool thread_running_control_high(THD *thd, int32 tr)
+{
+  int32 tr_high;
+  DBUG_ENTER(thread_running_control_high
+
+  tr_high= (int32)thread_running_high_watermark;
+
+  /* thread_running_ctl_mode: 0 – SELECTS, 1 – ALL. */
+  if ((!tr_high || tr = tr_high) ||
+      thd- transaction.is_active() ||
+      thd- get_command() != COM_QUERY ||
+      thd- security_ctx- master_access SUPER_ACL ||
+      thd- slave_thread)
+    DBUG_RETURN(FALSE);
+
+  const char *query= thd- query();
+  uint32 len= thd- query_length();
+
+  if ((!has_prefix(query, len, SELECT , 6) thread_running_ctl_mode == 0) || – 不再是逐個字符判斷
+      has_prefix(query, len, COMMIT , 6) ||
+      has_prefix(query, len, ROLLBACK , 8))
+    DBUG_RETURN(FALSE);
+
+  /* confirm again*/
+  if (tr tr_high get_thread_running() tr_high)
+  {
+    __sync_add_and_fetch(thread_rejected, 1);
+    DBUG_RETURN(TRUE);
+  }
+
+  DBUG_RETURN(FALSE);
+}
+
 

+static my_bool thread_running_control_low_enter(THD *thd)
+{
+  int res= 0;
+  int32 tr_low;
+  my_bool ret= FALSE;
+  my_bool slept= FALSE;
+  struct timespec timeout;
+  Thread_conc_queue *queue;
+  DBUG_ENTER(thread_running_control_low_enter
+
+  /* update global status */
+  __sync_add_and_fetch(thread_active, 1);
+
+  tr_low= (int32)queue_tr_low_watermark;
+  queue= thread_conc_queues + thd- query_id % N_THREAD_CONC_QUEUE;
+
+  queue- lock();– 問 1:在進行低水位判斷前,先鎖定 FIFO,避免低水位驗證失敗時無法獲取 FIFO 鎖進而不能放入 FIFO;
+
+retry:
+
+  if ((!tr_low || queue- thread_active tr_low) ||
+      (thd- lex- sql_command != SQLCOM_SELECT thread_running_ctl_mode == 0) ||
+      (!slept (thd- transaction.is_active() ||
+        thd- security_ctx- master_access SUPER_ACL || thd- slave_thread)))
+  {
+    queue- thread_active++; – 判斷是否滿足進入 FIFO 條件,如不滿足則立即更新 thread_active++,解鎖 queue 并退出;
+    queue- unlock();
+    DBUG_RETURN(ret);
+  }
+
+  if (!slept)
+  {
+    queue- unlock();
+
+    /* sleep for 500 us */
+    my_sleep(500);
+    slept= TRUE;
+    queue- lock();
+
+    goto retry;
+  }
+
+  /* get a free wait-slot */
+  Thread_wait_slot *slot= queue- pop_free();
+
+  /* can t find a free wait slot, must let the query enter */
+  if (!slot)– 當(dāng) FIFO 都滿了,即無法把當(dāng)前線程放入,則必須放行讓該 sql 正常執(zhí)行
+  {
+    queue- thread_active++;
+    queue- unlock();
+    DBUG_RETURN(ret);
+  }
+
+  slot- signaled= false;
+  slot- wait_ended= false;
+
+  /* put slot into waiting queue. */
+  queue- push_back_wait(slot);
+  queue- thread_wait++;
+
+  queue- unlock();
+
+  /* update global status */
+  thd_proc_info(thd, waiting in server fifo
+  __sync_sub_and_fetch(thread_active, 1);
+  __sync_add_and_fetch(thread_wait, 1);
+
+  /* cond-wait for at most thread_running_wait_timeout(ms). */
+  set_timespec_nsec(timeout, thread_running_wait_timeout_ns);
+
+  mysql_mutex_lock(slot- mutex);
+  while (!slot- signaled)
+  {
+    res= mysql_cond_timedwait(slot- cond, slot- mutex, timeout);
+    /* no need to signal if cond-wait timedout */
+    slot- signaled= true;
+  }
+  mysql_mutex_unlock(slot- mutex);
+
+  queue- lock();
+  queue- thread_wait–;
+  queue- thread_active++;
+
+  /* remove slot from waiting queue. */
+  queue- remove_wait(slot);
+  /* put slot into the free queue for reuse. */
+  queue- push_back_free(slot);
+
+  queue- unlock();
+
+  /* update global status */
+  __sync_sub_and_fetch(thread_wait, 1);
+  __sync_add_and_fetch(thread_active, 1);
+  thd_proc_info(thd, 0);
+
+  if (res == ETIMEDOUT || res == ETIME)
+  {
+    ret= TRUE; // indicate that query is rejected.
+    __sync_add_and_fetch(thread_rejected, 1);
+  }
+
+  DBUG_RETURN(ret);
+}

到此,相信大家對“mysql 的 thread_running 數(shù)量分析”有了更深的了解,不妨來實際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-07-19發(fā)表,共計9366字。
轉(zhuǎn)載說明:除特殊說明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 印江| 揭东县| 辰溪县| 华蓥市| 麦盖提县| 恩施市| 天台县| 通海县| 黎川县| 龙州县| 诏安县| 东安县| 新闻| 绥宁县| 奉新县| 临安市| 金平| 孟津县| 富裕县| 神木县| 将乐县| 五台县| 秀山| 铁岭市| 和静县| 周口市| 黄龙县| 石景山区| 利川市| 淄博市| 寿光市| 桐城市| 青田县| 普兰店市| 河津市| 博客| 渝北区| 江阴市| 辉南县| 墨玉县| 柳江县|