Utiliser un sujet Kafka comme destination en cas d'échec - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser un sujet Kafka comme destination en cas d'échec

Vous pouvez configurer un sujet Kafka comme destination en cas d'échec pour vos mappages de sources d'événements Kafka. Lorsque Lambda ne parvient pas à traiter les enregistrements après avoir épuisé toutes les tentatives ou lorsque les enregistrements dépassent l'âge maximum, Lambda envoie les enregistrements ayant échoué à la rubrique Kafka spécifiée pour un traitement ultérieur.

Comment fonctionne une destination Kafka en cas de défaillance

Lorsque vous configurez un sujet Kafka comme destination en cas d'échec, Lambda agit en tant que producteur Kafka et écrit les enregistrements ayant échoué dans le sujet de destination. Cela crée un modèle de sujet mort (DLT) au sein de votre infrastructure Kafka.

  • Même exigence de cluster — Le sujet de destination doit exister dans le même cluster Kafka que vos sujets source.

  • Contenu réel des enregistrements : les destinations Kafka reçoivent les enregistrements défaillants réels ainsi que les métadonnées relatives aux échecs.

  • Prévention de la récursivité : Lambda empêche les boucles infinies en bloquant les configurations dans lesquelles les sujets source et destination sont identiques.

Configuration d'une destination Kafka en cas de panne

Vous pouvez configurer un sujet Kafka comme destination en cas d'échec lors de la création ou de la mise à jour d'un mappage de source d'événements Kafka.

Configuration d'une destination Kafka (console)

Pour configurer un sujet Kafka comme destination en cas d'échec (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom de votre fonction.

  3. Effectuez l’une des actions suivantes :

    • Pour ajouter un nouveau déclencheur Kafka, sous Vue d'ensemble des fonctions, choisissez Ajouter un déclencheur.

    • Pour modifier un déclencheur Kafka existant, choisissez-le, puis choisissez Modifier.

  4. Sous Paramètres supplémentaires, pour Destination en cas d'échec, choisissez le sujet Kafka.

  5. Dans le champ Nom du sujet, entrez le nom du sujet Kafka auquel vous souhaitez envoyer les enregistrements ayant échoué.

  6. Choisissez Ajouter ou Enregistrer.

Configuration d'une destination Kafka ()AWS CLI

Utilisez le kafka:// préfixe pour spécifier un sujet Kafka comme destination en cas d'échec.

Création d'un mappage des sources d'événements avec la destination Kafka

L'exemple suivant crée un mappage de source d'événement Amazon MSK avec un sujet Kafka comme destination en cas d'échec :

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Pour Kafka autogéré, utilisez la même syntaxe :

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Mettre à jour une destination Kafka

Utilisez la update-event-source-mapping commande pour ajouter ou modifier une destination Kafka :

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Format d'enregistrement pour une destination Kafka

Lorsque Lambda envoie des enregistrements ayant échoué à un sujet Kafka, chaque message contient à la fois des métadonnées relatives à l'échec et le contenu réel de l'enregistrement.

Métadonnées relatives aux défaillances

Les métadonnées incluent des informations sur les raisons de l'échec de l'enregistrement et des détails sur le lot d'origine :

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

Comportement des clés de partition

Lambda utilise la même clé de partition que celle de l'enregistrement d'origine lors de la production vers le sujet de destination. Si l'enregistrement d'origine ne contenait aucune clé, Lambda utilise le partitionnement circulaire par défaut de Kafka entre toutes les partitions disponibles dans le sujet de destination.

Exigences et limitations

  • Mode provisionné requis — Une destination Kafka en cas de défaillance n'est disponible que pour les mappages de sources d'événements avec le mode provisionné activé.

  • Même cluster uniquement — Le sujet de destination doit exister dans le même cluster Kafka que vos sujets source.

  • Autorisations de sujet : le mappage de la source de votre événement doit disposer d'autorisations d'écriture sur le sujet de destination. Exemple :

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Aucune récursivité : le nom du sujet de destination ne peut pas être le même que celui de votre sujet source.