Intégration des données Snowflake avec Amazon Forecast : 2e partie

Intégration des données Snowflake avec Amazon Forecast : 2e partie


Dans un post précédentnous avons discuté des raisons pour lesquelles les modèles et les moteurs de prévision sont très demandés et de la manière dont les technologies d’IA et d’apprentissage automatique (ML) répondent aux besoins des organisations qui souhaitent tirer parti de la nouvelle économie des données.

Maintenant avec le service Amazon Forecast (annoncé novembre 2018), les entreprises peuvent facilement tirer parti du ML pour combiner des données de séries chronologiques avec des variables supplémentaires pour établir des prévisions et estimer des métriques opérationnelles, des métriques commerciales, des besoins en ressources, etc.

À partir de l’exemple que nous avons décrit dans notre article précédent, nous allons maintenant passer en revue le script Python simple que vous pouvez déployer pour exécuter l’intégration entre Snowflake et Amazon Forecast. Pour récapituler rapidement, voici comment fonctionne l’intégration :

Étapes d’intégration :

  1. Extraire les séries chronologiques : L’utilisateur isole un ensemble de données d’entraînement de série chronologique de la base de données Snowflake de l’utilisateur et l’enregistre sur Amazon S3.
  2. Prévoir: L’utilisateur exécute les données via Amazon Forecast à l’aide d’un script Python, reçoit une prévision de référence, puis recharge les données dans Snowflake.
  3. Connectez-vous à un partage : L’utilisateur se connecte aux données d’un autre utilisateur Snowflake via Snowflake Secure Data Sharing. Le premier utilisateur isole ensuite un ou plusieurs ensembles de données de séries chronologiques de l’autre utilisateur et les enregistre dans Amazon S3.
  4. Re-prévision avec des données enrichies :Pour recevoir une prévision de série chronologique améliorée, l’utilisateur exécute les ensembles de données de série chronologique d’origine et partagés via Amazon Forecast à l’aide d’un script Python. L’utilisateur fournit également le temps partagé en tant qu’entrées supplémentaires à Amazon Forecast en tant que série chronologique associée. Amazon Forecast utilise ensuite les entrées pour améliorer la précision des prévisions. L’utilisateur charge ensuite la prévision résultante dans Snowflake.

Le script d’intégration

Tout d’abord, nous nous connectons au compte Amazon, puis nous nous connectons au compte Snowflake, en définissant le propriétaire des données et le consommateur des données. Le propriétaire est le compte qui partagera les données, et le consommateur est le compte qui lira les données partagées (nommé le compte du processeur dans notre exemple de code).

# coding=utf-8
import snowflake.connector
import boto3
import subprocess
from time import sleep


#DEFINE PROCESSOR PARAMETERS
PROCESSOR_ACCOUNT =  '<ACCOUNT_NAME>'
PROCESSOR_USER =  '<USER_NAME>'
PROCESSOR_PASSWORD = '<PASSWORD>'
PROCESSOR_WAREHOUSE = 'FORECAST_WH'
PROCESSOR_DATABASE =  'FORECAST_RESULTS'
PROCESSOR_SCHEMA = 'FORECAST_DATA'
PROCESSOR_TABLE = 'FORECAST_RESULTS'
PROCESSOR_STAGE = 'FORECAST_STAGE'
PROCESSOR_S3 = '<S3_PATH>'
PROCESSOR_S3_PUBLIC_KEY = '<OWNER_PUBLIC_KEY>'
PROCESSOR_S3_PRIVATE_KEY = '<OWNER_PRIVATE_KEY>'


#DEFINE OWNER PARAMETERS
OWNER_ACCOUNT = '<ACCOUNT_NAME>'
OWNER_USER = '<USER_NAME>'
OWNER_PASSWORD = '<PASSWORD>'
OWNER_WAREHOUSE = 'FORECAST_WH'
OWNER_DATABASE = 'FORECAST_DATA'
OWNER_SCHEMA = 'FORECAST_DATA'
OWNER_TABLE = 'FORECAST_DATA'
OWNER_STAGE = 'FORECAST_STAGE'
OWNER_S3 = '<S3_PATH>'
OWNER_S3_PUBLIC_KEY = '<OWNER_PUBLIC_KEY>'
OWNER_S3_PRIVATE_KEY = '<OWNER_PRIVATE_KEY>'

#DEFINE FORECAST VARIABLES 
S3_BUCKET = PROCESSOR_S3 
FILENAME = 'data_0_0_0.csv.gz' 

DATASETNAME = 'snowflake_ds_1' 
DATASETGROUPNAME = 'snowflake_dsg_1' 
PREDICTORNAME = 'snowflake_f_1' 
RAW_FILEPATH = PROCESSOR_S3 + '/raw/' + FILENAME

def snowflake_connect(ACCOUNT, USER, PASSWORD, WAREHOUSE): 
       con = snowflake.connector.connect( 
       account = ACCOUNT, 
       user = USER, 
       password = PASSWORD, 
       ) 

cur = con.cursor() 
cur.execute('USE ROLE ACCOUNTADMIN') 
cur.execute('USE WAREHOUSE ' + WAREHOUSE) 

return con.cursor()

Nous allons maintenant créer et configurer la nouvelle base de données et les nouvelles tables, puis charger l’ensemble de données dans Snowflake.

def setup_owner():
       #SETUP INITIAL DATASET
       cursor = snowflake_connect(OWNER_ACCOUNT, OWNER_USER, 
OWNER_PASSWORD, OWNER_WAREHOUSE)
       cursor.execute('CREATE OR REPLACE DATABASE ' + OWNER_DATABASE)
       cursor.execute('CREATE OR REPLACE SCHEMA ' + OWNER_SCHEMA)
       cursor.execute('CREATE OR REPLACE STAGE ' + OWNER_STAGE + ' 
url="" + OWNER_S3 + '' credentials=(aws_key_id='' + 
OWNER_S3_PUBLIC_KEY + '' aws_secret_key='' + OWNER_S3_PRIVATE_KEY + '')')
       cursor.execute('CREATE OR REPLACE TABLE ' + OWNER_TABLE + ' 
(ts timestamp, demand float, id string)')
       cursor.execute('COPY INTO ' + OWNER_TABLE + ' FROM @' + 
OWNER_STAGE + '/item-demand-time.csv')

L’étape suivante consiste à créer un partage (c’est-à-dire un objet partageable) dans Snowflake et à monter la base de données.

Il est important de noter que Snowflake ne copie pas les données dans une autre zone pour les partager ; tout le partage est effectué via la couche de services unique et le magasin de métadonnées de Snowflake, ce qui signifie qu’au lieu de créer des copies en double, nous utilisons un pointeur vers la même copie originale des données.

La fonction de partage sécurisé des données de Snowflake garantit que vous disposez toujours d’une source unique de vérité et évite les processus ETL traditionnels et fastidieux requis lors du partage de données entre les SGBDR traditionnels.

Lorsque nous créons l’objet de base de données partagé, nous accordons également des privilèges d’accès spécifiques (instructions GRANT USAGE et GRANT SELECT). Ces privilèges peuvent être définis pour partager des objets spécifiques tels que des tables, des vues sécurisées et des fonctions sécurisées définies par l’utilisateur. Nous accordons ensuite l’accès au compte du sous-traitant à ce nouveau partage.

#CREATE SHARE
       cursor.execute('CREATE OR REPLACE SHARE FORECAST_SHARE')
       cursor.execute('GRANT USAGE ON DATABASE ' + OWNER_DATABASE + ' TO 
SHARE FORECAST_SHARE')
       cursor.execute('GRANT USAGE ON SCHEMA ' + OWNER_SCHEMA + ' TO SHARE 
FORECAST_SHARE')
       cursor.execute('GRANT SELECT ON TABLE ' + OWNER_TABLE + ' TO SHARE 
FORECAST_SHARE')
       cursor.execute('ALTER SHARE FORECAST_SHARE ADD ACCOUNTS = ' + PROCESSOR_ACCOUNT)

def setup_processor():
       #CREATE INITIAL SHARE
       cursor = snowflake_connect(PROCESSOR_ACCOUNT, PROCESSOR_USER, 
PROCESSOR_PASSWORD, PROCESSOR_WAREHOUSE)
       cursor.execute('CREATE OR REPLACE DATABASE ' + OWNER_DATABASE + ' FROM SHARE ' 
+ OWNER_ACCOUNT + '.FORECAST_SHARE')

Nous devons maintenant créer la table pour héberger les résultats d’Amazon Forecast. Lorsque nous créons la table résultante, nous accordons également l’accès aux objets nécessaires (base de données, schéma, table) et accordons au compte propriétaire l’accès à la base de données.

#CREATE RESULT SHARE
       cursor.execute('CREATE OR REPLACE DATABASE ' + 
PROCESSOR_DATABASE)
       cursor.execute('CREATE OR REPLACE SCHEMA ' + 
PROCESSOR_SCHEMA)
       cursor.execute('CREATE OR REPLACE STAGE ' + PROCESSOR_STAGE + 
' url="" + PROCESSOR_S3 + '' credentials=(aws_key_id='' + 
PROCESSOR_S3_PUBLIC_KEY + '' aws_secret_key='' + 
PROCESSOR_S3_PRIVATE_KEY + '')')
       cursor.execute('CREATE OR REPLACE TABLE ' + PROCESSOR_TABLE + ' 
(date datetime, first_observation_date datetime, item_id string, 
last_observation_date datetime, mean float, p10 
float, p50 float, p90 float)')
       cursor.execute('CREATE OR REPLACE SHARE FORECAST_RESULT_SHARE')
       cursor.execute('GRANT USAGE ON DATABASE ' + PROCESSOR_DATABASE + ' TO SHARE 
FORECAST_RESULT_SHARE')
       cursor.execute('GRANT USAGE ON SCHEMA ' + PROCESSOR_SCHEMA + ' TO SHARE 
FORECAST_RESULT_SHARE')
       cursor.execute('GRANT SELECT ON TABLE ' + PROCESSOR_TABLE + ' TO SHARE 
FORECAST_RESULT_SHARE')
      cursor.execute('ALTER SHARE FORECAST_RESULT_SHARE ADD ACCOUNTS = ' + OWNER_ACCOUNT)


#Instantiate Forecast Session
session = boto3.Session(region_name="us-west-2")
forecast = session.client(service_name="forecast")
forecastquery = session.client(service_name="forecastquery")

s3 = session.client('s3')
accountId = boto3.client('sts').get_caller_identity().get('Account')
ROLE_ARN = 'arn:aws:iam::%s:role/amazonforecast'%accountId


Ensuite, nous configurons l’ensemble de données et le schéma à lire via Amazon S3, en déchargeant les données de Snowflake et en les chargeant dans Amazon Forecast.

#Unload Data From Snowflake to S3

def unload_data_snowflake():
       cursor = snowflake_connect(PROCESSOR_ACCOUNT, PROCESSOR_USER, PROCESSOR_PASSWORD, 
PROCESSOR_WAREHOUSE)
       cursor.execute('USE DATABASE ' + PROCESSOR_DATABASE)
       cursor.execute('USE SCHEMA ' + PROCESSOR_SCHEMA)
       cursor.execute('COPY INTO @' + PROCESSOR_STAGE + '/raw/ FROM (select * FROM ' + 
OWNER_DATABASE + '.' + OWNER_SCHEMA +'.' + OWNER_TABLE + ')')

#Create Dataset 
def create_dataset():
        #forecast.delete_dataset(DatasetName=DATASETNAME)
       schema ={
          "Attributes":[
             {
                "AttributeName":"timestamp",
                "AttributeType":"timestamp"
             },
             {
                "AttributeName":"target_value",
                "AttributeType":"float"
             },
             {
                "AttributeName":"item_id",
                "AttributeType":"string"
             }
          ]
       }

    response = forecast.create_dataset( Domain="CUSTOM", DatasetType="TARGET_TIME_SERIES", 
DataFormat="CSV", DatasetName=DATASETNAME, DataFrequency="H", TimeStampFormat="yyyy-MM-dd 
hh:mm:ss", Schema=schema)

#Create dataset_group
def create_dataset_group():
    forecast.create_dataset_group(DatasetGroupName=DATASETGROUPNAME, RoleArn=ROLE_ARN, 
DatasetNames=[DATASETNAME])

Ensuite, nous préparons l’ensemble de données à envoyer à Amazon Forecast et gérons les conflits potentiels. Ici, nous créons également le prédicteur, qui sera utilisé pour générer la prévision et la recette utilisée pour former le modèle. (Notez que plusieurs recettes sont disponibles en fonction de vos besoins ; pour plus d’informations, consultez le guide du développeur Amazon Forecast.)

#Import DataSet
def import_dataset():
       ds_import_job_response=forecast.create_dataset_import_job(DatasetName=DATASETNAME, 
Delimiter=",", DatasetGroupName=DATASETGROUPNAME, S3Uri=RAW_FILEPATH)
       ds_versionId=ds_import_job_response['VersionId']

       #Wait for File To Finish Loading
       while True:
               dataImportStatus = 
forecast.describe_dataset_import_job(DatasetName=DATASETNAME, 
VersionId=ds_versionId)['Status']
               if (dataImportStatus != 'ACTIVE') and dataImportStatus != 
'FAILED':
                       sleep(30)
               else:
                      break
                       

#Create Recipe
def create_predictor():
        createPredictorResponse = 
forecast.create_predictor(RecipeName="forecast_MQRNN", 
DatasetGroupName=DATASETGROUPNAME, PredictorName=PREDICTORNAME, ForecastHorizo
= 24)
       predictorVersionId=createPredictorResponse['VersionId']

 #Wait for Predictor To Be Created
 while True:
           predictorStatus = 
forecast.describe_predictor(PredictorName=PREDICTORNAME, 
         VersionId=predictorVersionId)['Status']
         if predictorStatus != 'ACTIVE' and predictorStatus != 'FAILED':
                       sleep(30)
               else:
                       break

Et maintenant, nous déployons le Predictor pour gérer l’opération de prévision, après quoi nous pourrons interroger avec l’API d’Amazon pour récupérer les résultats.

                      #Deploy Predictor
def deploy_predictor():
        forecast.deploy_predictor(PredictorName=PREDICTORNAME)

 #Wait for Predictor To Be Deployed
       while True:
             deployedPredictorStatus = 
forecast.describe_deployed_predictor(PredictorName=PREDICTORNAME)['Status']
               if deployedPredictorStatus != 'ACTIVE' 
and deployedPredictorStatus != 'FAILED':
                      sleep(30)
               else:
                      break


Nous pouvons maintenant accéder et obtenir les prévisions, en spécifiant un filtre pour renvoyer notre ensemble de données spécifique.

def get_forecast():
       forecastReponse = forecastquery.get_forecast(
               PredictorName=PREDICTORNAME,
               Interval="hour",
               Filters={"item_id":"client_12"}
       )


Et enfin, nous récupérons l’ensemble de données résultant d’Amazon Forecast vers Amazon S3 et dans Snowflake à l’aide d’une tâche d’exportation de prévisions d’Amazon. Ensuite, les données du partage Snowflake seront disponibles pour que le consommateur puisse les lire et les traiter.

def export_data():
        forecastInfoList = forecast.list_forecasts(PredictorName=PREDICTORNAME)
['ForecastInfoList']
        forecastId = forecastInfoList[0]['ForecastId']

 #Drop Data Back In S3
       outputPath=S3_BUCKET + "/output"
       forecastExportResponse = 
forecast.create_forecast_export_job(ForecastId=forecastId, OutputPath={"S3Uri": 
outputPath, "RoleArn":ROLE_ARN})
      forecastExportJobId = forecastExportResponse['ForecastExportJobId']

 #Wait for Forecast to be Unloaded
       while True:
             forecastExportStatus = 
forecast.describe_forecast_export_job(ForecastExportJobId=forecastExportJobId)
['Status']
               if forecastExportStatus != 'ACTIVE' and forecastExportStatus != 
'FAILED':
                      sleep(30)
               else:
                      break


def load_data_snowflake():
        cursor = snowflake_connect(PROCESSOR_ACCOUNT, PROCESSOR_USER, PROCESSOR_PASSWORD, 
PROCESSOR_WAREHOUSE)
       cursor.execute('USE DATABASE ' + PROCESSOR_DATABASE)
       cursor.execute('USE SCHEMA ' + PROCESSOR_SCHEMA)
       cursor.execute('COPY INTO ' + PROCESSOR_TABLE + ' FROM @' + 
PROCESSOR_STAGE + '/output file_format=(skip_header=1)')

setup_owner()
setup_processor()
unload_data_snowflake()
create_dataset()
create_dataset_group()
import_dataset()
create_predictor()
deploy_predictor()
get_forecast()
export_data()
load_data_snowflake()

En résumé, avec ce script Python, vous pouvez exécuter un ensemble de données via Amazon Forecast, recevoir une prévision de référence, puis recharger les données dans Snowflake. Le processus est simple car Snowflake ne nécessite aucune préparation avant de créer et de partager des objets de base de données tout en accordant des contrôles d’accès granulaires, en protégeant les données d’origine et en vous permettant de lire des ensembles de données spécifiques pour les opérations de prévision et le traitement. Vous pouvez trouver le script Python complet pour Amazon Forecast sur GitHub.

Ensuite, nous finaliserons cette série en passant par l’étape finale de re-prévision avec des données enrichies et en la comparant à la prévision d’origine.

Obtenez le script Python complet ici

Laisser un commentaire

Votre adresse e-mail ne sera pas publiée.