

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Utilizzo delle operazioni di streaming in sessioni interattive di AWS Glue
<a name="interactive-sessions-streaming"></a>

## Commutazione del tipo di sessione streaming
<a name="interactive-sessions-switching-streaming-session-type"></a>

 Utilizzo della configurazione magic sessioni interattive AWS Glue, `%streaming`, per definire il processo che si sta eseguendo e inizializzare una sessione interattiva in streaming. 

## Flusso di input di campionamento per lo sviluppo interattivo
<a name="w2aac29c29b7"></a>

 Uno strumento che abbiamo sviluppato per migliorare l'esperienza interattiva nelle sessioni AWS Glue interattive è l'aggiunta di un nuovo metodo `GlueContext` per ottenere un'istantanea di uno stream in modalità statica DynamicFrame. `GlueContext`consente di ispezionare, interagire e implementare il flusso di lavoro. 

 Con l'istanza di classe `GlueContext`, sarai in grado di localizzare il metodo `getSampleStreamingDynamicFrame`. Gli argomenti richiesti per questo metodo sono: 
+  `dataFrame`: Lo Spark Streaming DataFrame 
+  `options`: vedi le opzioni disponibili di seguito 

 Le opzioni disponibili includono: 
+  **windowSize**: viene chiamato anche durata del microbatch. Questo parametro determinerà la durata di attesa di una query di streaming dopo l'attivazione del batch precedente. Il valore del parametro deve essere inferiore a `pollingTimeInMs`. 
+  **pollingTimeInMs**: La durata totale dell'esecuzione del metodo. Esso spedirà almeno un micro batch per ottenere registri campione dal flusso di input. 
+  **recordPollingLimit**: Questo parametro consente di limitare il numero totale di record che verranno esaminati dallo stream. 
+  (Facoltativo) È possibile utilizzare anche `writeStreamFunction` per applicare questa funzione personalizzata a ogni funzione di campionamento del registro. Vedi di seguito alcuni esempi in Scala e Python. 

****  
  

```
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here}
val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}"""
val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction)
dynFrame.show()
```

```
def sample_batch_function(batch_df, batch_id):
       //Optional but you can replace your own forEachBatch function here
options = {
            "pollingTimeInMs": "10000",
            "windowSize": "5 seconds",
        }
glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
```

**Nota**  
 Se il `DynFrame` d'esempio è vuoto, le ragioni possono essere varie:   
 La fonte di streaming è impostata su "Più recente" e non sono stati inseriti nuovi dati durante il periodo di campionamento. 
 Il tempo del polling non è sufficiente per elaborare i registri importati. I dati non verranno visualizzati a meno che l'intero batch non sia stato elaborato. 

## Esecuzione di applicazioni di streaming in sessioni interattive
<a name="running-streaming-applications-interactive-sessions"></a>

 Nelle sessioni interattive AWS Glue, è possibile eseguire un'applicazione di streaming AWS Glue nello stesso modo in cui si creerebbe un'applicazione di streaming nella Console AWS Glue. Poiché le sessioni interattive sono basate su sessione, l'individuazione di eccezioni nel runtime non provoca l'interruzione della sessione. Ora abbiamo l'ulteriore vantaggio di sviluppare iterativamente la funzione batch. Ad esempio: 

```
def batch_function(data_frame, batch_id):
    log.info(data_frame.count())
    invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
```

 Nell'esempio precedente, abbiamo incluso un utilizzo non valido di un metodo e a differenza dei normali processi AWS Glue che usciranno dall'intera applicazione, le definizioni e il contesto di codifica dell'utente vengono conservati interamente e la sessione è ancora operativa. Non è necessario avviare un nuovo cluster e rieseguire tutte le trasformazioni precedenti. Ciò consente di concentrarsi sull'iterazione rapida delle implementazioni delle funzioni batch per ottenere risultati desiderati. 

 È importante notare che le sessioni interattive valutano ogni istruzione in modo bloccante per far sì che la sessione esegua una sola istruzione alla volta. Poiché le query di streaming sono continue e senza fine, le sessioni con query di streaming attive non saranno in grado di gestire alcuna istruzione di follow-up a meno che non vengano interrotte. Puoi emettere il comando di interruzione direttamente da Jupyter Notebook e il nostro kernel gestirà la cancellazione per te. 

 Prendi come esempio la seguente sequenza di istruzioni in attesa dell'esecuzione: 

```
Statement 1:
      val number = df.count() 
      #Spark Action with deterministic result
      Result: 5
      
Statement 2:
      streamingQuery.start().awaitTermination()
      #Spark Streaming Query that will be executing continously
      Result: Constantly updated with each microbatch
      
Statement 3:
      val number2 = df.count()
      #This will not be executed as previous statement will be running indefinitely
```