

# Introducción a los flujos de CDC
<a name="cdc-setup"></a>

**importante**  
Esta característica se ofrece como versión preliminar de AWS y está sujeta a cambios. Para obtener más información, consulte la sección 2, Betas y versiones preliminares, del documento [Términos de servicio de AWS](https://aws.amazon.com/service-terms/). Para obtener más información acerca de los precios de los flujos de CDC, visite la [página Precios de Amazon Aurora DSQL](https://aws.amazon.com/rds/aurora/dsql/pricing/).  
Antes de la disponibilidad general, añadiremos nuevos tipos de operaciones (`"op": "u"` para actualizaciones) a la carga útil de su flujo. Para garantizar que su aplicación gestione estos cambios sin necesidad de modificaciones, trate cualquier valor `op` no reconocido como una operación upsert aplicando la carga útil `after`. Para obtener más información, consulte [Descripción de los registros de CDC](cdc-record-format.md).

Esta guía explica todos los pasos necesarios para comenzar a transmitir los cambios confirmados en las filas desde un clúster de Aurora DSQL a un flujo de datos de Amazon Kinesis. Al finalizar esta guía, habrá creado una canalización de CDC operativa y un script de Python que lee y muestra los registros de cambios.

## Requisitos previos
<a name="cdc-prerequisites"></a>

Antes de comenzar, confirme lo siguiente:
+ Ha creado un clúster de Aurora DSQL en estado `ACTIVE`. Si el clúster está inactivo, conéctese a él con cualquier cliente compatible con PostgreSQL para activarlo antes de crear un flujo de CDC. `CreateStream` devuelve un error de validación si el clúster no se encuentra en estado `ACTIVE`.
+ Aurora DSQL requiere que todos los recursos de CDC —el clúster, el flujo de datos de Amazon Kinesis, el rol de servicio de IAM y la entidad principal de llamada— se encuentren en la misma cuenta de AWS.
+ El flujo de datos de Amazon Kinesis se encuentra en la misma región de AWS que el clúster de Aurora DSQL.
+ Ha instalado y configurado la AWS CLI con credenciales que tienen permiso para crear roles de IAM y flujos de datos de Amazon Kinesis.

## Paso 1: Creación de un flujo de datos de Amazon Kinesis
<a name="cdc-step1-kinesis"></a>

El flujo de datos de Amazon Kinesis se encuentra en la misma cuenta y región de AWS que el clúster de Aurora DSQL. Los registros de CDC son más grandes que la fila de Aurora DSQL correspondiente porque el formato JSON incluye nombres de columnas, metadatos y sobrecarga de codificación.

### Tamaño del flujo de datos de Kinesis
<a name="cdc-sizing"></a>

La CDC de Aurora DSQL entrega la fila completa en cada cambio. Una actualización que afecta a una sola columna genera un registro que contiene todas las columnas de la fila. Los registros de eliminación son la excepción: solo incluyen las columnas de clave principal.

**Cálculo del tamaño medio de los registros**  
Mida el tamaño medio de las filas en disco para conocer el volumen que generará el CDC y anticipar registros de un tamaño excesivo. La siguiente consulta devuelve el tamaño medio de las tuplas en bytes para una tabla:

```
SELECT avg(pg_column_size(t.*)) FROM {{your_table}} t;
```

El sobre del registro de CDC añade los nombres de las columnas, los metadatos y la sobrecarga de codificación además al tamaño de la fila. Para conocer el formato de registro exacto, consulte [Carga útil de registro](cdc-record-format.md#cdc-record-payload). Para saber cómo gestiona Aurora DSQL los registros que superan el límite de tamaño de los registros de Kinesis, consulte [Gestión de los registros sobredimensionados](cdc-record-format.md#cdc-oversized-records). Para conocer el conjunto completo de límites del servicio Kinesis, consulte [Cuotas y límites de Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) en la *Guía para desarrolladores de Amazon Kinesis Data Streams*.

**importante**  
Al crear el flujo de datos de Kinesis, establezca lo siguiente:  
`MaxRecordSizeInKiB` en `10240` (10 MiB). El tamaño máximo predeterminado de Kinesis de 1 MiB no siempre es suficiente para los registros de CDC de Aurora DSQL. Cualquier registro que supere el tamaño de registro de Kinesis configurado provoca que el flujo de CDC se vea afectado por el error `KINESIS_OVERSIZE_RECORD`. Aurora DSQL divide los registros de gran tamaño en fragmentos que pueden alcanzar los 10 MiB cada uno, por lo que el flujo de datos de Kinesis debe aceptar registros de ese tamaño. Para obtener más información, consulte [Gestión de los registros sobredimensionados](cdc-record-format.md#cdc-oversized-records).
`StreamMode` a `ON_DEMAND`. El modo bajo demanda escala la capacidad de las particiones automáticamente y le protege contra un aprovisionamiento insuficiente durante picos inesperados. Kinesis puede seguir devolviendo `WriteProvisionedThroughputExceeded` durante ráfagas bruscas de segundos a medida que la capacidad se escala verticalmente. Planifique eventos de limitación breves.

Cree alarmas de CloudWatch para `IncomingBytes` y `WriteProvisionedThroughputExceeded` en el espacio de nombres de `AWS/Kinesis`. La limitación de Kinesis ralentiza la entrega de CDC y aumenta el retardo de la replicación. Para obtener información sobre métricas y alarmas de Aurora DSQL, consulte [Prácticas recomendadas de supervisión](cdc-monitoring.md#cdc-monitoring-best-practices).

El siguiente ejemplo utiliza AWS CLI. Si su versión de la AWS CLI no admite el parámetro `--max-record-size-in-ki-b`, utilice un AWS SDK para llamar a la operación [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html) de Kinesis.

```
aws kinesis create-stream \
  --stream-name {{my-cdc-stream}} \
  --stream-mode-details StreamMode=ON_DEMAND \
  --max-record-size-in-ki-b 10240 \
  --region {{region}}
```

Espere a que un flujo se active:

```
aws kinesis describe-stream-summary \
  --stream-name {{my-cdc-stream}} \
  --region {{region}} \
  --query 'StreamDescriptionSummary.StreamStatus'
```

El comando devuelve `"ACTIVE"` cuando el flujo está listo.

Anote el ARN del flujo que aparece en la salida. Lo necesitará en los siguientes pasos. El ARN tiene el formato `arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{my-cdc-stream}}`.

## Paso 2: Creación de un rol de IAM para Aurora DSQL
<a name="cdc-step2-iam"></a>

Aurora DSQL asume el rol de IAM para escribir registros de CDC en su flujo de datos de Kinesis. En este paso, creará el rol con una política de confianza y le asociará una política de permisos. Para obtener una explicación completa de cada elemento de la política, consulte [Configuración de IAM](cdc-iam.md).

**Creación de un archivo de política de confianza**  
Guarde el siguiente JSON como `trust-policy.json`. Sustituya los valores de {{your-account-id}}, {{region}} y {{cluster-id}} por los suyos.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "DSQLAccess",
            "Effect": "Allow",
            "Principal": {
                "Service": "dsql.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "{{your-account-id}}"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:dsql:{{region}}:{{your-account-id}}:cluster/{{cluster-id}}/stream/*"
                }
            }
        }
    ]
}
```

**Crear el rol**  
Ejecute el siguiente comando de para crear el rol de IAM:

```
aws iam create-role \
  --role-name {{dsql-cdc-role}} \
  --assume-role-policy-document file://trust-policy.json
```

**Creación de un archivo de política de permisos**  
Guarde el siguiente JSON como `permissions-policy.json`. Sustituya los valores de marcador de posición por su ARN de flujo de datos de Kinesis. La instrucción `KMSAccess` solo es necesaria si el flujo de datos de Kinesis utiliza una clave administrada por el cliente de AWS KMS, pero puede incluirla de forma preventiva para que, si añade una clave administrada por el cliente más adelante, no se vea afectado su flujo de CDC. Para obtener una explicación completa de cada condición, consulte [Política de permisos de roles de servicio](cdc-iam.md#cdc-iam-permissions-policy).

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KinesisAccess",
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}"
        },
        {
            "Sid": "KMSAccess",
            "Effect": "Allow",
            "Action": [
                "kms:GenerateDataKey"
            ],
            "Resource": "arn:aws:kms:*:*:key/*",
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "kinesis.{{region}}.amazonaws.com",
                    "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:{{region}}:{{your-account-id}}:stream/{{my-cdc-stream}}",
                    "aws:ResourceAccount": "${aws:PrincipalAccount}"
                }
            }
        }
    ]
}
```

**Asociación de la política de permisos**  
Use el siguiente comando:

```
aws iam put-role-policy \
  --role-name {{dsql-cdc-role}} \
  --policy-name dsql-cdc-kinesis-access \
  --policy-document file://permissions-policy.json
```

Anote el ARN de rol que aparece en la salida de `create-role`. El ARN tiene el formato `arn:aws:iam::{{your-account-id}}:role/{{dsql-cdc-role}}`.

## Paso 3: Creación del flujo de CDC
<a name="cdc-step3-create-stream"></a>

Utilice la AWS CLI para crear un flujo de CDC que conecte su clúster de Aurora DSQL con el flujo de datos de Kinesis. Sustituya los valores de los marcadores de posición por el ARN del flujo de Kinesis del paso 1, el ARN del rol de IAM del paso 2 y el identificador de su clúster.

```
aws dsql create-stream \
  --cluster-identifier {{cluster-id}} \
  --target-definition '{"kinesis":{"streamArn":"{{kinesis-stream-arn}}","roleArn":"{{role-arn}}"}}' \
  --ordering UNORDERED \
  --format JSON \
  --tags '{"Name":"{{my-cdc-stream}}"}' \
  --region {{region}}
```

La respuesta incluye un identificador de flujo y un estado de `CREATING`. La creación del flujo suele tardar de uno a tres minutos.

**Espera a la activación del flujo**  
Sondee el estado del flujo hasta que sea `ACTIVE`:

```
aws dsql get-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}} \
  --query 'status'
```

También puede utilizar el esperador `StreamActive` de los AWS SDK para realizar el sondeo de forma automática.

Una vez que el flujo alcance el estado `ACTIVE`, Aurora DSQL comenzará a enviar los cambios confirmados en las fila al flujo de datos de Kinesis.

**nota**  
Cada clúster de Aurora DSQL tiene un número máximo de flujos de CDC. Si alcanza este límite, `CreateStream` devuelve una `ServiceQuotaExceededException`. Para conocer el límite predeterminado, consulte [Cuotas y límites](https://docs.aws.amazon.com/aurora-dsql/latest/userguide/CHAP_quotas.html).

## Paso 4: Comprobación del flujo de registros
<a name="cdc-step4-verify"></a>

Inserte una fila en una tabla del clúster de Aurora DSQL. Por ejemplo:

```
CREATE TABLE IF NOT EXISTS test_cdc (
    id INT PRIMARY KEY,
    message TEXT
);

INSERT INTO test_cdc VALUES (1, 'hello cdc');
```

Lea el flujo de datos de Kinesis para comprobar que ha llegado el registro de CDC:

```
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \
  --stream-name {{my-cdc-stream}} \
  --shard-id shardId-000000000000 \
  --shard-iterator-type TRIM_HORIZON \
  --region {{region}} \
  --query 'ShardIterator' --output text)

aws kinesis get-records \
  --shard-iterator "$SHARD_ITERATOR" \
  --region {{region}}
```

El campo `Data` de cada registro contiene una carga útil JSON. Cuando se utiliza la AWS CLI, la carga útil está codificada en Base64 en la respuesta. Cuando usa el `boto3` SDK, el SDK lo descodifica automáticamente. El JSON descodificado tiene el siguiente aspecto:

```
{
    "type": "full",
    "op": "c",
    "before": null,
    "after": {"id": 1, "message": "hello cdc"},
    "source": {
        "version": "1.0",
        "ts_ms": 1705318200000,
        "ts_ns": 1705318200000000000,
        "txId": "ffthunp5stx6ffs2vyfqoatmfu",
        "schema": "public",
        "table": "test_cdc",
        "db": "postgres",
        "cluster": "{{cluster-id}}"
    },
    "ts_ms": 1705318200125,
    "ts_ns": 1705318200125483291
}
```

Para obtener una descripción completa de cada uno de los campos, consulte [Descripción de los registros de CDC](cdc-record-format.md).

## Paso 5: Consumición de registros con un script de Python
<a name="cdc-step5-consume"></a>

El siguiente script de Python lee los registros de CDC de un flujo de datos de Kinesis e imprime cada evento de cambio. El script utiliza el cliente `boto3` de Amazon Kinesis para iterar en las particiones y descodificar cada registro. Dado que la CDC de Aurora DSQL utiliza la entrega al menos una vez, es posible que el script muestre el mismo registro más de una vez.

```
"""
Read CDC records from an Amazon Kinesis data stream.

Usage:
    pip install boto3
    python consume_cdc.py --stream-name my-cdc-stream --region us-east-1
"""
from __future__ import annotations

import argparse
import json

import boto3


def consume_cdc(stream_name: str, region: str) -> None:
    kinesis = boto3.client("kinesis", region_name=region)

    # List all shards (paginate if the stream has many shards)
    shard_ids: list[str] = []
    paginator = kinesis.get_paginator("list_shards")
    for page in paginator.paginate(StreamName=stream_name):
        shard_ids.extend(s["ShardId"] for s in page["Shards"])
    print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))")

    for shard_id in shard_ids:
        iterator_response = kinesis.get_shard_iterator(
            StreamName=stream_name,
            ShardId=shard_id,
            ShardIteratorType="TRIM_HORIZON",
        )
        shard_iterator = iterator_response["ShardIterator"]

        while shard_iterator:
            records_response = kinesis.get_records(
                ShardIterator=shard_iterator, Limit=100
            )
            shard_iterator = records_response.get("NextShardIterator")

            for record in records_response["Records"]:
                # boto3 decodes Base64 automatically; record["Data"] is bytes.
                payload = json.loads(record["Data"])

                # A record's "type" field identifies its structure.
                # "full": inlined record with before/after values.
                # "chunked": main record that references fragments for a split image.
                # "fragment": one piece of a chunked image; reassemble in production code.
                # For details, see cdc-record-format.html#cdc-oversized-records.
                record_type = payload.get("type", "full")
                if record_type == "fragment":
                    print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}")
                    continue

                source = payload["source"]
                op = payload["op"]
                ts_ns = source["ts_ns"]
                tx_id = source["txId"]
                table = f"{source['schema']}.{source['table']}"

                # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent
                # release will emit "u" for updates, and "c" for inserts. Design your
                # consumer to handle all three values; this map stays correct across the
                # transition.
                op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"}
                print(
                    f"[{op_labels.get(op, op)}] {table} "
                    f"txId={tx_id} ts_ns={ts_ns} type={record_type}"
                )
                if payload.get("after"):
                    print(f"  after:  {json.dumps(payload['after'])}")
                if payload.get("before"):
                    print(f"  before: {json.dumps(payload['before'])}")
                if record_type == "chunked":
                    print(f"  chunked: {json.dumps(payload['chunked'])}")

            if not records_response["Records"]:
                break  # No more records in this shard


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description="Consume DSQL CDC records from Kinesis"
    )
    parser.add_argument("--stream-name", required=True, help="Kinesis stream name")
    parser.add_argument("--region", required=True, help="AWS Region")
    args = parser.parse_args()
    consume_cdc(args.stream_name, args.region)
```

Ejecute el script :

```
pip install boto3
python consume_cdc.py \
  --stream-name {{my-cdc-stream}} \
  --region {{region}}
```

El script imprime cada evento de cambio a medida que se recibe. Se muestra una salida similar a la siguiente:

```
Reading from my-cdc-stream (4 shard(s))
[INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full
  after:  {"id": 1, "message": "hello cdc"}
```

**Incorporación de la desduplicación del último escritor gana**  
Dado que la CDC de Aurora DSQL utiliza la entrega al menos una vez, las aplicaciones de producción deben desduplicar y ordenar los registros. En el siguiente ejemplo de código se muestra un enfoque de marca de agua superior: para cada clave principal, realiza un seguimiento del valor más alto de `source.ts_ns` observado hasta el momento y descarta cualquier registro con una marca de tiempo igual o anterior. Establezca `PK_COLUMNS` con los nombres de las columnas de clave principal de la tabla que está procesando. Para ver las estrategias que permiten gestionar varias tablas o eliminaciones, consulte [Estrategias de consumidor](cdc-streams.md#cdc-consumer-strategies).

```
# Set PK_COLUMNS to the primary key column(s) of your table.
PK_COLUMNS = ["id"]

# Maps each primary key value to the highest ts_ns seen for that key.
high_water: dict[tuple, int] = {}

def process_record(payload: dict) -> bool:
    """Return True if the record is new, False if it's a duplicate or stale.

    Skip fragment records; reassemble them into a full image before calling this.
    """
    if payload.get("type") == "fragment":
        return False  # Fragments are reassembled upstream, not deduplicated here.

    source = payload["source"]
    ts_ns = source["ts_ns"]
    op = payload["op"]

    # For inserts/updates the row is in "after"; for deletes it's in "before".
    row = payload.get("after") or payload.get("before") or {}
    pk = tuple(row.get(col) for col in PK_COLUMNS)

    prev_ts = high_water.get(pk, -1)
    if ts_ns <= prev_ts:
        return False  # Duplicate or out-of-order record

    high_water[pk] = ts_ns
    return True
```

## Administración de flujos de CDC
<a name="cdc-manage-streams"></a>

**Lista de flujos**  
Para ver una lista todos los flujos de CDC de un clúster, utilice la siguiente operación `ListStreams`:

```
aws dsql list-streams \
  --cluster-identifier {{cluster-id}} \
  --region {{region}}
```

**Eliminación de un flujo**  
Para eliminar un flujo de CDC, ejecute el siguiente comando:

```
aws dsql delete-stream \
  --cluster-identifier {{cluster-id}} \
  --stream-identifier {{stream-id}} \
  --region {{region}}
```

Puede utilizar el esperador `StreamNotExists` para sondear `GetStream` hasta que se devuelva una `ResourceNotFoundException`, lo que indica que Aurora DSQL ha eliminado por completo el flujo.