Introducción a los flujos de CDC
importante
Esta característica se ofrece como versión preliminar de AWS y está sujeta a cambios. Para obtener más información, consulte la sección 2, Betas y versiones preliminares, del documento Términos de servicio de AWS
Antes de la disponibilidad general, añadiremos nuevos tipos de operaciones ("op": "u" para actualizaciones) a la carga útil de su flujo. Para garantizar que su aplicación gestione estos cambios sin necesidad de modificaciones, trate cualquier valor op no reconocido como una operación upsert aplicando la carga útil after. Para obtener más información, consulte Descripción de los registros de CDC.
Esta guía explica todos los pasos necesarios para comenzar a transmitir los cambios confirmados en las filas desde un clúster de Aurora DSQL a un flujo de datos de Amazon Kinesis. Al finalizar esta guía, habrá creado una canalización de CDC operativa y un script de Python que lee y muestra los registros de cambios.
Requisitos previos
Antes de comenzar, confirme lo siguiente:
-
Ha creado un clúster de Aurora DSQL en estado
ACTIVE. Si el clúster está inactivo, conéctese a él con cualquier cliente compatible con PostgreSQL para activarlo antes de crear un flujo de CDC.CreateStreamdevuelve un error de validación si el clúster no se encuentra en estadoACTIVE. -
Aurora DSQL requiere que todos los recursos de CDC —el clúster, el flujo de datos de Amazon Kinesis, el rol de servicio de IAM y la entidad principal de llamada— se encuentren en la misma cuenta de AWS.
-
El flujo de datos de Amazon Kinesis se encuentra en la misma región de AWS que el clúster de Aurora DSQL.
-
Ha instalado y configurado la AWS CLI con credenciales que tienen permiso para crear roles de IAM y flujos de datos de Amazon Kinesis.
Paso 1: Creación de un flujo de datos de Amazon Kinesis
El flujo de datos de Amazon Kinesis se encuentra en la misma cuenta y región de AWS que el clúster de Aurora DSQL. Los registros de CDC son más grandes que la fila de Aurora DSQL correspondiente porque el formato JSON incluye nombres de columnas, metadatos y sobrecarga de codificación.
Tamaño del flujo de datos de Kinesis
La CDC de Aurora DSQL entrega la fila completa en cada cambio. Una actualización que afecta a una sola columna genera un registro que contiene todas las columnas de la fila. Los registros de eliminación son la excepción: solo incluyen las columnas de clave principal.
Cálculo del tamaño medio de los registros
Mida el tamaño medio de las filas en disco para conocer el volumen que generará el CDC y anticipar registros de un tamaño excesivo. La siguiente consulta devuelve el tamaño medio de las tuplas en bytes para una tabla:
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
El sobre del registro de CDC añade los nombres de las columnas, los metadatos y la sobrecarga de codificación además al tamaño de la fila. Para conocer el formato de registro exacto, consulte Carga útil de registro. Para saber cómo gestiona Aurora DSQL los registros que superan el límite de tamaño de los registros de Kinesis, consulte Gestión de los registros sobredimensionados. Para conocer el conjunto completo de límites del servicio Kinesis, consulte Cuotas y límites de Amazon Kinesis Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams.
importante
Al crear el flujo de datos de Kinesis, establezca lo siguiente:
-
MaxRecordSizeInKiBen10240(10 MiB). El tamaño máximo predeterminado de Kinesis de 1 MiB no siempre es suficiente para los registros de CDC de Aurora DSQL. Cualquier registro que supere el tamaño de registro de Kinesis configurado provoca que el flujo de CDC se vea afectado por el errorKINESIS_OVERSIZE_RECORD. Aurora DSQL divide los registros de gran tamaño en fragmentos que pueden alcanzar los 10 MiB cada uno, por lo que el flujo de datos de Kinesis debe aceptar registros de ese tamaño. Para obtener más información, consulte Gestión de los registros sobredimensionados. -
StreamModeaON_DEMAND. El modo bajo demanda escala la capacidad de las particiones automáticamente y le protege contra un aprovisionamiento insuficiente durante picos inesperados. Kinesis puede seguir devolviendoWriteProvisionedThroughputExceededdurante ráfagas bruscas de segundos a medida que la capacidad se escala verticalmente. Planifique eventos de limitación breves.
Cree alarmas de CloudWatch para IncomingBytes y WriteProvisionedThroughputExceeded en el espacio de nombres de AWS/Kinesis. La limitación de Kinesis ralentiza la entrega de CDC y aumenta el retardo de la replicación. Para obtener información sobre métricas y alarmas de Aurora DSQL, consulte Prácticas recomendadas de supervisión.
El siguiente ejemplo utiliza AWS CLI. Si su versión de la AWS CLI no admite el parámetro --max-record-size-in-ki-b, utilice un AWS SDK para llamar a la operación CreateStream de Kinesis.
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
Espere a que un flujo se active:
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
El comando devuelve "ACTIVE" cuando el flujo está listo.
Anote el ARN del flujo que aparece en la salida. Lo necesitará en los siguientes pasos. El ARN tiene el formato arn:aws:kinesis:.region:account-id:stream/my-cdc-stream
Paso 2: Creación de un rol de IAM para Aurora DSQL
Aurora DSQL asume el rol de IAM para escribir registros de CDC en su flujo de datos de Kinesis. En este paso, creará el rol con una política de confianza y le asociará una política de permisos. Para obtener una explicación completa de cada elemento de la política, consulte Configuración de IAM.
Creación de un archivo de política de confianza
Guarde el siguiente JSON como trust-policy.json. Sustituya los valores de your-account-id, region y cluster-id por los suyos.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Crear el rol
Ejecute el siguiente comando de para crear el rol de IAM:
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
Creación de un archivo de política de permisos
Guarde el siguiente JSON como permissions-policy.json. Sustituya los valores de marcador de posición por su ARN de flujo de datos de Kinesis. La instrucción KMSAccess solo es necesaria si el flujo de datos de Kinesis utiliza una clave administrada por el cliente de AWS KMS, pero puede incluirla de forma preventiva para que, si añade una clave administrada por el cliente más adelante, no se vea afectado su flujo de CDC. Para obtener una explicación completa de cada condición, consulte Política de permisos de roles de servicio.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Asociación de la política de permisos
Use el siguiente comando:
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
Anote el ARN de rol que aparece en la salida de create-role. El ARN tiene el formato arn:aws:iam::.your-account-id:role/dsql-cdc-role
Paso 3: Creación del flujo de CDC
Utilice la AWS CLI para crear un flujo de CDC que conecte su clúster de Aurora DSQL con el flujo de datos de Kinesis. Sustituya los valores de los marcadores de posición por el ARN del flujo de Kinesis del paso 1, el ARN del rol de IAM del paso 2 y el identificador de su clúster.
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
La respuesta incluye un identificador de flujo y un estado de CREATING. La creación del flujo suele tardar de uno a tres minutos.
Espera a la activación del flujo
Sondee el estado del flujo hasta que sea ACTIVE:
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
También puede utilizar el esperador StreamActive de los AWS SDK para realizar el sondeo de forma automática.
Una vez que el flujo alcance el estado ACTIVE, Aurora DSQL comenzará a enviar los cambios confirmados en las fila al flujo de datos de Kinesis.
nota
Cada clúster de Aurora DSQL tiene un número máximo de flujos de CDC. Si alcanza este límite, CreateStream devuelve una ServiceQuotaExceededException. Para conocer el límite predeterminado, consulte Cuotas y límites.
Paso 4: Comprobación del flujo de registros
Inserte una fila en una tabla del clúster de Aurora DSQL. Por ejemplo:
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
Lea el flujo de datos de Kinesis para comprobar que ha llegado el registro de CDC:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
El campo Data de cada registro contiene una carga útil JSON. Cuando se utiliza la AWS CLI, la carga útil está codificada en Base64 en la respuesta. Cuando usa el boto3 SDK, el SDK lo descodifica automáticamente. El JSON descodificado tiene el siguiente aspecto:
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
Para obtener una descripción completa de cada uno de los campos, consulte Descripción de los registros de CDC.
Paso 5: Consumición de registros con un script de Python
El siguiente script de Python lee los registros de CDC de un flujo de datos de Kinesis e imprime cada evento de cambio. El script utiliza el cliente boto3 de Amazon Kinesis para iterar en las particiones y descodificar cada registro. Dado que la CDC de Aurora DSQL utiliza la entrega al menos una vez, es posible que el script muestre el mismo registro más de una vez.
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
Ejecute el script :
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
El script imprime cada evento de cambio a medida que se recibe. Se muestra una salida similar a la siguiente:
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
Incorporación de la desduplicación del último escritor gana
Dado que la CDC de Aurora DSQL utiliza la entrega al menos una vez, las aplicaciones de producción deben desduplicar y ordenar los registros. En el siguiente ejemplo de código se muestra un enfoque de marca de agua superior: para cada clave principal, realiza un seguimiento del valor más alto de source.ts_ns observado hasta el momento y descarta cualquier registro con una marca de tiempo igual o anterior. Establezca PK_COLUMNS con los nombres de las columnas de clave principal de la tabla que está procesando. Para ver las estrategias que permiten gestionar varias tablas o eliminaciones, consulte Estrategias de consumidor.
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
Administración de flujos de CDC
Lista de flujos
Para ver una lista todos los flujos de CDC de un clúster, utilice la siguiente operación ListStreams:
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
Eliminación de un flujo
Para eliminar un flujo de CDC, ejecute el siguiente comando:
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
Puede utilizar el esperador StreamNotExists para sondear GetStream hasta que se devuelva una ResourceNotFoundException, lo que indica que Aurora DSQL ha eliminado por completo el flujo.