Ingestion de données : comment charger des téraoctets dans Snowflake

Ingestion de données : comment charger des téraoctets dans Snowflake


Nous recevons souvent ces questions de la part de clients confrontés à un chargement initial de données dans Snowflake ou, à une charge quotidienne à grande échelle. ingestion de données: « Quel est le moyen le plus rapide de charger des téraoctets de données ? » et : « Quel format de données entrantes recommandez-vous ? » Voici un exemple de chargement de données qui fournit des réponses à ces deux questions, et plus encore.

Nous avons récemment utilisé les données de l’ensemble de données 10 To TPCDS Benchmark pour explorer quelques alternatives. Ces données sont disponibles pour tous les clients Snowflake via la base de données nommée SNOWFLAKE_SAMPLE_DATA, schéma TPCDS_SF10TCL.

La plus grande table de cette base de données est STORE_SALES, contenant 28,8 milliards de lignes représentant 4,7 To de données non compressées, soit environ 164 octets par ligne. Snowflake comprime cela jusqu’à 1,3 To en interne. Le tableau contient cinq ans d’historique des transactions quotidiennes et 23 colonnes réparties entre les données entières et décimales.

Nous avons chargé trois formats de données source différents pour cette table :

  1. Fichiers CSV compressés
  2. Fichiers Parquet partitionnés par date (compressés rapidement)
  3. Fichiers ORC partitionnés par date (compressés rapidement)

Lors du chargement de Parquet et ORC dans Snowflake, vous avez le choix de stocker des lignes entières dans un Snowflake VARIANT ou d’extraire les colonnes individuelles dans un schéma structuré. Nous avons testé les deux approches pour les performances de charge. Mais pour les données avec un schéma fixe comme TPCDS, nous préférons les stocker structurées.

Parquet cloisonné et ORC sont intéressants à d’autres égards. Lors de l’utilisation du partitionnement HIVE pour ces formats dans un environnement de lac de données, la valeur de la colonne de données de partitionnement est généralement représentée par un partie du nom du fichier, plutôt que par une valeur à l’intérieur des données elles-mêmes. Cela signifie que lors du chargement des données, nous devons capturer et éventuellement manipuler le nom du fichier en référençant la propriété METADATA$FILENAME de Snowflake, lors de l’utilisation de la commande COPY.

Performances de charge

Examinons d’abord les performances brutes du chargement des données à l’aide d’un cluster Snowflake 2X-large :

Format source Disposition cible Temps de chargement (sec) To/h (non compressé)
CSV (Gzippé) Structuré 1104 15.4
Parquet (composition Snappy) Semi-structuré 3518 4.8
Parquet (composition Snappy) Structuré 3095 5.4
ORC (composition Snappy) Semi-structuré 3845 4.4
ORC (composition Snappy) Structuré 2820 6.0

Quelques points sautent aux yeux :

  • Le chargement à partir d’un CSV Gzippé est plusieurs fois plus rapide que le chargement depuis ORC et Parquet à un impressionnant 15 To/heure. Bien que 5 à 6 To/heure soient corrects si vos données sont à l’origine dans ORC ou Parquet, ne faites pas tout votre possible pour CRÉER des fichiers ORC ou Parquet à partir de CSV dans l’espoir qu’il chargera Snowflake plus rapidement.
  • Le chargement de données dans un schéma entièrement structuré (columnarisé) est environ 10 à 20 % plus rapide que de les placer dans une VARIANT.

Lorsque nous avons testé le chargement des mêmes données en utilisant différentes tailles d’entrepôt, nous avons constaté que la vitesse de chargement était inversement proportionnelle à l’échelle de l’entrepôt, comme prévu. Par exemple, un entrepôt 3X-large, qui est deux fois plus grand qu’un 2X-large, a chargé les mêmes données CSV à un débit de 28 To/heure. À l’inverse, un X-large chargé à ~7 To/heure et un large à un débit d’environ 3,5 To/h. Cela signifie que vous dépenserez environ le même nombre de crédits Snowflake pour charger un ensemble de données donné, quelle que soit la taille de cluster que vous utilisez, tant que vous suspendez l’entrepôt une fois terminé pour éviter les temps d’inactivité.

Les taux de chargement de vos propres fichiers de données peuvent différer en fonction d’un certain nombre de facteurs :

  • Emplacement de vos compartiments S3 – Pour notre test, notre déploiement Snowflake et nos compartiments S3 étaient situés dans us-west-2
  • Nombre et types de colonnes – Un plus grand nombre de colonnes peut nécessiter plus de temps par rapport au nombre d’octets dans les fichiers.
  • Efficacité de la compression Gzip – Plus de données lues à partir de S3 par octet non compressé peuvent entraîner des temps de chargement plus longs.

(Dans tous les cas, assurez-vous d’utiliser un nombre suffisant de fichiers de chargement pour occuper tous les threads de chargement. Pour un 2X-large, il y a 256 threads de ce type, et nous avions environ 2 000 fichiers de chargement pour couvrir les cinq années d’histoire.)

Meilleures pratiques pour Parquet et ORC

Pendant que nous envisageons Parquet et ORC, examinons la technique que nous avons utilisée pour remplir la version entièrement structurée de STORE_SALES à l’aide de données Parquet partitionnées.

Tout d’abord, considérez que les fichiers Parquet partitionnés par date résident dans un compartiment S3 avec les conventions de dénomination de préfixe suivantes, où le Souligné entier est l’une des valeurs des clés de partitionnement. Dans STORE_SALES, il s’agit d’une clé de substitution entière pour la colonne sold_date nommée ss_sold_date_sk :

  S3://<my_bucket>/10tb_parquet/store_sales/ss_sold_date_sk=2451132/

Si les fichiers de données ont été générés à l’origine par HIVE, il y aura également un préfixe représentant les données pour lesquelles la clé de partitionnement est NULL :

  S3://<my_bucket>/10tb_parquet/store_sales/ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__/

Enfin, HIVE créera une série de fichiers « tag » de 0 octet dans le compartiment qui doivent être ignorés lors de l’ingestion. Ces fichiers ont un format dans le modèle :

  S3://<my_bucket>/10tb_parquet/store_sales/ss_sold_date_sk=2451132_$folder$

Comme nous l’avons noté précédemment, les fichiers de données eux-mêmes ne contiennent pas de colonne ou de valeur pour ss_sold_date_sk même si cela fait partie de la définition de la table. Au lieu de cela, la valeur doit être dérivée du nom du préfixe dans S3.

Pour gérer l’ingestion à partir de ce bucket, nous avons d’abord défini une étape externe pour Snowflake comme suit :

  create or replace stage parquet_test
     url="s3://<my_bucket>/tpcds/10tb_parquet/"
     credentials = (aws_key_id=…,aws_secret_key=…)
     FILE_FORMAT = (TYPE = 'PARQUET');

Enfin, notre commande pour charger toutes les données Parquet dans la table STORE_SALES entièrement structurée ressemblera à ceci :

  copy into STORE_SALES from (
   select 
     NULLIF(
       regexp_replace (
       METADATA$FILENAME,
       '.*\=(.*)\/.*',
       '\1'), 
       '__HIVE_DEFAULT_PARTITION__'
     )                         as ss_sold_date_sk,
     $1:ss_sold_time_sk        as ss_sold_time_sk,
     $1:ss_item_sk             as ss_item_sk,
     $1:ss_customer_sk         as ss_customer_sk,
     $1:ss_cdemo_sk            as ss_cdemo_sk,
     $1:ss_hdemo_sk            as ss_hdemo_sk,
     $1:ss_addr_sk             as ss_addr_sk,
     $1:ss_store_sk            as ss_store_sk,
     $1:ss_promo_sk            as ss_promo_sk,
     $1:ss_ticket_number       as ss_ticket_number,
     $1:ss_quantity            as ss_quantity,
     $1:ss_wholesale_cost      as ss_wholesale_cost,
     $1:ss_list_price          as ss_list_price,
     $1:ss_sales_price         as ss_sales_price,
     $1:ss_ext_discount_amt    as ss_ext_discount_amt,
     $1:ss_ext_sales_price     as ss_ext_sales_price,
     $1:ss_ext_wholesale_cost  as ss_ext_wholesale_cost,
     $1:ss_ext_list_price      as ss_ext_list_price,
     $1:ss_ext_tax             as ss_ext_tax,
     $1:ss_coupon_amt          as ss_coupon_amt,
     $1:ss_net_paid            as ss_net_paid,
     $1:ss_net_paid_inc_tax    as ss_net_paid_inc_tax,
     $1:ss_net_profit          as ss_net_profit
   from @parquet_test/store_sales/)
   pattern= '.*/.*/.*/ss_sold_date_sk=.*/.*'
  ;   

Notez que nous utilisons la fonction « transformer » de la commande COPY pour analyser et manipuler le format Parquet semi-structuré. Le corps principal de COPY comprend l’extraction des champs étiquetés contenus dans les données Parquet, en les mappant directement à la colonne correspondante dans STORE_SALES. Par exemple dans l’expression :

     $1:ss_net_paid as ss_net_paid,

1 $ fait référence au contenu de la colonne unique représentant une ligne Parquet entière de données d’entrée sous la forme d’un ensemble de paires clé-valeur, et $1:ss_net_paid représente la valeur associée à la ss_net_paid clé dans cette ligne.

Regardons de plus près les deux Souligné expressions du script ci-dessus.

La première expression,

     NULLIF(
       regexp_replace (
       METADATA$FILENAME,
       '.*\=(.*)\/.*',
       '\1'), 
       '__HIVE_DEFAULT_PARTITION__'
      ) 

sert à remplir le ss_sold_date_sk colonne, qui est la valeur utilisée pour partitionner les données d’entrée. La REGEX_REPLACE La fonction transforme le nom de fichier S3 complet en juste la valeur entière représentant le date_key intégré dans le préfixe. Pour ce faire, il recherche les caractères dans le chemin du fichier après le signe ‘=’, jusqu’au ‘/’ suivant. NULLIF est utilisé pour remplacer les partitions nommées HIVE_DEFAULT_PARTITION avec la valeur NULL pour la clé de date.

L’expression finale

  pattern= '.*/.*/.*/ss_sold_date_sk=.*/.*'

sert de filtre sur les fichiers d’entrée, forçant COPY à ignorer les fichiers d’espace réservé de 0 octet dans le compartiment.

Le chargement d’ORC implique exactement le même processus, en modifiant uniquement la définition FORMAT dans la commande CREATE STAGE.

Conclusion

Le chargement de données dans Snowflake est rapide et flexible. Vous obtenez la plus grande vitesse lorsque vous travaillez avec des fichiers CSV, mais l’expressivité de Snowflake dans la gestion des données semi-structurées permet même aux schémas de partitionnement complexes pour les ensembles de données ORC et Parquet existants d’être facilement ingérés dans des tables Snowflake entièrement structurées.

Liens supplémentaires

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.