Jupyter à Spark via Snowflake Partie 4

Jupyter à Spark via Snowflake Partie 4


Dans le troisième partie de cette série, nous avons appris à connecter Sagemaker à Snowflake à l’aide du connecteur Python. Dans ce quatrième et dernier article, nous verrons comment connecter Sagemaker à Snowflake avec le Connecteur d’étincelle. Si vous n’avez pas encore téléchargé les Notebooks Jupyter, vous pouvez les trouver ici.

Vous pouvez consulter toute la série de blogs ici : Pun art > Deuxième partie > Partie trois > Quatrième partie.

Connecteur Spark – Spark local

Nous allons commencer par construire un carnet qui utilise une instance Spark locale. Le pilote Snowflake jdbc et le connecteur Spark doivent tous deux être installés sur votre ordinateur local. L’installation des pilotes s’effectue automatiquement dans Jupyter Notebook, vous n’avez donc pas besoin de télécharger manuellement les fichiers. Cependant, à titre de référence, les pilotes peuvent être téléchargés ici.

Tout d’abord, passons en revue le processus d’installation. L’étape décrite ci-dessous gère le téléchargement de tous les fichiers nécessaires ainsi que l’installation et la configuration. Vous pouvez lancer cette étape en effectuant les actions suivantes :

  • Créer un répertoire pour les fichiers jar flocon de neige
  • Définir les pilotes à télécharger
  • Identifier la dernière version du pilote
  • Télécharger le pilote
%bash
SFC_DIR=/home/ec2-user/snowflake
[ ! -d "$SFC_DIR" ] && mkdir $SFC_DIR 
cd $SFC_DIR
PRODUCTS='snowflake-jdbc spark-snowflake_2.11'
for PRODUCT in $PRODUCTS
do
   wget "https://repo1.maven.org/maven2/net/snowflake/$PRODUCT/maven-metadata.xml" 2> /dev/null
   VERSION=$(grep latest maven-metadata.xml | awk -F">" '{ print $2 }' | awk -F"<" '{ print $1 }')
   DRIVER=$PRODUCT-$VERSION.jar
   if [[ ! -e $DRIVER ]]
   then
      rm $PRODUCT* 2>/dev/null
      wget "https://repo1.maven.org/maven2/net/snowflake/$PRODUCT/$VERSION/$DRIVER" 2> /dev/null
   fi
   [ -e maven-metadata.xml ] && rm maven-metadata.xml
done

Une fois les deux pilotes jdbc installés, vous êtes prêt à créer le SparkContext. Mais d’abord, examinons comment l’étape ci-dessous accomplit cette tâche.

Pour construire avec succès le SparkContext, vous devez ajouter les bibliothèques nouvellement installées au CLASSPATH. Vous pouvez commencer par exécuter une commande shell pour répertorier le contenu du répertoire d’installation, ainsi que pour ajouter le résultat au CLASSPATH. Avec la configuration Spark pointant vers toutes les bibliothèques requises, vous êtes maintenant prêt à créer à la fois le contexte Spark et SQL.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext,SparkSession
from pyspark.sql.types import *
from sagemaker_pyspark import IAMRole, classpath_jars
from sagemaker_pyspark.algorithms import KMeansSageMakerEstimator

sfc_jars=!ls -d /home/ec2-user/snowflake/*.jar

conf = (SparkConf()
        .set("spark.driver.extraClassPath", (":".join(classpath_jars())+":"+":".join(sfc_jars)))
        .setMaster('local')
        .setAppName('local-spark-test'))
sc=SparkContext(conf=conf)

spark = SQLContext(sc)
sc

pic1

Avec le SparkContext maintenant créé, vous êtes prêt à charger vos informations d’identification. Vous pouvez effectuer cette étape en suivant les mêmes instructions décrites dans partie trois de cette série.

import boto3

params=['/SNOWFLAKE/URL','/SNOWFLAKE/ACCOUNT_ID'
        ,'/SNOWFLAKE/USER_ID','/SNOWFLAKE/PASSWORD'
        ,'/SNOWFLAKE/DATABASE','/SNOWFLAKE/SCHEMA'
        ,'/SNOWFLAKE/WAREHOUSE','/SNOWFLAKE/BUCKET'
        ,'/SNOWFLAKE/PREFIX']

region='us-east-1'

def get_credentials(params):
   ssm = boto3.client('ssm',region)
   response = ssm.get_parameters(
      Names=params,
      WithDecryption=True
   )
   #Build dict of credentials
   param_values={k['Name']:k['Value'] for k in  response['Parameters']}
   return param_values

param_values=get_credentials(params)

Une fois que le SparkContext est opérationnel, vous êtes prêt à commencer à lire les données de Snowflake via la méthode spark.read. Pour cet exemple, nous allons lire 50 millions de lignes.

sfOptions = {
  "sfURL" : param_values['/SNOWFLAKE/URL'],
  "sfAccount" : param_values['/SNOWFLAKE/ACCOUNT_ID'],
  "sfUser" : param_values['/SNOWFLAKE/USER_ID'],
  "sfPassword" : param_values['/SNOWFLAKE/PASSWORD'],
  "sfDatabase" : param_values['/SNOWFLAKE/DATABASE'],
  "sfSchema" : param_values['/SNOWFLAKE/SCHEMA'],
  "sfWarehouse" : param_values['/SNOWFLAKE/WAREHOUSE'],
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) 
  .options(**sfOptions) 
  .option("query", 
"select (V:main.temp_max - 273.15) * 1.8000 + 32.00 as temp_max_far, " +
"       (V:main.temp_min - 273.15) * 1.8000 + 32.00 as temp_min_far, " +
"       cast(V:time as timestamp) time, " +
"       V:city.coord.lat lat, " +
"       V:city.coord.lon lon " +
"from snowflake_sample_data.weather.weather_14_total limit 5000000").load()

pic2

Ici, vous verrez que j’exécute une instance Spark sur une seule machine (c’est-à-dire le serveur d’instance de bloc-notes). Sur mon carnet Par exemple, il a fallu environ 2 minutes pour lire d’abord 50 millions de lignes de Snowflake et calculer les informations statistiques.

La lecture de l’ensemble de données complet (225 millions de lignes) peut rendre le carnet instance ne répond pas. Cela est probablement dû à un manque de mémoire. Pour atténuer ce problème, vous pouvez soit créer un plus grand carnet instance en choisissant un type d’instance différent ou en exécutant Spark sur un cluster EMR. La première option est généralement appelée mise à l’échelle, tandis que la seconde est appelée mise à l’échelle. La mise à l’échelle est plus complexe, mais elle vous offre également plus de flexibilité. En tant que tel, nous verrons comment exécuter le carnet instance contre un cluster Spark.

Utiliser le connecteur Spark pour créer un cluster EMR

Exploiter la puissance de Spark nécessite de se connecter à un cluster Spark plutôt qu’à une instance Spark locale. La création d’un cluster Spark accessible par le Sagemaker Jupyter Notebook nécessite les étapes suivantes :

  • Le serveur Sagemaker doit être construit dans un VPC et donc dans un sous-réseau
  • Créez un nouveau groupe de sécurité pour autoriser les requêtes entrantes du sous-réseau Sagemaker via le port 8998 (API Livy) et SSH (port 22) à partir de votre propre machine (Remarque : Ceci est à des fins de test)
  • Options avancées
    • Utilisez le lien Options avancées pour configurer toutes les options nécessaires
  • Logiciel et étapes
    • Choisissez Hadoop et Spark
    • En option, vous pouvez sélectionner Zeppelin et Ganglia
  • Matériel
    • Validez le VPC (Réseau). Remarque : L’hôte Sagemaker doit être créé dans le même VPC que le cluster EMR
    • En option, vous pouvez également modifier les types d’instance et indiquer si vous souhaitez ou non utiliser la tarification au comptant
  • Paramètres généraux du cluster
    • Définir le nom du cluster
    • Gardez la journalisation pour résoudre les problèmes
  • Sécurité
    • Choisissez une paire de clés EC2 (créez-en une si vous n’en avez pas déjà). Sans la paire de clés, vous ne pourrez pas accéder au nœud maître via ssh pour finaliser la configuration.
    • Créer un groupe de sécurité supplémentaire pour permettre l’accès via SSH et Livy
  • Sur le nœud maître EMR, installez les packages pip sagemaker_pyspark, boto3 et sagemaker pour python 2.7 et 3.4
  • Installez le pilote Snowflake Spark et JDBC
  • Mettre à jour le chemin de classe supplémentaire du pilote et de l’exécuteur pour inclure les fichiers jar du pilote Snowflake

Passons en revue ce processus suivant étape par étape. Dans la console AWS, recherchez le service EMR, cliquez sur « Créer un cluster » puis cliquez sur « Options avancées »

La création d’un cluster Spark est un processus en quatre étapes. La première étape nécessite la sélection de la configuration logicielle pour votre cluster EMR. (Remarque : Décochez tous les autres packages, puis cochez Hadoop, Livy et Spark uniquement).

Create Cluster Hadoop

La deuxième étape spécifie le matériel (c’est-à-dire les types de machines virtuelles que vous souhaitez provisionner). Pour un cluster EMR de test, je sélectionne généralement une tarification au comptant. Au moment de la rédaction de cet article, une instance M4.LARGE EC2 à la demande coûte 0,10 $ de l’heure. Je peux généralement obtenir la même machine pour 0,04 $, qui comprend un disque SSD de 32 Go.

Create Cluster Advanced

La troisième étape définit les paramètres généraux du cluster. Assurez-vous de cocher « Logging » afin de pouvoir résoudre les problèmes si votre cluster Spark ne démarre pas. Ensuite, configurez une action d’amorçage personnalisée (vous pouvez télécharger le fichier ici).

Create Cluster Bootstrap

Le script effectue les étapes suivantes :

  1. Installation des packages python sagemaker_pyspark, boto3 et sagemaker pour python 2.7 et 3.4
  2. Installation des pilotes Snowflake JDBC et Spark. Au moment de la rédaction de cet article, les versions les plus récentes sont 3.5.3 (jdbc) et 2.3.1 (spark 2.11)
  3. Création d’un script pour mettre à jour l’extraClassPath pour les propriétés spark.driver et spark.executor
  4. Création d’un script de démarrage pour appeler le script listé ci-dessus

L’étape D peut ne pas sembler familière à certains d’entre vous ; cependant, c’est nécessaire car lorsqu’AWS crée les serveurs EMR, il démarre également l’action d’amorçage. A ce stade, les fichiers de configuration de Spark ne sont pas encore installés ; par conséquent, les propriétés CLASSPATH supplémentaires ne peuvent pas être mises à jour. L’étape D démarre un script qui attendra la fin de la construction de l’EMR, puis exécutera le script nécessaire à la mise à jour de la configuration.

La dernière étape requise pour créer le cluster Spark se concentre sur la sécurité.

Create Cluster Security

Pour activer les autorisations nécessaires pour déchiffrer les informations d’identification configurées dans le bloc-notes Jupyter, vous devez d’abord accorder aux nœuds EMR l’accès à Systems Manager. Dans la partie 3 de cette série de blogs, le déchiffrement des informations d’identification était géré par un processus exécuté avec le contexte de votre compte, alors qu’ici, dans la partie 4, le déchiffrement est géré par un processus exécuté dans le contexte EMR. En tant que tel, le contexte du processus EMR a besoin des mêmes autorisations de gestionnaire de système accordées par la stratégie créée dans la partie 3, qui est SagemakerCredentialsPolicy.

Ensuite, cliquez sur « EMR_EC2_DefaultRole » et « Joindre la politique », puis recherchez SagemakerCredentialsPolicy.

Permissions attach policy

À ce stade, vous devez accorder des autorisations à l’instance Sagemaker Notebook afin qu’elle puisse communiquer avec le cluster EMR. Commencez par créer un nouveau groupe de sécurité. (J’ai nommé le mien SagemakerEMR). Dans le groupe de sécurité SagemakerEMR, vous devez également créer deux règles entrantes.

La première règle (SSH) vous permet d’établir une session SSH depuis la machine cliente (par exemple votre ordinateur portable) vers le maître EMR. Bien que cette étape ne soit pas nécessaire, elle facilite grandement le dépannage.

La deuxième règle (TCP personnalisé) concerne le port 8998, qui est l’API Livy. Cette règle permet à l’instance Sagemaker Notebook de communiquer avec le cluster EMR via l’API Livy. Pour ce faire, le moyen le plus simple consiste à créer l’instance Sagemaker Notebook dans le VPC par défaut, puis à sélectionner le groupe de sécurité VPC par défaut en tant que source.e pour le trafic entrant via le port 8998.

Edit info fields

Après avoir créé le nouveau groupe de sécurité, sélectionnez-le en tant que « groupe de sécurité supplémentaire » pour le maître EMR.

Ensuite, cliquez sur « Créer un cluster » pour lancer le processus d’environ 10 minutes. Lorsque le cluster est prêt, il s’affiche comme « en attente ».

Vous avez maintenant votre cluster EMR. Maintenant, vous devez trouver l’adresse IP locale du nœud maître EMR, car le nœud maître EMR héberge l’API Livy, qui est, à son tour, utilisée par l’instance Sagemaker Notebook pour communiquer avec le cluster Spark. Pour trouver l’API locale, sélectionnez votre cluster, l’onglet matériel et votre maître EMR. Ensuite, faites défiler jusqu’à trouver l’adresse IP privée et notez-la car vous en aurez besoin pour la configuration de Sagemaker.

Construire l’instance Sagemaker Notebook

Pour utiliser le cluster EMR, vous devez d’abord créer un nouveau Sagemaker Carnet instance dans un VPC. Pour minimiser le réseau inter-AZ, je colocalise généralement l’instance de bloc-notes sur le même sous-réseau que j’utilise pour le cluster EMR. Enfin, choisissez le groupe de sécurité par défaut des VPC comme groupe de sécurité pour le Carnet de sages instance (Remarque : pour des raisons de sécurité, l’accès direct à Internet doit être désactivé).

Notebook Instance Settings

Lorsque le processus de construction du Sagemaker Carnet instance est terminée, téléchargez le Jupyter Cahier Spark-EMR-Flocon de neige sur votre ordinateur local, puis téléchargez-le sur votre Sagemaker Carnet exemple.

Passez ensuite en revue la première tâche du Carnet de sages et mettez à jour la variable d’environnement EMR_MASTER_INTERNAL_IP avec l’adresse IP interne du cluster EMR et exécutez l’étape (Remarque : dans l’exemple ci-dessus, elle apparaît sous la forme ip-172-31-61-244.ec2.internal).

Si le fichier de configuration Sparkmagic n’existe pas, cette étape téléchargera automatiquement le fichier de configuration Sparkmagic, puis le mettra à jour afin qu’il pointe vers le cluster EMR plutôt que vers l’hôte local. Pour appliquer la modification, redémarrez le noyau.

%bash
EMR_MASTER_INTERNAL_IP=ip-172-31-58-190.ec2.internal
CONF=/home/ec2-user/.sparkmagic/config.json
if [[ ! -e $CONF.bk ]]
then
   wget "https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/sparkmagic/example_config.json" 
-P /home/ec2-user/.sparkmagic -O /home/ec2-user/.sparkmagic/config.json.bk 2>/dev/null
fi
cat $CONF.bk | sed "s/localhost/$EMR_MASTER_INTERNAL_IP/" > $CONF.new
if [[ $(diff $CONF.new $CONF) ]]
then
   echo "Configuration has changed; Restart Kernel"
fi
cp $CONF.new $CONF

Après avoir redémarré le noyau, l’étape suivante vérifie la configuration pour s’assurer qu’elle pointe vers le maître EMR correct. S’il est correct, le processus continue sans mettre à jour la configuration.

Lors de l’exécution de la première étape sur le cluster Spark, le Pyspark le noyau démarre automatiquement un SparkContext.

import boto3

params=['/SNOWFLAKE/URL','/SNOWFLAKE/ACCOUNT_ID'
        ,'/SNOWFLAKE/USER_ID','/SNOWFLAKE/PASSWORD'
        ,'/SNOWFLAKE/DATABASE','/SNOWFLAKE/SCHEMA'
        ,'/SNOWFLAKE/WAREHOUSE','/SNOWFLAKE/BUCKET'
        ,'/SNOWFLAKE/PREFIX']

region='us-east-1'

def get_credentials(params):
   ssm = boto3.client('ssm',region)
   response = ssm.get_parameters(
      Names=params,
      WithDecryption=True
   )
   #Build dict of credentials
   param_values={k['Name']:k['Value'] for k in  response['Parameters']}
   return param_values

param_values=get_credentials(params)

pic3 2

Toutes nos félicitations! Vous avez maintenant configuré avec succès Sagemaker et EMR. Vous êtes maintenant prêt à lire l’ensemble de données de Snowflake. Cette fois, cependant, il n’est pas nécessaire de limiter le nombre ou les résultats et, comme vous le verrez, vous avez maintenant ingéré 225 millions de lignes.

sfOptions = {
  "sfURL" : param_values['/SNOWFLAKE/URL'],
  "sfAccount" : param_values['/SNOWFLAKE/ACCOUNT_ID'],
  "sfUser" : param_values['/SNOWFLAKE/USER_ID'],
  "sfPassword" : param_values['/SNOWFLAKE/PASSWORD'],
  "sfDatabase" : param_values['/SNOWFLAKE/DATABASE'],
  "sfSchema" : param_values['/SNOWFLAKE/SCHEMA'],
  "sfWarehouse" : param_values['/SNOWFLAKE/WAREHOUSE'],
}

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) 
  .options(**sfOptions) 
  .option("query", 
"select (V:main.temp_max - 273.15) * 1.8000 + 32.00 as temp_max_far, " +
"       (V:main.temp_min - 273.15) * 1.8000 + 32.00 as temp_min_far, " +
"       cast(V:time as timestamp) time, " +
"       V:city.coord.lat lat, " +
"       V:city.coord.lon lon " +
"from snowflake_sample_data.weather.weather_14_total").load()
df.describe().show()

pic4

Conclusion

Les solutions SaaS basées sur le cloud ont considérablement simplifié la création et la configuration de solutions d’apprentissage automatique (ML) de bout en bout et ont rendu le ML accessible même aux plus petites entreprises. Ce qui demandait autrefois beaucoup de temps, d’argent et d’efforts peut maintenant être accompli avec une fraction des ressources.

Pour plus d’informations sur le travail avec Spark, veuillez consulter l’excellent article en deux parties de Torsten Grabs et Edward Ma. La première partie, Pourquoi Spark, explique les avantages de l’utilisation de Spark et comment utiliser le shell Spark avec un cluster EMR pour traiter les données dans Snowflake. La seconde partie, Pousser le traitement des requêtes Spark vers Snowflake, fournit une excellente explication de la façon dont Spark avec le refoulement des requêtes offre une amélioration significative des performances par rapport au traitement Spark normal.

Une configuration Sagemaker / Snowflake rend le ML accessible même au plus petit budget. Cela ne laisse qu’une question. Que ferez-vous de vos données ?

Vous pouvez consulter toute la série de blogs ici : Pun art > Deuxième partie > Partie trois > Quatrième partie.

Abonnez-vous au blog Snowflake

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.