DBT ที่ควรรู้ (ปสกสต.) EP: 02

จาก Blog ครั้งก่อน DBT ที่ควรรู้ (ปสกสต.) ผมพูดถึงการใช้งานต่างๆที่ผมเคยใช้
ครั้งนี้ก็จะเป็นท่าต่างๆที่เคยใช้มาเหมือนกัน สำหรับใครก็ตามที่สนใจ DBT หรือใช้งานอยู่ก็ลองๆดูกันได้ครับ Let’s Gooooooo~~~~~.
Topic ครั้งนี้
- 📄 How to run DBT without error
compiled code not found for this model
- 🕒 When does DBT
create tmp
- 🤔 How to store an SQL script as a
variable/parameter
- ✋🏻 If set
on_schema_change='append_new_columns'
, how do handletmp
?
1. 📄 How to run DBT without error compiled code not found for this model
DBT docs สามารถดูข้อมูล เช่น Source, Model, Test หรือ Lineage ต่างๆ ที่มีการเขียนขึ้นใน Project นั้นๆได้ ผ่านการรัน
dbt docs generate; #ทำการ generate ข้อมูลต่างๆที่อยู่ใน project ของเราผ่านรูปแบบของไฟล์
dbt docs serve; #ทำการอ่านไฟล์ที่ generate และนำขึ้น website: http://localhost:8080
ซึ่งถ้าเราเขียน Script ต่างๆในรูปแบบของ DBT Jinja functions เช่น {{ source('x', 'y') }}, {{ model('xy') }}
คนอื่นก็จะเข้าใจได้ยาก เวลามาอ่าน Script ผ่าน VScode หรือ IDE ต่างๆ
แต่ DBT docs มี Tab นึงที่ชื่อ Compiled
ที่จะเปลี่ยน Script ให้อยู่ในรูปที่อ่านเข้าใจง่าย (👏🏻 แปะๆ)

แต่ปัญหาที่พบเจอคือ เมื่อมีการใช้คำสั่ง dbt run --select model
แล้วเรากลับมาดูที่ DBT docs จะพบว่า Compiled ไม่สามารถดูได้แล้ว compiled code not found for this model

ซึ่งวิธีแก้ปัญหานี้ ให้เพิ่ม --no-write-json
ใน dbt run ก่อนเสมอ
⭐️ dbt --no-write-json run --select model
เท่านี้ก็จะสามารถ run model พร้อมกับ ดู docs ได้ตลอดเวลาแล้ว เย้ๆๆๆ 🎉
ปล. ทุกครั้งที่มีการเปลี่ยน Code ให้รัน dbt docs generate
ใหม่ด้วยน้า
2. 🕒 When does DBT create tmp
ทุกคนเคยสงสัยกันไหม? ทำไมเวลาเรารัน DBT บางครั้งมันก็ Create tmp
ก่อน แล้วค่อย Merge เข้า Table หรือบางครั้งมันก็ Merge เข้าไปเลยโดยไม่ Create tmp
ซึ่งผมได้ทำการทดลองแล้วได้ข้อสรุปอยู่ 2 วิธีในตอนที่มัน Create tmp (ถ้าใครเจอวิธีไหนเพิ่มเติม คอมเม้นบอกกันหน่อยน้าาา~~~)
⭐️ incremental_strategy='insert_overwrite'
/*
Create a temporary table from the model SQL
*/
create temporary table {{ model_name }}__dbt_tmp as (
{{ model_sql }}
);
/*
If applicable, determine the partitions to overwrite by
querying the temp table.
*/
declare dbt_partitions_for_replacement array<date>;
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct date(max_tstamp))
from `my_project`.`my_dataset`.{{ model_name }}__dbt_tmp
);
/*
Overwrite partitions in the destination table which match
the partitions in the temporary table
*/
merge into {{ destination_table }} DEST
using {{ model_name }}__dbt_tmp SRC
on FALSE
when not matched by source and {{ partition_column }} in unnest(dbt_partitions_for_replacement)
then delete
when not matched then insert ...
⭐️ on_schema_change='append_new_columns'
or on_schema_change='sync_all_columns'
create or replace table {{ model_name }}__dbt_tmp
partition by {{ partition_column }}
cluster by {{ cluster_column }}
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 12 hour)
)
as (
{{ model_sql }}
);
ซึ่งถ้าถามว่าเราจะรู้ไปทำไม? เราจะใช้ในกรณีที่ Model ไหนเราอยากรันแบบ Parallel
เช่น ถ้าเราอยากจะซ่อม Data ของแต่ละวัน แล้วเรารู้ว่ามีกรณีไหนบ้างที่ Create tmp ก่อน เราก็จะสามารถรัน Parallel ใน Model นั้นๆ ได้
แต่ถ้าไม่รู้แล้วเผลอรัน Parallel ไป Data ก็จะได้แค่ Data ล่าสุดเท่านั้น เพราะ Model create tmp ทับไปแล้ว
3. 🤔 How to store an SQL script as a variable/parameter
ใน Bigquery ถ้าเราอยากใช้ max(partititon_date)
ของ table นั้นๆ ทุกคนทำยังไงกันบ้าง?
SELECT *
FROM model
WHERE partition_date = (SELECT MAX(partititon_date) FROM model)
แต่ๆ ลองดู Bytes processed รึยัง ซึ่งถ้าลองเช็คดูจะเห็นว่า การ Where วันที่ตรงๆ จะถูกกว่า การใช้ SELECT MAX(partititon_date) FROM model
-- ท่านี้ถูกกว่าน้าาา
SELECT *
FROM model
WHERE partition_date = "2024-06-15"
⭐️ ซึ่งสิ่งที่เราต้องทำคือ การเอา MAX(partititon_date)
เก็บไว้ในตัวแปรซักตัวก่อน แล้วค่อยเรียกใช้
{{
config(
...
)
}}
{% set query %}
SELECT max(partition_date) AS max_partition_date FROM {{ model('model') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute %} {% set max_partition_date = results.columns[0].values() %} {% endif %}
SELECT *
FROM {{ model('model') }}
WHERE partition_date = "{{ max_partition_date|join('","') }}"
เท่านี้ Bytes processed ของเราก็จะถูกลงแล้ว (เย้ๆๆๆ)
ปล. ถ้ารันใน SQL ปกติ สามารถใช้ Script ข้างล่างได้ โดยจะได้ผลลัพธ์เหมือนกับข้างบน
DECLARE max_partition_date DATE;
SET max_partition_date = (SELECT MAX(partition_date) FROM model);
SELECT count(*) FROM model WHERE partition_date = max_partition_date;
4. ✋🏻 If set on_schema_change='append_new_columns'
, how do handle tmp
?
ถ้าใครมีการใช้ on_schema_change='append_new_columns'
หรือ on_schema_change='sync_all_columns'
อยู่ จะพบปัญหาที่ DBT มีการ Create tmp เยอะแยะเต็มไปหมดของทุกๆ Model
ซึ่งแน่แหล่ะว่าเดี๋ยว Tmp เหล่านั้นมันก็หายไปเอง (อาจจะ 12 ชม.) แต่ถ้าเราอยากให้มันหายไปเลย เราสามารถเพิ่ม post_hook ให้มัน Drop tmp ได้
{{
config(
...
post_hook='{% if is_incremental() %} DROP TABLE {{ this|replace("`","") }}__dbt_tmp {% endif %}',
on_schema_change='append_new_columns'
)
}}
ถ้าใครอยากให้แชร์เกี่ยวกับ DBT อีก ฝากกด Clap ให้หน่อยนะครับ :) แล้วพบกันใหม่ในการแชร์ครั้งหน้า สาหวัดลีค้าบบบบ