Construire une dimension à changement lent de type 2 dans Snowflake à l’aide de flux et de tâches : partie 2

Construire une dimension à changement lent de type 2 dans Snowflake à l’aide de flux et de tâches : partie 2


Partie 1 de cet article en deux parties a montré comment créer une dimension à évolution lente (SCD) de type 2 à l’aide de la fonctionnalité de flux de Snowflake pour configurer un flux et insérer des données. Maintenant, automatisons le flux et faisons-le fonctionner selon un calendrier. Tout d’abord, vous allez mettre à jour certaines données, puis les traiter manuellement. Ensuite, vous supprimerez les données et configurerez le traitement automatique.

Mise à jour des données dans une table

Vous avez vu comment fonctionnent les inserts. Maintenant, essayons les mises à jour.

Pour commencer, exécutez la transaction suivante pour mettre à jour deux enregistrements :

begin;
update nation
set n_comment="New comment for Brazil", update_timestamp = current_timestamp()::timestamp_ntz
where n_nationkey = 2;

update nation
set n_comment="New comment for Canada", update_timestamp = current_timestamp()::timestamp_ntz
where n_nationkey = 3;
commit;

Affichez les données de la table NATION en exécutant la commande suivante :

select * from nation where n_nationkey in (1, 2,3);

Dans la figure suivante, notez les valeurs mises à jour dans les colonnes N_COMMENT et UPDATE_TIMESTAMP pour N_NATIONKEY 2 et N_NATIONKEY 3.

View data Nation Table 1

Ensuite, regardons ce que le flux a capturé. Les mises à jour génèrent deux lignes dans un flux.

Exécutez la commande suivante :

select * from nation_table_changes;

Dans la figure suivante, notez que la colonne METADATA$ISUPDATE contient TRUE. Vous avez mis à jour deux enregistrements dans la table NATION, il y a donc quatre lignes dans le flux.

Lets View Stream 2

Ensuite, fusionnez les données du flux dans la table NATION_HISTORY en exécutant à nouveau l’instruction MERGE :

-- MERGE statement that uses the CHANGE_DATA view to load data into the NATION_HISTORY table
merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- CHANGE_DATA is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
   on nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
   and nh.start_time = m.start_time
when matched and m.dml_type="U" then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type="D" then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type="I" then insert -- Inserting a new n_nationkey and updating an existing one both result in an Insert
           (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, 
m.country_code, m.start_time, m.end_time, m.current_flag);

L’instruction MERGE a mis à jour deux enregistrements dans la table NATION_HISTORY et en a inséré deux autres, comme illustré dans la figure suivante :

Merge Statement Uses Change Data 3

Si vous affichez les données de la table NATION_HISTORY, vous verrez désormais 27 enregistrements au lieu de 25, comme illustré dans la figure suivante :

select * from nation_history;
New Data History Nation 4ΩΩ

Comme illustré dans la figure suivante, le filtrage des seules valeurs N_NATIONKEY que vous avez mises à jour montre comment les enregistrements sont horodatés pour START_TIME, END_TIME et CURRENT_TIME.

select * from nation_history where n_nationkey in (2,3) order by n_nationkey, start_time;

View Nation Values Updated 5

Encore une fois, comme vous avez exécuté une instruction DML à l’aide du flux sur la table NATION, le flux a été purgé pour vous assurer de ne pas traiter deux fois les mêmes données modifiées. Pour vérifier cela, exécutez la commande suivante :

select * from nation_table_changes;

La figure suivante montre que le flux a été purgé :

No Data Added 6

Création d’une tâche

L’exécution manuelle de la commande MERGE commence à devenir fastidieuse. Heureusement, vous n’avez pas à l’exécuter manuellement. C’est là que les tâches entrent en jeu.

À l’aide d’une tâche, vous pouvez planifier l’exécution de l’instruction MERGE de manière récurrente et l’exécuter uniquement s’il existe des données dans le flux NATION_TABLE_CHANGES.

Si vous ne l’avez pas déjà fait, voici les étapes que vous pouvez suivre pour créer un rôle TASKADMIN. Comme mentionné dans la partie 1, je l’exécute en tant que SYSADMIN, donc le rôle TASKADMIN est accordé à SYSADMIN ci-dessous.

--Set up TASKADMIN role
use role securityadmin;
create role taskadmin;
-- Set the active role to ACCOUNTADMIN before granting the EXECUTE TASK privilege to TASKADMIN
use role accountadmin;
grant execute task on account to role taskadmin;

-- Set the active role to SECURITYADMIN to show that this role can grant a role to another role 
use role securityadmin;
grant role taskadmin to role sysadmin;

Une fois que SYSADMIN a obtenu le rôle TASKADMIN, vous pouvez créer une tâche pour automatiser la MERGE.

Les tâches nécessitent un entrepôt pour s’exécuter, créons donc un entrepôt de tâches s’il n’en existe pas :

create warehouse if not exists task_warehouse with warehouse_size="XSMALL" auto_suspend = 120;

Enfin, exécutez la commande SQL suivante pour créer la tâche. Vous ne devriez recevoir aucune erreur. Cette tâche s’exécutera toutes les minutes et ne s’exécutera que si le flux NATION_TABLE_CHANGES contient des données.

-- Create a task to schedule the MERGE statement
create or replace task populate_nation_history warehouse = task_warehouse schedule="1 minute" when system$stream_has_data('nation_table_changes')
as   
merge into nation_history nh
using nation_change_data m
   on nh.n_nationkey = m.n_nationkey
   and nh.start_time = m.start_time
when matched and m.dml_type="U" then update
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type="D" then update
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type="I" then insert
           (n_nationkey, n_name, n_regionkey, n_comment, 
country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, 
m.country_code, m.start_time, m.end_time, m.current_flag);

Après avoir créé la tâche, vous pouvez vérifier son statut à l’aide de la commande suivante :

show tasks;

La figure suivante montre que la tâche est suspendue. Par défaut, une tâche est suspendue lors de sa création.

View Current Tasks 7

Exécutez la commande suivante pour modifier la tâche afin de la reprendre :

-- Resume the task
alter task populate_nation_history resume;
show tasks;

La figure suivante montre que la tâche a été démarrée.

Resume Tasks 8

Vous pouvez interroger pour savoir quand la tâche sera exécutée la prochaine fois :

select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state 
from table(information_schema.task_history()) where state="SCHEDULED" order by completed_time desc;

La figure suivante montre que la tâche s’exécutera à nouveau dans 32 secondes. Vous verrez probablement un nombre différent car les résultats sont spécifiques à votre tâche. Le résultat peut même être un nombre négatif si la tâche a commencé à s’exécuter.

When Will Next Task Run 32 Secs

Suppression de données dans un tableau

Jusqu’à présent, vous avez inséré et mis à jour des données. Vous avez également créé une tâche pour automatiser le processus. Ensuite, supprimez les données de la table NATION en exécutant la commande suivante :

-- Delete data
delete from nation
where n_nationkey in (3,7);

La figure suivante montre que deux lignes ont été supprimées.

Delate Data

En regardant la table NATION, vous pouvez voir qu’il n’y a que 23 enregistrements. N_NATIONKEY 3 et N_NATIONKEY 7 ne figurent plus dans le tableau, comme illustré dans la figure suivante.

View Data Nation Table 11

Exécutez la commande suivante pour consulter le flux NATION_TABLE_CHANGES :

select * from nation_table_changes;

La figure suivante montre que deux enregistrements ont été supprimés :

Stream After Update 12

Exécutez la commande suivante pour voir quand la tâche d’exécution de MERGE s’exécutera à nouveau :

select timestampdiff(second, current_timestamp, scheduled_time) as next_run, scheduled_time, current_timestamp, name, state 
from table(information_schema.task_history()) where state="SCHEDULED" order by completed_time desc;

La figure suivante montre que la tâche s’exécutera dans 24 secondes. Attendons donc qu’elle s’exécute pour vérifier la table NATION_HISTORY.

When Will Next Task Run 9

Lorsque le temps est écoulé, interrogez le flux NATION_TABLE_CHANGES pour vérifier que les données qu’il contient ont été traitées :

select * from nation_table_changes;

La figure suivante montre que le flux ne contient aucune donnée, indiquant que les données ont été traitées :

No Data Added 6

Exécutez la commande suivante pour interroger la table NATION_HISTORY afin de voir l’état de N_NATIONKEY 3 et N_NATIONKEY 7, que vous avez précédemment supprimé de la table NATION :

select * from nation_history where n_nationkey in (3,7) order by n_nationkey, start_time;

La figure suivante montre que même si vous avez supprimé N_NATIONKEY 3 et N_NATIONKEY 7 de la table NATION et que l’enregistrement n’est plus actuel (indiqué par la colonne CURRENT_FLAG contenant 0), l’enregistrement n’a pas été supprimé de l’historique. La colonne END_TIME indique quand l’enregistrement a été marqué comme supprimé.

View Nation History Table

Insertion, mise à jour et suppression simultanées de données

Jusqu’à présent, vous avez effectué des actions DML individuelles sur la table NATION. Ensuite, exécutez la commande suivante pour insérer la Colombie (N_NATIONKEY 26), mettre à jour l’Indonésie (N_NATIONKEY 9) et supprimer l’Arabie saoudite (N_NATIONKEY 20) en même temps :

-- Insert, update, delete in one pass
begin;
insert into nation values(26, 'COLOMBIA', 1, 'New country', 'CO', current_timestamp()::timestamp_ntz);

update nation
set n_comment="New comment for Indonesia", update_timestamp = 
current_timestamp()::timestamp_ntz
where n_nationkey = 9;

delete from nation
where n_nationkey in (20);
commit;

Exécutez la commande suivante pour afficher toutes les modifications apportées au flux NATION_TABLE_CHANGES :

select * from nation_table_changes;

La figure suivante montre les changements :

View After Update 13

Ensuite, exécutez la commande suivante pour afficher les données de la table NATION pour N_NATIONKEY 26, N_NATIONKEY 9 et N_NATIONKEY 20 :

select * from nation where n_nationkey in (26,9,20);

La figure suivante montre les données de la table NATION pour N_NATIONKEY 9 et N_NATIONKEY 26, mais il n’y a pas de données pour N_NATIONKEY 20, car vous avez supprimé l’Arabie saoudite :

Select From Nation Key 14

Attendez que la tâche exécute le flux, puis exécutez la commande suivante pour consulter la table NATION_HISTORY :

select * from nation_history where n_nationkey in (26,9,20);

La figure suivante montre l’historique dans la table NATION_HISTORY pour N_NATIONKEY 9, N_NATIONKEY 20 et N_NATIONKEY 26 :

View Data Nation History 15

Emballer

C’est tout ce qu’il faut pour configurer le processus de création d’un SCD de type 2 dans Snowflake. Vous pouvez également suivre les mêmes étapes pour construire un SCD de type 2, puis le modifier pour répondre à vos besoins spécifiques.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.