PostgreSQL技術大講堂 - 第33講:并行查詢管理
最新學訊:近期OCP認證正在報名中,因考試人員較多請盡快報名獲取最近考試時間,報名費用請聯系在線老師,甲骨文官方認證,報名從速!
我要咨詢
PostgreSQL從小白到專家,是從入門逐漸能力提升的一個系列教程,內容包括對PG基礎的認知、包括安裝使用、包括角色權限、包括維護管理、、等內容,希望對熱愛PG、學習PG的同學們有幫助,歡迎持續關注CUUG PG技術大講堂。
第33講:并行查詢管理
第33講:11月11日(周六)19:30-20:30,往期文檔及視頻,聯系CUUG
內容 : 并行查詢工作原理與機制、各種并行查詢處理方式
并行查詢概述
· 并行查詢為什么會快?
現代的CPU型號有大量的內核,提供了并行執行更大的可擴性,并行查詢不是因為并行讀取,而是因為數據分散在許多CPU內核上進行處理,F代操作系統為PostgreSQL數據文件提供了良好的緩存。預讀允許從存儲中獲取一個塊,而不僅僅是PG守護進程請求的塊。因此,查詢性能不受磁盤IO的限制。
它消耗CPU周期:從表數據頁逐個讀取行,比較行值和WHERE條件。
并行查詢工作原理與機制
· How does it work?
Processes:查詢執行總是在“leader”進程中開始。一個leader執行所有非并行活動及其對并行處理的貢獻。執行相同查詢的其他進程稱為“worker”進程。并行執行使用動態后臺工作器基礎結構(在9.4中添加)。由于PostgreSQL的其他部分使用進程,而不是線程,因此創建三個工作進程的查詢可能比傳統的執行速度快4倍。
Communication:Workers使用消息隊列(基于共享內存)與leader通信。每個進程有兩個隊列:一個用于錯誤,另一個用于元組。
leader、gather、worker
· gather節點作為子查詢樹的根節點

并行查詢工作原理與機制
· 使用要點
如果所有CPU內核都已飽和,則不要啟用并行執行。并行執行會從其他查詢中竊取CPU時間,從而增加其它查詢的響應時間。
最重要的是,并行處理顯著增加了具有高WORK-MEM值的內存使用量,因為每個hash連接或排序操作占用一個WORK-MEM內存量。
低延遲的OLTP查詢在并行執行時不能再快了。特別是,當啟用并行執行時,返回單行的查詢可能會執行得不好。
Pierian spring對于開發人員來說是一個TPC-H基準。檢查是否有類似的查詢以獲得最佳并行執行。
并行執行只支持不帶鎖謂詞的SELECT查詢。
正確的索引可能是并行順序表掃描的更好選擇。
不支持游標或掛起的查詢。
窗口函數和有序集聚合函數是非并行的。
對IO綁定的工作負載沒有好處。
沒有并行排序算法。但是,使用排序的查詢在某些方面仍然可以并行。
將CTE(替換為…)替換為支持并行執行的子選擇。
外部數據包裝器(FDW)當前不支持并行執行(但它們可以!)
不支持完全外部聯接。
設置最大行數的客戶端禁用并行執行。
如果查詢使用未標記為并行安全的函數,則它將是單線程的。
可序列化事務隔離級別禁用并行執行。
· How many workers to use?
影響wokers數量的參數權重依次順序:
max_parallel_workers_per_gather :每次sql操作workers數量的最大值。
max_parallel_workers:其次,查詢執行器從max_parallel_workers池中可以獲取workers的最大數。
max_worker_processes:這個是workers的頂級限制后臺進程的總數(此參數謹慎修改,根據系統實際的cpu個數(核數)來設置)。
max_parallel_workers_per_gather:理解為每個用戶去銀行取錢金額。
max_parallel_workers:理解為用戶存在銀行中的總存款金額。
max_worker_processes:理解為某個銀行支點可用現金總數。
· How many workers to use?


· 參數針對的是一個session還是整個實例?
第一個會話:

第二個會話:

· 增加worders進程的條件
查詢規劃器可以考慮根據表或索引大小增加或減少工作線程的數量:
min_parallel_table_scan_size
min_parallel_index_scan_size
示例:
set min_parallel_table_scan_size='8MB'
8MB table => 1 worker
24MB table => 2 workers
72MB table => 3 workers
x => log(x / min_parallel_table_scan_size) / log(3) + 1 worker
每一次表比min_parallel_(index| table)掃描大小大3倍,postgres就添加一個worker。workers的數量不是基于成本的!
· 示例
假如一張表的大小是1600MB
1、設置min_parallel_table_scan_size='500MB';
則:Workers Planned: 2
2、設置min_parallel_table_scan_size=‘'200MB';
則:Workers Planned: 3
3、設置min_parallel_table_scan_size=‘‘100MB';
則:Workers Planned: 4
· 改變max_parallel_workers_per_gather進程分配規則
改變workers分配規則:
實際上,系統設置的參數在生產中并不總是合適的,可以使用下面命令覆蓋特定表的workers數量。
ALTER table…SET(parallel_workers=N)
· 動態修改workers參數的值
我們可以在不重新啟動服務器的情況下增加工作線程數
alter system set max_parallel_workers_per_gather=4;
select * from pg_reload_conf();
tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
Finalize Aggregate (cost=1349772.08..1349772.09 rows=1 width=32) (actual time=160146.769..160146.769 rows=1 loops=1)
-> Gather (cost=1349771.65..1349772.06 rows=4 width=32) (actual time=160145.984..160147.581 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
· Why parallel execution is not used?
除了一長串的并行執行限制之外,PostgreSQL還會檢查成本:
“parallel_setup_cost”以避免短查詢的并行執行。它模擬了內存設置、進程啟動和初始通信所花費的時間?梢岳斫鉃閳绦袝r間小于指定的秒的查詢不走并行。
“parallel_tuple_cost”:leader 和 workers 之間的溝通可能需要很長時間。時間與workers發送的元組數成正比。該參數模擬了通信成本。
· Why parallel execution is not used?
示例:
1張表200M數據,總共3百萬行。
查詢語句:explain analyze select sum(sal) from emp4;
1、parallel_setup_cost=10000時
當查詢成本累計時間超過該值時使用并行查詢
2、parallel_setup_cost=20000
當查詢成本累計時間低于該值時使用串行查詢
Serial sequential scan
· 串行順序掃描
tpch=# explain analyze select l_quantity as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------
Seq Scan on lineitem (cost=0.00..1874376.30 rows=58833712 width=5) (actual time=0.523..33309.303 rows=58839715 loops=1)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 1146337
Planning Time: 6.637 ms
Execution Time: 41297.038 ms
(5 rows)
# 順序掃描產生太多沒有聚合的行。因此,查詢由一個CPU核執行。
Parallel sequential scan
· 并行查詢

tpch=# explain analyze select sum(l_quantity) as sum_qty from lineitem where l_shipdate <= date '1998-12-01' - interval '105' day;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=1349772.08..1349772.09 rows=1 width=32) (actual time=31962.761..31962.762 rows=1 loops=1)
-> Gather (cost=1349771.65..1349772.06 rows=4 width=32) (actual time=31961.980..31962.146 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (cost=1348771.65..1348771.66 rows=1 width=32) (actual time=31951.809..31951.809 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (cost=0.00..1312000.57 rows=14708428 width=5) (actual time=1.491..29217.070 rows=11767943 loop
s=5)
Filter: (l_shipdate <= '1998-08-18 00:00:00'::timestamp without time zone)
Rows Removed by Filter: 229267
Planning Time: 0.666 ms
Execution Time: 31964.069 ms
*在添加SUM()之后,可以清楚地看到4個worker將幫助我們加快查詢速度
· Parallel Aggregation
“Parallel Seq Scan”節點生成用于部分聚合的行。“部分聚合”節點使用SUM()減少這些行。最后,由“Gather”節點從每個worker收集SUM計數器。
最終結果由“Finalize Aggregate”節點計算。
Finalize Aggregate (cost=1349772.08..1349772.09 rows=1 width=32) (actual time=31962.761..31962.762 rows=1 loops=1)
-> Gather (cost=1349771.65..1349772.06 rows=4 width=32) (actual time=31961.980..31962.146 rows=5 loops=1)
Workers Planned: 4
Workers Launched: 4
-> Partial Aggregate (cost=1348771.65..1348771.66 rows=1 width=32) (actual time=31951.809..31951.809 rows=1 loops=5)
-> Parallel Seq Scan on lineitem (cost=0.00..1312000.57 rows=14708428 width=5) (actual time=1.491..29217.070 rows=11767943 loop
s=5)
Nested loop joins
· Parallel Index Only Scan
tpch=# explain (costs off) select c_custkey, count(o_orderkey)
from customer left outer join orders on
c_custkey = o_custkey and o_comment not like '%special%deposits%'
group by c_custkey;
QUERY PLAN
--------------------------------------------------------------------------------------
Finalize GroupAggregate
Group Key: customer.c_custkey
-> Gather Merge
Workers Planned: 4
-> Partial GroupAggregate
Group Key: customer.c_custkey
-> Nested Loop Left Join
-> Parallel Index Only Scan using customer_pkey on customer
-> Index Scan using idx_orders_custkey on orders
Index Cond: (o_custkey = customer.c_custkey)
Filter: ((o_comment)::text !~~ '%special%deposits%'::text)
(11 rows)
Hash Join
· Hash Join
PostgreSQL 11及以前版本,每個workers進程都構建自己的哈希表。結果,4+workers進程無法提高績效。
直到PostgreSQL 12,新實現使用共享哈希表。每個工人都可以利用WORK-MEM來構建哈希表。
TPC-H的查詢12很好地說明了并行散列連接。每個工作進程幫助構建一個共享哈希表。
-- Query 12 from TPC-H
· Tpch 12.sql執行計劃
每個worker幫助構建一個共享的hash表

Merge Join
· Merge Join
由于merge-join的性質,不可能使其并行執行。不要擔心,在查詢執行的最后一個階段,我們仍然可以看到帶有合并聯接的查詢的并行執行。
-- Query 2 from TPC-H
-> Merge Join
Merge Cond: (part.p_partkey = partsupp.ps_partkey)
Join Filter: (partsupp.ps_supplycost = (SubPlan 1))
-> Gather Merge
Workers Planned: 4
-> Parallel Index Scan using part_pkey on part
Partition-wise join
· Partition-wise join
如果連接表的分區鍵之間存在相等連接條件,那么兩個類似分區表之間的連接可以分解為它們的匹配分區之間的連接。分區鍵之間的等連接意味著一個分區表的給定分區中給定行的所有連接伙伴必須在另一個分區表的相應分區中。因此,分區表之間的連接可以分解為匹配分區之間的連接,這時候就會使用并行查詢,然后比對,提高速度。這種將分區表之間的連接分解為分區之間的連接的技術稱為partition-wise join。

PostgreSQL 12默認禁用分區連接功能。分區連接的規劃成本很高。類似分區表的連接可以按匹配的分區進行。這允許postgres使用更小的哈希表。每個分區連接操作都可以并行執行。
tpch=# set enable_partitionwise_join=t;
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
由于并行查詢成本估計,可能該查詢不會用到并行,可以改變成本估算設置:
tpch=# set parallel_setup_cost = 1; --默認值為1000
tpch=# set parallel_tuple_cost = 0.01; --默認值為0.1
tpch=# explain (costs off) select * from prt1 t1, prt2 t2
where t1.a = t2.b and t1.b = 0 and t2.b between 0 and 10000;
QUERY PLAN
-----------------------------------------------------------
Gather
Workers Planned: 2
-> Parallel Append
-> Parallel Hash Join
Hash Cond: (t2_1.b = t1_1.a)
-> Parallel Seq Scan on prt2_p2 t2_1 --prt2_p2 與prt1_p2 兩個分區連接
Filter: ((b >= 0) AND (b <= 10000))
-> Parallel Hash
-> Parallel Seq Scan on prt1_p2 t1_1
Filter: (b = 0)
-> Parallel Hash Join
Hash Cond: (t2.b = t1.a)
-> Parallel Seq Scan on prt2_p1 t2 --prt2_p1 與prt1_p1 兩個分區連接
Filter: ((b >= 0) AND (b <= 10000))
-> Parallel Hash
-> Parallel Seq Scan on prt1_p1 t1
Filter: (b = 0)
(17 rows)