Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Guida introduttiva agli stream CDC
Importante
Questa funzionalità viene fornita come AWS anteprima ed è soggetta a modifiche. Per ulteriori informazioni, consulta la sezione 2, Beta e anteprime, nei Termini di AWS servizio
Prima della disponibilità generale, aggiungeremo nuovi tipi di operazioni ("op": "u"per gli aggiornamenti) al payload dello stream. Per garantire che l'applicazione gestisca queste modifiche senza modifiche, considera qualsiasi op valore non riconosciuto come un problema, applicando il payload. after Per informazioni dettagliate, vedi Comprensione dei record CDC.
Questa guida illustra ogni passaggio necessario per iniziare lo streaming delle modifiche impegnate a livello di riga da un cluster Aurora DSQL a un flusso di dati Amazon Kinesis. Alla fine di questa guida, hai creato una pipeline CDC funzionante e uno script Python che legge e stampa i record delle modifiche.
Prerequisiti
Prima di iniziare, conferma quanto segue:
-
Hai creato un cluster Aurora DSQL in stato.
ACTIVESe il cluster è inattivo, connettiti ad esso con qualsiasi PostgreSQL-compatible client per riattivarlo prima di creare uno stream CDC.CreateStreamrestituisce un errore di convalida se il cluster non è in stato.ACTIVE -
Aurora DSQL richiede che tutte le risorse CDC (cluster, flusso di dati Amazon Kinesis, ruolo di servizio IAM e calling principal) si trovino nello stesso account. AWS
-
Il tuo flusso di dati Amazon Kinesis si trova nella stessa AWS regione del cluster Aurora DSQL.
-
L'hai installato e configurato AWS CLI con credenziali autorizzate a creare ruoli IAM e flussi di dati Amazon Kinesis.
Fase 1: creare un flusso di dati Amazon Kinesis
Crea un flusso di dati Kinesis nello stesso AWS account e nella stessa regione del cluster Aurora DSQL. I record CDC sono più grandi della riga Aurora DSQL corrispondente perché il formato JSON include nomi di colonna, metadati e sovraccarico di codifica.
Dimensionamento del flusso di dati Kinesis
Aurora DSQL CDC offre l'intera riga su ogni modifica. Un aggiornamento che riguarda una singola colonna produce un record che contiene tutte le colonne della riga. I record di eliminazione sono l'eccezione: includono solo le colonne chiave primarie.
Stima della dimensione media dei record
Misura la dimensione media delle righe su disco per comprendere il volume che CDC produrrà e per prevedere record sovradimensionati. La seguente query restituisce la dimensione media delle tuple in byte per una tabella:
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
La busta dei record CDC aggiunge i nomi delle colonne, i metadati e il sovraccarico di codifica in aggiunta alla dimensione delle righe. Per il formato di registrazione esatto, vedi. Registra il payload Per informazioni su come Aurora DSQL gestisce i record che superano il limite di dimensione dei record Kinesis, vedere. Gestione di dischi di grandi dimensioni Per il set completo dei limiti del servizio Kinesis, consulta le quote e i limiti di Amazon Kinesis Data Streams nella Amazon Kinesis Data Streams Developer Guide.
Importante
Quando crei il flusso di dati Kinesis, imposta quanto segue:
-
MaxRecordSizeInKiBa10240(10 MiB). Il valore massimo predefinito di 1 MiB per Kinesis non è sempre sufficiente per i record Aurora DSQL CDC. Qualsiasi record che supera la dimensione del record Kinesis configurato causa il danneggiamento del flusso CDC.KINESIS_OVERSIZE_RECORDAurora DSQL divide i record di grandi dimensioni in frammenti che possono avvicinarsi a 10 MiB ciascuno, quindi il flusso di dati Kinesis deve accettare record di quelle dimensioni. Per informazioni dettagliate, vedi Gestione di dischi di grandi dimensioni. -
StreamModeaON_DEMAND. On-demand mode ridimensiona automaticamente la capacità dello shard e ti protegge dall'under-provisioning in caso di picchi imprevisti. Kinesis può continuare a funzionare anche in caso diWriteProvisionedThroughputExceededbrusche sequenze di secondi man mano che la capacità aumenta. Pianifica brevi eventi di limitazione.
Crea CloudWatch allarmi su IncomingBytes e WriteProvisionedThroughputExceeded nel namespace. AWS/Kinesis Il throttling di Kinesis rallenta l'erogazione del CDC e aumenta il ritardo di replica. Per le DSQL-side metriche di Aurora e le indicazioni sugli allarmi, vedi. Le migliori pratiche di monitoraggio
Gli esempi seguenti utilizzano AWS CLI. Se la tua AWS CLI versione non supporta il --max-record-size-in-ki-b parametro, usa un AWS SDK per chiamare l'operazione CreateStreamKinesis.
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
Attendi che lo stream diventi attivo:
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
Il comando ritorna "ACTIVE" quando lo stream è pronto.
Registra lo stream ARN dall'output. Ne hai bisogno nei seguenti passaggi. L'ARN ha il formato. arn:aws:kinesis:region:account-id:stream/my-cdc-stream
Fase 2: Creare un ruolo IAM per Aurora DSQL
Aurora DSQL assume un ruolo IAM per scrivere record CDC nel flusso di dati Kinesis. In questo passaggio, crei il ruolo con una politica di fiducia e alleghi una politica di autorizzazioni. Per una spiegazione completa di ogni elemento della policy, vediConfigurazione di IAM.
Creare il file delle politiche di attendibilità
Salva il seguente codice JSON con nome. trust-policy.json Sostituisci your-account-id region e cluster-id con i tuoi valori.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Creare il ruolo
Per creare il ruolo IAM, eseguire il seguente comando :
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
Crea il file dei criteri di autorizzazione
Salva il seguente file JSON con nome. permissions-policy.json Sostituisci i valori segnaposto con il tuo ARN del flusso di dati Kinesis. L'KMSAccessistruzione è richiesta solo se il flusso di dati Kinesis utilizza una chiave gestita AWS KMS dal cliente, ma è possibile includerla preventivamente in modo che l'aggiunta di una chiave gestita dal cliente in un secondo momento non interrompa il flusso CDC. Per una spiegazione completa di ogni condizione, vedi. Politica di autorizzazione dei ruoli di servizio
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Allega la politica delle autorizzazioni
Esegui il comando seguente:
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
Registra l'ARN del ruolo dall'create-roleoutput. L'ARN ha il formato. arn:aws:iam::your-account-id:role/dsql-cdc-role
Fase 3: Creare lo stream CDC
Usa il AWS CLI per creare un flusso CDC che colleghi il tuo cluster Aurora DSQL al flusso di dati Kinesis. Sostituisci i valori segnaposto con l'ARN del flusso Kinesis della Fase 1, l'ARN del ruolo IAM della Fase 2 e l'identificatore del cluster.
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
La risposta include un identificatore di flusso e uno stato di. CREATING La creazione dello stream richiede in genere da uno a tre minuti.
Attendi che lo stream diventi attivo
Esamina lo stato dello stream fino a raggiungereACTIVE:
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
Puoi anche utilizzare il StreamActive cameriere negli AWS SDK per effettuare sondaggi automaticamente.
Una volta raggiunto lo streamACTIVE, Aurora DSQL inizia a fornire modifiche impegnate a livello di riga al flusso di dati Kinesis.
Nota
Ogni cluster Aurora DSQL ha un numero massimo di stream CDC. Se raggiungi questo limite, restituisce un. CreateStream ServiceQuotaExceededException Per il limite predefinito, vedi Quote e limiti.
Passaggio 4: Verifica che i record scorrano
Inserisci una riga in una tabella sul tuo cluster Aurora DSQL. Esempio:
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
Leggi dal flusso di dati Kinesis per verificare che il record CDC sia arrivato:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
Il Data campo di ogni record contiene un payload JSON. Quando si utilizza il AWS CLI, il payload è Base64-encoded nella risposta. Quando si utilizza l'boto3SDK, l'SDK lo decodifica automaticamente. Il codice JSON decodificato ha il seguente aspetto:
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
Per una descrizione completa di ogni campo, vedi. Comprensione dei record CDC
Passaggio 5: consumare i record con uno script Python
Il seguente script Python legge i record CDC da un flusso di dati Kinesis e stampa ogni evento di modifica. Lo script utilizza il client boto3 Amazon Kinesis per eseguire iterazioni sugli shard e decodificare ogni record. Poiché Aurora DSQL CDC utilizza almeno una consegna, lo script potrebbe stampare lo stesso record più di una volta.
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
Esegui lo script :
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
Lo script stampa ogni evento di modifica non appena arriva. L'output visualizzato è simile al seguente:
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
L'aggiunta della deduplicazione Last-Writer-Wins
Poiché Aurora DSQL CDC utilizza almeno una consegna, le app di produzione devono deduplicare e ordinare i record. Il seguente esempio di codice mostra un approccio basato sull'elevato numero di marcature: per ogni chiave primaria, tiene traccia del valore più alto source.ts_ns visto finora e scarta qualsiasi record con un timestamp uguale o precedente. Imposta PK_COLUMNS i nomi delle colonne della chiave principale della tabella che stai elaborando. Per le strategie che gestiscono più tabelle o eliminazioni, consultaStrategie di consumo.
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
Gestione degli stream CDC
Elenco degli stream
Per elencare tutti i flussi CDC per un cluster, usa l'operazione: ListStreams
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
Eliminazione di uno stream
Per eliminare uno stream CDC, esegui il seguente comando:
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
Puoi usare il StreamNotExists cameriere per eseguire un sondaggio GetStream finché non ResourceNotFoundException viene restituito un, a indicare che Aurora DSQL ha eliminato completamente lo stream.