Funzioni di ricompensa personalizzate nel tuo ambiente AWS - Amazon SageMaker AI

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Funzioni di ricompensa personalizzate nel tuo ambiente AWS

Le funzioni di ricompensa personalizzate nel tuo AWS ambiente supportano solo RFT a turno singolo. Questo addestra i modelli sulle attività in cui un singolo prompt riceve un'unica risposta, valutata in modo indipendente. Il modello riceve un prompt e genera una risposta, che viene poi valutata dalla funzione di ricompensa: non c'è conversazione. back-and-forth Ciò è in contrasto con la RFT a più turni, in cui il modello si impegna in più round di interazione con un ambiente o un utente prima di ricevere una ricompensa finale.

Panoramica dell’architettura

L'architettura è composta da due componenti principali:

Formazione VPC:

  • Implementazione: carica il set di dati e il modello, invia le implementazioni alla funzione di ricompensa e riceve ricompense

  • Trainer: riceve le implementazioni dal componente Rollout, esegue passaggi in avanti e indietro e aggiorna i pesi dei modelli

VPC del cliente:

  • Reward Lambda: funzione di ricompensa implementata dal cliente che valuta le risposte del modello e restituisce punteggi di ricompensa

Flusso di lavoro:

  1. Il rollout carica il set di dati e il modello

  2. Rollout genera risposte al modello e chiama Lambda per i premi

  3. Lambda restituisce punteggi di ricompensa

  4. Rollout invia le implementazioni a Trainer

  5. Trainer aggiorna i pesi delle politiche in base alle ricompense

Configurazione della ricetta

Usa questa ricetta quando la funzione di ricompensa completa l'elaborazione entro 15 minuti.

## Nova Lite RLVR Training (PEFT) run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "lora" # Enable LoRA for PEFT lora_tuning: alpha: 32 lora_plus_lr_ratio: 64.0 # LoRA+ learning rate scaling factor (0.0–100.0)
## Nova Lite RLVR Training run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "null" # Disable LoRA for PEFT

Parametri della ricetta

  • max_steps: Numero di aggiornamenti del gradiente al modello. Ogni aggiornamento utilizza global_batch_size × refit_freq esempi. Ogni campione corrisponde a una generazione di modelli. Campioni di formazione totali =max_steps × global_batch_size.

  • max_seq_length: Lunghezza massima del contesto (in token) che il modello elabora durante l'addestramento. Dovrebbe contenere la lunghezza del prompt di input più la lunghezza della risposta generata. Un'impostazione troppo breve causa errori di allenamento; un'impostazione troppo grande spreca memoria della GPU e rallenta l'allenamento. Preimpostazioni disponibili: 8K (impostazione predefinita), 16K, 32K.

  • global_batch_size: Numero di campioni per aggiornamento del gradiente del modello. Valori più grandi forniscono gradienti più stabili ma richiedono più memoria. Notate che ogni campione corrisponde a una generazione del modello e non a un prompt. Viene utilizzato un solo prompt per creare number_generation campioni. Consigliato: 64-4096 in potenze di 2.

  • refit_freq: Frequenza di aggiornamento del peso del modello. Il numero di campioni in ogni aggiornamento èrefit_freq * global_batch_size. Controlla la frequenza con cui i modelli di generazione vengono aggiornati. Valori più alti aumentano la dimensione effettiva del batch e portano a un apprendimento più stabile. Valori più bassi aumentano la velocità di allenamento, ma aumentano la varianza. Aumento di refit_freq, aumento dei dati «off_policy». Consigliato: 4 (min: 1, max: 4).

  • rollout_strategy.off_policy_async: consente agli aggiornamenti del modello di essere «non conformi alle regole», vale a dire che le generazioni utilizzate per calcolare la perdita possono provenire da versioni precedenti del modello rispetto al modello attuale. L'attivazione non conforme alle regole comporta una formazione più rapida, ma può essere instabile se il livello è elevato. age_tolerance Consigliato: Vero (Vero, Falso).

  • rollout_strategy.age_tolerance: funziona solo quando off_policy_async è abilitato. Conserva solo i dati della versione del modello meno recente rispetto age_tolerance a quella del modello corrente. I valori più bassi eliminano i dati, i valori più alti contengono più dati delle versioni precedenti del modello. Consigliato: 2 (min: 1, max: 20).

  • clip_ratio_high: Clipping aiuta a prevenire grandi aggiornamenti delle politiche che potrebbero destabilizzare la formazione. Valori più elevati incoraggiano gli aggiornamenti che correggono gli errori del modello ma possono destabilizzare la formazione. Valori più bassi portano a un minore apprendimento. Consigliato: 0,3 (0,1, 10).

  • ent_coeff: Abbreviazione di «coefficiente di entropia», questo parametro incoraggia l'esplorazione durante l'allenamento aggiungendo un bonus di entropia alla funzione di perdita. I valori più alti promuovono un diverse/exploratory comportamento migliore, mentre i valori più bassi si concentrano sullo sfruttamento delle conoscenze attuali. Consigliato: 0,0 (min: 0, max: 0,1).

Selezione della modalità di ragionamento

Scegli tra tre livelli di sforzo di ragionamento in base alla complessità del tuo compito:

Sforzo di ragionamento

Caso d'uso

Costo/latenza

Ideale per

ometti il campo (nessun ragionamento)

Semplici domande fattuali, classificazioni

Bassa

Ottimizzazione della velocità e dei costi

low

Complessità moderata che richiede qualche ragionamento

Media

Prestazioni ed efficienza bilanciate

high

Attività analitiche complesse, problemi in più fasi

Elevata

Massima capacità di ragionamento

Comportamento predefinito: quando reasoning_effort viene specificato senza un valore, il valore predefinito è. high

Linee guida:

  • Da utilizzare high per attività analitiche complesse in cui il step-by-step pensiero aggiunge valore (matematica, logica, debug del codice)

  • Da utilizzare low per attività di complessità moderata che richiedono un certo ragionamento

  • Ometti completamente il campo per domande dirette basate su fatti, classificazioni semplici e per ottimizzare velocità e costi

Importante

Le modalità di ragionamento più avanzate migliorano le prestazioni per le attività che richiedono analisi logiche e ragionamenti complessi, ma aumentano i costi e la latenza durante la formazione e l'implementazione. Non sono utili per semplici domande fattuali come «Qual è la capitale della Francia?»

implementazione della funzione di ricompensa

La funzione di ricompensa (chiamata anche scorer o grader) è il componente principale che valuta le risposte del modello e fornisce segnali di feedback per l'allenamento. Deve essere implementata come una funzione Lambda che accetta le risposte del modello e restituisce punteggi di ricompensa.

Prerequisiti

Assicurati che le funzioni Lambda e le code SQS seguano il formato di denominazione richiesto e che il tuo ruolo di esecuzione disponga delle autorizzazioni necessarie.

Denominazione Lambda ARN:

Lambda ARN deve seguire questo formato di denominazione:

arn:aws:lambda:*:*:function:*SageMaker*

Denominazione SQS (richiesta solo per le funzioni di ricompensa remote nel proprio ambiente): AWS

  • Garantisci le autorizzazioni SQS nel ruolo di esecuzione creato per il cluster HyperPod

  • SQS ARN deve corrispondere a uno dei seguenti formati di denominazione:

    arn:aws:sqs:*:*:*SageMaker* arn:aws:sqs:*:*:*Sagemaker* arn:aws:sqs:*:*:*sagemaker*
  • Nel client SQS, usa endpoint override: --endpoint https://sqs.us-west-2.amazonaws.com perché in VPCE, la versione precedente di SQS Service Endpoint non è disponibile

Politica IAM per il ruolo di esecuzione:

{ "Action": "lambda:InvokeFunction", "Resource": [ "arn:aws:lambda:*:*:function:*SageMaker*" ], "Effect": "Allow" }, { "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": [ "arn:aws:sqs:*:*:*SageMaker*" ], "Effect": "Allow" }

Endpoint VPC:

Affinché il HyperPod cluster richiami le funzioni Lambda, devi:

  • Crea un endpoint VPC per il servizio Lambda nel VPC del tuo cluster HyperPod

  • Associa l'endpoint al gruppo di sicurezza del cluster

  • Assicurati che la policy degli endpoint VPC consenta lambda: azione InvokeFunction

Verifica di vedere un endpoint lambda nel VPC collegato a EKS.

Formato dell'interfaccia

La tua funzione di ricompensa deve accettare e restituire dati nel seguente formato.

Esempio di input per la formazione:

[{ "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]

Esempio di payload per la ricompensa Lambda:

Il sistema aggiunge il turno dell'assistente (risposta generata) all'ultimo turno del messages campo e aggiunge un unico: id

[{ "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Amazon, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]

Contratto Reward Lambda:

def lambda_handler(event, context): return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Args: samples: List of dictionaries in OpenAI format Example input (List of such sample): { "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Company, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } } Returns: List of dictionaries with reward scores: { "id": str, # Same id as input sample "aggregate_reward_score": float, # Overall score for the sample "metrics_list": [ # OPTIONAL: Component scores { "name": str, # Name of the component score "value": float, # Value of the component score "type": str # "Reward" or "Metric" } ] } """

Campi di input:

Campo

Description

Note aggiuntive

id

Identificatore univoco per il campione

Ripreso in uscita. Formato delle stringhe

messages

Cronologia chat ordinata in formato OpenAI

Matrice di oggetti di messaggio

messages [] .role

Relatore del messaggio

Valori comuni: «utente», «assistente», «sistema»

messaggi [] .content

Contenuto testuale del messaggio

Stringa normale

metadati

Informazioni in formato libero per facilitare la valutazione

Oggetto; campi opzionali trasmessi dai dati di addestramento

Campi di output:

Campo

Description

Note aggiuntive

id

Stesso identificatore del campione di input

Deve corrispondere all'input

aggregate_reward_score

Punteggio complessivo del campione

Float (ad esempio, 0,0—1,0 o intervallo definito dall'attività)

metrics_list

Punteggi dei componenti che compongono l'aggregato

Matrice di oggetti metrici

metrics_list [] .nome

Nome del componente metric/reward

Stringa (ad esempio, «accuracy», «policy_reward»)

metrics_list [] .value

Valore della metrica/ricompensa del componente

Float

metrics_list [] .type

Categoria di componente

Stringa: «Ricompensa» o «Metrica»

Vincoli tecnici

  • Limite di timeout: tempo di esecuzione massimo di 15 minuti per chiamata Lambda

  • Concorrenza: deve gestire le richieste simultanee rollout_worker_replicas × 64

  • Affidabilità: deve implementare una corretta gestione degli errori e restituire punteggi validi in modo coerente

  • Prestazioni: ottimizzate per un'esecuzione rapida (secondi, non minuti) per consentire un allenamento efficiente

Migliori pratiche:

  • Riduci al minimo le chiamate API esterne

  • Utilizza algoritmi e strutture dati efficienti

  • Implementa la logica di ripetizione dei tentativi per guasti transitori

  • Memorizza nella cache i calcoli riutilizzabili

  • Esegui test approfonditi prima dell'addestramento per garantire un'esecuzione priva di bug

Utilizzo di funzioni di ricompensa personalizzate

Implementa funzioni di ricompensa personalizzate quando disponi di criteri di valutazione specifici per attività:

  1. Definisci i criteri di valutazione: stabilisci cosa rappresenta una buona risposta per il tuo compito

  2. Implementa la funzione Lambda: crea una funzione Lambda seguendo il formato dell'interfaccia

  3. Esegui il test in locale: convalida la funzione e restituisce punteggi corretti per gli input di esempio

  4. Implementa su AWS: distribuisci la tua Lambda e annota l'ARN

  5. Configura la ricetta: aggiungi l'ARN Lambda al campo della tua ricetta reward_lambda_arn

  6. Esegui test con set di dati di piccole dimensioni: esegui RFT con un numero minimo di dati per verificare l'integrazione

Funzione Lambda di esempio

Questo esempio convalida il formato di input e confronta l'output del modello con le risposte di riferimento. Sostituisci la logica di punteggio con i tuoi criteri di valutazione effettivi.

from typing import List import json from dataclasses import asdict, dataclass @dataclass class RewardOutput: """Reward service output.""" id: str aggregate_reward_score: float def lambda_handler(event, context): """ Main lambda handler """ return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Core grader function """ scores: List[RewardOutput] = [] for sample in samples: # Extract components idx = sample["id"] ground_truth = sample.get("metadata", {}).get("reference_answer") if "messages" not in sample: print(f"Messages is None/empty for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) # Get model's response (last turn is assistant turn) last_message = sample["messages"][-1] assert last_message["role"] == "assistant", "Last message must be from assistant" model_text = last_message["content"] ground_truth_text = _extract_ground_truth_text(ground_truth) if model_text.lower() == ground_truth_text.lower(): score = 1.0 else: score = 0.0 ro = RewardOutput(id=idx, aggregate_reward_score=score) scores.append(ro) # Convert to dict format for JSON serialization return [asdict(score) for score in scores] def _extract_ground_truth_text(ground_truth) -> str: """ Turn the `ground_truth` field into a plain string. """ if isinstance(ground_truth, str): return ground_truth if isinstance(ground_truth, dict): # Common patterns: { "explanation": "...", "answer": "..." } if "explanation" in ground_truth and isinstance(ground_truth["explanation"], str): return ground_truth["explanation"] if "answer" in ground_truth and isinstance(ground_truth["answer"], str): return ground_truth["answer"] # Fallback: stringify the whole dict return json.dumps(ground_truth, ensure_ascii=False) # Fallback: stringify anything else return str(ground_truth)

Utilizzo di LLM come giudice per le funzioni di ricompensa

I Large Language Models (LLMs) vengono sempre più utilizzati come giudici nei flussi di lavoro RFT (Reinforcement Fine-Tuning), che forniscono segnali di ricompensa automatici che guidano l'ottimizzazione dei modelli. In questo approccio, un LLM valuta i risultati del modello in base a criteri specifici, che si tratti di valutare la correttezza, la qualità, l'aderenza allo stile o l'equivalenza semantica, e assegna premi che guidano il processo di reinforcement learning.

Ciò è particolarmente utile per le attività in cui le funzioni di ricompensa tradizionali sono difficili da definire programmaticamente, come determinare se rappresentazioni diverse (come «1/3", «0,333" e «un terzo») sono semanticamente equivalenti o valutare qualità sfumate come coerenza e pertinenza. Sfruttando i giudici basati su LLM come funzioni di ricompensa, puoi scalare RFT su domini complessi senza richiedere ampie annotazioni umane, consentendo una rapida iterazione e il miglioramento continuo dei tuoi modelli in diversi casi d'uso oltre ai tradizionali problemi di allineamento.

Prima di implementarne uno LLM-as-a-Judge in produzione, verificate che le valutazioni del modello arbitrale siano in linea con il giudizio umano. Ciò comporta la misurazione dei tassi di concordanza tra il giudice del LLM e i valutatori umani su campioni rappresentativi del compito svolto, assicurando idealmente che l'accordo del LLM con gli esseri umani soddisfi o superi i tassi di accordo interumani. Questa fase di convalida aiuta a identificare potenziali distorsioni, assicura che il segnale di ricompensa guidi il modello nella direzione prevista e aumenta la fiducia che il processo di valutazione automatizzato produrrà modelli che soddisfano i criteri di qualità della produzione.

Using LLM-as-a-Judge è una semplice estensione dell'utilizzo delle funzioni Lambda per il Reinforcement Learning with Verifiable Rewards (RLVR). All'interno della funzione Lambda, effettui una chiamata a uno dei modelli ospitati in Amazon Bedrock. Per garantire che la formazione e la valutazione funzionino correttamente con il modello di arbitrato, assicurati che la quota di throughput per il modello Amazon Bedrock utilizzato sia sufficiente.

Configura la tua funzione Lambda in modo che il timeout sia lungo, fino a un massimo di 15 minuti. L'impostazione predefinita per Lambda è di 3 secondi e la modifica del timeout nella configurazione Lambda è essenziale per tenere conto dei tempi di risposta più lunghi dei modelli Amazon Bedrock rispetto alle funzioni di ricompensa basate sulla logica. La Lambda viene inoltre richiamata in parallelo durante l'addestramento, quindi aumenta la concorrenza per massimizzare al massimo la velocità effettiva disponibile. Tieni presente che il limite di concorrenza deve essere impostato sia nella configurazione Lambda che nella ricetta del lavoro di formazione.

Esempio di ricetta di allenamento:

display_name: "Nova Lite V2 LoRA RLVR SMTJ training on GPU" version: "1.0" instance_types: ["ml.p5.48xlarge", "ml.p5en.48xlarge"] run: name: <experiment_name> model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: "nova-lite-2/prod" data_s3_path: s3://<path>/<training_data>.jsonl replicas: 4 reward_lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> ## SMTJ RFT Training specific configs training_config: max_length: 1200 # Context window (tokens) for inputs+prompt global_batch_size: 64 # Total samples per optimizer step across all replicas (16/32/64/128/256) reasoning_effort: high # Enables reasoning mode High / Low / or null for non-reasoning test_freq: 10 rollout: # How responses are generated for GRPO/advantage calc advantage_strategy: number_generation: 4 # N samples per prompt to estimate advantages (variance vs cost) generator: max_new_tokens: 1024 # Cap on tokens generated per sample set_random_seed: true # Seed generation for reproducibility across runs temperature: 1 # Softmax temperature top_k: 1 # Sample only from top-K logits rewards: preset_reward_function: null # Usage of reward functions built into Verl [exact_match, code_executions, math_answers] api_endpoint: lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> lambda_concurrency_limit: 12 # Max concurrent Lambda invocations (throughput vs. throttling) trainer: max_steps: 100 # Steps to train for. One Step = global_batch_size save_steps: 20 test_freq:10 # RL parameters ent_coeff: 0.0 # A bonus added to the policy loss that rewards higher-output entropy kl_loss_coef: 0.0 # Weight on the KL penalty between the actor (trainable policy) and a frozen reference model optim_config: # Optimizer settings lr: 1e-6 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95

Esempio Lambda:

Questa funzione Lambda implementa un sistema di punteggio delle LLM-as-a-Judge ricompense per la messa a punto dei rinforzi. Elabora batch di risposte generate dal modello estraendo le risposte da output ben formattati (cercando la \boxed{} notazione), quindi utilizza Claude Haiku come modello di valutazione per valutare la somiglianza semantica tra la risposta estratta e la risposta di riferimento alla verità fondamentale su una scala 0,0-1,0. Il giudice confronta le risposte per determinare se sono semanticamente equivalenti (anche se rappresentate in modo diverso, come «1/3" vs «0,333"), gestendo casi in cui le risposte possono essere formattate in vari modi. La funzione include la logica di riprova per la limitazione, convalida la struttura dei messaggi e restituisce un elenco di punteggi di ricompensa che possono essere utilizzati come segnali di addestramento nel processo di apprendimento per rinforzo, con punteggi pari a 0,0 assegnati quando non è possibile estrarre le risposte o la convalida fallisce.

import json import random from dataclasses import asdict, dataclass import re from typing import Dict, Optional, Any, List import boto3 from botocore.exceptions import ClientError from copy import deepcopy import time import base64 def extract_solution_nova(solution_str: str) -> Optional[str]: """ Extract solution from Nova-formatted response. Args: solution_str: The solution text from Nova model method: "strict" or "flexible" extraction method Returns: Extracted numerical answer or None """ boxed_matches = re.findall(r'\\boxed\{([^}]+)\}', solution_str) if boxed_matches: final_answer = boxed_matches[-1].replace(",", "").replace("$", "") return final_answer return 0.0 bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1') JUDGE_MODEL_ID = "global.anthropic.claude-haiku-4-5-20251001-v1:0" SYSTEM_PROMPT = "You must output ONLY a number between 0.0 and 1.0. No explanations, no text, just the number." JUDGE_PROMPT_TEMPLATE = """Compare the following two responses and rate how similar they are on a scale of 0.0 to 1.0, where: - 1.0 means the responses are semantically equivalent (same meaning, even if worded differently) - 0.5 means the responses are partially similar - 0.0 means the responses are completely different or contradictory Response A: {response_a} Response B: {response_b} Output ONLY a number between 0.0 and 1.0. No explanations.""" def lambda_graded(id: str, response_a: str, response_b: str, max_retries: int = 50) -> float: """Call Bedrock to compare responses and return similarity score.""" prompt = JUDGE_PROMPT_TEMPLATE.format(response_a=response_a, response_b=response_b) print(f"Calling judge: {JUDGE_MODEL_ID}") for attempt in range(max_retries): try: print(f"Attempt: {attempt}") response = bedrock_runtime.converse( modelId=JUDGE_MODEL_ID, messages=[{"role": "user", "content": [{"text": prompt}]}], system=[{"text": SYSTEM_PROMPT}], inferenceConfig={"temperature": 0.0, "maxTokens": 10} ) print(f"Bedrock call successful: {response}") output = response['output']['message']['content'][0]['text'].strip() score = float(output) print(f"Score parsed: {score}") return max(0.0, min(1.0, score)) except Exception as e: if "ThrottlingException" in str(e) and attempt < max_retries - 1: time.sleep(2 ** attempt) print(f"Throttling {id}") else: print(f"Bedrock call failed: {e}") return 0.0 print("Max retries reached. Unable to complete the request.") return 0.0 def compute_score(id: str, solution_str: str, ground_truth: str, method: str = "strict", format_score: float = 0.0, score: float = 1.0, data_source: str ='dataset_name', extra_info: Optional[dict] = None) -> float: """ The scoring function for PandaLM with Nova format. Args: solution_str: The solution text from Nova model ground_truth: JSON string containing the ground truth answer method: The method to extract the solution, choices are 'strict' and 'flexible' format_score: The score for format compliance score: The score for correct answer data_source: Should match the data_source in the given dataset extra_info: Optional dict with additional fields. Required in function signature. Returns: Score between 0 and 1 """ import json answer = extract_solution_nova(solution_str=solution_str, method=method) if answer is None: return 0.0 print(f"Answer: {str(answer)}, Reference: {str(ground_truth)}") # Clean both answers for comparison clean_answer = str(answer) clean_ground_truth = str(ground_truth) score = lambda_graded(id, response_a=clean_answer, response_b=clean_ground_truth) print(f"Raw score: {score}") return score @dataclass class RewardOutput: """Reward service.""" id: str aggregate_reward_score: float def lambda_handler(event, context): scores: List[RewardOutput] = [] samples = event print(len(samples)) for sample in samples: # Extract the ground truth key. In the current dataset it's answer print("Sample: ", json.dumps(sample, indent=2)) ground_truth = sample["reference_answer"] idx = "no id" # print(sample) if not "id" in sample: print(f"ID is None/empty for sample: {sample}") else: idx = sample["id"] ro = RewardOutput(id=idx, aggregate_reward_score=0.0) if not "messages" in sample: print(f"Messages is None/empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Extract answer from ground truth dict if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Get completion from last message (assistant message) last_message = sample["messages"][-1] completion_text = last_message["content"] if last_message["role"] not in ["assistant", "nova_assistant"]: print(f"Last message is not from assistant for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue if not "content" in last_message: print(f"Completion text is empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue random_score = compute_score(id=id, solution_str=completion_text, ground_truth=ground_truth) ro = RewardOutput(id=idx, aggregate_reward_score=random_score) print(f"Response for id: {idx} is {ro}") scores.append(ro) return [asdict(score) for score in scores]