Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Esempi e casi d'uso
Le funzioni durevoli Lambda consentono di creare applicazioni con tolleranza ai guasti e in più fasi utilizzando operazioni durevoli come passaggi e attese. Grazie al checkpoint automatico e al modello checkpoint-replay, in cui l'esecuzione riparte dall'inizio dopo un errore ma salta i checkpoint completati, le funzioni possono riprendersi dagli errori e riprendere l'esecuzione senza perdere i progressi.
Processi tolleranti ai guasti di breve durata
Utilizza funzioni durevoli per creare operazioni affidabili che in genere vengono completate in pochi minuti. Sebbene questi processi siano più brevi rispetto ai flussi di lavoro a lunga durata, traggono comunque vantaggio dal checkpoint automatico e dalla tolleranza ai guasti nei sistemi distribuiti. Le funzioni durevoli garantiscono il corretto completamento dei processi in più fasi anche quando le singole chiamate di servizio falliscono, senza richiedere una complessa gestione degli errori o un codice di gestione dello stato.
Gli scenari più comuni includono i sistemi di prenotazione alberghiera, le piattaforme di prenotazione di ristoranti, le richieste di viaggi in ride-sharing, l'acquisto di biglietti per eventi e gli upgrade degli abbonamenti SaaS. Questi scenari condividono caratteristiche comuni: più chiamate di servizio che devono essere completate contemporaneamente, la necessità di riprovare automaticamente in caso di guasti transitori e l'esigenza di mantenere lo stato coerente tra i sistemi distribuiti.
Transazioni distribuite tra microservizi
Coordina i pagamenti, l'inventario e la spedizione su più servizi con ripristino automatico in caso di guasti. Ogni operazione di servizio è conclusa in una fase, in modo da garantire che la transazione possa essere ripristinata da qualsiasi momento in caso di guasto del servizio.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
Se un passaggio fallisce, la funzione riprova automaticamente dall'ultimo checkpoint riuscito. La prenotazione dell'inventario persiste anche se l'elaborazione del pagamento fallisce temporaneamente. Quando la funzione riprova, salta la fase di inventario completata e procede direttamente all'elaborazione del pagamento. In questo modo si eliminano le prenotazioni duplicate e si garantisce lo stato coerente in tutto il sistema distribuito.
Elaborazione degli ordini con più passaggi
Elabora gli ordini tramite convalida, autorizzazione al pagamento, allocazione dell'inventario ed evasione con tentativi e ripristino automatici. Ogni passaggio è controllato, in modo da garantire che l'ordine proceda anche se i singoli passaggi falliscono e si riprova.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
Questo schema assicura che gli ordini non rimangano mai bloccati in stati intermedi. Se la convalida fallisce, l'ordine viene rifiutato prima dell'autorizzazione del pagamento. Se l'autorizzazione al pagamento fallisce, l'inventario non viene allocato. Ogni passaggio si basa su quello precedente con riprova e ripristino automatici.
Nota
Il controllo condizionale if (!validation.itemsValid) non rientra in una fase e verrà rieseguito durante la riproduzione. Questo è sicuro perché è deterministico: produce sempre lo stesso risultato con lo stesso oggetto di convalida.
Processi a esecuzione prolungata
Utilizza funzioni durevoli per processi che durano ore, giorni o settimane. Le operazioni di attesa sospendono l'esecuzione senza incorrere in costi di elaborazione, rendendo convenienti i processi di lunga durata. Durante i periodi di attesa, la funzione smette di funzionare e Lambda ricicla l'ambiente di esecuzione. Quando è il momento di riprendere, Lambda richiama nuovamente la funzione e la riproduce dall'ultimo checkpoint.
Questo modello di esecuzione rende le funzioni durevoli ideali per i processi che devono essere sospesi per lunghi periodi, che si tratti di attese di decisioni umane, risposte esterne del sistema, finestre di elaborazione pianificate o ritardi basati sul tempo. Paghi solo per il tempo di elaborazione attivo, non per l'attesa.
Gli scenari più comuni includono i processi di approvazione dei documenti, l'elaborazione programmata in batch, i processi di onboarding per più giorni, i processi di prova in abbonamento e i sistemi di notifica ritardata. Questi scenari condividono caratteristiche comuni: lunghi periodi di attesa misurati in ore o giorni, la necessità di mantenere lo stato di esecuzione per tutte le attese e requisiti attenti ai costi in base ai quali è proibitivo pagare per i tempi di elaborazione inattivi.
Human-in-the-loop approvazioni
Sospendi l'esecuzione per le revisioni, le approvazioni o le decisioni dei documenti mantenendo lo stato di esecuzione. La funzione attende i callback esterni senza consumare risorse e riprende automaticamente quando viene ricevuta l'approvazione.
Questo modello è essenziale per i processi che richiedono il giudizio umano o la convalida esterna. La funzione si sospende al punto di callback e non comporta alcun addebito di calcolo durante l'attesa. Quando qualcuno invia la propria decisione tramite API, Lambda richiama nuovamente la funzione e la riproduce dal checkpoint, continuando con il risultato dell'approvazione.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
Quando la richiamata viene ricevuta e la funzione riprende, viene riprodotta dall'inizio. La fase di preparazione del documento restituisce immediatamente il risultato del checkpoint. Inoltre, l' waitForCallback operazione ritorna istantaneamente con il risultato dell'approvazione memorizzato anziché attendere nuovamente. L'esecuzione prosegue quindi fino alla fase di finalizzazione o archiviazione.
Pipeline di dati a più stadi
Elabora set di dati di grandi dimensioni attraverso fasi di estrazione, trasformazione e caricamento con punti di controllo tra le fasi. Il completamento di ogni fase può richiedere ore e i checkpoint assicurano che la pipeline possa riprendere da qualsiasi fase in caso di interruzione.
Questo modello è ideale per flussi di lavoro ETL, migrazioni di dati o processi di elaborazione in batch in cui è necessario elaborare i dati in fasi con punti di ripristino tra di loro. Se una fase fallisce, la pipeline riprende dall'ultima fase completata anziché ricominciare dall'inizio. È inoltre possibile utilizzare le operazioni di attesa per effettuare una pausa tra le fasi, rispettare i limiti di velocità, attendere che i sistemi a valle siano pronti o pianificare l'elaborazione durante le ore non di punta.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
Ogni fase è racchiusa in una fase, che crea un checkpoint che consente alla pipeline di riprendere da qualsiasi fase in caso di interruzione. L'attesa di 5 minuti tra l'estrazione e la trasformazione rispetta i limiti di velocità del sistema di origine senza consumare risorse di elaborazione, mentre l'attesa fino alle 2 del mattino pianifica le costose operazioni di caricamento durante le ore non di punta.
Nota
La new Date() chiamata e la calculateMsUntilHour() funzione non rientrano nelle fasi iniziali e verranno rieseguite durante la riproduzione. Per le operazioni basate sul tempo che devono essere coerenti tra i replay, calcola il timestamp all'interno di un passaggio o utilizzalo solo per le durate di attesa (che sono punti di controllo).
Richiamazioni concatenate tra le funzioni
Richiama altre funzioni Lambda dall'interno di una funzione durevole utilizzando. context.invoke() La funzione chiamante si sospende in attesa del completamento della funzione richiamata, creando un checkpoint che preserva il risultato. Se la funzione chiamante viene interrotta dopo il completamento della funzione richiamata, riprende con il risultato memorizzato senza richiamare nuovamente la funzione.
Utilizzate questo modello quando avete funzioni specializzate che gestiscono domini specifici (convalida dei clienti, elaborazione dei pagamenti, gestione dell'inventario) e avete bisogno di coordinarle in un flusso di lavoro. Ogni funzione mantiene la propria logica e può essere richiamata da più funzioni di orchestrazione, evitando la duplicazione del codice.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
Ogni chiamata crea un checkpoint nella funzione orchestrator. Se l'orchestrator viene interrotto dopo il completamento della convalida del cliente, riprende da quel checkpoint con i dati del cliente memorizzati, saltando la chiamata di convalida. In questo modo si evitano chiamate duplicate ai servizi downstream e si garantisce un'esecuzione coerente anche dopo le interruzioni.
Le funzioni richiamate possono essere funzioni Lambda durevoli o standard. Se si richiama una funzione durevole, questa può avere un proprio flusso di lavoro in più fasi con attese e checkpoint. L'orchestratore attende semplicemente che l'esecuzione completa e duratura finisca, ricevendo il risultato finale.
Le chiamate tra account non sono supportate. Tutte le funzioni richiamate devono trovarsi nello stesso AWS account della funzione chiamante.
Schemi avanzati
Usa funzioni durevoli per creare applicazioni complesse in più fasi che combinano più operazioni durevoli, esecuzione parallela, elaborazione di array, logica condizionale e polling. Questi modelli consentono di creare applicazioni sofisticate che coordinano molte attività mantenendo la tolleranza agli errori e il ripristino automatico.
I pattern avanzati vanno oltre i semplici passaggi sequenziali. È possibile eseguire operazioni contemporaneamente conparallel(), elaborare array conmap(), attendere condizioni esterne e combinare queste primitive per creare applicazioni affidabili. waitForCondition() Ogni operazione durevole crea i propri checkpoint, in modo che l'applicazione possa essere ripristinata da qualsiasi punto in caso di interruzione.
Processi di onboarding degli utenti
Guida gli utenti attraverso la registrazione, la verifica delle e-mail, la configurazione del profilo e la configurazione iniziale con la gestione dei tentativi. Questo esempio combina passaggi sequenziali, callback e logica condizionale per creare un processo di onboarding completo.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
Il processo combina passaggi sequenziali con punti di controllo per la creazione dell'account e l'invio di e-mail, quindi si interrompe per un massimo di 48 ore in attesa della verifica dell'e-mail senza consumare risorse. La logica condizionale gestisce percorsi diversi a seconda che la verifica sia completata o scaduta. Le attività di configurazione del profilo vengono eseguite contemporaneamente utilizzando operazioni parallele per ridurre il tempo totale di esecuzione e ogni passaggio riprova automaticamente in caso di errori transitori per garantire che l'onboarding venga completato in modo affidabile.
Elaborazione in batch con checkpoint
Elabora milioni di record con ripristino automatico dall'ultimo checkpoint riuscito dopo gli errori. Questo esempio dimostra come le funzioni durevoli combinino le map() operazioni con la suddivisione in blocchi e la limitazione della velocità per gestire l'elaborazione di dati su larga scala.
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
I record vengono suddivisi in batch gestibili per evitare di sovraccaricare la memoria o i servizi downstream, quindi vengono elaborati più batch contemporaneamente con il controllo del parallelismo. maxConcurrency Ogni batch ha il proprio checkpoint, quindi in caso di errore si limita a riprovare a rielaborare il batch fallito anziché rielaborare tutti i record. Questo modello è ideale per lavori ETL, migrazioni di dati o operazioni di massa in cui l'elaborazione può richiedere ore.
Fasi successive