Construire une dimension à changement lent de type 2 dans Snowflake à l’aide de flux et de tâches : partie 1

Construire une dimension à changement lent de type 2 dans Snowflake à l’aide de flux et de tâches : partie 1


Introduction

Il s’agit de la partie 1 d’un article en deux parties qui explique comment créer une dimension à évolution lente (SCD) de type 2 à l’aide de la fonctionnalité Stream de Snowflake. La deuxième partie expliquera comment automatiser le processus à l’aide de la fonctionnalité Task de Snowflake.

Les SCD sont une technique de modélisation de base de données courante utilisée pour capturer des données dans une table et montrer leur évolution dans le temps. Bien que les SCD soient le plus souvent associés à des dimensions, vous pouvez appliquer la méthodologie SCD décrite dans ce blog à n’importe quelle table d’une base de données.

Conceptuellement, la création d’un SCD est simple, mais avec des bases de données traditionnelles, la mise en œuvre d’un SCD peut être difficile. Par exemple, la configuration des déclencheurs peut être difficile et gourmande en ressources, et certaines bases de données autres que Snowflake vous obligent à exploiter les journaux de base de données propriétaires avec des outils propriétaires coûteux.

Si l’utilisation de déclencheurs ou de journaux d’extraction n’est pas possible, l’exécution de comparaisons de tables complètes est une autre option. Cela peut être extrêmement coûteux car il nécessite des analyses complètes de plusieurs tables et n’est pas toujours une solution pratique.

Construire un SCD dans Snowflake est extrêmement simple grâce aux fonctionnalités Streams et Tasks que Snowflake a récemment annoncées lors du Snowflake Summit.

Flux et tâches

Un flux est un nouveau type d’objet Snowflake qui fournit des capacités de capture de données modifiées (CDC) pour suivre le delta des modifications dans une table, y compris les insertions et les modifications du langage de manipulation de données (DML), afin que des mesures puissent être prises à l’aide des données modifiées. Un flux de table vous permet d’interroger une table et de consommer un ensemble de modifications apportées à une table, au niveau de la ligne, entre deux points transactionnels dans le temps.

Une tâche est un nouveau type d’objet Snowflake qui définit une planification récurrente pour l’exécution d’instructions SQL, y compris des instructions qui appellent des procédures stockées. Vous pouvez enchaîner des tâches pour une exécution successive afin de prendre en charge un traitement périodique plus complexe.

Dans un pipeline de données continu, les tâches peuvent éventuellement utiliser des flux pour fournir un moyen pratique de traiter en continu des données nouvelles ou modifiées. Une tâche peut vérifier si un flux contient des données modifiées pour une table et consommer les données modifiées ou ignorer l’exécution en cours si aucune donnée modifiée n’existe.

Configurer un flux

Dans l’exemple suivant, je montre tout le code requis pour créer un SCD de type 2 dans Snowflake, et je fournis une explication de ce que fait chaque étape.

Vous devez utiliser un rôle qui a la capacité de créer des bases de données, des flux et des tâches. J’ai utilisé le rôle SYSADMIN.

use role sysadmin;

Maintenant, configurez une base de données et un schéma dans lesquels travailler :

create database streams_and_tasks;
use database streams_and_tasks;
create schema scd;
use schema scd;

Pour cet article, vous allez créer une table appelée NATION. Ce tableau peut être modifié dans le cadre d’un processus ETL. Les modifications peuvent inclure l’insertion, la mise à jour ou la suppression de données. La table NATION a toujours la vue actuelle des données et la update_timestamp le champ est mis à jour pour chaque changement de ligne.

create or replace table nation (
     n_nationkey number,
     n_name varchar(25),
     n_regionkey number,
     n_comment varchar(152),
     country_code varchar(2),
     update_timestamp timestamp_ntz);

La table NATION_HISTORY conservera un historique des modifications apportées à la table NATION. Chaque enregistrement a un Heure de début champ et un heure de fin champ indiquant quand l’enregistrement était valide. De plus, chaque enregistrement a un drapeau_actuel champ indiquant si l’enregistrement est l’enregistrement en cours.

La table NATION_HISTORY sera chargée en fonction des modifications apportées à la table NATION.

create or replace table nation_history (
    n_nationkey number,
    n_name varchar(25),
    n_regionkey number,
    n_comment varchar(152),
    country_code varchar(2),
    start_time timestamp_ntz,
    end_time timestamp_ntz,
    current_flag int);

Ensuite, créez un flux appelé NATION_TABLE_CHANGES sur la table NATION. La création d’un flux active essentiellement CDC pour la table. Les modifications de données dans la table NATION seront alors disponibles pour un traitement ultérieur à l’aide de ce flux.

create or replace stream nation_table_changes on table nation;

Vous pouvez afficher les flux en exécutant la commande suivante :

show streams;

Après avoir exécuté la commande, l’interface graphique affiche les détails de chaque flux existant, tels que le nom du flux, le nom de la base de données, le nom du schéma, le propriétaire et le nom de la table. La capture d’écran suivante affiche les détails du flux NATION_TABLE_CHANGES.

Show Streams

Vous pouvez interroger les données dans le flux. À ce stade, le flux doit être vide car vous n’avez modifié aucune donnée dans la table NATION. Notez dans la capture d’écran ci-dessus que les colonnes du flux sont les mêmes que les colonnes de la table NATION source sur laquelle le flux a été créé.

De plus, il existe trois nouvelles colonnes que vous pouvez utiliser pour savoir quel type d’opérations DML a modifié les données d’une table source : METADATA$ACTION, METADATA$ISUPDATE et METADATA$ROW_ID.

Vous pouvez trouver des définitions de colonne ici.

L’instruction ci-dessous interroge le flux NATION_TABLE_CHANGES.

select * from nation_table_changes;

Il n’y a eu aucune modification de données dans la table NATION, le flux est donc vide. Remarquez les trois colonnes METADATA à la fin du tableau.

No Data Added

Avant de créer la table NATION_HISTORY, examinons d’abord comment ces trois colonnes de flux changent en fonction de l’opération DML (insertion, mise à jour ou suppression) que vous exécutez sur la table NATION. Les différences affichées dans les trois colonnes vous permettent de construire la logique requise pour charger la table NATION_HISTORY.

Examinons ce que contiennent ces trois colonnes de flux lorsque vous insérez un enregistrement, mettez à jour cet enregistrement et supprimez cet enregistrement.

Une opération d’insertion génère une seule ligne dans le flux. En fonction des valeurs des colonnes METADATA$ACTION et METADATA$ISUPDATE indiquées ci-dessous, vous pouvez voir qu’un enregistrement a été inséré et qu’aucune mise à jour n’a eu lieu.

Insert False 943

Une opération de mise à jour génère deux lignes dans le flux : la colonne METADATA$ACTION affiche deux entrées (INSERT et DELETE) et les deux entrées de la colonne METADATA$ISUPDATE sont toutes deux TRUE. Notez que l’entrée dans la colonne METADATA$ROW_ID est la même pour les deux lignes.

Vous devez vous préoccuper d’un seul de ces enregistrements, alors utilisons celui où l’entrée METADATA$ACTION est INSERT et l’entrée METADATA$ISUPDATE est TRUE.

Insert True Delete True 943

Une opération de suppression génère une seule ligne dans le flux. L’entrée dans la colonne METADATA$ACTION est DELETE, similaire à ce que vous avez vu avec l’instruction INSERT ci-dessus, mais l’entrée dans la colonne METADATA$ISUPDATE est FALSE. Vous savez donc que l’enregistrement a été supprimé, pas mis à jour.

Delete False 943

Maintenant que vous savez ce qui se passe pour chaque type d’opération DML, vous pouvez utiliser ces informations pour écrire une instruction SQL afin de charger la table NATION_HISTORY avec des données reflétant les modifications apportées à la table NATION.

Vous devrez également utiliser une instruction MERGE, car vous devez effectuer une opération d’insertion ou de mise à jour, ou les deux, dans la table NATION_HISTORY chaque fois que les lignes de la table NATION changent.

La syntaxe de l’instruction MERGE ressemble à la figure suivante. (Je montrerai le code un peu plus tard mais je veux d’abord montrer la structure.)

Merge Into Nation History

Notez que pour chaque type de DML, une action différente a lieu.

Dans le utilisant section, la vue NATION_CHANGE_DATA gère la logique pour déterminer ce qui doit être chargé dans la table NATION_HISTORY. Il s’appuie sur les données du flux NATION_TABLE_CHANGES.

Exécutez l’instruction CREATE illustrée ci-dessous. Cela ressemble à beaucoup de code SQL, mais il génère simplement des données à charger dans la table NATION_HISTORY.

create or replace view nation_change_data as
-- This subquery figures out what to do when data is inserted into the NATION table
-- An insert to the NATION table results in an INSERT to the NATION_HISTORY table
select n_nationkey, n_name, n_regionkey, n_comment, 
country_code, start_time, end_time, current_flag, 'I' as dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
             update_timestamp as start_time,
             lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
             case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
             case when end_time_raw is null then 1 else 0 end as current_flag
      from (select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp
            from nation_table_changes
            where metadata$action = 'INSERT'
            and metadata$isupdate="FALSE"))
union
-- This subquery figures out what to do when data is updated in the NATION table
-- An update to the NATION table results in an update AND an insert to the NATION_HISTORY table
-- The subquery below generates two records, each with a different dml_type
select n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag, dml_type
from (select n_nationkey, n_name, n_regionkey, n_comment, country_code,
             update_timestamp as start_time,
             lag(update_timestamp) over (partition by n_nationkey order by update_timestamp desc) as end_time_raw,
             case when end_time_raw is null then '9999-12-31'::timestamp_ntz else end_time_raw end as end_time,
             case when end_time_raw is null then 1 else 0 end as current_flag, 
             dml_type
      from (-- Identify data to insert into nation_history table
            select n_nationkey, n_name, n_regionkey, n_comment, country_code, update_timestamp, 'I' as dml_type
            from nation_table_changes
            where metadata$action = 'INSERT'
            and metadata$isupdate="TRUE"
            union
            -- Identify data in NATION_HISTORY table that needs to be updated
            select n_nationkey, null, null, null, null, start_time, 'U' as dml_type
            from nation_history
            where n_nationkey in (select distinct n_nationkey 
                                  from nation_table_changes
                                  where metadata$action = 'INSERT'
                                  and metadata$isupdate="TRUE")
     and current_flag = 1))
union
-- This subquery figures out what to do when data is deleted from the NATION table
-- A deletion from the NATION table results in an update to the NATION_HISTORY table
select nms.n_nationkey, null, null, null, null, nh.start_time, current_timestamp()::timestamp_ntz, null, 'D'
from nation_history nh
inner join nation_table_changes nms
   on nh.n_nationkey = nms.n_nationkey
where nms.metadata$action = 'DELETE'
and   nms.metadata$isupdate="FALSE"
and   nh.current_flag = 1;

Exécutez maintenant l’instruction SQL suivante, qui devrait s’exécuter sans erreur mais ne renvoyer aucune donnée car le flux n’a encore rien capturé.

select * from nation_change_data;

Maintenant que vous avez créé la vue NATION_CHANGE_DATA, vous pouvez l’inclure dans l’instruction MERGE qui a été affichée précédemment :

merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- nation_change_data is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
   on  nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
   and nh.start_time = m.start_time
when matched and m.dml_type="U" then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type="D" then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when not matched and m.dml_type="I" then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert
           (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment, m.country_code, m.start_time, m.end_time, m.current_flag);

Exécutez maintenant l’instruction MERGE. Il doit s’exécuter mais ne pas charger de données dans la table NATION_HISTORY car il n’y a pas de données dans le flux.

Tout ce dont vous avez besoin pour construire un SCD de type II est maintenant en place. Commençons donc à insérer, mettre à jour et supprimer des données pour voir comment cela fonctionne.

Insertion de données dans une table

Pour commencer, insérons 25 lignes de données dans la table NATION. L’exemple suivant définit une variable ($update_timestamp) égal à l’horodatage actuel et fait référence à cette variable dans les instructions INSERT. Cependant, vous pouvez choisir comment définir l’horodatage. Peut-être provient-il d’un système source ou est-il défini par un outil ETL chargeant la table.

J’insère également les données à l’aide d’une transaction mais j’utilise le commencer; … fin; syntaxe. Au total, vous devriez voir s’exécuter 28 instructions SQL lorsque vous exécutez le code suivant :

set update_timestamp = current_timestamp()::timestamp_ntz;
begin;
insert into nation values(0,'ALGERIA',0,' haggle. carefully final deposits detect slyly agai','DZ',$update_timestamp);
insert into nation values(1,'ARGENTINA',1,'al foxes promise slyly according to the regular accounts. bold requests alon','AR',$update_timestamp);
insert into nation values(2,'BRAZIL',1,'y alongside of the pending deposits. carefully special packages are about the ironic forges. slyly special ','BR',$update_timestamp);
insert into nation values(3,'CANADA',1,'eas hang ironic silent packages. slyly regular packages are furiously over the tithes. fluffily bold','CA',$update_timestamp);
insert into nation values(4,'EGYPT',4,'y above the carefully unusual theodolites. final dugouts are quickly across the furiously regular d','EG',$update_timestamp);
insert into nation values(5,'ETHIOPIA',0,'ven packages wake quickly. regu','ET',$update_timestamp);
insert into nation values(6,'FRANCE',3,'refully final requests. regular ironi','FR',$update_timestamp);
insert into nation values(7,'GERMANY',3,'l platelets. regular accounts x-ray: unusual regular acco','DE',$update_timestamp);
insert into nation values(8,'INDIA',2,'ss excuses cajole slyly across the packages. deposits print aroun','IN',$update_timestamp);
insert into nation values(9,'INDONESIA',2,' slyly express asymptotes. regular deposits haggle slyly. carefully ironic hockey players sleep blithely. carefull','ID',$update_timestamp);
insert into nation values(10,'IRAN',4,'efully alongside of the slyly final dependencies. ','IR',$update_timestamp);
insert into nation values(11,'IRAQ',4,'nic deposits boost atop the quickly final requests? quickly regula','IQ',$update_timestamp);
insert into nation values(12,'JAPAN',2,'ously. final express gifts cajole a','JP',$update_timestamp);
insert into nation values(13,'JORDAN',4,'ic deposits are blithely about the carefully regular pa','JO',$update_timestamp);
insert into nation values(14,'KENYA',0,' pending excuses haggle furiously deposits. pending express pinto beans wake fluffily past t','KE',$update_timestamp);
insert into nation values(15,'MOROCCO',0,'rns. blithely bold courts among the closely regular packages use furiously bold platelets?','MA',$update_timestamp);
insert into nation values(16,'MOZAMBIQUE',0,'s. ironic unusual asymptotes wake blithely r','MZ',$update_timestamp);
insert into nation values(17,'PERU',1,'platelets. blithely pending dependencies use fluffily across the even pinto beans. carefully silent accoun','PE',$update_timestamp);
insert into nation values(18,'CHINA',2,'c dependencies. furiously express notornis sleep slyly regular accounts. ideas sleep. depos','CN',$update_timestamp);
insert into nation values(19,'ROMANIA',3,'ular asymptotes are about the furious multipliers. express dependencies nag above the ironically ironic account','RO',$update_timestamp);
insert into nation values(20,'SAUDI ARABIA',4,'ts. silent requests haggle. closely express packages sleep across the blithely','SA',$update_timestamp);
insert into nation values(21,'VIETNAM',2,'hely enticingly express accounts. even final ','VN',$update_timestamp);
insert into nation values(22,'RUSSIA',3,' requests against the platelets use never according to the quickly regular pint','RU',$update_timestamp);
insert into nation values(23,'UNITED KINGDOM',3,'eans boost carefully special requests. accounts are. carefull','GB',$update_timestamp);
insert into nation values(24,'UNITED STATES',1,'y final packages. slow foxes cajole quickly. quickly silent platelets breach ironic accounts. unusual pinto be','US',$update_timestamp);
commit;


Vous pouvez désormais utiliser des commandes pour afficher les données de la table NATION et du flux NATION_TABLE_CHANGES que vous avez créé.

Tout d’abord, exécutez cette commande pour afficher les données de la table NATION :

select * from nation;

La figure suivante montre les données de la table NATION :

View Data Nation Table Nation

Exécutez ensuite cette commande pour afficher les données dans le flux NATION_TABLE_CHANGES :

select * from nation_table_changes;

La figure suivante montre les données du flux NATION_TABLE_CHANGES :

Select Nation Table Changes False

Notez les valeurs dans les colonnes METADATA, qui indiquent que les modifications étaient des opérations d’insertion.

Maintenant que le flux contient des données, exécutez l’instruction MERGE suivante pour charger les données dans la table NATION_HISTORY :


-- MERGE statement that uses the CHANGE_DATA view to load data 
into the NATION_HISTORY table
merge into nation_history nh -- Target table to merge changes from NATION into
using nation_change_data m -- NATION_CHANGE_DATA is a view that holds the logic that determines what to insert/update into the NATION_HISTORY table.
   on nh.n_nationkey = m.n_nationkey -- n_nationkey and start_time determine whether there is a unique record in the NATION_HISTORY table
   and nh.start_time = m.start_time
when matched and m.dml_type="U" then update -- Indicates the record has been updated and is no longer current and the end_time needs to be stamped
    set nh.end_time = m.end_time,
        nh.current_flag = 0
when matched and m.dml_type="D" then update -- Deletes are essentially logical deletes. The record is stamped and no newer version is inserted
   set nh.end_time = m.end_time,
       nh.current_flag = 0
when not matched and m.dml_type="I" then insert -- Inserting a new n_nationkey and updating an existing one both result in an insert
          (n_nationkey, n_name, n_regionkey, n_comment, country_code, start_time, end_time, current_flag)
    values (m.n_nationkey, m.n_name, m.n_regionkey, m.n_comment,m.country_code, m.start_time, m.end_time, m.current_flag);


Dans la figure suivante, vous pouvez voir que 25 lignes ont été insérées dans la table NATION_HISTORY :

Run Merge Statement

Exécutez maintenant la commande suivante :

select * from nation_history;

Remarquez dans la figure suivante que les champs START_TIME, END_TIME et CURRENT_FLAG sont maintenant remplis de données :

Query history nation table

Étant donné que vous avez exécuté une instruction DML à l’aide du flux sur la table NATION, le flux a été purgé pour vous assurer de ne pas traiter deux fois les mêmes données modifiées. Pour vérifier cela, exécutez la commande suivante :

select * from nation_table_changes;

La figure suivante montre que le flux a été purgé :

No Data Added Stream empty

Et après?

Dans la partie 2 de cet article, vous verrez comment mettre à jour des données, supprimer des données et automatiser le traitement de flux à l’aide d’une tâche.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.