Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Configuration des contrôles de gestion des erreurs pour les sources d'événements Kafka
Vous pouvez configurer la façon dont Lambda gère les erreurs et les nouvelles tentatives pour vos mappages de sources d'événements Kafka. Ces configurations vous aident à contrôler la manière dont Lambda traite les enregistrements ayant échoué et gère le comportement des nouvelles tentatives.
Configurations de nouvelle tentative disponibles
Les configurations de nouvelle tentative suivantes sont disponibles pour Amazon MSK et les sources d'événements Kafka autogérées :
-
Nombre maximal de tentatives : nombre maximal de tentatives de Lambda lorsque votre fonction renvoie une erreur. Cela ne compte pas la tentative d'invocation initiale. La valeur par défaut est -1 (infini).
-
Age maximum d'enregistrement : âge maximum d'un enregistrement que Lambda envoie à votre fonction. La valeur par défaut est -1 (infini).
-
Fractionner le lot en cas d'erreur : lorsque votre fonction renvoie une erreur, divisez le lot en deux lots plus petits et réessayez chacun séparément. Cela permet d'isoler les enregistrements problématiques.
-
Réponse partielle par lot : autorisez votre fonction à renvoyer des informations sur les enregistrements d'un lot dont le traitement a échoué, afin que Lambda puisse réessayer uniquement les enregistrements ayant échoué.
Configuration des contrôles de gestion des erreurs (console)
Vous pouvez configurer le comportement des nouvelles tentatives lors de la création ou de la mise à jour d'un mappage de source d'événements Kafka dans la console Lambda.
Pour configurer le comportement des nouvelles tentatives pour une source d'événements Kafka (console)
-
Ouvrez la page Functions
(Fonctions) de la console Lambda. -
Choisissez le nom de votre fonction.
-
Effectuez l’une des actions suivantes :
-
Pour ajouter un nouveau déclencheur Kafka, sous Vue d'ensemble des fonctions, choisissez Ajouter un déclencheur.
-
Pour modifier un déclencheur Kafka existant, choisissez-le, puis choisissez Modifier.
-
-
Sous Configuration du sondeur d'événements, sélectionnez le mode provisionné pour configurer les contrôles de gestion des erreurs :
-
Pour Réessayer, entrez le nombre maximum de tentatives (0 à 10 000, ou -1 pour un nombre infini).
-
Pour Age maximum d'enregistrement, entrez l'âge maximum en secondes (60-604800, ou -1 pour un nombre infini).
-
Pour activer le fractionnement par lots en cas d'erreur, sélectionnez Fractionner le lot en cas d'erreur.
-
Pour activer la réponse partielle par lots, sélectionnez ReportBatchItemFailures.
-
-
Choisissez Ajouter ou Enregistrer.
Configuration du comportement des nouvelles tentatives ()AWS CLI
Utilisez les AWS CLI commandes suivantes pour configurer le comportement des nouvelles tentatives pour vos mappages de sources d'événements Kafka.
Création d'un mappage de sources d'événements avec des configurations de nouvelle tentative
L'exemple suivant crée un mappage de source d'événements Kafka autogéré avec des contrôles de gestion des erreurs :
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Pour les sources d'événements Amazon MSK :
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Mise à jour des configurations de nouvelle tentative
Utilisez la update-event-source-mapping commande pour modifier les configurations de nouvelle tentative pour un mappage de source d'événement existant :
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
PartialBatchResponse
La réponse partielle par lots, également connue sous le nom de ReportBatchItemFailures, est une fonctionnalité clé pour la gestion des erreurs dans l'intégration de Lambda aux sources Kafka. Sans cette fonctionnalité, lorsqu'une erreur survient dans l'un des éléments d'un lot, cela entraîne le retraitement de tous les messages de ce lot. Lorsque la réponse par lots partielle est activée et implémentée, le gestionnaire renvoie des identifiants uniquement pour les messages ayant échoué, ce qui permet à Lambda de réessayer uniquement ces éléments spécifiques. Cela permet de mieux contrôler le traitement des lots contenant des messages d'échec.
Pour signaler des erreurs par lots, vous allez utiliser le schéma JSON suivant :
{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
Important
Si vous renvoyez un fichier JSON valide vide ou nul, le mappage de la source d'événements considérera qu'un lot a été traité avec succès. Tout topic-partition_number ou offset non valide renvoyé qui n'était pas présent dans l'événement invoqué sera considéré comme un échec et le lot entier sera réessayé.
Les exemples de code suivants montrent comment implémenter une réponse par lots partielle pour les fonctions Lambda qui reçoivent des événements provenant de sources Kafka. La fonction signale les défaillances échecs d’articles par lots dans la réponse, en indiquant à Lambda de réessayer ces messages ultérieurement.
Voici une implémentation du gestionnaire Lambda en Python qui illustre cette approche :
import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass
Voici une version de Node.js :
const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };