Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Funciones de recompensa personalizadas en su entorno AWS
Las funciones de recompensa personalizadas de su AWS entorno solo admiten la RFT de un solo turno. Esto capacita a los modelos en tareas en las que una sola solicitud recibe una sola respuesta, que se evalúa de forma independiente. El modelo recibe un mensaje y genera una respuesta, que luego se puntúa según la función de recompensa (no back-and-forth hay conversación). Esto contrasta con la RFT de varios turnos, en la que el modelo interactúa en varias rondas con un entorno o un usuario antes de recibir la recompensa final.
Temas
Información general de la arquitectura
La arquitectura consta de dos componentes principales:
VPC de formación:
-
Implementación: carga el conjunto de datos y el modelo, envía las implementaciones a la función de recompensas y recibe las recompensas
-
Entrenador: recibe los despliegues del componente Rollout, realiza pases hacia adelante y hacia atrás y actualiza los pesos del modelo
VPC del cliente:
-
Reward Lambda: función de recompensa implementada por el cliente que evalúa las respuestas del modelo y devuelve las puntuaciones de recompensa
Flujo de trabajo:
-
El despliegue carga el conjunto de datos y el modelo
-
La implementación genera respuestas modelo y solicita recompensas a Lambda
-
Lambda devuelve las puntuaciones de recompensa
-
Rollout envía las implementaciones a Trainer
-
El entrenador actualiza las ponderaciones de la política en función de las recompensas
Configuración de recetas
Usa esta receta cuando la función de recompensa termine de procesarse en 15 minutos.
## Nova Lite RLVR Training (PEFT) run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "lora" # Enable LoRA for PEFT lora_tuning: alpha: 32 lora_plus_lr_ratio: 64.0 # LoRA+ learning rate scaling factor (0.0–100.0)
## Nova Lite RLVR Training run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "null" # Disable LoRA for PEFT
Parámetros de la receta
-
max_steps: Número de actualizaciones de gradiente en el modelo. Cada actualización utilizaglobal_batch_size × refit_freqmuestras. Cada muestra corresponde a una generación de modelos. Muestras de entrenamiento totales =max_steps × global_batch_size. -
max_seq_length: Longitud máxima del contexto (en fichas) que el modelo procesa durante el entrenamiento. Debe adaptarse a la longitud del mensaje de entrada más la longitud de la respuesta generada. Una configuración demasiado corta provoca errores de entrenamiento; una configuración demasiado grande desperdicia memoria de la GPU y ralentiza el entrenamiento. Presets disponibles: 8K (predeterminado), 16K, 32K. -
global_batch_size: Número de muestras por actualización de gradiente del modelo. Los valores más altos proporcionan gradientes más estables, pero requieren más memoria. Tenga en cuenta que cada muestra corresponde a una generación del modelo y no a un mensaje. Se utiliza un único indicador para crearnumber_generationmuestras. Recomendado: 64-4096 en potencias de 2. -
refit_freq: Frecuencia con la que se actualiza el peso del modelo. El número de muestras de cada actualización es derefit_freq * global_batch_size. Controla la frecuencia con la que se actualizan los modelos de generación. Los valores más altos aumentan el tamaño efectivo del lote y conducen a un aprendizaje más estable. Los valores más bajos aumentan la velocidad de entrenamiento, pero aumentan la varianza. Si se aumenta el número de refit_freq, se incrementan los datos de «off_policy». Recomendado: 4 (mínimo: 1, máximo: 4). -
rollout_strategy.off_policy_async: Permite que las actualizaciones del modelo estén «fuera de la política», es decir, las generaciones que se utilizan para calcular la pérdida pueden provenir de versiones del modelo anteriores a las del modelo actual. Permitirlo fuera de la política conduce a una formación más rápida, pero puede resultar inestable si la tasaage_tolerancees alta. Recomendado: Verdadero (verdadero, falso). -
rollout_strategy.age_tolerance: Solo funciona cuandooff_policy_asyncestá activado. Conserve únicamente los datos de la versión del modelo que sea inferior a laage_toleranceanterior del modelo actual. Los valores más bajos descartan los datos, los valores más altos contienen más datos de versiones anteriores del modelo. Recomendado: 2 (mínimo: 1, máximo: 20). -
clip_ratio_high: Los recortes ayudan a evitar grandes actualizaciones de políticas que podrían desestabilizar la formación. Los valores más altos fomentan las actualizaciones que corrigen los errores del modelo, pero pueden desestabilizar el entrenamiento. Los valores más pequeños conducen a un menor aprendizaje. Recomendado: 0,3 (0,1, 10). -
ent_coeff: Abreviatura de «coeficiente de entropía», este parámetro fomenta la exploración durante el entrenamiento al añadir una bonificación de entropía a la función de pérdida. Los valores más altos promueven un mayor diverse/exploratory comportamiento, mientras que los valores más bajos se centran en explotar los conocimientos actuales. Recomendado: 0.0 (mínimo: 0, máximo: 0.1).
Selección del modo de razonamiento
Elija entre tres niveles de esfuerzo de razonamiento en función de la complejidad de la tarea:
Esfuerzo de razonamiento |
Caso de uso |
Costo/latencia |
Recomendada para |
|---|---|---|---|
Omita el campo (sin razonamiento) |
Consultas fácticas simples, clasificaciones |
Bajo |
Optimización de velocidad y costes |
low |
Complejidad moderada que requiere cierto razonamiento |
Medio |
Rendimiento y eficiencia equilibrados |
high |
Tareas analíticas complejas, problemas de varios pasos |
Alto |
Máxima capacidad de razonamiento |
Comportamiento predeterminado: cuando reasoning_effort se especifica sin un valor, el valor predeterminado es. high
Pautas:
-
Úselo
highpara tareas analíticas complejas en las que step-by-step pensar agrega valor (matemáticas, lógica, depuración de código) -
Úselo
lowpara tareas de complejidad moderada que requieren un poco de razonamiento -
Omita el campo por completo para consultas objetivas directas, clasificaciones simples y al optimizar la velocidad y el costo
importante
Los modos de razonamiento superior mejoran el rendimiento de las tareas que requieren un análisis lógico y un razonamiento complejo, pero aumentan los costes y la latencia tanto durante el entrenamiento como durante el despliegue. No son útiles para consultas fácticas simples como «¿Cuál es la capital de Francia?»
Implementación de la función de recompensa
La función de recompensa (también llamada «puntuador» o «calificador») es el componente principal que evalúa las respuestas del modelo y proporciona señales de retroalimentación para el entrenamiento. Debe implementarse como una función Lambda que acepte las respuestas del modelo y devuelva las puntuaciones de recompensa.
Requisitos previos
Asegúrese de que las funciones de Lambda y las colas de SQS sigan el formato de nomenclatura requerido y que su función de ejecución cuente con los permisos necesarios.
Denominación del ARN de Lambda:
El ARN de Lambda debe seguir este formato de nomenclatura:
arn:aws:lambda:*:*:function:*SageMaker*
Denominación SQS (necesaria solo para las funciones de recompensa remotas en su propio AWS entorno):
-
Asegúrese de tener permisos de SQS en la función de ejecución creada para el clúster HyperPod
-
El ARN de SQS debe coincidir con uno de estos formatos de nomenclatura:
arn:aws:sqs:*:*:*SageMaker* arn:aws:sqs:*:*:*Sagemaker* arn:aws:sqs:*:*:*sagemaker* -
En el cliente SQS, utilice la anulación del punto final:
--endpoint https://sqs.us-west-2.amazonaws.comya que en VPCE, el punto final del servicio SQS antiguo no está disponible
Política de IAM para la función de ejecución:
{ "Action": "lambda:InvokeFunction", "Resource": [ "arn:aws:lambda:*:*:function:*SageMaker*" ], "Effect": "Allow" }, { "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": [ "arn:aws:sqs:*:*:*SageMaker*" ], "Effect": "Allow" }
Punto de conexión de VPC:
Para que el HyperPod clúster invoque funciones Lambda, debe:
-
Cree un punto de enlace de VPC para el servicio Lambda en la VPC de su clúster HyperPod
-
Asocie el punto final al grupo de seguridad del clúster
-
Asegúrese de que la política de puntos finales de la VPC permita la acción lambda: InvokeFunction
Compruebe que ve un punto final lambda en la VPC conectada al EKS.
Formato de interfaz
Tu función de recompensas debe aceptar y devolver datos en el siguiente formato.
Ejemplo de información para la formación:
[{ "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]
Ejemplo de carga útil para la Lambda de recompensa:
El sistema añade el turno del asistente (respuesta generada) al último turno del messages campo y añade un único: id
[{ "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Amazon, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]
Contrato de Reward Lambda:
def lambda_handler(event, context): return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Args: samples: List of dictionaries in OpenAI format Example input (List of such sample): { "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Company, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } } Returns: List of dictionaries with reward scores: { "id": str, # Same id as input sample "aggregate_reward_score": float, # Overall score for the sample "metrics_list": [ # OPTIONAL: Component scores { "name": str, # Name of the component score "value": float, # Value of the component score "type": str # "Reward" or "Metric" } ] } """
Campos de entrada:
Campo |
Description (Descripción) |
Notas adicionales |
|---|---|---|
id |
Identificador único de la muestra |
Se repitió en la salida. Formato de cadena |
Mensajes |
Historial de chat ordenado en formato OpenAI |
Matriz de objetos de mensajes |
mensajes [] .role |
Orador del mensaje |
Valores comunes: «usuario», «asistente», «sistema» |
mensajes [] .contenido |
Contenido textual del mensaje |
Cadena sin formato |
metadatos |
Información en formato libre para facilitar la calificación |
Objeto; campos opcionales transferidos de los datos de entrenamiento |
Campos de salida:
Campo |
Description (Descripción) |
Notas adicionales |
|---|---|---|
id |
El mismo identificador que la muestra de entrada |
Debe coincidir con la entrada |
agregate_reward_score |
Puntuación general de la muestra |
Flotación (p. ej., 0,0—1,0 o rango definido por la tarea) |
Lista_de_métricas |
Puntuaciones de los componentes que componen el agregado |
Matriz de objetos métricos |
metrics_list [] .name |
Nombre del componente métrica/recompensa |
Cadena (por ejemplo, «precisión», «policy_reward») |
lista_métricas [] .valor |
Valor del componente métrica/recompensa |
Flotante |
metrics_list [] .type |
Categoría del componente |
Cadena: «Recompensa» o «Métrica» |
Restricciones técnicas
-
Límite de tiempo de espera: 15 minutos de tiempo máximo de ejecución por invocación a Lambda
-
Simultaneidad: debe gestionar solicitudes simultáneas
rollout_worker_replicas × 64 -
Fiabilidad: debe implementar un manejo de errores adecuado y devolver puntajes válidos de manera consistente
-
Rendimiento: optimice para una ejecución rápida (segundos, no minutos) para permitir un entrenamiento eficiente
Prácticas recomendadas:
-
Minimice las llamadas a la API externa
-
Utilice algoritmos y estructuras de datos eficientes
-
Implemente la lógica de reintento para los errores transitorios
-
Almacene en caché los cálculos reutilizables
-
Realice pruebas exhaustivas antes del entrenamiento para garantizar una ejecución sin errores
Uso de funciones de recompensa personalizadas
Implemente funciones de recompensa personalizadas cuando tenga criterios de evaluación específicos para cada tarea:
-
Defina los criterios de evaluación: determine qué es lo que constituye una buena respuesta para su tarea
-
Implementar la función Lambda: cree una función Lambda siguiendo el formato de la interfaz
-
Pruebe localmente: valide que su función devuelva las puntuaciones correctas para las entradas de muestra
-
Implemente en AWS: Implemente su Lambda y anote el ARN
-
Configure la receta: añada el ARN de Lambda al campo de la receta
reward_lambda_arn -
Pruebe con un conjunto de datos pequeño: ejecute la RFT con un mínimo de datos para verificar la integración
Ejemplo de función de Lambda
En este ejemplo, se valida el formato de entrada y se compara la salida del modelo con las respuestas de referencia. Sustituya la lógica de puntuación por sus criterios de evaluación reales.
from typing import List import json from dataclasses import asdict, dataclass @dataclass class RewardOutput: """Reward service output.""" id: str aggregate_reward_score: float def lambda_handler(event, context): """ Main lambda handler """ return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Core grader function """ scores: List[RewardOutput] = [] for sample in samples: # Extract components idx = sample["id"] ground_truth = sample.get("metadata", {}).get("reference_answer") if "messages" not in sample: print(f"Messages is None/empty for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) # Get model's response (last turn is assistant turn) last_message = sample["messages"][-1] assert last_message["role"] == "assistant", "Last message must be from assistant" model_text = last_message["content"] ground_truth_text = _extract_ground_truth_text(ground_truth) if model_text.lower() == ground_truth_text.lower(): score = 1.0 else: score = 0.0 ro = RewardOutput(id=idx, aggregate_reward_score=score) scores.append(ro) # Convert to dict format for JSON serialization return [asdict(score) for score in scores] def _extract_ground_truth_text(ground_truth) -> str: """ Turn the `ground_truth` field into a plain string. """ if isinstance(ground_truth, str): return ground_truth if isinstance(ground_truth, dict): # Common patterns: { "explanation": "...", "answer": "..." } if "explanation" in ground_truth and isinstance(ground_truth["explanation"], str): return ground_truth["explanation"] if "answer" in ground_truth and isinstance(ground_truth["answer"], str): return ground_truth["answer"] # Fallback: stringify the whole dict return json.dumps(ground_truth, ensure_ascii=False) # Fallback: stringify anything else return str(ground_truth)
Utilizar a LLM como juez para las funciones de recompensa
Los modelos de lenguaje grande (LLMs) se utilizan cada vez más como jueces en los flujos de trabajo de ajuste fino por refuerzo (RFT), ya que proporcionan señales de recompensa automatizadas que guían la optimización de los modelos. Con este enfoque, un máster en máster evalúa los resultados del modelo en función de criterios específicos (ya sea la corrección, la calidad, la adherencia al estilo o la equivalencia semántica) y asigna recompensas que impulsan el proceso de aprendizaje por refuerzo.
Esto es particularmente valioso para tareas en las que las funciones de recompensa tradicionales son difíciles de definir mediante programación, como determinar si diferentes representaciones (como «1/3», «0,333» y «un tercio») son semánticamente equivalentes, o evaluar cualidades matizadas como la coherencia y la relevancia. Al utilizar jueces basados en LLM como funciones de recompensa, puede escalar la RFT a dominios complejos sin necesidad de una amplia anotación humana, lo que permite una iteración rápida y una mejora continua de sus modelos en diversos casos de uso, más allá de los problemas de alineación tradicionales.
Antes de ponerlo LLM-as-a-Judge en producción, compruebe que las evaluaciones del modelo de jueces se ajustan al criterio humano. Esto implica medir las tasas de acuerdo entre el juez del LLM y los evaluadores humanos en muestras representativas de su tarea, lo ideal es garantizar que el acuerdo del LLM con las personas alcance o supere las tasas de acuerdo interhumanas. Este paso de validación ayuda a identificar posibles sesgos, garantiza que la señal de recompensa oriente el modelo en la dirección deseada y genera confianza en que el proceso de evaluación automatizada producirá modelos que cumplan con sus criterios de calidad de producción.
Using LLM-as-a-Judge es una simple extensión del uso de funciones Lambda para el aprendizaje por refuerzo con recompensas verificables (RLVR). Dentro de la función Lambda, puede realizar una llamada a uno de los modelos alojados en Amazon Bedrock. Para garantizar que la formación y la evaluación funcionen bien con el modelo de jueces, asegúrese de que su cuota de rendimiento para el modelo Amazon Bedrock utilizado sea suficiente.
Configure la función Lambda para que el tiempo de espera sea prolongado, hasta un máximo de 15 minutos. La configuración predeterminada de Lambda es de 3 segundos, y cambiar el tiempo de espera en la configuración de Lambda es esencial para tener en cuenta los tiempos de respuesta más prolongados de los modelos de Amazon Bedrock en comparación con las funciones de recompensa basadas en la lógica. La Lambda también se invoca en paralelo durante el entrenamiento, así que aumente la simultaneidad para maximizar al máximo el rendimiento disponible. Tenga en cuenta que el límite de simultaneidad debe establecerse tanto en la configuración de Lambda como en la receta del trabajo de formación.
Ejemplo de receta de formación:
display_name: "Nova Lite V2 LoRA RLVR SMTJ training on GPU" version: "1.0" instance_types: ["ml.p5.48xlarge", "ml.p5en.48xlarge"] run: name: <experiment_name> model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: "nova-lite-2/prod" data_s3_path: s3://<path>/<training_data>.jsonl replicas: 4 reward_lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> ## SMTJ RFT Training specific configs training_config: max_length: 1200 # Context window (tokens) for inputs+prompt global_batch_size: 64 # Total samples per optimizer step across all replicas (16/32/64/128/256) reasoning_effort: high # Enables reasoning mode High / Low / or null for non-reasoning test_freq: 10 rollout: # How responses are generated for GRPO/advantage calc advantage_strategy: number_generation: 4 # N samples per prompt to estimate advantages (variance vs cost) generator: max_new_tokens: 1024 # Cap on tokens generated per sample set_random_seed: true # Seed generation for reproducibility across runs temperature: 1 # Softmax temperature top_k: 1 # Sample only from top-K logits rewards: preset_reward_function: null # Usage of reward functions built into Verl [exact_match, code_executions, math_answers] api_endpoint: lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> lambda_concurrency_limit: 12 # Max concurrent Lambda invocations (throughput vs. throttling) trainer: max_steps: 100 # Steps to train for. One Step = global_batch_size save_steps: 20 test_freq:10 # RL parameters ent_coeff: 0.0 # A bonus added to the policy loss that rewards higher-output entropy kl_loss_coef: 0.0 # Weight on the KL penalty between the actor (trainable policy) and a frozen reference model optim_config: # Optimizer settings lr: 1e-6 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95
Lambda de ejemplo:
Esta función Lambda implementa un sistema de puntuación de LLM-as-a-Judge recompensas para ajustar los refuerzos. Procesa lotes de respuestas generadas por modelos extrayéndolas de resultados bien formateados (buscando la \boxed{} notación) y, a continuación, utiliza Claude Haiku como modelo de juez para evaluar la similitud semántica entre la respuesta extraída y la respuesta de referencia basada en la verdad fundamental en una escala de 0,0-1,0. El juez compara las respuestas para determinar si son semánticamente equivalentes (incluso si se representan de manera diferente, como «1/3» frente a «0,333»), y se ocupa de los casos en los que las respuestas pueden tener varios formatos. La función incluye una lógica de reintento para limitar la velocidad, valida la estructura de los mensajes y devuelve una lista de puntuaciones de recompensa que se pueden utilizar como señales de entrenamiento en el proceso de aprendizaje por refuerzo. Se asignan puntuaciones de 0,0 cuando las respuestas no se pueden extraer o la validación falla.
import json import random from dataclasses import asdict, dataclass import re from typing import Dict, Optional, Any, List import boto3 from botocore.exceptions import ClientError from copy import deepcopy import time import base64 def extract_solution_nova(solution_str: str) -> Optional[str]: """ Extract solution from Nova-formatted response. Args: solution_str: The solution text from Nova model method: "strict" or "flexible" extraction method Returns: Extracted numerical answer or None """ boxed_matches = re.findall(r'\\boxed\{([^}]+)\}', solution_str) if boxed_matches: final_answer = boxed_matches[-1].replace(",", "").replace("$", "") return final_answer return 0.0 bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1') JUDGE_MODEL_ID = "global.anthropic.claude-haiku-4-5-20251001-v1:0" SYSTEM_PROMPT = "You must output ONLY a number between 0.0 and 1.0. No explanations, no text, just the number." JUDGE_PROMPT_TEMPLATE = """Compare the following two responses and rate how similar they are on a scale of 0.0 to 1.0, where: - 1.0 means the responses are semantically equivalent (same meaning, even if worded differently) - 0.5 means the responses are partially similar - 0.0 means the responses are completely different or contradictory Response A: {response_a} Response B: {response_b} Output ONLY a number between 0.0 and 1.0. No explanations.""" def lambda_graded(id: str, response_a: str, response_b: str, max_retries: int = 50) -> float: """Call Bedrock to compare responses and return similarity score.""" prompt = JUDGE_PROMPT_TEMPLATE.format(response_a=response_a, response_b=response_b) print(f"Calling judge: {JUDGE_MODEL_ID}") for attempt in range(max_retries): try: print(f"Attempt: {attempt}") response = bedrock_runtime.converse( modelId=JUDGE_MODEL_ID, messages=[{"role": "user", "content": [{"text": prompt}]}], system=[{"text": SYSTEM_PROMPT}], inferenceConfig={"temperature": 0.0, "maxTokens": 10} ) print(f"Bedrock call successful: {response}") output = response['output']['message']['content'][0]['text'].strip() score = float(output) print(f"Score parsed: {score}") return max(0.0, min(1.0, score)) except Exception as e: if "ThrottlingException" in str(e) and attempt < max_retries - 1: time.sleep(2 ** attempt) print(f"Throttling {id}") else: print(f"Bedrock call failed: {e}") return 0.0 print("Max retries reached. Unable to complete the request.") return 0.0 def compute_score(id: str, solution_str: str, ground_truth: str, method: str = "strict", format_score: float = 0.0, score: float = 1.0, data_source: str ='dataset_name', extra_info: Optional[dict] = None) -> float: """ The scoring function for PandaLM with Nova format. Args: solution_str: The solution text from Nova model ground_truth: JSON string containing the ground truth answer method: The method to extract the solution, choices are 'strict' and 'flexible' format_score: The score for format compliance score: The score for correct answer data_source: Should match the data_source in the given dataset extra_info: Optional dict with additional fields. Required in function signature. Returns: Score between 0 and 1 """ import json answer = extract_solution_nova(solution_str=solution_str, method=method) if answer is None: return 0.0 print(f"Answer: {str(answer)}, Reference: {str(ground_truth)}") # Clean both answers for comparison clean_answer = str(answer) clean_ground_truth = str(ground_truth) score = lambda_graded(id, response_a=clean_answer, response_b=clean_ground_truth) print(f"Raw score: {score}") return score @dataclass class RewardOutput: """Reward service.""" id: str aggregate_reward_score: float def lambda_handler(event, context): scores: List[RewardOutput] = [] samples = event print(len(samples)) for sample in samples: # Extract the ground truth key. In the current dataset it's answer print("Sample: ", json.dumps(sample, indent=2)) ground_truth = sample["reference_answer"] idx = "no id" # print(sample) if not "id" in sample: print(f"ID is None/empty for sample: {sample}") else: idx = sample["id"] ro = RewardOutput(id=idx, aggregate_reward_score=0.0) if not "messages" in sample: print(f"Messages is None/empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Extract answer from ground truth dict if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Get completion from last message (assistant message) last_message = sample["messages"][-1] completion_text = last_message["content"] if last_message["role"] not in ["assistant", "nova_assistant"]: print(f"Last message is not from assistant for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue if not "content" in last_message: print(f"Completion text is empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue random_score = compute_score(id=id, solution_str=completion_text, ground_truth=ground_truth) ro = RewardOutput(id=idx, aggregate_reward_score=random_score) print(f"Response for id: {idx} is {ro}") scores.append(ro) return [asdict(score) for score in scores]