Connettore Apache Kafka di Amazon Athena - Amazon Athena

Connettore Apache Kafka di Amazon Athena

Il connettore Amazon Athena per Apache Kafka consente ad Amazon Athena di eseguire query SQL sui tuoi argomenti di Apache Kafka. Utilizza questo connettore per visualizzare gli argomenti di Apache Kafka come tabelle e i messaggi come righe in Athena.

Questo connettore non utilizza Connessioni Glue per centralizzare le proprietà di configurazione in Glue. La configurazione della connessione viene effettuata tramite Lambda.

Prerequisiti

Implementa il connettore sul tuo Account AWS utilizzando la console Athena o AWS Serverless Application Repository. Per ulteriori informazioni, consulta Crea una connessione di un'origine dati o Utilizzare AWS Serverless Application Repository per distribuire un connettore origine dati.

Limitazioni

  • Le operazioni di scrittura DDL non sono supportate.

  • Eventuali limiti Lambda pertinenti. Per ulteriori informazioni, consulta la sezione Quote Lambda nella Guida per gli sviluppatori di AWS Lambda.

  • Nelle condizioni di filtro, è necessario impostare i tipi di dati date e timestamp sul tipo di dati appropriato.

  • I tipi di dati data e ora non sono supportati per il tipo di file CSV e vengono trattati come valori varchar.

  • La mappatura in campi JSON annidati non è supportata. Il connettore mappa solo i campi di primo livello.

  • Il connettore non supporta tipi complessi. I tipi complessi vengono interpretati come stringhe.

  • Per estrarre o lavorare con valori JSON complessi, utilizza le funzioni relative a JSON disponibili in Athena. Per ulteriori informazioni, consulta Estrarre dati JSON da stringhe.

  • Il connettore non supporta l'accesso ai metadati dei messaggi Kafka.

Termini

  • Gestore dei metadati: un gestore Lambda che recupera i metadati dall'istanza del database.

  • Gestore dei record: un gestore Lambda che recupera i record di dati dall'istanza del database.

  • Gestore composito: un gestore Lambda che recupera sia i metadati sia i record di dati dall'istanza del database.

  • Endpoint Kafka: una stringa di testo che stabilisce una connessione a un'istanza Kafka.

Compatibilità dei cluster

Il connettore Kafka può essere utilizzato con i seguenti tipi di cluster.

Connessione a Confluent

Per la connessione a Confluent sono necessari i seguenti passaggi:

  1. Genera una chiave API da Confluent.

  2. Memorizza il nome utente e la password per la chiave API di Confluent in Gestione dei segreti AWS.

  3. Fornisci il nome del segreto per la variabile di ambiente secrets_manager_secret nel connettore Kafka.

  4. Segui la procedura nella sezione Configurazione del connettore Kafka di questo documento.

Metodi di autenticazione di supportati

Il connettore supporta i seguenti metodi di autenticazione.

  • SSL

  • SASL/SCRAM

  • SASL/PLAIN

  • SASL/PLAINTEXT

  • NO_AUTH

  • Kafka e Confluent Platform autogestiti: SSL, SASL/SCRAM, SASL/PLAINTEXT, NO_AUTH

  • Kafka e Confluent Cloud autogestiti: SASL/PLAIN

Per ulteriori informazioni, consulta Configurazione dell'autenticazione del connettore Kafka di Athena.

Formati di dati di input supportati

Il connettore supporta i seguenti formati di dati di input.

  • JSON

  • CSV

  • AVRO

  • PROTOBUF (BUFFER DI PROTOCOLLO)

Parametri

Utilizzare i parametri illustrati in questa sezione per configurare il connettore Kafka di Athena.

  • auth_type: specifica il tipo di autenticazione del cluster. Il connettore supporta i seguenti tipi di autenticazione:

    • NO_AUTH: connettiti direttamente a Kafka (ad esempio, a un cluster Kafka implementato su un'istanza EC2 che non utilizza l'autenticazione).

    • SASL_SSL_PLAIN: questo metodo utilizza il protocollo di sicurezza SASL_SSL e il meccanismo SASL PLAIN. Per ulteriori informazioni, consulta Configurazione SASL nella documentazione Kafka di Apache.

    • SASL_PLAINTEXT_PLAIN: questo metodo utilizza il protocollo di sicurezza SASL_PLAINTEXT e il meccanismo SASL PLAIN. Per ulteriori informazioni, consulta Configurazione SASL nella documentazione Kafka di Apache.

    • SASL_SSL_SCRAM_SHA512: puoi utilizzare questo tipo di autenticazione per controllare l'accesso ai cluster Amazon Kafka. Questo metodo consente di archiviare il nome utente e la password su Gestione dei segreti AWS. Il segreto deve essere associato al cluster Kafka. Per ulteriori informazioni, consulta Autenticazione tramite SASL/SCRAM nella documentazione di Apache Kafka.

    • SASL_PLAINTEXT_SCRAM_SHA512: questo metodo utilizza il protocollo di sicurezza SASL_PLAINTEXT e il meccanismo SCRAM_SHA512 SASL. Questo metodo utilizza il nome utente e la password memorizzati in Gestione dei segreti AWS. Per ulteriori informazioni, consulta la sezione Configurazione SASL nella documentazione Apache Kafka.

    • SSL: l'autenticazione SSL utilizza i file archivio delle chiavi e di archivio di trust per connettersi al cluster Apache Kafka. Devi generare i file del trust store e del key store, caricarli in un bucket Amazon S3 e fornire il riferimento ad Amazon S3 quando implementi il connettore. Il key store, il trust store e la chiave SSL sono archiviati in Gestione dei segreti AWS. Il client deve fornire la chiave segreta AWS quando viene implementato il connettore. Per ulteriori informazioni, consulta Crittografia e autenticazione tramite SSL nella documentazione di Apache Kafka.

      Per ulteriori informazioni, consulta Configurazione dell'autenticazione del connettore Kafka di Athena.

  • certificates_s3_reference: la posizione Amazon S3 che contiene i certificati (i file key store e trust store).

  • disable_spill_encryption: (facoltativo) se impostato su True, disabilita la crittografia dello spill. L'impostazione predefinita è False: in questo modo, i dati riversati su S3 vengono crittografati utilizzando AES-GCM tramite una chiave generata casualmente o una chiave generata mediante KMS. La disabilitazione della crittografia dello spill può migliorare le prestazioni, soprattutto se la posizione dello spill utilizza la crittografia lato server.

  • kafka_endpoint: i dettagli dell'endpoint da fornire a Kafka.

  • schema_registry_url: l'indirizzo URL per il registro dello schema (ad esempio, http://schema-registry.example.org:8081). Si applica ai formati di dati AVRO e PROTOBUF. Athena supporta solo il registro degli schemi Confluent.

  • secrets_manager_secret: il nome del segreto AWS in cui vengono salvate le credenziali.

  • Parametri di spill: le funzioni Lambda archiviano temporaneamente ("riversano") i dati che non rientrano nella memoria di Amazon S3. Tutte le istanze del database a cui accede la stessa funzione Lambda riversano i dati nella stessa posizione. Utilizza i parametri nella tabella seguente per specificare la posizione di spill.

    Parametro Descrizione
    spill_bucket Obbligatorio. Il nome del bucket Amazon S3 in cui la funzione Lambda può riversare i dati.
    spill_prefix Obbligatorio. Il prefisso all'interno del bucket di spill in cui la funzione Lambda può riversare dati.
    spill_put_request_headers (Facoltativo) Una mappa codificata in JSON delle intestazioni e dei valori della richiesta per la richiesta putObject di Amazon S3 utilizzata per lo spill (ad esempio, {"x-amz-server-side-encryption" : "AES256"}). Per altre possibili intestazioni, consulta l'argomento relativo a PutObject nella Documentazione di riferimento dell'API di Amazon Simple Storage Service.
  • ID di sottoreti: uno o più ID di sottorete corrispondenti alla sottorete che la funzione Lambda può utilizzare per accedere all'origine dati.

    • Cluster di Kafka pubblico o cluster standard di Confluent Cloud: associa il connettore a una sottorete privata dotata di un gateway NAT.

    • Cluster Confluent Cloud con connettività privata: associa il connettore a una sottorete privata che abbia un percorso diretto al cluster Confluent Cloud.

      • Per AWS Transit Gateway, le sottoreti devono trovarsi in un VPC collegato allo stesso gateway di transito utilizzato da Confluent Cloud.

      • Per il Peering VPC, le sottoreti devono trovarsi in un VPC in peering con Confluent Cloud VPC.

      • Per AWS PrivateLink, le sottoreti devono trovarsi in un VPC con un percorso verso gli endpoint VPC che si connettono a Confluent Cloud.

Nota

Se si implementa il connettore in un VPC per accedere a risorse private e si desidera connettersi anche a un servizio accessibile al pubblico come Confluent, è necessario associare il connettore a una sottorete privata con un gateway NAT. Per ulteriori informazioni, consulta Gateway NAT nella Guida per l'utente di Amazon VPC.

Supporto dei tipi di dati

Nella tabella seguente vengono illustrati i tipi di dati corrispondenti supportati per Kafka e Apache Arrow.

Kafka Arrow
CHAR VARCHAR
VARCHAR VARCHAR
TIMESTAMP MILLISECOND
DATE DAY
BOOLEAN BOOL
SMALLINT SMALLINT
INTEGER INT
BIGINT BIGINT
DECIMAL FLOAT8
DOUBLE FLOAT8

Partizioni e suddivisioni

Gli argomenti di Kafka sono suddivisi in partizioni. Ogni partizione è ordinata. Ogni messaggio in una partizione ha un ID incrementale chiamato offset. Ogni partizione Kafka è ulteriormente suddivisa in più suddivisioni per l'elaborazione in parallelo. I dati sono disponibili per il periodo di conservazione configurato nei cluster Kafka.

Best practice

Come procedura consigliata, utilizza il pushdown del predicato quando esegui query su Athena, come negli esempi seguenti.

SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE integercol = 2147483647
SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'

Configurazione del connettore Kafka

Prima di poter utilizzare il connettore, è necessario configurare il cluster Apache Kafka, utilizzare il registro degli schemi di AWS Glue per definire lo schema e configurare l'autenticazione del connettore.

Quando lavori con il registro degli schemi di AWS Glue, tieni in considerazione i seguenti punti:

  • Assicurati che il testo nel campo Description (Descrizione) del registro degli schemi di AWS Glue includa la stringa {AthenaFederationKafka}. Questa stringa di marcatura è obbligatoria per i registri AWS Glue utilizzati con il connettore Kafka di Amazon Athena.

  • Per prestazioni ottimali, utilizza solo lettere minuscole per i nomi dei database e delle tabelle. L'utilizzo di caratteri misti tra maiuscole e minuscole fa sì che il connettore esegua una ricerca senza distinzione tra maiuscole e minuscole, più impegnativa dal punto di vista computazionale.

Come configurare l'ambiente Amazon Kafka e AWS Glue Schema Registry
  1. Configurazione dell'ambiente Apache Kafka.

  2. Carica il file di descrizione dell'argomento di Kafka (ossia il relativo schema) in formato JSON nel registro degli schemi di AWS Glue. Per ulteriori informazioni, consulta la pagina Integrazione con il registro dello schema AWS Glue nella Guida per gli sviluppatori di AWS Glue.

  3. Per utilizzare il formato di dati AVRO o PROTOBUF quando definisci lo schema nel registro degli schemi AWS Glue:

    • Per Schema name (Nome schema), inserire il nome dell'argomento Kafka nella stesso casing dell'originale.

    • Per il Formato dei dati, scegli Apache Avro o Protocol Buffers.

    Per schemi di esempio, consulta la sezione seguente.

Utilizza il formato degli esempi in questa sezione quando carichi lo schema nel registro degli schemi di AWS Glue.

Esempio di schema di tipo JSON

Nell'esempio seguente, lo schema da creare nel registro degli schemi di AWS Glue specifica json come valore di dataFormat e utilizza datatypejson per topicName.

Nota

Il valore di topicName deve utilizzare le stesse maiuscole e minuscole del nome dell'argomento in Kafka.

{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }

Esempio di schema di tipo CSV

Nell'esempio seguente, lo schema da creare nel registro degli schemi AWS Glue specifica csv come valore di dataFormat e utilizza datatypecsvbulk per topicName. Il valore di topicName deve utilizzare le stesse maiuscole e minuscole del nome dell'argomento in Kafka.

{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }

Esempio di schema di tipo AVRO

L'esempio seguente viene utilizzato per creare uno schema basato su AVRO nel registro degli schemi AWS Glue. Quando si definisce lo schema nel registro degli schemi AWS Glue, in Schema name (nome schema) si immette il nome dell'argomento Kafka nello stesso casing dell'originale e per Data format (Formato dati) si sceglie Apache Avro. Poiché queste informazioni vengono specificate direttamente nel registro, i campi dataformat e topicName non sono obbligatori.

{ "type": "record", "name": "avrotest", "namespace": "example.com", "fields": [{ "name": "id", "type": "int" }, { "name": "name", "type": "string" } ] }

Esempio di schema di tipo PROTOBUF

L'esempio seguente viene utilizzato per creare uno schema basato su PROTOBUF nel registro degli schemi AWS Glue. Quando si definisce lo schema nel registro degli schemi AWS Glue, in Schema name (Nome schema) si immette il nome dell'argomento Kafka nello stesso casing dell'originale e per Data format (Formato dati) si sceglie Protocol Buffers. Poiché queste informazioni vengono specificate direttamente nel registro, i campi dataformat e topicName non sono obbligatori. La prima riga definisce lo schema come PROTOBUF.

syntax = "proto3"; message protobuftest { string name = 1; int64 calories = 2; string colour = 3; }

Per ulteriori informazioni sull'aggiunta di un registro e di schemi nel registro degli schemi AWS Glue, vedere Guida introduttiva al Registro degli schemi nella documentazione AWS Glue.

Configurazione dell'autenticazione del connettore Kafka di Athena

Puoi utilizzare diversi metodi per l'autenticazione al cluster Apache Kafka, tra cui SSL, SASL/SCRAM, SASL/PLAIN e SASL/PLAINTEXT.

La tabella seguente mostra i tipi di autenticazione per il connettore, il protocollo di sicurezza e il meccanismo SASL per ciascuno di questi metodi. Per ulteriori informazioni, consulta la sezione Sicurezza nella documentazione di Apache Kafka.

auth_type security.protocol sasl.mechanism Compatibilità con tipo di cluster
SASL_SSL_PLAIN SASL_SSL PLAIN
  • Kafka autogestito

  • Confluent Platform

  • Confluent Cloud

SASL_PLAINTEXT_PLAIN SASL_PLAINTEXT PLAIN
  • Kafka autogestito

  • Confluent Platform

SASL_SSL_SCRAM_SHA512 SASL_SSL SCRAM-SHA-512
  • Kafka autogestito

  • Confluent Platform

SASL_PLAINTEXT_SCRAM_SHA512 SASL_PLAINTEXT SCRAM-SHA-512
  • Kafka autogestito

  • Confluent Platform

SSL SSL N/D
  • Kafka autogestito

  • Confluent Platform

SSL

Se il cluster è autenticato tramite SSL, devi generare i file del trust store e del key store e caricarli nel bucket Amazon S3. È necessario fornire questo riferimento Amazon S3 quando implementi il connettore. Il key store, il trust store e la chiave SSL sono archiviati in Gestione dei segreti AWS. Fornisci la chiave segreta AWS quando implementi il connettore.

Per informazioni sulla creazione di un segreto in Secrets Manager, consulta la pagina Crea un segreto Gestione dei segreti AWS.

Per utilizzare questo tipo di autenticazione, è necessario impostare le variabili di ambiente come illustrato nella tabella seguente.

Parametro Valore
auth_type SSL
certificates_s3_reference La posizione di Amazon S3 contenente i certificati.
secrets_manager_secret Il nome della tua chiave segreta AWS.

Dopo avere creato un segreto in Gestione dei segreti, puoi visualizzarlo nella console Gestione dei segreti.

Visualizzazione del segreto in Gestione dei segreti
  1. Apri la console di Secrets Manager all'indirizzo https://console.aws.amazon.com/secretsmanager/.

  2. Nel riquadro di navigazione, scegli Secrets (Segreti).

  3. Nella pagina Secrets (Segreti), scegli il collegamento al tuo segreto.

  4. Nella pagina dei dettagli del segreto, scegli Retrieve secret value (Recupera il valore di un segreto).

    L'immagine seguente mostra un esempio di segreto con tre coppie chiave/valore: keystore_password, truststore_password e ssl_key_password.

    Recupero di un segreto SSL in Gestione dei segreti

Per ulteriori informazioni sull'utilizzo di SSL con Kafka, consulta Crittografia e autenticazione tramite SSL nella documentazione di Apache Kafka.

SASL/SCRAM

Se il cluster utilizza l'autenticazione SCRAM, fornisci la chiave Gestione dei segreti associata al cluster quando implementi il connettore. Le credenziali AWS dell'utente (chiave segreta e chiave di accesso) vengono utilizzate per autenticarsi al cluster.

Imposta le variabili di ambiente come illustrato nella tabella seguente.

Parametro Valore
auth_type SASL_SSL_SCRAM_SHA512
secrets_manager_secret Il nome della tua chiave segreta AWS.

L'immagine seguente mostra un esempio di segreto nella console Gestione dei segreti con due coppie chiave/valore: una per username e una per password.

Recupero di un segreto SCRAM in Gestione dei segreti

Per ulteriori informazioni sull'utilizzo di SASL/SCRAM con Kafka, consulta Autenticazione tramite SASL/SCRAM nella documentazione di Apache Kafka.

Informazioni sulla licenza

Utilizzando questo connettore, riconosci l'inclusione di componenti di terze parti, un elenco dei quali è disponibile nel file pom.xml di questo connettore, e accetti i termini delle rispettive licenze di terze parti fornite nel file LICENSE.txt su GitHub.com.

Risorse aggiuntive

Per ulteriori informazioni su questo connettore, visita il sito corrispondente su GitHub.com.