Fonctions de récompense personnalisées dans votre AWS environnement - Amazon SageMaker AI

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Fonctions de récompense personnalisées dans votre AWS environnement

Les fonctions de récompense personnalisées de votre AWS environnement ne prennent en charge que le RFT à tour unique. Cela entraîne des modèles sur des tâches pour lesquelles une seule invite reçoit une réponse unique, évaluée indépendamment. Le modèle reçoit une invite et génère une réponse, qui est ensuite notée par votre fonction de récompense : il n'y a pas back-and-forth de conversation. Cela contraste avec le RFT à tours multiples où le modèle effectue plusieurs cycles d'interaction avec un environnement ou un utilisateur avant de recevoir une récompense finale.

Présentation de l’architecture

L'architecture se compose de deux composants principaux :

VPC d'entraînement :

  • Déploiement : charge le jeu de données et le modèle, envoie les déploiements à la fonction de récompense et reçoit des récompenses

  • Entraîneur : reçoit les déploiements depuis le composant de déploiement, effectue des passes vers l'avant et vers l'arrière et met à jour les poids des modèles

VPC du client :

  • Reward Lambda : fonction de récompense mise en œuvre par le client qui évalue les réponses du modèle et renvoie les scores de récompense

Flux de travail :

  1. Le déploiement charge le jeu de données et le modèle

  2. Le déploiement génère des réponses aux modèles et appelle Lambda pour obtenir des récompenses

  3. Lambda affiche les scores de récompense

  4. Rollout envoie les déploiements à Trainer

  5. Le formateur met à jour les pondérations politiques en fonction des récompenses

Configuration de la recette

Utilisez cette recette lorsque le traitement de votre fonction de récompense est terminé dans les 15 minutes.

## Nova Lite RLVR Training (PEFT) run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "lora" # Enable LoRA for PEFT lora_tuning: alpha: 32 lora_plus_lr_ratio: 64.0 # LoRA+ learning rate scaling factor (0.0–100.0)
## Nova Lite RLVR Training run: name: my-rft-run model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: nova-lite-2/prod data_s3_path: s3://example-bucket/train.jsonl output_s3_path: "" replicas: 2 # Number of compute instances for training. All supported values: {2, 4, 8, 16} generation_replicas: 2 # LLM inference replicas rollout_worker_replicas: 1 # Lambda functions for RFT reward_lambda_arn: "" ## Training config - essential fields for all services training_config: max_length: 10240 global_batch_size: 256 reasoning_effort: high data: shuffle: false rollout: rollout_strategy: type: off_policy_async age_tolerance: 2 advantage_strategy: number_generation: 8 generator: max_new_tokens: 8192 set_random_seed: true temperature: 1 top_k: 0 rewards: api_endpoint: lambda_arn: ${run.reward_lambda_arn} lambda_concurrency_limit: 100 # Lambda should be able to handle (rollout_worker_replicas * 64) requests # Training configuration trainer: max_steps: 100 save_steps: 5 save_top_k: 5 # RL parameters refit_freq: 4 clip_ratio_high: 0.2 ent_coeff: 0.001 loss_scale: 1 optim_config: # Optimizer settings lr: 7e-7 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95 peft: # Parameter-efficient fine-tuning (LoRA) peft_scheme: "null" # Disable LoRA for PEFT

Paramètres de la recette

  • max_steps: nombre de mises à jour du dégradé apportées au modèle. Chaque mise à jour utilise global_batch_size × refit_freq des exemples. Chaque échantillon correspond à une génération de modèle. Nombre total d'échantillons d'entraînement =max_steps × global_batch_size.

  • max_seq_length: longueur maximale du contexte (en jetons) que le modèle traite pendant l'entraînement. Doit tenir compte de la longueur de l'invite d'entrée et de la longueur de la réponse générée. Un réglage trop court entraîne des erreurs d'entraînement ; un réglage trop élevé gaspille la mémoire du GPU et ralentit l'entraînement. Préréglages disponibles : 8K (par défaut), 16K, 32K.

  • global_batch_size: nombre d'échantillons par mise à jour du gradient du modèle. Des valeurs plus élevées fournissent des dégradés plus stables mais nécessitent plus de mémoire. Notez que chaque échantillon correspond à une génération par le modèle, et non à une invite. Une seule invite est utilisée pour créer des number_generation échantillons. Recommandé : 64-4096 en puissances de 2.

  • refit_freq: Fréquence à laquelle le poids du modèle est mis à jour. Le nombre d'échantillons dans chaque mise à jour est derefit_freq * global_batch_size. Contrôle la fréquence à laquelle les modèles de génération sont mis à jour. Des valeurs plus élevées augmentent la taille effective du lot et permettent un apprentissage plus stable. Des valeurs faibles augmentent la vitesse d'entraînement, mais augmentent la variance. L'augmentation de refit_freq augmente les données « off_policy ». Recommandé : 4 (min : 1, max : 4).

  • rollout_strategy.off_policy_async: permet aux mises à jour du modèle d'être « hors politique », c'est-à-dire que les générations utilisées pour calculer la perte peuvent provenir d'une version antérieure du modèle à celle du modèle actuel. L'activation d'une approche hors politique permet d'accélérer la formation, mais peut être instable si la valeur age_tolerance est élevée. Recommandé : Vrai (Vrai, Faux).

  • rollout_strategy.age_tolerance: ne fonctionne que lorsqu'il off_policy_async est activé. Conservez uniquement les données de la version du modèle inférieure age_tolerance à celle du modèle actuel. Les valeurs faibles suppriment les données, tandis que les valeurs élevées contiennent davantage de données issues des versions précédentes du modèle. Recommandé : 2 (min : 1, max : 20).

  • clip_ratio_high: Le découpage permet d'éviter les mises à jour importantes des politiques susceptibles de déstabiliser la formation. Des valeurs plus élevées encouragent les mises à jour qui corrigent les erreurs de modèle mais peuvent déstabiliser l'entraînement. Des valeurs plus faibles entraînent moins d'apprentissage. Recommandé : 0,3 (0,1, 10).

  • ent_coeff: Abréviation de « coefficient d'entropie », ce paramètre encourage l'exploration pendant l'entraînement en ajoutant un bonus d'entropie à la fonction de perte. Les valeurs les plus élevées favorisent diverse/exploratory le comportement, tandis que les valeurs faibles visent à exploiter les connaissances actuelles. Recommandé : 0,0 (min : 0, max : 0,1).

Sélection du mode de raisonnement

Choisissez parmi trois niveaux d'effort de raisonnement en fonction de la complexité de votre tâche :

Effort de raisonnement

Cas d’utilisation

Coût/Latence

Idéal pour

omettre le champ (sans raisonnement)

Requêtes factuelles simples, classifications

Faible

Optimisation de la vitesse et des coûts

bas

Complexité modérée nécessitant un certain raisonnement

Moyenne

Équilibre entre performance et efficacité

haut

Tâches analytiques complexes, problèmes en plusieurs étapes

Élevée

Capacité de raisonnement maximale

Comportement par défaut : lorsqu'il reasoning_effort est spécifié sans valeur, sa valeur par défaut est. high

Directives :

  • highÀ utiliser pour des tâches analytiques complexes où la step-by-step réflexion apporte une valeur ajoutée (mathématiques, logique, débogage de code)

  • lowÀ utiliser pour les tâches de complexité modérée nécessitant un certain raisonnement

  • Omettez entièrement le champ pour les requêtes factuelles directes, les classifications simples et lors de l'optimisation de la vitesse et des coûts

Important

Les modes de raisonnement avancés améliorent les performances pour les tâches nécessitant une analyse logique et un raisonnement complexe, mais augmentent les coûts et la latence pendant la formation et le déploiement. Ils ne sont pas utiles pour de simples questions factuelles telles que « Quelle est la capitale de la France ? »

Implantation de la fonction de récompense

La fonction de récompense (également appelée scorer ou grader) est l'élément central qui évalue les réponses du modèle et fournit des signaux de feedback pour l'entraînement. Elle doit être implémentée en tant que fonction Lambda qui accepte les réponses du modèle et renvoie des scores de récompense.

Conditions préalables

Assurez-vous que vos fonctions Lambda et vos files d'attente SQS respectent le format de dénomination requis et que votre rôle d'exécution dispose des autorisations nécessaires.

Dénomination des ARN Lambda :

L'ARN Lambda doit respecter le format de dénomination suivant :

arn:aws:lambda:*:*:function:*SageMaker*

Dénomination SQS (requise uniquement pour les fonctions de récompense à distance dans votre propre AWS environnement) :

  • Garantir les autorisations SQS dans le rôle d'exécution créé pour le cluster HyperPod

  • L'ARN SQS doit correspondre à l'un des formats de dénomination suivants :

    arn:aws:sqs:*:*:*SageMaker* arn:aws:sqs:*:*:*Sagemaker* arn:aws:sqs:*:*:*sagemaker*
  • Dans le client SQS, utilisez le remplacement du point de terminaison : --endpoint https://sqs.us-west-2.amazonaws.com car dans VPCE, l'ancien point de terminaison de service SQS n'est pas disponible

Politique IAM pour le rôle d'exécution :

{ "Action": "lambda:InvokeFunction", "Resource": [ "arn:aws:lambda:*:*:function:*SageMaker*" ], "Effect": "Allow" }, { "Action": [ "sqs:DeleteMessage", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": [ "arn:aws:sqs:*:*:*SageMaker*" ], "Effect": "Allow" }

Point de terminaison VPC :

Pour que le HyperPod cluster puisse invoquer des fonctions Lambda, vous devez :

  • Créez un point de terminaison VPC pour le service Lambda dans le VPC de votre cluster HyperPod

  • Associer le point de terminaison au groupe de sécurité du cluster

  • Assurez-vous que la politique de point de terminaison du VPC autorise l'action lambda : InvokeFunction

Vérifiez que vous voyez un point de terminaison Lambda dans le VPC connecté à l'EKS.

Format d'interface

Votre fonction de récompense doit accepter et renvoyer les données dans le format suivant.

Exemple de contribution à la formation :

[{ "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]

Exemple de charge utile pour la récompense Lambda :

Le système ajoute le tour de l'assistant (réponse générée) au dernier tour du messages terrain et ajoute une valeur unique id :

[{ "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Amazon, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } }]

Contrat Reward Lambda :

def lambda_handler(event, context): return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Args: samples: List of dictionaries in OpenAI format Example input (List of such sample): { "id": "123", "messages": [ { "role": "user", "content": "Do you have a dedicated security team?" }, { "role": "assistant", "content": "As an AI developed by Company, I do not have a dedicated security team..." } ], "metadata": { "reference_answer": { "compliant": "No", "explanation": "As an AI developed by Company, I do not have a traditional security team..." }, "my_key": "sample-001" } } Returns: List of dictionaries with reward scores: { "id": str, # Same id as input sample "aggregate_reward_score": float, # Overall score for the sample "metrics_list": [ # OPTIONAL: Component scores { "name": str, # Name of the component score "value": float, # Value of the component score "type": str # "Reward" or "Metric" } ] } """

Champs de saisie :

Champ

Description

Remarques supplémentaires

id

Identifiant unique pour l'échantillon

Retourné dans la sortie. Format de chaîne

messages

Historique des discussions ordonné au format OpenAI

Tableau d'objets de message

messages [] .role

Intervenant du message

Valeurs communes : « utilisateur », « assistant », « système »

messages [] .content

Contenu textuel du message

Chaîne de texte brut

metadata

Informations sous forme libre pour faciliter le classement

Objet ; champs facultatifs transmis à partir des données d'entraînement

Champs de sortie :

Champ

Description

Remarques supplémentaires

id

Identifiant identique à celui de l'échantillon d'entrée

Doit correspondre à l'entrée

score de récompense agrégé

Note globale pour l'échantillon

Valeur flottante (par exemple, 0,0-1,0 ou plage définie par la tâche)

liste_métriques

Scores des composants qui constituent l'agrégat

Tableau d'objets métriques

metrics_list [] .nom

Nom de la métrique ou de la récompense du composant

Chaîne (par exemple, « accuracy », « policy_reward »)

metrics_list [] .valeur

Valeur de la métrique ou de la récompense du composant

Float

liste_métriques [] .type

Catégorie de composant

Chaîne : « Récompense » ou « Métrique »

Contraintes techniques

  • Limite de délai d'exécution : 15 minutes maximum par appel Lambda

  • Concurrence : doit gérer les demandes rollout_worker_replicas × 64 simultanées

  • Fiabilité : doit mettre en œuvre une gestion appropriée des erreurs et renvoyer des scores valides de manière cohérente

  • Performances : optimisez pour une exécution rapide (quelques secondes, et non quelques minutes) afin de permettre un entraînement efficace

Bonnes pratiques :

  • Minimiser les appels d'API externes

  • Utiliser des algorithmes et des structures de données efficaces

  • Implémenter une logique de nouvelle tentative pour les échecs transitoires

  • Mettre en cache les calculs réutilisables

  • Effectuez des tests approfondis avant l'entraînement pour garantir une exécution sans bogue

Utilisation de fonctions de récompense personnalisées

Implémentez des fonctions de récompense personnalisées lorsque vous avez des critères d'évaluation spécifiques à une tâche :

  1. Définissez les critères d'évaluation : déterminez ce qui constitue une bonne réponse pour votre tâche

  2. Implémenter la fonction Lambda : créer une fonction Lambda selon le format de l'interface

  3. Testez localement : validez que votre fonction renvoie des scores corrects pour les échantillons d'entrées

  4. Déployer vers AWS : déployez votre Lambda et notez l'ARN

  5. Configurer la recette : ajoutez l'ARN Lambda au champ de votre recette reward_lambda_arn

  6. Test avec un petit ensemble de données : exécutez RFT avec un minimum de données pour vérifier l'intégration

Exemple de fonction Lambda

Cet exemple valide le format d'entrée et compare le résultat du modèle aux réponses de référence. Remplacez la logique de notation par vos critères d'évaluation réels.

from typing import List import json from dataclasses import asdict, dataclass @dataclass class RewardOutput: """Reward service output.""" id: str aggregate_reward_score: float def lambda_handler(event, context): """ Main lambda handler """ return lambda_grader(event) def lambda_grader(samples: list[dict]) -> list[dict]: """ Core grader function """ scores: List[RewardOutput] = [] for sample in samples: # Extract components idx = sample["id"] ground_truth = sample.get("metadata", {}).get("reference_answer") if "messages" not in sample: print(f"Messages is None/empty for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") ro = RewardOutput(id=idx, aggregate_reward_score=0.0) scores.append(ro) # Get model's response (last turn is assistant turn) last_message = sample["messages"][-1] assert last_message["role"] == "assistant", "Last message must be from assistant" model_text = last_message["content"] ground_truth_text = _extract_ground_truth_text(ground_truth) if model_text.lower() == ground_truth_text.lower(): score = 1.0 else: score = 0.0 ro = RewardOutput(id=idx, aggregate_reward_score=score) scores.append(ro) # Convert to dict format for JSON serialization return [asdict(score) for score in scores] def _extract_ground_truth_text(ground_truth) -> str: """ Turn the `ground_truth` field into a plain string. """ if isinstance(ground_truth, str): return ground_truth if isinstance(ground_truth, dict): # Common patterns: { "explanation": "...", "answer": "..." } if "explanation" in ground_truth and isinstance(ground_truth["explanation"], str): return ground_truth["explanation"] if "answer" in ground_truth and isinstance(ground_truth["answer"], str): return ground_truth["answer"] # Fallback: stringify the whole dict return json.dumps(ground_truth, ensure_ascii=False) # Fallback: stringify anything else return str(ground_truth)

Utiliser le LLM comme juge pour les fonctions de récompense

Les grands modèles linguistiques (LLMs) sont de plus en plus utilisés comme juges dans les flux de travail de réglage fin par renforcement (RFT), fournissant des signaux de récompense automatisés qui guident l'optimisation des modèles. Dans cette approche, un LLM évalue les résultats du modèle par rapport à des critères spécifiques, qu'il s'agisse d'évaluer l'exactitude, la qualité, le respect du style ou l'équivalence sémantique, et attribue des récompenses qui stimulent le processus d'apprentissage par renforcement.

Cela est particulièrement utile pour les tâches où les fonctions de récompense traditionnelles sont difficiles à définir par programmation, par exemple pour déterminer si différentes représentations (comme « 1/3 », « 0,333 » et « un tiers ») sont sémantiquement équivalentes, ou pour évaluer des qualités nuancées telles que la cohérence et la pertinence. En utilisant des juges basés sur le LLM comme fonctions de récompense, vous pouvez adapter RFT à des domaines complexes sans nécessiter d'annotations humaines approfondies, ce qui permet une itération rapide et une amélioration continue de vos modèles dans divers cas d'utilisation, au-delà des problèmes d'alignement traditionnels.

Avant de déployer un modèle LLM-as-a-Judge en production, vérifiez que les évaluations du modèle de juge correspondent au jugement humain. Cela implique de mesurer les taux d'accord entre le juge du LLM et les évaluateurs humains sur des échantillons représentatifs de votre tâche, afin de garantir idéalement que l'accord du LLM avec les humains atteint ou dépasse les taux de concordance interhumains. Cette étape de validation permet d'identifier les biais potentiels, de garantir que le signal de récompense oriente votre modèle dans la direction prévue et de garantir que le processus d'évaluation automatisé produira des modèles répondant à vos critères de qualité de production.

Using LLM-as-a-Judge est une extension simple de l'utilisation des fonctions Lambda pour l'apprentissage par renforcement avec récompenses vérifiables (RLVR). Dans la fonction Lambda, vous appelez l'un des modèles hébergés sur Amazon Bedrock. Pour garantir que la formation et l'évaluation fonctionnent correctement avec le modèle Judge, assurez-vous que votre quota de débit pour le modèle Amazon Bedrock utilisé est suffisant.

Configurez votre fonction Lambda de manière à ce que le délai d'expiration soit long, jusqu'à un maximum de 15 minutes. Le paramètre par défaut pour Lambda est de 3 secondes, et il est essentiel de modifier le délai d'expiration dans la configuration Lambda pour tenir compte des temps de réponse plus longs des modèles Amazon Bedrock par rapport aux fonctions de récompense basées sur la logique. Le Lambda est également invoqué en parallèle pendant l'entraînement. Augmentez donc la simultanéité pour maximiser le débit disponible. Notez que la limite de simultanéité doit être définie à la fois dans la configuration Lambda et dans la recette de la tâche de formation.

Exemple de recette de formation :

display_name: "Nova Lite V2 LoRA RLVR SMTJ training on GPU" version: "1.0" instance_types: ["ml.p5.48xlarge", "ml.p5en.48xlarge"] run: name: <experiment_name> model_type: amazon.nova-2-lite-v1:0:256k model_name_or_path: "nova-lite-2/prod" data_s3_path: s3://<path>/<training_data>.jsonl replicas: 4 reward_lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> ## SMTJ RFT Training specific configs training_config: max_length: 1200 # Context window (tokens) for inputs+prompt global_batch_size: 64 # Total samples per optimizer step across all replicas (16/32/64/128/256) reasoning_effort: high # Enables reasoning mode High / Low / or null for non-reasoning test_freq: 10 rollout: # How responses are generated for GRPO/advantage calc advantage_strategy: number_generation: 4 # N samples per prompt to estimate advantages (variance vs cost) generator: max_new_tokens: 1024 # Cap on tokens generated per sample set_random_seed: true # Seed generation for reproducibility across runs temperature: 1 # Softmax temperature top_k: 1 # Sample only from top-K logits rewards: preset_reward_function: null # Usage of reward functions built into Verl [exact_match, code_executions, math_answers] api_endpoint: lambda_arn: arn:aws:lambda:<region>:<account>:function:<lambda-name> lambda_concurrency_limit: 12 # Max concurrent Lambda invocations (throughput vs. throttling) trainer: max_steps: 100 # Steps to train for. One Step = global_batch_size save_steps: 20 test_freq:10 # RL parameters ent_coeff: 0.0 # A bonus added to the policy loss that rewards higher-output entropy kl_loss_coef: 0.0 # Weight on the KL penalty between the actor (trainable policy) and a frozen reference model optim_config: # Optimizer settings lr: 1e-6 # Learning rate weight_decay: 0.0 # L2 regularization strength (0.0–1.0) adam_beta1: 0.9 adam_beta2: 0.95

Exemple Lambda :

Cette fonction Lambda implémente un système de notation des LLM-as-a-Judge récompenses pour affiner le renforcement. Il traite des lots de réponses générées par le modèle en extrayant les réponses à partir de sorties bien formatées (en recherchant une \boxed{} notation), puis utilise Claude Haiku comme modèle de jugement pour évaluer la similitude sémantique entre la réponse extraite et la réponse de référence de base sur une échelle de 0,0-1,0. Le juge compare les réponses pour déterminer si elles sont sémantiquement équivalentes (même si elles sont représentées différemment, comme « 1/3 » ou « 0,333 »), en traitant les cas où les réponses peuvent être formatées de différentes manières. La fonction inclut une logique de réessai pour la limitation, valide la structure des messages et renvoie une liste de scores de récompense qui peuvent être utilisés comme signaux d'entraînement dans le processus d'apprentissage par renforcement, avec des scores de 0,0 attribués lorsque les réponses ne peuvent pas être extraites ou que la validation échoue.

import json import random from dataclasses import asdict, dataclass import re from typing import Dict, Optional, Any, List import boto3 from botocore.exceptions import ClientError from copy import deepcopy import time import base64 def extract_solution_nova(solution_str: str) -> Optional[str]: """ Extract solution from Nova-formatted response. Args: solution_str: The solution text from Nova model method: "strict" or "flexible" extraction method Returns: Extracted numerical answer or None """ boxed_matches = re.findall(r'\\boxed\{([^}]+)\}', solution_str) if boxed_matches: final_answer = boxed_matches[-1].replace(",", "").replace("$", "") return final_answer return 0.0 bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1') JUDGE_MODEL_ID = "global.anthropic.claude-haiku-4-5-20251001-v1:0" SYSTEM_PROMPT = "You must output ONLY a number between 0.0 and 1.0. No explanations, no text, just the number." JUDGE_PROMPT_TEMPLATE = """Compare the following two responses and rate how similar they are on a scale of 0.0 to 1.0, where: - 1.0 means the responses are semantically equivalent (same meaning, even if worded differently) - 0.5 means the responses are partially similar - 0.0 means the responses are completely different or contradictory Response A: {response_a} Response B: {response_b} Output ONLY a number between 0.0 and 1.0. No explanations.""" def lambda_graded(id: str, response_a: str, response_b: str, max_retries: int = 50) -> float: """Call Bedrock to compare responses and return similarity score.""" prompt = JUDGE_PROMPT_TEMPLATE.format(response_a=response_a, response_b=response_b) print(f"Calling judge: {JUDGE_MODEL_ID}") for attempt in range(max_retries): try: print(f"Attempt: {attempt}") response = bedrock_runtime.converse( modelId=JUDGE_MODEL_ID, messages=[{"role": "user", "content": [{"text": prompt}]}], system=[{"text": SYSTEM_PROMPT}], inferenceConfig={"temperature": 0.0, "maxTokens": 10} ) print(f"Bedrock call successful: {response}") output = response['output']['message']['content'][0]['text'].strip() score = float(output) print(f"Score parsed: {score}") return max(0.0, min(1.0, score)) except Exception as e: if "ThrottlingException" in str(e) and attempt < max_retries - 1: time.sleep(2 ** attempt) print(f"Throttling {id}") else: print(f"Bedrock call failed: {e}") return 0.0 print("Max retries reached. Unable to complete the request.") return 0.0 def compute_score(id: str, solution_str: str, ground_truth: str, method: str = "strict", format_score: float = 0.0, score: float = 1.0, data_source: str ='dataset_name', extra_info: Optional[dict] = None) -> float: """ The scoring function for PandaLM with Nova format. Args: solution_str: The solution text from Nova model ground_truth: JSON string containing the ground truth answer method: The method to extract the solution, choices are 'strict' and 'flexible' format_score: The score for format compliance score: The score for correct answer data_source: Should match the data_source in the given dataset extra_info: Optional dict with additional fields. Required in function signature. Returns: Score between 0 and 1 """ import json answer = extract_solution_nova(solution_str=solution_str, method=method) if answer is None: return 0.0 print(f"Answer: {str(answer)}, Reference: {str(ground_truth)}") # Clean both answers for comparison clean_answer = str(answer) clean_ground_truth = str(ground_truth) score = lambda_graded(id, response_a=clean_answer, response_b=clean_ground_truth) print(f"Raw score: {score}") return score @dataclass class RewardOutput: """Reward service.""" id: str aggregate_reward_score: float def lambda_handler(event, context): scores: List[RewardOutput] = [] samples = event print(len(samples)) for sample in samples: # Extract the ground truth key. In the current dataset it's answer print("Sample: ", json.dumps(sample, indent=2)) ground_truth = sample["reference_answer"] idx = "no id" # print(sample) if not "id" in sample: print(f"ID is None/empty for sample: {sample}") else: idx = sample["id"] ro = RewardOutput(id=idx, aggregate_reward_score=0.0) if not "messages" in sample: print(f"Messages is None/empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Extract answer from ground truth dict if ground_truth is None: print(f"No answer found in ground truth for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue # Get completion from last message (assistant message) last_message = sample["messages"][-1] completion_text = last_message["content"] if last_message["role"] not in ["assistant", "nova_assistant"]: print(f"Last message is not from assistant for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue if not "content" in last_message: print(f"Completion text is empty for id: {idx}") scores.append(RewardOutput(id="0", aggregate_reward_score=0.0)) continue random_score = compute_score(id=id, solution_str=completion_text, ground_truth=ground_truth) ro = RewardOutput(id=idx, aggregate_reward_score=random_score) print(f"Response for id: {idx} is {ro}") scores.append(ro) return [asdict(score) for score in scores]