Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erste Schritte mit CDC-Streams
Wichtig
Diese Funktion wird als AWS Vorversion bereitgestellt und kann sich ändern. Weitere Informationen finden Sie in Abschnitt 2, Betas und Vorschauen, in den AWS Servicebedingungen
Vor der allgemeinen Verfügbarkeit werden wir Ihrer Stream-Payload neue Operationstypen ("op": "u"für Updates) hinzufügen. Um sicherzustellen, dass Ihre Anwendung diese Änderungen unverändert verarbeitet, behandeln Sie jeden unbekannten op Wert als Fehler, indem Sie die Payload übernehmen. after Details dazu finden Sie unter CDC-Datensätze verstehen.
Dieser Leitfaden führt Sie durch jeden Schritt, der erforderlich ist, um mit dem Streaming festgeschriebener Änderungen auf Zeilenebene von einem Aurora DSQL-Cluster in einen Amazon Kinesis Kinesis-Datenstream zu beginnen. Am Ende dieses Handbuchs haben Sie eine funktionierende CDC-Pipeline und ein Python-Skript erstellt, das Änderungsdatensätze liest und druckt.
Voraussetzungen
Bevor Sie beginnen, überprüfen Sie Folgendes:
-
Sie haben einen Aurora DSQL-Cluster im
ACTIVEStatus erstellt. Wenn sich Ihr Cluster im Leerlauf befindet, stellen Sie mit einem beliebigen PostgreSQL-compatible Client eine Verbindung zu ihm her, um ihn aufzuwecken, bevor Sie einen CDC-Stream erstellen.CreateStreamgibt einen Validierungsfehler zurück, wenn sich der Cluster nicht imACTIVEStatus befindet. -
Aurora DSQL erfordert, dass sich alle CDC-Ressourcen — der Cluster, der Amazon Kinesis-Datenstream, die IAM-Servicerolle und der aufrufende Principal — im selben Konto befinden. AWS
-
Ihr Amazon Kinesis Kinesis-Datenstream befindet sich in derselben AWS Region wie Ihr Aurora DSQL-Cluster.
-
Sie haben die AWS CLI mit Anmeldeinformationen installiert und konfiguriert, die berechtigt sind, IAM-Rollen und Amazon Kinesis Kinesis-Datenstreams zu erstellen.
Schritt 1: Erstellen Sie einen Amazon Kinesis Kinesis-Datenstream
Erstellen Sie einen Kinesis-Datenstream in demselben AWS Konto und derselben Region wie Ihr Aurora DSQL-Cluster. CDC-Datensätze sind größer als die entsprechende Aurora DSQL-Zeile, da das JSON-Format Spaltennamen, Metadaten und Kodierungsaufwand beinhaltet.
Dimensionierung des Kinesis-Datenstroms
Aurora DSQL CDC liefert bei jeder Änderung die vollständige Zeile. Eine Aktualisierung, die sich auf eine einzelne Spalte bezieht, erzeugt einen Datensatz, der alle Spalten in der Zeile enthält. Löschdatensätze sind die Ausnahme — sie enthalten nur die Primärschlüsselspalten.
Schätzen Sie die durchschnittliche Datensatzgröße
Messen Sie die durchschnittliche Zeilengröße auf der Festplatte, um das Volumen zu ermitteln, das CDC produzieren wird, und um zu große Datensätze zu antizipieren. Die folgende Abfrage gibt die durchschnittliche Tupelgröße in Byte für eine Tabelle zurück:
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
Der CDC-Datensatzumschlag fügt zusätzlich zur Zeilengröße Spaltennamen, Metadaten und Kodierungsaufwand hinzu. Das genaue Datensatzformat finden Sie unterNutzlast aufzeichnen. Informationen darüber, wie Aurora DSQL mit Datensätzen umgeht, die die Kinesis-Datensatzgrößenbeschränkung überschreiten, finden Sie unter. Umgang mit übergroßen Datensätzen Den vollständigen Satz der Kinesis-Servicebeschränkungen finden Sie unter Amazon Kinesis Data Streams-Kontingente und -Limits im Amazon Kinesis Data Streams Developer Guide.
Wichtig
Wenn Sie den Kinesis-Datenstream erstellen, stellen Sie Folgendes ein:
-
MaxRecordSizeInKiBbis10240(10 MiB). Das standardmäßige Kinesis-Maximum von 1 MiB ist nicht immer groß genug für Aurora DSQL-CDC-Datensätze. Jeder Datensatz, der die konfigurierte Kinesis-Datensatzgröße überschreitet, führt zu einer Beeinträchtigung des CDC-Streams mit.KINESIS_OVERSIZE_RECORDAurora DSQL teilt übergroße Datensätze in Fragmente auf, die sich jeweils bis zu 10 MiB nähern können, sodass der Kinesis-Datenstrom Datensätze dieser Größe akzeptieren muss. Details hierzu finden Sie unter Umgang mit übergroßen Datensätzen. -
StreamModezuON_DEMAND. On-demand Der Modus skaliert die Shard-Kapazität automatisch und schützt Sie vor unzureichender Bereitstellung bei unerwarteten Leistungsspitzen. Kinesis kann immer nochWriteProvisionedThroughputExceededbei kurzen Ausbrüchen im Sekundenbereich zurückkehren, wenn die Kapazität steigt. Planen Sie kurze Drosselungen ein.
Erstellen Sie CloudWatch Alarme im IncomingBytes und WriteProvisionedThroughputExceeded im Namespace. AWS/Kinesis Kinesis-Drosselung verlangsamt die CDC-Übertragung und erhöht die Replikationsverzögerung. DSQL-sideAurora-Metriken und Hinweise zu Alarmen finden Sie unterÜberwachung bewährter Verfahren.
Das folgende Beispiel verwendet die AWS CLI. Wenn Ihre AWS CLI Version den --max-record-size-in-ki-b Parameter nicht unterstützt, verwenden Sie ein AWS SDK, um die CreateStreamKinesis-Operation aufzurufen.
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
Warten Sie, bis der Stream aktiv wird:
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
Der Befehl kehrt zurück"ACTIVE", wenn der Stream bereit ist.
Nehmen Sie den Stream-ARN von der Ausgabe auf. Sie benötigen ihn in den folgenden Schritten. Der ARN hat das Formatarn:aws:kinesis:.region:account-id:stream/my-cdc-stream
Schritt 2: Erstellen Sie eine IAM-Rolle für Aurora DSQL
Aurora DSQL übernimmt eine IAM-Rolle, um CDC-Datensätze in Ihren Kinesis-Datenstrom zu schreiben. In diesem Schritt erstellen Sie die Rolle mit einer Vertrauensrichtlinie und fügen eine Berechtigungsrichtlinie hinzu. Eine vollständige Erläuterung der einzelnen Richtlinienelemente finden Sie unterIAM konfigurieren.
Erstellen Sie die Vertrauensrichtlinien-Datei
Speichern Sie den folgenden JSON-Code untertrust-policy.json. Ersetzen Sie your-account-idregion, und cluster-id durch Ihre Werte.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Erstellen der Rolle
Führen Sie den folgenden --Befehl aus, um die IAM-Rolle zu erstellen:
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
Erstellen Sie die Datei mit den Berechtigungsrichtlinien
Speichern Sie den folgenden JSON-Code unterpermissions-policy.json. Ersetzen Sie die Platzhalterwerte durch Ihren Kinesis-Datenstream-ARN. Die KMSAccess Anweisung ist nur erforderlich, wenn Ihr Kinesis-Datenstream einen vom AWS KMS Kunden verwalteten Schlüssel verwendet. Sie können ihn jedoch präventiv einbeziehen, sodass das Hinzufügen eines vom Kunden verwalteten Schlüssels zu einem späteren Zeitpunkt Ihren CDC-Stream nicht unterbricht. Eine vollständige Erläuterung der einzelnen Bedingungen finden Sie unter. Richtlinie für Berechtigungen für Servicerollen
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Hängen Sie die Berechtigungsrichtlinie an
Führen Sie den folgenden Befehl aus:
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
Notieren Sie den Rollen-ARN aus der create-role Ausgabe. Der ARN hat das Formatarn:aws:iam::.your-account-id:role/dsql-cdc-role
Schritt 3: Erstellen Sie den CDC-Stream
Verwenden Sie den AWS CLI , um einen CDC-Stream zu erstellen, der Ihren Aurora DSQL-Cluster mit dem Kinesis-Datenstream verbindet. Ersetzen Sie die Platzhalterwerte durch den Kinesis-Stream-ARN aus Schritt 1, den IAM-Rollen-ARN aus Schritt 2 und Ihre Cluster-ID.
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
Die Antwort enthält eine Stream-ID und den Status. CREATING Die Erstellung eines Streams dauert in der Regel ein bis drei Minuten.
Warten Sie, bis der Stream aktiv wird
Fragen Sie den Stream-Status ab, bis er folgende Werte erreichtACTIVE:
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
Sie können auch den StreamActive Kellner in den AWS SDKs verwenden, um automatisch eine Umfrage durchzuführen.
Sobald der Stream erreicht istACTIVE, beginnt Aurora DSQL mit der Übertragung festgeschriebener Änderungen auf Zeilenebene an Ihrem Kinesis-Datenstream.
Anmerkung
Jeder Aurora DSQL-Cluster hat eine maximale Anzahl von CDC-Streams. Wenn Sie dieses Limit erreichen, wird a CreateStream zurückgegeben. ServiceQuotaExceededException Informationen zum Standardlimit finden Sie unter Kontingente und Limits.
Schritt 4: Stellen Sie sicher, dass die Datensätze fließen
Fügen Sie eine Zeile in eine Tabelle auf Ihrem Aurora DSQL-Cluster ein. Beispiel:
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
Lesen Sie aus dem Kinesis-Datenstrom, um zu überprüfen, ob der CDC-Datensatz eingetroffen ist:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
Das Data Feld jedes Datensatzes enthält eine JSON-Nutzlast. Wenn Sie die verwenden AWS CLI, ist die Nutzlast Base64-encoded in der Antwort enthalten. Wenn Sie das boto3 SDK verwenden, dekodiert das SDK es automatisch. Das dekodierte JSON sieht wie folgt aus:
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
Eine vollständige Beschreibung der einzelnen Felder finden Sie unterCDC-Datensätze verstehen.
Schritt 5: Datensätze mit einem Python-Skript konsumieren
Das folgende Python-Skript liest CDC-Datensätze aus einem Kinesis-Datenstream und druckt jedes Änderungsereignis. Das Skript verwendet den boto3 Amazon Kinesis Kinesis-Client, um über Shards zu iterieren und jeden Datensatz zu dekodieren. Da Aurora DSQL CDC mindestens einmal zugestellt wird, druckt das Skript denselben Datensatz möglicherweise mehrmals.
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
Führen Sie das Skript aus:
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
Das Skript druckt jedes Änderungsereignis, sobald es eingeht. Die Ausgabe entspricht weitgehend der folgenden:
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
Last-Writer-Wins-Deduplizierung hinzufügen
Da Aurora DSQL CDC mindestens einmal geliefert wird, sollten Produktions-Apps Datensätze deduplizieren und ordnen. Das folgende Codebeispiel zeigt einen Ansatz mit hohen Wasserwerten: Für jeden Primärschlüssel wird der höchste Wert erfasst, der bisher source.ts_ns gesehen wurde, und alle Datensätze mit einem gleichen oder einem früheren Zeitstempel werden verworfen. Geben Sie PK_COLUMNS die Primärschlüsselspaltennamen der Tabelle ein, die Sie verarbeiten. Strategien, die mehrere Tabellen oder Löschungen verarbeiten, finden Sie unterStrategien für Verbraucher.
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
CDC-Streams verwalten
Streams auflisten
Verwenden Sie den folgenden ListStreams Vorgang, um alle CDC-Streams für einen Cluster aufzulisten:
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
Löschen eines Streams
Führen Sie den folgenden Befehl aus, um einen CDC-Stream zu löschen:
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
Sie können den StreamNotExists Kellner für die Abfrage verwenden, GetStream bis a zurückgegeben ResourceNotFoundException wird, was darauf hinweist, dass Aurora DSQL den Stream vollständig gelöscht hat.