Ein Kafka-Thema als Ziel für den Fall eines Fehlers verwenden - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Ein Kafka-Thema als Ziel für den Fall eines Fehlers verwenden

Sie können ein Kafka-Thema als Ziel für den Fall eines Fehlers für Ihre Quellenzuordnungen für Kafka-Ereignisse konfigurieren. Wenn Lambda Datensätze nach anstrengenden Wiederholungsversuchen nicht verarbeiten kann oder wenn Datensätze das Höchstalter überschreiten, sendet Lambda die fehlgeschlagenen Datensätze zur späteren Verarbeitung an das angegebene Kafka-Thema.

So funktioniert ein Kafka-Ziel für den Fall eines Fehlers

Wenn Sie ein Kafka-Thema als Ziel für den Fall eines Fehlers konfigurieren, fungiert Lambda als Kafka-Producer und schreibt fehlgeschlagene Datensätze in das Zielthema. Auf diese Weise entsteht in Ihrer Kafka-Infrastruktur ein Muster mit veralteten Themen (DLT).

  • Gleiche Clusteranforderung — Das Zielthema muss im selben Kafka-Cluster wie Ihre Quellthemen vorhanden sein.

  • Tatsächlicher Datensatzinhalt — Kafka-Ziele erhalten die tatsächlich fehlgeschlagenen Datensätze zusammen mit den Fehlermetadaten.

  • Rekursionsprävention — Lambda verhindert Endlosschleifen, indem es Konfigurationen blockiert, bei denen Quell- und Zielthemen identisch sind.

Konfiguration eines Kafka-Ziels für den Fall eines Fehlers

Sie können ein Kafka-Thema als Ziel für den Fall eines Fehlers konfigurieren, wenn Sie eine Kafka-Ereignisquellenzuordnung erstellen oder aktualisieren.

Konfiguration eines Kafka-Ziels (Konsole)

Um ein Kafka-Thema als Ziel für den Fall eines Fehlers zu konfigurieren (Konsole)
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie den Namen Ihrer Funktion.

  3. Führen Sie eine der folgenden Aktionen aus:

    • Um einen neuen Kafka-Trigger hinzuzufügen, wählen Sie unter Funktionsübersicht die Option Trigger hinzufügen aus.

    • Um einen vorhandenen Kafka-Trigger zu ändern, wählen Sie den Auslöser aus und klicken Sie dann auf Bearbeiten.

  4. Wählen Sie unter Zusätzliche Einstellungen für Ziel bei einem Fehler die Option Kafka-Thema aus.

  5. Geben Sie als Themenname den Namen des Kafka-Themas ein, an das Sie fehlerhafte Datensätze senden möchten.

  6. Wählen Sie Hinzufügen oder Speichern.

Konfiguration eines Kafka-Ziels ()AWS CLI

Verwenden Sie das kafka:// Präfix, um ein Kafka-Thema als Ziel für den Fall eines Fehlers anzugeben.

Erstellen einer Ereignisquellenzuordnung mit einem Kafka-Ziel

Im folgenden Beispiel wird eine Amazon MSK-Ereignisquellenzuordnung mit einem Kafka-Thema als Ziel für den Fall eines Fehlers erstellt:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Verwenden Sie für selbstverwaltetes Kafka dieselbe Syntax:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Aktualisierung einer Kafka-Destination

Verwenden Sie den update-event-source-mapping Befehl, um ein Kafka-Ziel hinzuzufügen oder zu ändern:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Datensatzformat für ein Kafka-Ziel

Wenn Lambda fehlgeschlagene Datensätze an ein Kafka-Thema sendet, enthält jede Nachricht sowohl Metadaten über den Fehler als auch den eigentlichen Datensatzinhalt.

Metadaten für den Fehler

Die Metadaten enthalten Informationen darüber, warum der Datensatz fehlgeschlagen ist, und Details zum ursprünglichen Batch:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

Verhalten der Partitionsschlüssel

Lambda verwendet beim Produzieren zum Zielthema denselben Partitionsschlüssel wie im Originaldatensatz. Wenn der ursprüngliche Datensatz keinen Schlüssel hatte, verwendet Lambda die standardmäßige Round-Robin-Partitionierung von Kafka für alle verfügbaren Partitionen im Zielthema.

Anforderungen und Einschränkungen

  • Bereitgestellter Modus erforderlich — Ein Kafka-Ziel für den Fall eines Fehlers ist nur für Zuordnungen von Ereignisquellen verfügbar, bei denen der Bereitstellungsmodus aktiviert ist.

  • Nur derselbe Cluster — Das Zielthema muss im selben Kafka-Cluster wie Ihre Quellthemen vorhanden sein.

  • Themenberechtigungen — Ihre Zuordnung zur Ereignisquelle muss über Schreibberechtigungen für das Zielthema verfügen. Beispiel:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Keine Rekursion — Der Name des Zielthemas darf nicht mit einem Ihrer Quellthemennamen identisch sein.