Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Fonctions de récompense personnalisées dans votre AWS environnement
Les fonctions de récompense personnalisées de votre AWS environnement ne prennent en charge que le RFT à tour unique. Cela entraîne des modèles sur des tâches pour lesquelles une seule invite reçoit une réponse unique, évaluée indépendamment. Le modèle reçoit une invite et génère une réponse, qui est ensuite notée par votre fonction de récompense : il n'y a pas back-and-forth de conversation. Cela contraste avec le RFT à tours multiples où le modèle effectue plusieurs cycles d'interaction avec un environnement ou un utilisateur avant de recevoir une récompense finale.
Rubriques
Présentation de l’architecture
L'architecture se compose de deux composants principaux :
VPC d'entraînement :
-
Déploiement : charge le jeu de données et le modèle, envoie les déploiements à la fonction de récompense et reçoit des récompenses
-
Entraîneur : reçoit les déploiements depuis le composant de déploiement, effectue des passes vers l'avant et vers l'arrière et met à jour les poids des modèles
VPC du client :
-
Reward Lambda : fonction de récompense mise en œuvre par le client qui évalue les réponses du modèle et renvoie les scores de récompense
Flux de travail :
-
Le déploiement charge le jeu de données et le modèle
-
Le déploiement génère des réponses aux modèles et appelle Lambda pour obtenir des récompenses
-
Lambda affiche les scores de récompense
-
Rollout envoie les déploiements à Trainer
-
Le formateur met à jour les pondérations politiques en fonction des récompenses
Configuration de la recette
Utilisez cette recette lorsque le traitement de votre fonction de récompense est terminé dans les 15 minutes.
## Nova Lite RLVR Training (PEFT) run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "lora" # Enable LoRA for PEFT lora_tuning: alpha: 32 lora_plus_lr_ratio: 64.0 # LoRA+ learning rate scaling factor (0.0–100.0)
## Nova Lite RLVR Training run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "null" # Disable LoRA for PEFT
Paramètres de la recette
-
max_steps: nombre de mises à jour du dégradé apportées au modèle. Chaque mise à jour utiliseglobal_batch_size × refit_freqdes exemples. Chaque échantillon correspond à une génération de modèle. Nombre total d'échantillons d'entraînement =max_steps × global_batch_size. -
max_seq_length: longueur maximale du contexte (en jetons) que le modèle traite pendant l'entraînement. Doit tenir compte de la longueur de l'invite d'entrée et de la longueur de la réponse générée. Un réglage trop court entraîne des erreurs d'entraînement ; un réglage trop élevé gaspille la mémoire du GPU et ralentit l'entraînement. Préréglages disponibles : 8K (par défaut), 16K, 32K. -
global_batch_size: nombre d'échantillons par mise à jour du gradient du modèle. Des valeurs plus élevées fournissent des dégradés plus stables mais nécessitent plus de mémoire. Notez que chaque échantillon correspond à une génération par le modèle, et non à une invite. Une seule invite est utilisée pour créer desnumber_generationéchantillons. Recommandé : 64-4096 en puissances de 2. -
refit_freq: Fréquence à laquelle le poids du modèle est mis à jour. Le nombre d'échantillons dans chaque mise à jour est derefit_freq * global_batch_size. Contrôle la fréquence à laquelle les modèles de génération sont mis à jour. Des valeurs plus élevées augmentent la taille effective du lot et permettent un apprentissage plus stable. Des valeurs faibles augmentent la vitesse d'entraînement, mais augmentent la variance. L'augmentation de refit_freq augmente les données « off_policy ». Recommandé : 4 (min : 1, max : 4). -
rollout_strategy.off_policy_async: permet aux mises à jour du modèle d'être « hors politique », c'est-à-dire que les générations utilisées pour calculer la perte peuvent provenir d'une version antérieure du modèle à celle du modèle actuel. L'activation d'une approche hors politique permet d'accélérer la formation, mais peut être instable si la valeurage_toleranceest élevée. Recommandé : Vrai (Vrai, Faux). -
rollout_strategy.age_tolerance: ne fonctionne que lorsqu'iloff_policy_asyncest activé. Conservez uniquement les données de la version du modèle inférieureage_toleranceà celle du modèle actuel. Les valeurs faibles suppriment les données, tandis que les valeurs élevées contiennent davantage de données issues des versions précédentes du modèle. Recommandé : 2 (min : 1, max : 20). -
clip_ratio_high: Le découpage permet d'éviter les mises à jour importantes des politiques susceptibles de déstabiliser la formation. Des valeurs plus élevées encouragent les mises à jour qui corrigent les erreurs de modèle mais peuvent déstabiliser l'entraînement. Des valeurs plus faibles entraînent moins d'apprentissage. Recommandé : 0,3 (0,1, 10). -
ent_coeff: Abréviation de « coefficient d'entropie », ce paramètre encourage l'exploration pendant l'entraînement en ajoutant un bonus d'entropie à la fonction de perte. Les valeurs les plus élevées favorisent diverse/exploratory le comportement, tandis que les valeurs faibles visent à exploiter les connaissances actuelles. Recommandé : 0,0 (min : 0, max : 0,1).
Sélection du mode de raisonnement
Choisissez parmi trois niveaux d'effort de raisonnement en fonction de la complexité de votre tâche :
Effort de raisonnement |
Cas d’utilisation |
Coût/Latence |
Idéal pour |
|---|---|---|---|
omettre le champ (sans raisonnement) |
Requêtes factuelles simples, classifications |
Faible |
Optimisation de la vitesse et des coûts |
bas |
Complexité modérée nécessitant un certain raisonnement |
Moyenne |
Équilibre entre performance et efficacité |
haut |
Tâches analytiques complexes, problèmes en plusieurs étapes |
Élevée |
Capacité de raisonnement maximale |
Comportement par défaut : lorsqu'il reasoning_effort est spécifié sans valeur, sa valeur par défaut est. high
Directives :
-
highÀ utiliser pour des tâches analytiques complexes où la step-by-step réflexion apporte une valeur ajoutée (mathématiques, logique, débogage de code) -
lowÀ utiliser pour les tâches de complexité modérée nécessitant un certain raisonnement -
Omettez entièrement le champ pour les requêtes factuelles directes, les classifications simples et lors de l'optimisation de la vitesse et des coûts
Important
Les modes de raisonnement avancés améliorent les performances pour les tâches nécessitant une analyse logique et un raisonnement complexe, mais augmentent les coûts et la latence pendant la formation et le déploiement. Ils ne sont pas utiles pour de simples questions factuelles telles que « Quelle est la capitale de la France ? »
Implantation de la fonction de récompense
La fonction de récompense (également appelée scorer ou grader) est l'élément central qui évalue les réponses du modèle et fournit des signaux de feedback pour l'entraînement. Elle doit être implémentée en tant que fonction Lambda qui accepte les réponses du modèle et renvoie des scores de récompense.
Conditions préalables
Assurez-vous que vos fonctions Lambda et vos files d'attente SQS respectent le format de dénomination requis et que votre rôle d'exécution dispose des autorisations nécessaires.
Dénomination des ARN Lambda :
L'ARN Lambda doit respecter le format de dénomination suivant :
arn:aws:lambda:*:*:function:*SageMaker*
Dénomination SQS (requise uniquement pour les fonctions de récompense à distance dans votre propre AWS environnement) :
-
Garantir les autorisations SQS dans le rôle d'exécution créé pour le cluster HyperPod
-
L'ARN SQS doit correspondre à l'un des formats de dénomination suivants :
arn:aws:sqs:*:*:*SageMaker* arn:aws:sqs:*:*:*Sagemaker* arn:aws:sqs:*:*:*sagemaker* -
Dans le client SQS, utilisez le remplacement du point de terminaison :
--endpoint https://sqs.us-west-2.amazonaws.comcar dans VPCE, l'ancien point de terminaison de service SQS n'est pas disponible
Politique IAM pour le rôle d'exécution :
{ "Action": "lambda:InvokeFunction", "Resource": [ "arn:aws:lambda:*:*:function:*SageMaker*" ], "Effect": "Allow" }, { "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": [ "arn:aws:sqs:*:*:*SageMaker*" ], "Effect": "Allow" }
Point de terminaison VPC :
Pour que le HyperPod cluster puisse invoquer des fonctions Lambda, vous devez :
-
Créez un point de terminaison VPC pour le service Lambda dans le VPC de votre cluster HyperPod
-
Associer le point de terminaison au groupe de sécurité du cluster
-
Assurez-vous que la politique de point de terminaison du VPC autorise l'action lambda : InvokeFunction
Vérifiez que vous voyez un point de terminaison Lambda dans le VPC connecté à l'EKS.
Format d'interface
Votre fonction de récompense doit accepter et renvoyer les données dans le format suivant.
Exemple de contribution à la formation :
[{ "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]
Exemple de charge utile pour la récompense Lambda :
Le système ajoute le tour de l'assistant (réponse générée) au dernier tour du messages terrain et ajoute une valeur unique id :
[{ "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Amazon, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]
Contrat Reward Lambda :
def lambda_handler(event, context): return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Args: samples: List of dictionaries in OpenAI format Example input (List of such sample): { "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Company, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } } Returns: List of dictionaries with reward scores: { "id": str, # Same id as input sample "aggregate_reward_score": float, # Overall score for the sample "metrics_list": [ # OPTIONAL: Component scores { "name": str, # Name of the component score "value": float, # Value of the component score "type": str # "Reward" or "Metric" } ] } """
Champs de saisie :
Champ |
Description |
Remarques supplémentaires |
|---|---|---|
id |
Identifiant unique pour l'échantillon |
Retourné dans la sortie. Format de chaîne |
messages |
Historique des discussions ordonné au format OpenAI |
Tableau d'objets de message |
messages [] .role |
Intervenant du message |
Valeurs communes : « utilisateur », « assistant », « système » |
messages [] .content |
Contenu textuel du message |
Chaîne de texte brut |
metadata |
Informations sous forme libre pour faciliter le classement |
Objet ; champs facultatifs transmis à partir des données d'entraînement |
Champs de sortie :
Champ |
Description |
Remarques supplémentaires |
|---|---|---|
id |
Identifiant identique à celui de l'échantillon d'entrée |
Doit correspondre à l'entrée |
score de récompense agrégé |
Note globale pour l'échantillon |
Valeur flottante (par exemple, 0,0-1,0 ou plage définie par la tâche) |
liste_métriques |
Scores des composants qui constituent l'agrégat |
Tableau d'objets métriques |
metrics_list [] .nom |
Nom de la métrique ou de la récompense du composant |
Chaîne (par exemple, « accuracy », « policy_reward ») |
metrics_list [] .valeur |
Valeur de la métrique ou de la récompense du composant |
Float |
liste_métriques [] .type |
Catégorie de composant |
Chaîne : « Récompense » ou « Métrique » |
Contraintes techniques
-
Limite de délai d'exécution : 15 minutes maximum par appel Lambda
-
Concurrence : doit gérer les demandes
rollout_worker_replicas × 64simultanées -
Fiabilité : doit mettre en œuvre une gestion appropriée des erreurs et renvoyer des scores valides de manière cohérente
-
Performances : optimisez pour une exécution rapide (quelques secondes, et non quelques minutes) afin de permettre un entraînement efficace
Bonnes pratiques :
-
Minimiser les appels d'API externes
-
Utiliser des algorithmes et des structures de données efficaces
-
Implémenter une logique de nouvelle tentative pour les échecs transitoires
-
Mettre en cache les calculs réutilisables
-
Effectuez des tests approfondis avant l'entraînement pour garantir une exécution sans bogue
Utilisation de fonctions de récompense personnalisées
Implémentez des fonctions de récompense personnalisées lorsque vous avez des critères d'évaluation spécifiques à une tâche :
-
Définissez les critères d'évaluation : déterminez ce qui constitue une bonne réponse pour votre tâche
-
Implémenter la fonction Lambda : créer une fonction Lambda selon le format de l'interface
-
Testez localement : validez que votre fonction renvoie des scores corrects pour les échantillons d'entrées
-
Déployer vers AWS : déployez votre Lambda et notez l'ARN
-
Configurer la recette : ajoutez l'ARN Lambda au champ de votre recette
reward_lambda_arn -
Test avec un petit ensemble de données : exécutez RFT avec un minimum de données pour vérifier l'intégration
Exemple de fonction Lambda
Cet exemple valide le format d'entrée et compare le résultat du modèle aux réponses de référence. Remplacez la logique de notation par vos critères d'évaluation réels.
from typing import List import json from dataclasses import asdict, dataclass @dataclass class RewardOutput: """Reward service output.""" id: str aggregate_reward_score: float def lambda_handler(event, context): """ Main lambda handler """ return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Core grader function """ scores: List[RewardOutput] = [] for sample in samples: # Extract components idx = sample["id"] ground_truth = sample.get("metadata", {}).get("reference_answer") if "messages" not in sample: print(f"Messages is None/empty for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) # Get model's response (last turn is assistant turn) last_message = sample["messages"][-1] assert last_message["role"] == "assistant", "Last message must be from assistant" model_text = last_message["content"] ground_truth_text = _extract_ground_truth_text(ground_truth) if model_text.lower() == ground_truth_text.lower(): score = 1.0 else: score = 0.0 ro = RewardOutput(id=idx, aggregate_reward_score=score) scores.append(ro) # Convert to dict format for JSON serialization return [asdict(score) for score in scores] def _extract_ground_truth_text(ground_truth) -> str: """ Turn the `ground_truth` field into a plain string. """ if isinstance(ground_truth, str): return ground_truth if isinstance(ground_truth, dict): # Common patterns: { "explanation": "...", "answer": "..." } if "explanation" in ground_truth and isinstance(ground_truth["explanation"], str): return ground_truth["explanation"] if "answer" in ground_truth and isinstance(ground_truth["answer"], str): return ground_truth["answer"] # Fallback: stringify the whole dict return json.dumps(ground_truth, ensure_ascii=False) # Fallback: stringify anything else return str(ground_truth)
Utiliser le LLM comme juge pour les fonctions de récompense
Les grands modèles linguistiques (LLMs) sont de plus en plus utilisés comme juges dans les flux de travail de réglage fin par renforcement (RFT), fournissant des signaux de récompense automatisés qui guident l'optimisation des modèles. Dans cette approche, un LLM évalue les résultats du modèle par rapport à des critères spécifiques, qu'il s'agisse d'évaluer l'exactitude, la qualité, le respect du style ou l'équivalence sémantique, et attribue des récompenses qui stimulent le processus d'apprentissage par renforcement.
Cela est particulièrement utile pour les tâches où les fonctions de récompense traditionnelles sont difficiles à définir par programmation, par exemple pour déterminer si différentes représentations (comme « 1/3 », « 0,333 » et « un tiers ») sont sémantiquement équivalentes, ou pour évaluer des qualités nuancées telles que la cohérence et la pertinence. En utilisant des juges basés sur le LLM comme fonctions de récompense, vous pouvez adapter RFT à des domaines complexes sans nécessiter d'annotations humaines approfondies, ce qui permet une itération rapide et une amélioration continue de vos modèles dans divers cas d'utilisation, au-delà des problèmes d'alignement traditionnels.
Avant de déployer un modèle LLM-as-a-Judge en production, vérifiez que les évaluations du modèle de juge correspondent au jugement humain. Cela implique de mesurer les taux d'accord entre le juge du LLM et les évaluateurs humains sur des échantillons représentatifs de votre tâche, afin de garantir idéalement que l'accord du LLM avec les humains atteint ou dépasse les taux de concordance interhumains. Cette étape de validation permet d'identifier les biais potentiels, de garantir que le signal de récompense oriente votre modèle dans la direction prévue et de garantir que le processus d'évaluation automatisé produira des modèles répondant à vos critères de qualité de production.
Using LLM-as-a-Judge est une extension simple de l'utilisation des fonctions Lambda pour l'apprentissage par renforcement avec récompenses vérifiables (RLVR). Dans la fonction Lambda, vous appelez l'un des modèles hébergés sur Amazon Bedrock. Pour garantir que la formation et l'évaluation fonctionnent correctement avec le modèle Judge, assurez-vous que votre quota de débit pour le modèle Amazon Bedrock utilisé est suffisant.
Configurez votre fonction Lambda de manière à ce que le délai d'expiration soit long, jusqu'à un maximum de 15 minutes. Le paramètre par défaut pour Lambda est de 3 secondes, et il est essentiel de modifier le délai d'expiration dans la configuration Lambda pour tenir compte des temps de réponse plus longs des modèles Amazon Bedrock par rapport aux fonctions de récompense basées sur la logique. Le Lambda est également invoqué en parallèle pendant l'entraînement. Augmentez donc la simultanéité pour maximiser le débit disponible. Notez que la limite de simultanéité doit être définie à la fois dans la configuration Lambda et dans la recette de la tâche de formation.
Exemple de recette de formation :
display_name: "Nova Lite V2 LoRA RLVR SMTJ training on GPU" version: "1.0" instance_types: ["ml.p5.48xlarge", "ml.p5en.48xlarge"] run: name: <experiment_name> model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: "nova-lite-2/prod" data_s3_path: s3://<path>/<training_data>.jsonl replicas: 4 reward_lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> ## SMTJ RFT Training specific configs training_config: max_length: 1200 # Context window (tokens) for inputs+prompt global_batch_size: 64 # Total samples per optimizer step across all replicas (16/32/64/128/256) reasoning_effort: high # Enables reasoning mode High / Low / or null for non-reasoning test_freq: 10 rollout: # How responses are generated for GRPO/advantage calc advantage_strategy: number_generation: 4 # N samples per prompt to estimate advantages (variance vs cost) generator: max_new_tokens: 1024 # Cap on tokens generated per sample set_random_seed: true # Seed generation for reproducibility across runs temperature: 1 # Softmax temperature top_k: 1 # Sample only from top-K logits rewards: preset_reward_function: null # Usage of reward functions built into Verl [exact_match, code_executions, math_answers] api_endpoint: lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> lambda_concurrency_limit: 12 # Max concurrent Lambda invocations (throughput vs. throttling) trainer: max_steps: 100 # Steps to train for. One Step = global_batch_size save_steps: 20 test_freq:10 # RL parameters ent_coeff: 0.0 # A bonus added to the policy loss that rewards higher-output entropy kl_loss_coef: 0.0 # Weight on the KL penalty between the actor (trainable policy) and a frozen reference model optim_config: # Optimizer settings lr: 1e-6 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95
Exemple Lambda :
Cette fonction Lambda implémente un système de notation des LLM-as-a-Judge récompenses pour affiner le renforcement. Il traite des lots de réponses générées par le modèle en extrayant les réponses à partir de sorties bien formatées (en recherchant une \boxed{} notation), puis utilise Claude Haiku comme modèle de jugement pour évaluer la similitude sémantique entre la réponse extraite et la réponse de référence de base sur une échelle de 0,0-1,0. Le juge compare les réponses pour déterminer si elles sont sémantiquement équivalentes (même si elles sont représentées différemment, comme « 1/3 » ou « 0,333 »), en traitant les cas où les réponses peuvent être formatées de différentes manières. La fonction inclut une logique de réessai pour la limitation, valide la structure des messages et renvoie une liste de scores de récompense qui peuvent être utilisés comme signaux d'entraînement dans le processus d'apprentissage par renforcement, avec des scores de 0,0 attribués lorsque les réponses ne peuvent pas être extraites ou que la validation échoue.
import json import random from dataclasses import asdict, dataclass import re from typing import Dict, Optional, Any, List import boto3 from botocore.exceptions import ClientError from copy import deepcopy import time import base64 def extract_solution_nova(solution_str: str) -> Optional[str]: """ Extract solution from Nova-formatted response. Args: solution_str: The solution text from Nova model method: "strict" or "flexible" extraction method Returns: Extracted numerical answer or None """ boxed_matches = re.findall(r'\\boxed\{([^}]+)\}', solution_str) if boxed_matches: final_answer = boxed_matches[-1].replace(",", "").replace("$", "") return final_answer return 0.0 bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1') JUDGE_MODEL_ID = "global.anthropic.claude-haiku-4-5-20251001-v1:0" SYSTEM_PROMPT = "You must output ONLY a number between 0.0 and 1.0. No explanations, no text, just the number." JUDGE_PROMPT_TEMPLATE = """Compare the following two responses and rate how similar they are on a scale of 0.0 to 1.0, where: - 1.0 means the responses are semantically equivalent (same meaning, even if worded differently) - 0.5 means the responses are partially similar - 0.0 means the responses are completely different or contradictory Response A: {response_a} Response B: {response_b} Output ONLY a number between 0.0 and 1.0. No explanations.""" def lambda_graded(id: str, response_a: str, response_b: str, max_retries: int = 50) -> float: """Call Bedrock to compare responses and return similarity score.""" prompt = JUDGE_PROMPT_TEMPLATE.format(response_a=response_a, response_b=response_b) print(f"Calling judge: {JUDGE_MODEL_ID}") for attempt in range(max_retries): try: print(f"Attempt: {attempt}") response = bedrock_runtime.converse( modelId=JUDGE_MODEL_ID, messages=[{"role": "user", "content": [{"text": prompt}]}], system=[{"text": SYSTEM_PROMPT}], inferenceConfig={"temperature": 0.0, "maxTokens": 10} ) print(f"Bedrock call successful: {response}") output = response['output']['message']['content'][0]['text'].strip() score = float(output) print(f"Score parsed: {score}") return max(0.0, min(1.0, score)) except Exception as e: if "ThrottlingException" in str(e) and attempt < max_retries - 1: time.sleep(2 ** attempt) print(f"Throttling {id}") else: print(f"Bedrock call failed: {e}") return 0.0 print("Max retries reached. Unable to complete the request.") return 0.0 def compute_score(id: str, solution_str: str, ground_truth: str, method: str = "strict", format_score: float = 0.0, score: float = 1.0, data_source: str ='dataset_name', extra_info: Optional[dict] = None) -> float: """ The scoring function for PandaLM with Nova format. Args: solution_str: The solution text from Nova model ground_truth: JSON string containing the ground truth answer method: The method to extract the solution, choices are 'strict' and 'flexible' format_score: The score for format compliance score: The score for correct answer data_source: Should match the data_source in the given dataset extra_info: Optional dict with additional fields. Required in function signature. Returns: Score between 0 and 1 """ import json answer = extract_solution_nova(solution_str=solution_str, method=method) if answer is None: return 0.0 print(f"Answer: {str(answer)}, Reference: {str(ground_truth)}") # Clean both answers for comparison clean_answer = str(answer) clean_ground_truth = str(ground_truth) score = lambda_graded(id, response_a=clean_answer, response_b=clean_ground_truth) print(f"Raw score: {score}") return score @dataclass class RewardOutput: """Reward service.""" id: str aggregate_reward_score: float def lambda_handler(event, context): scores: List[RewardOutput] = [] samples = event print(len(samples)) for sample in samples: # Extract the ground truth key. In the current dataset it's answer print("Sample: ", json.dumps(sample, indent=2)) ground_truth = sample["reference_answer"] idx = "no id" # print(sample) if not "id" in sample: print(f"ID is None/empty for sample: {sample}") else: idx = sample["id"] ro = RewardOutput(id=idx, aggregate_reward_score=0.0) if not "messages" in sample: print(f"Messages is None/empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Extract answer from ground truth dict if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Get completion from last message (assistant message) last_message = sample["messages"][-1] completion_text = last_message["content"] if last_message["role"] not in ["assistant", "nova_assistant"]: print(f"Last message is not from assistant for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue if not "content" in last_message: print(f"Completion text is empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue random_score = compute_score(id=id, solution_str=completion_text, ground_truth=ground_truth) ro = RewardOutput(id=idx, aggregate_reward_score=random_score) print(f"Response for id: {idx} is {ro}") scores.append(ro) return [asdict(score) for score in scores]