Snowflake Connector for Spark Version 2.6 Turbocharges lit avec Apache Arrow

Snowflake Connector for Spark Version 2.6 Turbocharges lit avec Apache Arrow


Le connecteur Snowflake pour Spark (« Connecteur Spark ») utilise désormais le Flèche Apache format de résultat en colonnes pour améliorer considérablement les performances de lecture des requêtes.

Auparavant, le connecteur Spark exécutait d’abord une requête et copiait le jeu de résultats dans une étape au format CSV ou JSON avant de lire les données de Snowflake et de les charger dans un Spark DataFrame. En règle générale, le téléchargement et la désérialisation des données CSV ou JSON consommaient la majeure partie du temps de traitement de bout en bout lorsque les données étaient lues à partir d’une source de données Snowflake Cloud Data Platform.

Avec cette version 2.6.0, le connecteur Snowflake Spark exécute la requête directement via JDBC et (dé)sérialise les données à l’aide de Arrow, Le nouveau format de résultat client de Snowflake. Cela permet de gagner du temps dans les lectures de données et permet également l’utilisation des résultats de requête mis en cache.

Résultats de référence : lectures rapides et pouvant être mises en cache avec Apache Arrow

Dans ce benchmark, nous avons exécuté une tâche Spark qui lit les données de la table LINEITEM, qui a une taille compressée de 16,3 Go dans Snowflake. La table est une norme PTC-H table LINEITEM. Nous avons d’abord capturé l’augmentation du débit grâce au format de données binaires en colonnes plus efficace en effectuant une nouvelle lecture brute à partir de la table Snowflake.

Nous avons constaté une amélioration immédiate par 4 des performances de bout en bout de ce travail Spark. Cette amélioration est due à une amélioration des performances par 10 du temps passé par le connecteur Spark pour récupérer et traiter les résultats de la requête Snowflake.

De plus, Snowflake a un cache de résultat de requête pour les requêtes répétées qui fonctionnent sur des données inchangées. En stockant les résultats qui peuvent être réutilisés, la base de données peut éviter le recalcul et diriger simplement le pilote client pour lire à partir du cache de résultats déjà calculé. Dans les versions précédentes du connecteur Spark, ce cache de résultats de requête n’était pas utilisable. À partir de la version 2.6.0, le connecteur Spark émettra des tâches de refoulement vers Snowflake à l’aide de requêtes directes ; cela signifie que le connecteur Spark est capable de tirer pleinement parti du cache des résultats de la requête. Nous avons également constaté cet avantage dans nos résultats de référence, qui sont présentés ci-dessous. Avec les lectures mises en cache, les performances de bout en bout pour la tâche Spark décrite ci-dessus sont 14 fois plus rapides que lors de l’utilisation de lectures au format CSV non mises en cache dans les versions précédentes du connecteur Spark.

Nous avons exécuté un cluster Spark à quatre travailleurs avec des machines AWS EC2 c4.2xlarge, Apache Spark 2.4.5 et Scala 2.11. La taille de l’entrepôt Snowflake était 4X-Large. Le cloud du déploiement Snowflake et le déploiement du cluster Spark se trouvaient dans la même région cloud : US-West-2 (Oregon). Le graphique suivant montre les résultats :

image1

Code Scala utilisé pour le test de référence

L’extrait suivant montre le code utilisé pour le test de référence avec Arrow. Configurer « use_copy_unload » sur « true » peut tester les performances sans Arrow.

import net.snowflake.spark.snowflake._

// Snowflake Spark Connector options
// The default value of "use_cached_result" is "false".
// It is "true" means to disable this feature.
val sfOptions: Map[String, String] = Map(
"sfSSL" -> "on",
"sfUser" -> "<snowflake_user_name>",
"pem_private_key" -> "<private_key>",
"use_copy_unload" -> "false",
"use_cached_result" -> "true",
"sfDatabase" -> "<snowflake_database>",
"sfURL" -> "<snowflake_account>.<snowflake_cloud_domain>",
"sfWarehouse" -> "snowflake_warehouse",
"partition_size_in_mb" -> "60"
)

// Test table is TPCH LINEITEM which has 600M rows.
// Its compressed size in Snowflake is 16.3GB.
val sourceTableName = "LINEITEM"
val sourceSchema = "TPCH_SF100"

val df = sqlContext.read
.format("net.snowflake.spark.snowflake")
.options(sfOptions)
.option("dbtable", sourceTableName)
.option("sfSchema", sourceSchema)
.load()

// Clear cache before any test
sqlContext.clearCache()

// Execute cache() and show(1) to read all data into cache
// and show one row. The time to show one row can be ignored.
// The DataFrame execution time is regarded as the reading time.
val startTime = System.currentTimeMillis()
df.cache().show(1)
val endTime = System.currentTimeMillis()

val result_msg = s"read time: ${(endTime - startTime).toDouble/1000.0} s"
println(result_msg)

PRISE EN MAIN AVEC SUPPORT FLECHE EN SNOWFLAKE CONNECTEUR POUR SPARK

Le format Arrow est disponible avec Snowflake Connector for Spark version 2.6.0 et supérieure et il est activé par défaut. Pour plus de détails, consultez le Connecteur Snowflake pour documentation Spark.

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.