Esempi e casi d'uso - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi e casi d'uso

Le funzioni durevoli Lambda consentono di creare applicazioni con tolleranza ai guasti e in più fasi utilizzando operazioni durevoli come passaggi e attese. Grazie al checkpoint automatico e al modello checkpoint-replay, in cui l'esecuzione riparte dall'inizio dopo un errore ma salta i checkpoint completati, le funzioni possono riprendersi dagli errori e riprendere l'esecuzione senza perdere i progressi.

Processi tolleranti ai guasti di breve durata

Utilizza funzioni durevoli per creare operazioni affidabili che in genere vengono completate in pochi minuti. Sebbene questi processi siano più brevi rispetto ai flussi di lavoro a lunga durata, traggono comunque vantaggio dal checkpoint automatico e dalla tolleranza ai guasti nei sistemi distribuiti. Le funzioni durevoli garantiscono il corretto completamento dei processi in più fasi anche quando le singole chiamate di servizio falliscono, senza richiedere una complessa gestione degli errori o un codice di gestione dello stato.

Gli scenari più comuni includono i sistemi di prenotazione alberghiera, le piattaforme di prenotazione di ristoranti, le richieste di viaggi in ride-sharing, l'acquisto di biglietti per eventi e gli upgrade degli abbonamenti SaaS. Questi scenari condividono caratteristiche comuni: più chiamate di servizio che devono essere completate contemporaneamente, la necessità di riprovare automaticamente in caso di guasti transitori e l'esigenza di mantenere lo stato coerente tra i sistemi distribuiti.

Transazioni distribuite tra microservizi

Coordina i pagamenti, l'inventario e la spedizione su più servizi con ripristino automatico in caso di guasti. Ogni operazione di servizio è conclusa in una fase, in modo da garantire che la transazione possa essere ripristinata da qualsiasi momento in caso di guasto del servizio.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, amount, items } = event; // Reserve inventory across multiple warehouses const inventory = await context.step("reserve-inventory", async () => { return await inventoryService.reserve(items); }); // Process payment const payment = await context.step("process-payment", async () => { return await paymentService.charge(amount); }); // Create shipment const shipment = await context.step("create-shipment", async () => { return await shippingService.createShipment(orderId, inventory); }); return { orderId, status: 'completed', shipment }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] amount = event['amount'] items = event['items'] # Reserve inventory across multiple warehouses inventory = context.step( lambda _: inventory_service.reserve(items), name='reserve-inventory' ) # Process payment payment = context.step( lambda _: payment_service.charge(amount), name='process-payment' ) # Create shipment shipment = context.step( lambda _: shipping_service.create_shipment(order_id, inventory), name='create-shipment' ) return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}

Se un passaggio fallisce, la funzione riprova automaticamente dall'ultimo checkpoint riuscito. La prenotazione dell'inventario persiste anche se l'elaborazione del pagamento fallisce temporaneamente. Quando la funzione riprova, salta la fase di inventario completata e procede direttamente all'elaborazione del pagamento. In questo modo si eliminano le prenotazioni duplicate e si garantisce lo stato coerente in tutto il sistema distribuito.

Elaborazione degli ordini con più passaggi

Elabora gli ordini tramite convalida, autorizzazione al pagamento, allocazione dell'inventario ed evasione con tentativi e ripristino automatici. Ogni passaggio è controllato, in modo da garantire che l'ordine proceda anche se i singoli passaggi falliscono e si riprova.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId, items } = event; // Validate order details const validation = await context.step("validate-order", async () => { const customer = await customerService.validate(customerId); const itemsValid = await inventoryService.validateItems(items); return { customer, itemsValid }; }); if (!validation.itemsValid) { return { orderId, status: 'rejected', reason: 'invalid_items' }; } // Authorize payment const authorization = await context.step("authorize-payment", async () => { return await paymentService.authorize( validation.customer.paymentMethod, calculateTotal(items) ); }); // Allocate inventory const allocation = await context.step("allocate-inventory", async () => { return await inventoryService.allocate(items); }); // Fulfill order const fulfillment = await context.step("fulfill-order", async () => { return await fulfillmentService.createShipment({ orderId, items: allocation.allocatedItems, address: validation.customer.shippingAddress }); }); return { orderId, status: 'completed', trackingNumber: fulfillment.trackingNumber }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] items = event['items'] # Validate order details def validate_order(_): customer = customer_service.validate(customer_id) items_valid = inventory_service.validate_items(items) return {'customer': customer, 'itemsValid': items_valid} validation = context.step(validate_order, name='validate-order') if not validation['itemsValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'} # Authorize payment authorization = context.step( lambda _: payment_service.authorize( validation['customer']['paymentMethod'], calculate_total(items) ), name='authorize-payment' ) # Allocate inventory allocation = context.step( lambda _: inventory_service.allocate(items), name='allocate-inventory' ) # Fulfill order fulfillment = context.step( lambda _: fulfillment_service.create_shipment({ 'orderId': order_id, 'items': allocation['allocatedItems'], 'address': validation['customer']['shippingAddress'] }), name='fulfill-order' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': fulfillment['trackingNumber'] }

Questo schema assicura che gli ordini non rimangano mai bloccati in stati intermedi. Se la convalida fallisce, l'ordine viene rifiutato prima dell'autorizzazione del pagamento. Se l'autorizzazione al pagamento fallisce, l'inventario non viene allocato. Ogni passaggio si basa su quello precedente con riprova e ripristino automatici.

Nota

Il controllo condizionale if (!validation.itemsValid) non rientra in una fase e verrà rieseguito durante la riproduzione. Questo è sicuro perché è deterministico: produce sempre lo stesso risultato con lo stesso oggetto di convalida.

Processi a esecuzione prolungata

Utilizza funzioni durevoli per processi che durano ore, giorni o settimane. Le operazioni di attesa sospendono l'esecuzione senza incorrere in costi di elaborazione, rendendo convenienti i processi di lunga durata. Durante i periodi di attesa, la funzione smette di funzionare e Lambda ricicla l'ambiente di esecuzione. Quando è il momento di riprendere, Lambda richiama nuovamente la funzione e la riproduce dall'ultimo checkpoint.

Questo modello di esecuzione rende le funzioni durevoli ideali per i processi che devono essere sospesi per lunghi periodi, che si tratti di attese di decisioni umane, risposte esterne del sistema, finestre di elaborazione pianificate o ritardi basati sul tempo. Paghi solo per il tempo di elaborazione attivo, non per l'attesa.

Gli scenari più comuni includono i processi di approvazione dei documenti, l'elaborazione programmata in batch, i processi di onboarding per più giorni, i processi di prova in abbonamento e i sistemi di notifica ritardata. Questi scenari condividono caratteristiche comuni: lunghi periodi di attesa misurati in ore o giorni, la necessità di mantenere lo stato di esecuzione per tutte le attese e requisiti attenti ai costi in base ai quali è proibitivo pagare per i tempi di elaborazione inattivi.

Human-in-the-loop approvazioni

Sospendi l'esecuzione per le revisioni, le approvazioni o le decisioni dei documenti mantenendo lo stato di esecuzione. La funzione attende i callback esterni senza consumare risorse e riprende automaticamente quando viene ricevuta l'approvazione.

Questo modello è essenziale per i processi che richiedono il giudizio umano o la convalida esterna. La funzione si sospende al punto di callback e non comporta alcun addebito di calcolo durante l'attesa. Quando qualcuno invia la propria decisione tramite API, Lambda richiama nuovamente la funzione e la riproduce dal checkpoint, continuando con il risultato dell'approvazione.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { documentId, reviewers } = event; // Step 1: Prepare document for review const prepared = await context.step("prepare-document", async () => { return await documentService.prepare(documentId); }); // Step 2: Request approval with callback const approval = await context.waitForCallback( "approval-callback", async (callbackId) => { await notificationService.sendApprovalRequest({ documentId, reviewers, callbackId, expiresIn: 86400 }); }, { timeout: { seconds: 86400 } } ); // Function resumes here when approval is received if (approval?.approved) { const finalized = await context.step("finalize-document", async () => { return await documentService.finalize(documentId, approval.comments); }); return { status: 'approved', documentId, finalizedAt: finalized.timestamp }; } // Handle rejection await context.step("archive-rejected", async () => { await documentService.archive(documentId, approval?.reason); }); return { status: 'rejected', documentId, reason: approval?.reason }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig @durable_execution def lambda_handler(event, context: DurableContext): document_id = event['documentId'] reviewers = event['reviewers'] # Step 1: Prepare document for review prepared = context.step( lambda _: document_service.prepare(document_id), name='prepare-document' ) # Step 2: Request approval with callback def send_approval_request(callback_id): notification_service.send_approval_request({ 'documentId': document_id, 'reviewers': reviewers, 'callbackId': callback_id, 'expiresIn': 86400 }) approval = context.wait_for_callback( send_approval_request, name='approval-callback', config=WaitConfig(timeout=86400) ) # Function resumes here when approval is received if approval and approval.get('approved'): finalized = context.step( lambda _: document_service.finalize(document_id, approval.get('comments')), name='finalize-document' ) return { 'status': 'approved', 'documentId': document_id, 'finalizedAt': finalized['timestamp'] } # Handle rejection context.step( lambda _: document_service.archive(document_id, approval.get('reason') if approval else None), name='archive-rejected' ) return { 'status': 'rejected', 'documentId': document_id, 'reason': approval.get('reason') if approval else None }

Quando la richiamata viene ricevuta e la funzione riprende, viene riprodotta dall'inizio. La fase di preparazione del documento restituisce immediatamente il risultato del checkpoint. Inoltre, l' waitForCallback operazione ritorna istantaneamente con il risultato dell'approvazione memorizzato anziché attendere nuovamente. L'esecuzione prosegue quindi fino alla fase di finalizzazione o archiviazione.

Pipeline di dati a più stadi

Elabora set di dati di grandi dimensioni attraverso fasi di estrazione, trasformazione e caricamento con punti di controllo tra le fasi. Il completamento di ogni fase può richiedere ore e i checkpoint assicurano che la pipeline possa riprendere da qualsiasi fase in caso di interruzione.

Questo modello è ideale per flussi di lavoro ETL, migrazioni di dati o processi di elaborazione in batch in cui è necessario elaborare i dati in fasi con punti di ripristino tra di loro. Se una fase fallisce, la pipeline riprende dall'ultima fase completata anziché ricominciare dall'inizio. È inoltre possibile utilizzare le operazioni di attesa per effettuare una pausa tra le fasi, rispettare i limiti di velocità, attendere che i sistemi a valle siano pronti o pianificare l'elaborazione durante le ore non di punta.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize } = event; // Stage 1: Extract data from source const extracted = await context.step("extract-data", async () => { const records = await sourceDatabase.extractRecords(datasetId); return { recordCount: records.length, records }; }); // Wait 5 minutes to respect source system rate limits await context.wait({ seconds: 300 }); // Stage 2: Transform data in batches const transformed = await context.step("transform-data", async () => { const batches = chunkArray(extracted.records, batchSize); const results = []; for (const batch of batches) { const transformed = await transformService.processBatch(batch); results.push(transformed); } return { batchCount: batches.length, results }; }); // Wait until off-peak hours (e.g., 2 AM) const now = new Date(); const targetHour = 2; const msUntilTarget = calculateMsUntilHour(now, targetHour); await context.wait({ seconds: Math.floor(msUntilTarget / 1000) }); // Stage 3: Load data to destination const loaded = await context.step("load-data", async () => { let loadedCount = 0; for (const result of transformed.results) { await destinationDatabase.loadBatch(result); loadedCount += result.length; } return { loadedCount }; }); // Stage 4: Verify and finalize const verified = await context.step("verify-pipeline", async () => { const verification = await destinationDatabase.verifyRecords(datasetId); await pipelineService.markComplete(datasetId, verification); return verification; }); return { datasetId, recordsProcessed: extracted.recordCount, batchesProcessed: transformed.batchCount, recordsLoaded: loaded.loadedCount, verified: verified.success }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event['batchSize'] # Stage 1: Extract data from source def extract_data(_): records = source_database.extract_records(dataset_id) return {'recordCount': len(records), 'records': records} extracted = context.step(extract_data, name='extract-data') # Wait 5 minutes to respect source system rate limits context.wait(300) # Stage 2: Transform data in batches def transform_data(_): batches = chunk_array(extracted['records'], batch_size) results = [] for batch in batches: transformed = transform_service.process_batch(batch) results.append(transformed) return {'batchCount': len(batches), 'results': results} transformed = context.step(transform_data, name='transform-data') # Wait until off-peak hours (e.g., 2 AM) now = datetime.now() target_hour = 2 ms_until_target = calculate_ms_until_hour(now, target_hour) context.wait(ms_until_target // 1000) # Stage 3: Load data to destination def load_data(_): loaded_count = 0 for result in transformed['results']: destination_database.load_batch(result) loaded_count += len(result) return {'loadedCount': loaded_count} loaded = context.step(load_data, name='load-data') # Stage 4: Verify and finalize def verify_pipeline(_): verification = destination_database.verify_records(dataset_id) pipeline_service.mark_complete(dataset_id, verification) return verification verified = context.step(verify_pipeline, name='verify-pipeline') return { 'datasetId': dataset_id, 'recordsProcessed': extracted['recordCount'], 'batchesProcessed': transformed['batchCount'], 'recordsLoaded': loaded['loadedCount'], 'verified': verified['success'] }

Ogni fase è racchiusa in una fase, che crea un checkpoint che consente alla pipeline di riprendere da qualsiasi fase in caso di interruzione. L'attesa di 5 minuti tra l'estrazione e la trasformazione rispetta i limiti di velocità del sistema di origine senza consumare risorse di elaborazione, mentre l'attesa fino alle 2 del mattino pianifica le costose operazioni di caricamento durante le ore non di punta.

Nota

La new Date() chiamata e la calculateMsUntilHour() funzione non rientrano nelle fasi iniziali e verranno rieseguite durante la riproduzione. Per le operazioni basate sul tempo che devono essere coerenti tra i replay, calcola il timestamp all'interno di un passaggio o utilizzalo solo per le durate di attesa (che sono punti di controllo).

Richiamazioni concatenate tra le funzioni

Richiama altre funzioni Lambda dall'interno di una funzione durevole utilizzando. context.invoke() La funzione chiamante si sospende in attesa del completamento della funzione richiamata, creando un checkpoint che preserva il risultato. Se la funzione chiamante viene interrotta dopo il completamento della funzione richiamata, riprende con il risultato memorizzato senza richiamare nuovamente la funzione.

Utilizzate questo modello quando avete funzioni specializzate che gestiscono domini specifici (convalida dei clienti, elaborazione dei pagamenti, gestione dell'inventario) e avete bisogno di coordinarle in un flusso di lavoro. Ogni funzione mantiene la propria logica e può essere richiamata da più funzioni di orchestrazione, evitando la duplicazione del codice.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; // Main orchestrator function export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { orderId, customerId } = event; // Step 1: Validate customer by invoking customer service function const customer = await context.invoke( "validate-customer", "arn:aws:lambda:us-east-1:123456789012:function:customer-service:1", { customerId } ); if (!customer.isValid) { return { orderId, status: "rejected", reason: "invalid_customer" }; } // Step 2: Check inventory by invoking inventory service function const inventory = await context.invoke( "check-inventory", "arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1", { orderId, items: event.items } ); if (!inventory.available) { return { orderId, status: "rejected", reason: "insufficient_inventory" }; } // Step 3: Process payment by invoking payment service function const payment = await context.invoke( "process-payment", "arn:aws:lambda:us-east-1:123456789012:function:payment-service:1", { customerId, amount: inventory.totalAmount, paymentMethod: customer.paymentMethod } ); // Step 4: Create shipment by invoking fulfillment service function const shipment = await context.invoke( "create-shipment", "arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1", { orderId, items: inventory.allocatedItems, address: customer.shippingAddress } ); return { orderId, status: "completed", trackingNumber: shipment.trackingNumber, estimatedDelivery: shipment.estimatedDelivery }; } );
Python
from aws_durable_execution_sdk_python import DurableContext, durable_execution # Main orchestrator function @durable_execution def lambda_handler(event, context: DurableContext): order_id = event['orderId'] customer_id = event['customerId'] # Step 1: Validate customer by invoking customer service function customer = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1', {'customerId': customer_id}, name='validate-customer' ) if not customer['isValid']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'} # Step 2: Check inventory by invoking inventory service function inventory = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1', {'orderId': order_id, 'items': event['items']}, name='check-inventory' ) if not inventory['available']: return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'} # Step 3: Process payment by invoking payment service function payment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1', { 'customerId': customer_id, 'amount': inventory['totalAmount'], 'paymentMethod': customer['paymentMethod'] }, name='process-payment' ) # Step 4: Create shipment by invoking fulfillment service function shipment = context.invoke( 'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1', { 'orderId': order_id, 'items': inventory['allocatedItems'], 'address': customer['shippingAddress'] }, name='create-shipment' ) return { 'orderId': order_id, 'status': 'completed', 'trackingNumber': shipment['trackingNumber'], 'estimatedDelivery': shipment['estimatedDelivery'] }

Ogni chiamata crea un checkpoint nella funzione orchestrator. Se l'orchestrator viene interrotto dopo il completamento della convalida del cliente, riprende da quel checkpoint con i dati del cliente memorizzati, saltando la chiamata di convalida. In questo modo si evitano chiamate duplicate ai servizi downstream e si garantisce un'esecuzione coerente anche dopo le interruzioni.

Le funzioni richiamate possono essere funzioni Lambda durevoli o standard. Se si richiama una funzione durevole, questa può avere un proprio flusso di lavoro in più fasi con attese e checkpoint. L'orchestratore attende semplicemente che l'esecuzione completa e duratura finisca, ricevendo il risultato finale.

Nota

Le chiamate tra account non sono supportate. Tutte le funzioni richiamate devono trovarsi nello stesso AWS account della funzione chiamante.

Schemi avanzati

Usa funzioni durevoli per creare applicazioni complesse in più fasi che combinano più operazioni durevoli, esecuzione parallela, elaborazione di array, logica condizionale e polling. Questi modelli consentono di creare applicazioni sofisticate che coordinano molte attività mantenendo la tolleranza agli errori e il ripristino automatico.

I pattern avanzati vanno oltre i semplici passaggi sequenziali. È possibile eseguire operazioni contemporaneamente conparallel(), elaborare array conmap(), attendere condizioni esterne e combinare queste primitive per creare applicazioni affidabili. waitForCondition() Ogni operazione durevole crea i propri checkpoint, in modo che l'applicazione possa essere ripristinata da qualsiasi punto in caso di interruzione.

Processi di onboarding degli utenti

Guida gli utenti attraverso la registrazione, la verifica delle e-mail, la configurazione del profilo e la configurazione iniziale con la gestione dei tentativi. Questo esempio combina passaggi sequenziali, callback e logica condizionale per creare un processo di onboarding completo.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { userId, email } = event; // Step 1: Create user account const user = await context.step("create-account", async () => { return await userService.createAccount(userId, email); }); // Step 2: Send verification email await context.step("send-verification", async () => { return await emailService.sendVerification(email); }); // Step 3: Wait for email verification (up to 48 hours) const verified = await context.waitForCallback( "email-verification", async (callbackId) => { await notificationService.sendVerificationLink({ email, callbackId, expiresIn: 172800 }); }, { timeout: { seconds: 172800 } } ); if (!verified) { await context.step("send-reminder", async () => { await emailService.sendReminder(email); }); return { status: "verification_timeout", userId, message: "Email verification not completed within 48 hours" }; } // Step 4: Initialize user profile in parallel const setupResults = await context.parallel("profile-setup", [ async (ctx: DurableContext) => { return await ctx.step("create-preferences", async () => { return await preferencesService.createDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("setup-notifications", async () => { return await notificationService.setupDefaults(userId); }); }, async (ctx: DurableContext) => { return await ctx.step("create-welcome-content", async () => { return await contentService.createWelcome(userId); }); } ]); // Step 5: Send welcome email await context.step("send-welcome", async () => { const [preferences, notifications, content] = setupResults.getResults(); return await emailService.sendWelcome({ email, preferences, notifications, content }); }); return { status: "onboarding_complete", userId, completedAt: new Date().toISOString() }; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig from datetime import datetime @durable_execution def lambda_handler(event, context: DurableContext): user_id = event['userId'] email = event['email'] # Step 1: Create user account user = context.step( lambda _: user_service.create_account(user_id, email), name='create-account' ) # Step 2: Send verification email context.step( lambda _: email_service.send_verification(email), name='send-verification' ) # Step 3: Wait for email verification (up to 48 hours) def send_verification_link(callback_id): notification_service.send_verification_link({ 'email': email, 'callbackId': callback_id, 'expiresIn': 172800 }) verified = context.wait_for_callback( send_verification_link, name='email-verification', config=WaitConfig(timeout=172800) ) if not verified: context.step( lambda _: email_service.send_reminder(email), name='send-reminder' ) return { 'status': 'verification_timeout', 'userId': user_id, 'message': 'Email verification not completed within 48 hours' } # Step 4: Initialize user profile in parallel def create_preferences(ctx: DurableContext): return ctx.step( lambda _: preferences_service.create_defaults(user_id), name='create-preferences' ) def setup_notifications(ctx: DurableContext): return ctx.step( lambda _: notification_service.setup_defaults(user_id), name='setup-notifications' ) def create_welcome_content(ctx: DurableContext): return ctx.step( lambda _: content_service.create_welcome(user_id), name='create-welcome-content' ) setup_results = context.parallel( [create_preferences, setup_notifications, create_welcome_content], name='profile-setup' ) # Step 5: Send welcome email def send_welcome(_): results = setup_results.get_results() preferences, notifications, content = results[0], results[1], results[2] return email_service.send_welcome({ 'email': email, 'preferences': preferences, 'notifications': notifications, 'content': content }) context.step(send_welcome, name='send-welcome') return { 'status': 'onboarding_complete', 'userId': user_id, 'completedAt': datetime.now().isoformat() }

Il processo combina passaggi sequenziali con punti di controllo per la creazione dell'account e l'invio di e-mail, quindi si interrompe per un massimo di 48 ore in attesa della verifica dell'e-mail senza consumare risorse. La logica condizionale gestisce percorsi diversi a seconda che la verifica sia completata o scaduta. Le attività di configurazione del profilo vengono eseguite contemporaneamente utilizzando operazioni parallele per ridurre il tempo totale di esecuzione e ogni passaggio riprova automaticamente in caso di errori transitori per garantire che l'onboarding venga completato in modo affidabile.

Elaborazione in batch con checkpoint

Elabora milioni di record con ripristino automatico dall'ultimo checkpoint riuscito dopo gli errori. Questo esempio dimostra come le funzioni durevoli combinino le map() operazioni con la suddivisione in blocchi e la limitazione della velocità per gestire l'elaborazione di dati su larga scala.

TypeScript
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js"; interface Batch { batchIndex: number; recordIds: string[]; } export const handler = withDurableExecution( async (event: any, context: DurableContext) => { const { datasetId, batchSize = 1000 } = event; // Step 1: Get all record IDs to process const recordIds = await context.step("fetch-record-ids", async () => { return await dataService.getRecordIds(datasetId); }); // Step 2: Split into batches const batches: Batch[] = []; for (let i = 0; i < recordIds.length; i += batchSize) { batches.push({ batchIndex: Math.floor(i / batchSize), recordIds: recordIds.slice(i, i + batchSize) }); } // Step 3: Process batches with controlled concurrency const batchResults = await context.map( "process-batches", batches, async (ctx: DurableContext, batch: Batch, index: number) => { const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => { const results = []; for (const recordId of batch.recordIds) { const result = await recordService.process(recordId); results.push(result); } return results; }); const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => { return await validationService.validateBatch(processed); }); return { batchIndex: batch.batchIndex, recordCount: batch.recordIds.length, successCount: validated.successCount, failureCount: validated.failureCount }; }, { maxConcurrency: 5 } ); // Step 4: Aggregate results const summary = await context.step("aggregate-results", async () => { const results = batchResults.getResults(); const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0); const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0); return { datasetId, totalRecords: recordIds.length, batchesProcessed: batches.length, successCount: totalSuccess, failureCount: totalFailure, completedAt: new Date().toISOString() }; }); return summary; } );
Python
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig from datetime import datetime from typing import List, Dict @durable_execution def lambda_handler(event, context: DurableContext): dataset_id = event['datasetId'] batch_size = event.get('batchSize', 1000) # Step 1: Get all record IDs to process record_ids = context.step( lambda _: data_service.get_record_ids(dataset_id), name='fetch-record-ids' ) # Step 2: Split into batches batches = [] for i in range(0, len(record_ids), batch_size): batches.append({ 'batchIndex': i // batch_size, 'recordIds': record_ids[i:i + batch_size] }) # Step 3: Process batches with controlled concurrency def process_batch(ctx: DurableContext, batch: Dict, index: int): batch_index = batch['batchIndex'] def process_records(_): results = [] for record_id in batch['recordIds']: result = record_service.process(record_id) results.append(result) return results processed = ctx.step(process_records, name=f'batch-{batch_index}') validated = ctx.step( lambda _: validation_service.validate_batch(processed), name=f'validate-{batch_index}' ) return { 'batchIndex': batch_index, 'recordCount': len(batch['recordIds']), 'successCount': validated['successCount'], 'failureCount': validated['failureCount'] } batch_results = context.map( process_batch, batches, name='process-batches', config=MapConfig(max_concurrency=5) ) # Step 4: Aggregate results def aggregate_results(_): results = batch_results.get_results() total_success = sum(r['successCount'] for r in results) total_failure = sum(r['failureCount'] for r in results) return { 'datasetId': dataset_id, 'totalRecords': len(record_ids), 'batchesProcessed': len(batches), 'successCount': total_success, 'failureCount': total_failure, 'completedAt': datetime.now().isoformat() } summary = context.step(aggregate_results, name='aggregate-results') return summary

I record vengono suddivisi in batch gestibili per evitare di sovraccaricare la memoria o i servizi downstream, quindi vengono elaborati più batch contemporaneamente con il controllo del parallelismo. maxConcurrency Ogni batch ha il proprio checkpoint, quindi in caso di errore si limita a riprovare a rielaborare il batch fallito anziché rielaborare tutti i record. Questo modello è ideale per lavori ETL, migrazioni di dati o operazioni di massa in cui l'elaborazione può richiedere ore.

Fasi successive