DBT ใช้ยังไง EP: 04

จากความเดิมตอน EP:03 และ EP ก่อนหน้าที่เราเกริ่นๆว่า DBT คืออะไร แล้วมีท่าแบบไหนบ้าง วันนี้เราจะมาคุยกันว่าแล้ว DBT มันใช้ยังไง
How to optimize DBT?
จากความรู้ที่เราได้มาทั้งหมด 3 ตอน รวมถึง Incremental ท่าไหนนะที่ถูกที่สุด??

ปล. นิยามคำว่า “ถูก” ของแต่ละคนอาจจะนิยามไม่เหมือนกัน ลองมาดูการใช้งานในแต่ละท่ากัน
Full refresh
ถูก = Data มาตรงกับต้นทางเสมอ
ผมแนะนำท่านี้สำหรับ คนที่ไม่อยากปวดหัวเรื่อง Data ต้นทาง != ปลายทาง เพราะท่าอื่นๆ มีเรื่องให้ปวดหัวเยอะมากกกกกก ตอนเริ่มแต่ท่านี้จะง่ายที่สุด เพราะมีการ Create Table ทุกครั้งที่รัน
Append
ถูก = Data ไม่มีการ Updates row เก่า
ผมแนะนำท่านี้สำหรับ คนที่ไม่มีการทำการ Updates row เก่าๆเลย Data มาแบบไหนจบวัน อนาคตรีรันก็ได้แบบนั้น ซึ่งท่านี้ไม่เสีย Cost สำหรับการ Scan Data ที่ปลายทางก่อนด้วย แต่ปัญหาเรื่องเดียวก็คือ ถ้าจะรันแก้ Data ณ วันนั้น
วิธีแก้ของผม คือ เพิ่ม Delete Data ก่อนที่จะรัน Insert ทุกครั้ง (prehook)
# prehook
DELETE FROM `project_id.dataset.table_name` WHERE partition_date = '2023-08-20';
# Incremental insert
INSERT INTO `project_id.dataset.table_name`
...
...
เท่านี้ก็จะสามารถรีรันซ่อม Data ณ วันที่เราอยากจะทำการ Insert ใหม่ได้แล้ว
Merge
ถูก = Data ครบ ถ้า unique ไม่มี null
ผมแนะนำท่านี้สำหรับ คนที่มีการทำการ Updates row เก่า วันที่เก่าๆ บางวัน โดยเราอ้างอิงจาก excution_date ของวันที่ที่เข้ามา ยกตัวอย่าง
มีคนซื้อของวันที่ 2023–08–28 แต่เขามีการนำของมาคืน (return) วันที่ 2023–08–31 แล้ว records นั้นของเรามีการอัพเดต (เปลี่ยน status เป็น cancel)
ซึ่งถ้าเราใช้ท่า Append
เราก็คงต้องกลับไปรีรันวันที่ 2023–08–28
หรือถ้าใช้ columns updated_at วันที่ 2023–08–31 แล้วเรารัน ณ วันที่มีการ updated_at data ก็มี 2 records (ของเก่า + ของใหม่ที่ cancel)
ซึ่งถ้าเรา merge มันจะไปหาว่า target มี unique_key พวกนี้รึยัง ถ้ามีให้ updates (แม้จะเหมือนเดิม) แต่ถ้าไม่มีให้ Insert
ซึ่งถ้าเราสร้าง Table แบบมี Cluster ด้วยก็จะทำให้เวลา Scan ถูกลงอีกด้วย
{{
config(
incremental_strategy='merge',
unique_key=['receipt_no', 'store_no', ...],
...
cluster_by=['store_no', ...],
...
)
}}
ข้อเสีย ตรงที่ unique_key ต้องห้ามมี null และ ถ้ามี data ที่ duplicated เข้าไปแล้ว ต้องจัดการเอง เช่น Target มี 2 records ที่มี Unique_key เหมือนกัน จะได้ดังภาพ
Table A (Source)
A | B | C
1 | "Aum" | "DS"
Tabel B (Target)
A | B | C
1 | "Aum" | "DE"
1 | "Aum" | "DA"
Table B merge w/ Table A
unique_key = ["A", "B"]
A | B | C
1 | "Aum" | "DS" -- ปรับตรงนี้
1 | "Aum" | "DS" -- ปรับตรงนี้
Delete+Insert
ถูก = Data ครบ ถ้า unique ไม่มี null
ท่านี้จะเหมือน Merge เลยแต่ต่างกันตรงที่มันจะเปลี่ยนจาก Merge+Insert
เป็น Delete+Insert
ซึ่งเป็นท่าที่ผมมองว่าดีมากกกกกก (แต่ BQ ไม่มีให้ใช้ 🥲) ซึ่งมันจะช่วยแก้ปัญหา “ถ้ามี data ที่ duplicated เข้าไปแล้ว ต้องจัดการเอง
” จาก Merge ได้
Table A (Source)
A | B | C
1 | "Aum" | "DS"
Tabel B (Target)
A | B | C
1 | "Aum" | "DE"
1 | "Aum" | "DA"
Table B delete+insert w/ Table A
unique_key = ["A", "B"]
A | B | C
1 | "Aum" | "DS" -- เพิ่มตรงนี้เข้ามา
Insert_overwrite (dynamic)
ถูก = Cost ถูกกว่า ถ้ามี Partition แล้วพร้อมปวดหัว
{{
config(
materialized = 'incremental',
unique_key = 'session_id',
partition_by = { 'field': 'session_date', 'data_type': 'date' }, -- เพิ่มตรงนี้เข้ามา
incremental_strategy = 'insert_overwrite' -- เพิ่มตรงนี้เข้ามา
)
}}
ถ้าที่ถูกมากกกกกกก (แต่ยังไม่ถึงขั้นสุด ยังแค่ขั้นกว่า) เพราะใช้งานรวมกับ Partition นั้นเองคือ การนำวันที่มา แล้ว replace ไปเลยทั้งหมดของวันที่นั้น ตัวอย่าง
Table A (Source)
A | B | C | D
1 | "Thanapol" | "DE" | "2023-08-27"
1 | "Thanapol" | "DE" | "2023-08-28"
Tabel B (Target)
A | B | C | D
1 | "Aum" | "DE" | "2023-08-28"
1 | "Aum" | "DE" | "2023-08-29"
1 | "Aum" | "DE" | "2023-08-30"
Table B with insert_overwrite w/ Table A
A | B | C | D
1 | "Thanapol" | "DE" | "2023-08-28" -- เพิ่มตรงนี้เข้ามา
1 | "Thanapol" | "DE" | "2023-08-27" -- เพิ่มตรงนี้เข้ามา
1 | "Aum" | "DE" | "2023-08-29"
1 | "Aum" | "DE" | "2023-08-30"
Data เข้ามาใหม่ (Table A) แล้วจะนำเข้าไปยัง Table B มันจะทำการหาก่อนว่า Table A มีวันที่อะไรบ้าง ในที่นี้ 2023–08–27, 2023–08–28 (Columns ที่เป็น parititon ของ Table B) แล้วทำการลบวันที่เหล่านั้นใร Table B แล้ว Insert ใหม่ของ Table A
ข้อเสีย ถ้าเรา Union join กันของ 2 Table โดยต่างวันที่กัน อันนี้จะ manage ค่อนข้างยาก เพราะ มันลบวันที่ทั้งหมดใน Table target ที่เราทำการใส่ไว้ แล้วค่อย Insert = ถ้าเรามีการใส่วันค่อมกัน หรืออะไรก็ตามนิดเดียว Data วันเก่าก็โดนลบด้วย
Table A (Source)
A | B | C | D
1 | "Thanapol" | "DE" | "2023-08-27"
1 | "Thanapol" | "DE" | "2023-08-28"
UNION ALL
A | B | C | D
X | "Thanapol" | "DE" | "2023-08-25"
X | "Thanapol" | "DE" | "2023-08-26"
Tabel B (Target)
A | B | C | D
1 | "Aum" | "DE" | "2023-08-25"
1 | "Aum" | "DE" | "2023-08-26"
1 | "Aum" | "DE" | "2023-08-27"
Table B with insert_overwrite w/ Table A
A | B | C | D
X | "Thanapol" | "DE" | "2023-08-25" -- เพิ่มตรงนี้เข้ามา
X | "Thanapol" | "DE" | "2023-08-26" -- เพิ่มตรงนี้เข้ามา
1 | "Thanapol" | "DE" | "2023-08-27"
1 | "Thanapol" | "DE" | "2023-08-28"
Insert_overwrite (static)
ถูก = Cost ถูกสุด ถ้ามี Partition แล้วพร้อมปวดหัว
ถ้าที่ถูกมากกกกกกกขั้นสุด แต่ความปวดหัวยังเท่าเดิม เพราะว่า Insert_overwrite (dynamic)
จะหาวันที่แล้วลบจาก Data ที่เราต้องการจะรัน เช่น ถ้า Script ที่เรารันมี Data 2 วันก็จะลบ แล้ว insert 2 วัน แต่ถ้า Data เรามี 100 วันก็จะลบ แล้ว Insert 100 วัน ซึ่งมันเปลืองมากกกกก เพราะมันต้องทำ tmp แล้วหาวันที่ที่เรามี (นี้ยังไม่รวมว่า partition ของเราเป็น date+time อีก)
แต่ท่านี้เราจะ Static ไปเลยว่า เราต้องการให้มันลบในกี่วัน เช่น ลบ ภายใน 2 วันนี้ วันอื่นไม่ทำ แต่ insert ตั้งแต่วันไหนก็ได้ = มันไม่ต้องไปสร้าง tmp -> group by partition_date ก็จะประหยัดตรงนี้ไป แต่แลกกับต้องมา Handle แต่ละ case
{% set partitions_to_replace = [
'current_date',
'date_sub(current_date, interval 1 day)'
] %}
{{config(
materialized = 'incremental',
partition_by = { 'field': 'session_date', 'data_type': 'date' },
incremental_strategy = 'insert_overwrite',
partitions = partitions_to_replace -- เพิ่มตรงนี้เข้ามา
)}}
Summary
เอาเป็นว่า Cost ตามนี้เลย โดย X axis = จำนวน GB ที่ใช้ใน bigquery หรือ Cost นั้นเอง และ Y axis = จำนวน Data ที่รัน

- Full-refresh จะใช้ Cost แพงที่สุด 198.17 GB
- Merge ธรรมดา = 91.15 GB
- Merge (cluster) = 57.21 GB
- Insert_overwrite (dynamic) = 53.32 GB
- Insert_overwrite (static) = 27.11 GB