

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Guida introduttiva agli stream CDC
<a name="cdc-setup"></a>

**Importante**  
Questa funzionalità viene fornita come AWS anteprima ed è soggetta a modifiche. Per ulteriori informazioni, consulta la sezione 2, Beta e anteprime, nei Termini di [AWS servizio](https://aws.amazon.com/service-terms/). Per ulteriori informazioni sui prezzi degli stream CDC, consulta la pagina dei prezzi di [Aurora DSQL](https://aws.amazon.com/rds/aurora/dsql/pricing/).  
Prima della disponibilità generale, aggiungeremo nuovi tipi di operazioni (`"op": "u"`per gli aggiornamenti) al payload dello stream. Per garantire che l'applicazione gestisca queste modifiche senza modifiche, considera qualsiasi `op` valore non riconosciuto come un problema, applicando il payload. `after` Per informazioni dettagliate, vedi [Comprensione dei record CDC](cdc-record-format.md).

Questa guida illustra ogni passaggio necessario per iniziare lo streaming delle modifiche impegnate a livello di riga da un cluster Aurora DSQL a un flusso di dati Amazon Kinesis. Alla fine di questa guida, hai creato una pipeline CDC funzionante e uno script Python che legge e stampa i record delle modifiche.

## Prerequisiti
<a name="cdc-prerequisites"></a>

Prima di iniziare, conferma quanto segue:
+ Hai creato un cluster Aurora DSQL in stato. `ACTIVE` Se il cluster è inattivo, connettiti ad esso con qualsiasi PostgreSQL-compatible client per riattivarlo prima di creare uno stream CDC. `CreateStream`restituisce un errore di convalida se il cluster non è in stato. `ACTIVE`
+ Aurora DSQL richiede che tutte le risorse CDC (cluster, flusso di dati Amazon Kinesis, ruolo di servizio IAM e calling principal) si trovino nello stesso account. AWS 
+ Il tuo flusso di dati Amazon Kinesis si trova nella stessa AWS regione del cluster Aurora DSQL.
+ L'hai installato e configurato AWS CLI con credenziali autorizzate a creare ruoli IAM e flussi di dati Amazon Kinesis.

## Fase 1: creare un flusso di dati Amazon Kinesis
<a name="cdc-step1-kinesis"></a>

Crea un flusso di dati Kinesis nello stesso AWS account e nella stessa regione del cluster Aurora DSQL. I record CDC sono più grandi della riga Aurora DSQL corrispondente perché il formato JSON include nomi di colonna, metadati e sovraccarico di codifica.

### Dimensionamento del flusso di dati Kinesis
<a name="cdc-sizing"></a>

Aurora DSQL CDC offre l'intera riga su ogni modifica. Un aggiornamento che riguarda una singola colonna produce un record che contiene tutte le colonne della riga. I record di eliminazione sono l'eccezione: includono solo le colonne chiave primarie.

**Stima della dimensione media dei record**  
Misura la dimensione media delle righe su disco per comprendere il volume che CDC produrrà e per prevedere record sovradimensionati. La seguente query restituisce la dimensione media delle tuple in byte per una tabella:

```
SELECT avg(pg_column_size(t.*)) FROM {{your_table}} t;
```

La busta dei record CDC aggiunge i nomi delle colonne, i metadati e il sovraccarico di codifica in aggiunta alla dimensione delle righe. Per il formato di registrazione esatto, vedi. [Registra il payload](cdc-record-format.md#cdc-record-payload) Per informazioni su come Aurora DSQL gestisce i record che superano il limite di dimensione dei record Kinesis, vedere. [Gestione di dischi di grandi dimensioni](cdc-record-format.md#cdc-oversized-records) *Per il set completo dei limiti del servizio Kinesis, consulta le [quote e i limiti di Amazon Kinesis Data Streams nella Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) Developer Guide.*

**Importante**  
Quando crei il flusso di dati Kinesis, imposta quanto segue:  
`MaxRecordSizeInKiB`a `10240` (10 MiB). Il valore massimo predefinito di 1 MiB per Kinesis non è sempre sufficiente per i record Aurora DSQL CDC. Qualsiasi record che supera la dimensione del record Kinesis configurato causa il danneggiamento del flusso CDC. `KINESIS_OVERSIZE_RECORD` Aurora DSQL divide i record di grandi dimensioni in frammenti che possono avvicinarsi a 10 MiB ciascuno, quindi il flusso di dati Kinesis deve accettare record di quelle dimensioni. Per informazioni dettagliate, vedi [Gestione di dischi di grandi dimensioni](cdc-record-format.md#cdc-oversized-records).
`StreamMode`a`ON_DEMAND`. On-demand mode ridimensiona automaticamente la capacità dello shard e ti protegge dall'under-provisioning in caso di picchi imprevisti. Kinesis può continuare a funzionare anche in caso di `WriteProvisionedThroughputExceeded` brusche sequenze di secondi man mano che la capacità aumenta. Pianifica brevi eventi di limitazione.

Crea CloudWatch allarmi su `IncomingBytes` e `WriteProvisionedThroughputExceeded` nel namespace. `AWS/Kinesis` Il throttling di Kinesis rallenta l'erogazione del CDC e aumenta il ritardo di replica. Per le DSQL-side metriche di Aurora e le indicazioni sugli allarmi, vedi. [Le migliori pratiche di monitoraggio](cdc-monitoring.md#cdc-monitoring-best-practices)

Gli esempi seguenti utilizzano AWS CLI. Se la tua AWS CLI versione non supporta il `--max-record-size-in-ki-b` parametro, usa un AWS SDK per chiamare l'operazione [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)Kinesis.

```
aws kinesis create-stream \
  --stream-name {{my-cdc-stream}} \
  --stream-mode-details StreamMode=ON_DEMAND \
  --max-record-size-in-ki-b 10240 \
  --region {{region}}
```

Attendi che lo stream diventi attivo:

```
aws kinesis describe-stream-summary \
  --stream-name {{my-cdc-stream}} \
  --region {{region}} \
  --query 'StreamDescriptionSummary.StreamStatus'
```

Il comando ritorna `"ACTIVE"` quando lo stream è pronto.

Registra lo stream ARN dall'output. Ne hai bisogno nei seguenti passaggi. L'ARN ha il formato. `arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{my-cdc-stream}}`

## Fase 2: Creare un ruolo IAM per Aurora DSQL
<a name="cdc-step2-iam"></a>

Aurora DSQL assume un ruolo IAM per scrivere record CDC nel flusso di dati Kinesis. In questo passaggio, crei il ruolo con una politica di fiducia e alleghi una politica di autorizzazioni. Per una spiegazione completa di ogni elemento della policy, vedi[Configurazione di IAM](cdc-iam.md).

**Creare il file delle politiche di attendibilità**  
Salva il seguente codice JSON con nome. `trust-policy.json` Sostituisci {{your-account-id}} {{region}} e {{cluster-id}} con i tuoi valori.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "DSQLAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "dsql.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "{{your-account-id}}"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:dsql:{{region}}:{{your-account-id}}:cluster/{{cluster-id}}/stream/*"
                }
            }
        }
    ]
}
```

**Creare il ruolo**  
Per creare il ruolo IAM, eseguire il seguente comando :

```
aws iam create-role \
  --role-name {{dsql-cdc-role}} \
  --assume-role-policy-document file://trust-policy.json
```

**Crea il file dei criteri di autorizzazione**  
Salva il seguente file JSON con nome. `permissions-policy.json` Sostituisci i valori segnaposto con il tuo ARN del flusso di dati Kinesis. L'`KMSAccess`istruzione è richiesta solo se il flusso di dati Kinesis utilizza una chiave gestita AWS KMS dal cliente, ma è possibile includerla preventivamente in modo che l'aggiunta di una chiave gestita dal cliente in un secondo momento non interrompa il flusso CDC. Per una spiegazione completa di ogni condizione, vedi. [Politica di autorizzazione dei ruoli di servizio](cdc-iam.md#cdc-iam-permissions-policy)

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisAccess",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}"
        },
        {
            "Sid": "KMSAccess",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": "arn:aws:kms:*:*:key/*",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.{{region}}.amazonaws.com",
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}",
                    "aws:ResourceAccount": "${aws:PrincipalAccount}"
                }
            }
        }
    ]
}
```

**Allega la politica delle autorizzazioni**  
Esegui il comando seguente:

```
aws iam put-role-policy \
  --role-name {{dsql-cdc-role}} \
  --policy-name dsql-cdc-kinesis-access \
  --policy-document file://permissions-policy.json
```

Registra l'ARN del ruolo dall'`create-role`output. L'ARN ha il formato. `arn:aws:iam::{{your-account-id}}:role/{{dsql-cdc-role}}`

## Fase 3: Creare lo stream CDC
<a name="cdc-step3-create-stream"></a>

Usa il AWS CLI per creare un flusso CDC che colleghi il tuo cluster Aurora DSQL al flusso di dati Kinesis. Sostituisci i valori segnaposto con l'ARN del flusso Kinesis della Fase 1, l'ARN del ruolo IAM della Fase 2 e l'identificatore del cluster.

```
aws dsql create-stream \
  --cluster-identifier {{cluster-id}} \
  --target-definition '{"kinesis":{"streamArn":"{{kinesis-stream-arn}}","roleArn":"{{role-arn}}"}}' \
  --ordering UNORDERED \
  --format JSON \
  --tags '{"Name":"{{my-cdc-stream}}"}' \
  --region {{region}}
```

La risposta include un identificatore di flusso e uno stato di. `CREATING` La creazione dello stream richiede in genere da uno a tre minuti.

**Attendi che lo stream diventi attivo**  
Esamina lo stato dello stream fino a raggiungere`ACTIVE`:

```
aws dsql get-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}} \
  --query 'status'
```

Puoi anche utilizzare il `StreamActive` cameriere negli AWS SDK per effettuare sondaggi automaticamente.

Una volta raggiunto lo stream`ACTIVE`, Aurora DSQL inizia a fornire modifiche impegnate a livello di riga al flusso di dati Kinesis.

**Nota**  
Ogni cluster Aurora DSQL ha un numero massimo di stream CDC. Se raggiungi questo limite, restituisce un. `CreateStream` `ServiceQuotaExceededException` Per il limite predefinito, vedi [Quote e limiti](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/CHAP_quotas.html).

## Passaggio 4: Verifica che i record scorrano
<a name="cdc-step4-verify"></a>

Inserisci una riga in una tabella sul tuo cluster Aurora DSQL. Esempio:

```
CREATE TABLE IF NOT EXISTS test_cdc (
    id INT PRIMARY KEY,
    message TEXT
);

INSERT INTO test_cdc VALUES (1, 'hello cdc');
```

Leggi dal flusso di dati Kinesis per verificare che il record CDC sia arrivato:

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
  --stream-name {{my-cdc-stream}} \
  --shard-id shardId-000000000000 \
  --shard-iterator-type TRIM_HORIZON \
  --region {{region}} \
  --query 'ShardIterator' --output text)

aws kinesis get-records \
  --shard-iterator "$SHARD_ITERATOR" \
  --region {{region}}
```

Il `Data` campo di ogni record contiene un payload JSON. Quando si utilizza il AWS CLI, il payload è Base64-encoded nella risposta. Quando si utilizza l'`boto3`SDK, l'SDK lo decodifica automaticamente. Il codice JSON decodificato ha il seguente aspetto:

```
{
    "type": "full",
    "op": "c",
    "before": null,
    "after": {"id": 1, "message": "hello cdc"},
    "source": {
        "version": "1.0",
        "ts_ms": 1705318200000,
        "ts_ns": 1705318200000000000,
        "txId": "ffthunp5stx6ffs2vyfqoatmfu",
        "schema": "public",
        "table": "test_cdc",
        "db": "postgres",
        "cluster": "{{cluster-id}}"
    },
    "ts_ms": 1705318200125,
    "ts_ns": 1705318200125483291
}
```

Per una descrizione completa di ogni campo, vedi. [Comprensione dei record CDC](cdc-record-format.md)

## Passaggio 5: consumare i record con uno script Python
<a name="cdc-step5-consume"></a>

Il seguente script Python legge i record CDC da un flusso di dati Kinesis e stampa ogni evento di modifica. Lo script utilizza il client `boto3` Amazon Kinesis per eseguire iterazioni sugli shard e decodificare ogni record. Poiché Aurora DSQL CDC utilizza almeno una consegna, lo script potrebbe stampare lo stesso record più di una volta.

```
"""
Read CDC records from an Amazon Kinesis data stream.

Usage:
    pip install boto3
    python consume_cdc.py --stream-name my-cdc-stream --region us-east-1
"""
from __future__ import annotations

import argparse
import json

import boto3


def consume_cdc(stream_name: str, region: str) -> None:
    kinesis = boto3.client("kinesis", region_name=region)

    # List all shards (paginate if the stream has many shards)
    shard_ids: list[str] = []
    paginator = kinesis.get_paginator("list_shards")
    for page in paginator.paginate(StreamName=stream_name):
        shard_ids.extend(s["ShardId"] for s in page["Shards"])
    print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))")

    for shard_id in shard_ids:
        iterator_response = kinesis.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType="TRIM_HORIZON",
        )
        shard_iterator = iterator_response["ShardIterator"]

        while shard_iterator:
            records_response = kinesis.get_records(
                ShardIterator=shard_iterator, Limit=100
            )
            shard_iterator = records_response.get("NextShardIterator")

            for record in records_response["Records"]:
                # boto3 decodes Base64 automatically; record["Data"] is bytes.
                payload = json.loads(record["Data"])

                # A record's "type" field identifies its structure.
                # "full": inlined record with before/after values.
                # "chunked": main record that references fragments for a split image.
                # "fragment": one piece of a chunked image; reassemble in production code.
                # For details, see cdc-record-format.html#cdc-oversized-records.
                record_type = payload.get("type", "full")
                if record_type == "fragment":
                    print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}")
                    continue

                source = payload["source"]
                op = payload["op"]
                ts_ns = source["ts_ns"]
                tx_id = source["txId"]
                table = f"{source['schema']}.{source['table']}"

                # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent
                # release will emit "u" for updates, and "c" for inserts. Design your
                # consumer to handle all three values; this map stays correct across the
                # transition.
                op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"}
                print(
                    f"[{op_labels.get(op, op)}] {table} "
                    f"txId={tx_id} ts_ns={ts_ns} type={record_type}"
                )
                if payload.get("after"):
                    print(f"  after:  {json.dumps(payload['after'])}")
                if payload.get("before"):
                    print(f"  before: {json.dumps(payload['before'])}")
                if record_type == "chunked":
                    print(f"  chunked: {json.dumps(payload['chunked'])}")

            if not records_response["Records"]:
                break  # No more records in this shard


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Consume DSQL CDC records from Kinesis"
    )
    parser.add_argument("--stream-name", required=True, help="Kinesis stream name")
    parser.add_argument("--region", required=True, help="AWS Region")
    args = parser.parse_args()
    consume_cdc(args.stream_name, args.region)
```

Esegui lo script :

```
pip install boto3
python consume_cdc.py \
  --stream-name {{my-cdc-stream}} \
  --region {{region}}
```

Lo script stampa ogni evento di modifica non appena arriva. L'output visualizzato è simile al seguente:

```
Reading from my-cdc-stream (4 shard(s))
[INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full
  after:  {"id": 1, "message": "hello cdc"}
```

**L'aggiunta della deduplicazione Last-Writer-Wins**  
Poiché Aurora DSQL CDC utilizza almeno una consegna, le app di produzione devono deduplicare e ordinare i record. Il seguente esempio di codice mostra un approccio basato sull'elevato numero di marcature: per ogni chiave primaria, tiene traccia del valore più alto `source.ts_ns` visto finora e scarta qualsiasi record con un timestamp uguale o precedente. Imposta `PK_COLUMNS` i nomi delle colonne della chiave principale della tabella che stai elaborando. Per le strategie che gestiscono più tabelle o eliminazioni, consulta[Strategie di consumo](cdc-streams.md#cdc-consumer-strategies).

```
# Set PK_COLUMNS to the primary key column(s) of your table.
PK_COLUMNS = ["id"]

# Maps each primary key value to the highest ts_ns seen for that key.
high_water: dict[tuple, int] = {}

def process_record(payload: dict) -> bool:
    """Return True if the record is new, False if it's a duplicate or stale.

    Skip fragment records; reassemble them into a full image before calling this.
    """
    if payload.get("type") == "fragment":
        return False  # Fragments are reassembled upstream, not deduplicated here.

    source = payload["source"]
    ts_ns = source["ts_ns"]
    op = payload["op"]

    # For inserts/updates the row is in "after"; for deletes it's in "before".
    row = payload.get("after") or payload.get("before") or {}
    pk = tuple(row.get(col) for col in PK_COLUMNS)

    prev_ts = high_water.get(pk, -1)
    if ts_ns <= prev_ts:
        return False  # Duplicate or out-of-order record

    high_water[pk] = ts_ns
    return True
```

## Gestione degli stream CDC
<a name="cdc-manage-streams"></a>

**Elenco degli stream**  
Per elencare tutti i flussi CDC per un cluster, usa l'operazione: `ListStreams`

```
aws dsql list-streams \
  --cluster-identifier {{cluster-id}} \
  --region {{region}}
```

**Eliminazione di uno stream**  
Per eliminare uno stream CDC, esegui il seguente comando:

```
aws dsql delete-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}}
```

Puoi usare il `StreamNotExists` cameriere per eseguire un sondaggio `GetStream` finché non `ResourceNotFoundException` viene restituito un, a indicare che Aurora DSQL ha eliminato completamente lo stream.