Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exemples et cas d'utilisation
Les fonctions durables de Lambda vous permettent de créer des applications en plusieurs étapes tolérantes aux pannes en utilisant des opérations durables telles que les étapes et les attentes. Grâce au point de contrôle automatique et à un modèle de réexécution des points de contrôle, dans lequel l'exécution redémarre depuis le début après un échec mais ignore les points de contrôle terminés, vos fonctions peuvent se rétablir après un échec et reprendre l'exécution sans perdre leur progression.
Procédés de courte durée tolérants aux pannes
Utilisez des fonctions durables pour créer des opérations fiables qui se terminent généralement en quelques minutes. Bien que ces processus soient plus courts que les flux de travail de longue durée, ils bénéficient tout de même du point de contrôle automatique et de la tolérance aux pannes dans les systèmes distribués. Des fonctions durables garantissent le succès de vos processus en plusieurs étapes, même en cas d'échec d'appels de service individuels, sans nécessiter de gestion des erreurs complexe ni de code de gestion des états.
Les scénarios courants incluent les systèmes de réservation d'hôtels, les plateformes de réservation de restaurants, les demandes de voyages en covoiturage, les achats de billets pour des événements et les mises à niveau d'abonnements SaaS. Ces scénarios présentent des caractéristiques communes : plusieurs appels de service qui doivent être exécutés simultanément, la nécessité d'une nouvelle tentative automatique en cas d'échec transitoire et la nécessité de maintenir un état cohérent sur l'ensemble des systèmes distribués.
Transactions distribuées sur des microservices
Coordonnez les paiements, les stocks et les expéditions entre plusieurs services grâce à l'annulation automatique en cas de panne. Chaque opération de service est encapsulée en une étape, ce qui garantit que la transaction peut être rétablie à tout moment en cas de défaillance d'un service.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
Si une étape échoue, la fonction réessaie automatiquement à partir du dernier point de contrôle réussi. La réservation d'inventaire est maintenue même si le traitement du paiement échoue temporairement. Lorsque la fonction essaie à nouveau, elle ignore l'étape d'inventaire terminée et passe directement au traitement des paiements. Cela élimine les réservations dupliquées et garantit un état cohérent dans l'ensemble de votre système distribué.
Traitement des commandes en plusieurs étapes
Traitez les commandes par le biais de la validation, de l'autorisation de paiement, de l'allocation des stocks et de l'exécution avec une nouvelle tentative et une reprise automatiques. Chaque étape fait l'objet d'un point de contrôle, ce qui garantit que la commande progresse même si les étapes individuelles échouent et que vous réessayez.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
Ce modèle garantit que les commandes ne restent jamais bloquées dans des états intermédiaires. Si la validation échoue, la commande est rejetée avant l'autorisation de paiement. Si l'autorisation de paiement échoue, le stock n'est pas alloué. Chaque étape s'appuie sur la précédente avec une nouvelle tentative et une restauration automatiques.
Remarque
La vérification conditionnelle if (!validation.itemsValid) se situe en dehors d'une étape et sera réexécutée pendant la rediffusion. C'est sûr car c'est déterministe : cela produit toujours le même résultat pour le même objet de validation.
Procédés de longue durée
Utilisez des fonctions durables pour les processus qui s'étendent sur des heures, des jours ou des semaines. Les opérations d'attente suspendent l'exécution sans frais de calcul, ce qui permet de rentabiliser les processus de longue durée. Pendant les périodes d'attente, votre fonction cesse de fonctionner et Lambda recycle l'environnement d'exécution. Au moment de reprendre, Lambda invoque à nouveau votre fonction et la rejoue depuis le dernier point de contrôle.
Ce modèle d'exécution rend les fonctions durables idéales pour les processus qui doivent être interrompus pendant de longues périodes, qu'il s'agisse d'attendre des décisions humaines, des réponses du système externe, des fenêtres de traitement planifiées ou des retards temporels. Vous ne payez que pour le temps de calcul actif, et non pour le temps d'attente.
Les scénarios courants incluent les processus d'approbation des documents, le traitement par lots planifié, les processus d'intégration sur plusieurs jours, les processus d'essai d'abonnement et les systèmes de notification différée. Ces scénarios présentent des caractéristiques communes : des délais d'attente prolongés mesurés en heures ou en jours, la nécessité de maintenir l'état d'exécution pendant ces temps d'attente et des exigences liées aux coûts dans lesquelles le paiement pour le temps de calcul inactif est prohibitif.
Human-in-the-loop approbations
Suspendez l'exécution pour les révisions de documents, les approbations ou les décisions tout en maintenant l'état d'exécution. La fonction attend les rappels externes sans consommer de ressources et reprend automatiquement lorsque l'approbation est reçue.
Ce modèle est essentiel pour les processus qui nécessitent un jugement humain ou une validation externe. La fonction est suspendue au point de rappel, sans frais de calcul pendant l'attente. Lorsque quelqu'un soumet sa décision via l'API, Lambda invoque à nouveau votre fonction et rejoue depuis le point de contrôle, en continuant avec le résultat de l'approbation.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
Lorsque le rappel est reçu et que votre fonction reprend, elle est rediffusée depuis le début. L'étape de préparation du document renvoie instantanément le résultat du point de contrôle. L' waitForCallback opération renvoie également instantanément le résultat d'approbation enregistré au lieu d'attendre à nouveau. L'exécution se poursuit ensuite jusqu'aux étapes de finalisation ou d'archivage.
Pipelines de données en plusieurs étapes
Traitez de grands ensembles de données par le biais de phases d'extraction, de transformation et de chargement avec des points de contrôle entre les étapes. Chaque étape peut prendre des heures, et les points de contrôle garantissent que le pipeline peut reprendre à n'importe quelle étape en cas d'interruption.
Ce modèle est idéal pour les flux de travail ETL, les migrations de données ou les tâches de traitement par lots où vous devez traiter les données par étapes, avec des points de récupération entre eux. Si une étape échoue, le pipeline reprend à partir de la dernière étape terminée plutôt que de recommencer depuis le début. Vous pouvez également utiliser les opérations d'attente pour faire une pause entre les étapes, en respectant les limites de débit, en attendant que les systèmes en aval soient prêts ou en planifiant le traitement en dehors des heures de pointe.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
Chaque étape est encapsulée dans une étape, ce qui crée un point de contrôle qui permet au pipeline de reprendre à partir de n'importe quelle étape en cas d'interruption. L'attente de 5 minutes entre l'extraction et la transformation respecte les limites de débit du système source sans consommer de ressources informatiques, tandis que l'attente jusqu'à 2 heures du matin permet de planifier l'opération de chargement coûteuse pendant les heures creuses.
Remarque
L'new Date()appel et la calculateMsUntilHour() fonction sont des étapes extérieures et seront réexécutés pendant la rediffusion. Pour les opérations basées sur le temps qui doivent être cohérentes entre les rediffusions, calculez l'horodatage d'une étape ou utilisez-le uniquement pour les durées d'attente (qui sont des points de contrôle).
Invocations enchaînées entre les fonctions
Appelez d'autres fonctions Lambda depuis une fonction durable en utilisant. context.invoke() La fonction appelante est suspendue en attendant que la fonction invoquée soit terminée, créant ainsi un point de contrôle qui préserve le résultat. Si la fonction appelante est interrompue une fois la fonction invoquée terminée, elle reprend avec le résultat enregistré sans avoir à réinvoquer la fonction.
Utilisez ce modèle lorsque vous avez des fonctions spécialisées qui gèrent des domaines spécifiques (validation des clients, traitement des paiements, gestion des stocks) et que vous devez les coordonner dans un flux de travail. Chaque fonction conserve sa propre logique et peut être invoquée par plusieurs fonctions d'orchestrateur, évitant ainsi la duplication de code.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
Chaque invocation crée un point de contrôle dans la fonction d'orchestrateur. Si l'orchestrateur est interrompu une fois la validation du client terminée, il reprend à partir de ce point de contrôle avec les données client stockées, en ignorant l'appel de validation. Cela permet d'éviter les appels dupliqués vers les services en aval et de garantir une exécution cohérente malgré les interruptions.
Les fonctions invoquées peuvent être des fonctions Lambda durables ou standard. Si vous invoquez une fonction durable, elle peut avoir son propre flux de travail en plusieurs étapes avec des temps d'attente et des points de contrôle. L'orchestrateur attend simplement la fin de l'exécution durable complète pour obtenir le résultat final.
Les appels entre comptes ne sont pas pris en charge. Toutes les fonctions invoquées doivent être dans le même AWS compte que la fonction appelante.
Modèles avancés
Utilisez des fonctions durables pour créer des applications complexes en plusieurs étapes qui combinent plusieurs opérations durables, l'exécution parallèle, le traitement de tableaux, la logique conditionnelle et le sondage. Ces modèles vous permettent de créer des applications sophistiquées qui coordonnent de nombreuses tâches tout en maintenant la tolérance aux pannes et la restauration automatique.
Les modèles avancés vont au-delà de simples étapes séquentielles. Vous pouvez exécuter des opérations simultanémentparallel(), traiter des tableaux avec ces primitivesmap(), attendre les conditions extérieures et combiner ces primitives pour créer des applications fiables. waitForCondition() Chaque opération durable crée ses propres points de contrôle, afin que votre application puisse se rétablir à tout moment en cas d'interruption.
Processus d'intégration des utilisateurs
Guidez les utilisateurs lors de l'enregistrement, de la vérification des e-mails, de la configuration du profil et de la configuration initiale avec gestion des nouvelles tentatives. Cet exemple combine des étapes séquentielles, des rappels et une logique conditionnelle pour créer un processus d'intégration complet.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
Le processus combine des étapes séquentielles avec des points de contrôle pour la création de comptes et l'envoi d'e-mails, puis s'interrompt pendant 48 heures en attendant la vérification des e-mails sans consommer de ressources. La logique conditionnelle gère différents chemins selon que la vérification est terminée ou expirée. Les tâches de configuration du profil s'exécutent simultanément à l'aide d'opérations parallèles afin de réduire le temps d'exécution total, et chaque étape est réessayée automatiquement en cas d'échec transitoire afin de garantir la fiabilité de l'intégration.
Traitement par lots avec points de contrôle
Traitez des millions d'enregistrements grâce à la restauration automatique depuis le dernier point de contrôle réussi après un échec. Cet exemple montre comment les fonctions durables combinent les map() opérations avec le découpage et la limitation de débit pour gérer le traitement de données à grande échelle.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
Les enregistrements sont divisés en lots gérables pour éviter de surcharger la mémoire ou les services en aval, puis plusieurs lots sont traités simultanément avec maxConcurrency le contrôle du parallélisme. Chaque lot possède son propre point de contrôle, de sorte que les échecs ne font que réessayer le lot défaillant au lieu de retraiter tous les enregistrements. Ce modèle est idéal pour les tâches ETL, les migrations de données ou les opérations en masse dont le traitement peut prendre des heures.
Étapes suivantes