Uso de una canalización de OpenSearch Ingestion con machine learning e inferencia en lotes sin conexión - Amazon OpenSearch Service

Uso de una canalización de OpenSearch Ingestion con machine learning e inferencia en lotes sin conexión

Las canalizaciones de Amazon OpenSearch Ingestion (OSI) admiten el procesamiento de inferencia en lotes fuera de línea con machine learning (ML) para enriquecer de manera eficiente grandes volúmenes de datos a bajo costo. Utilice la inferencia en lotes sin conexión siempre que tenga conjuntos de datos grandes que puedan procesarse de forma asíncrona. La inferencia en lotes sin conexión funciona con los modelos de Amazon Bedrock y SageMaker. Esta característica está disponible en todas las Regiones de AWS compatibles con OpenSearch Ingestion con dominios de OpenSearch Service 2.17 y versiones posteriores.

nota

Para el procesamiento de inferencias en tiempo real, utilice Conectores de ML de Amazon OpenSearch Service para plataformas de terceros.

El procesamiento de inferencias en lotes sin conexión aprovecha una característica de OpenSearch llamada ML Commons. ML Commons proporciona algoritmos de ML mediante llamadas a la API de REST y de transporte. Esas llamadas eligen los nodos y los recursos correctos para cada solicitud de ML y supervisan las tareas de ML para garantizar el tiempo de actividad. De esta manera, ML Commons le permite aprovechar los algoritmos de ML de código abierto existentes y reducir el esfuerzo necesario para desarrollar nuevas características de ML. Para obtener más información sobre ML Commons, consulte Machine learning en la documentación de OpenSearch.org.

Funcionamiento

Para crear una canalización de inferencias en lotes sin conexión en OpenSearch Ingestion, puede agregar un procesador de inferencias de machine learning a una canalización. Este procesador permite que su canalización se conecte a servicios de IA como SageMaker para ejecutar tareas de inferencia en lotes. Puede configurar su procesador para que se conecte al servicio de IA que desee a través de los conectores de IA (compatibles con batch_predict) que se ejecutan en su dominio de destino.

OpenSearch Ingestion utiliza el procesador ml_inference con ML Commons para crear tareas de inferencia en lotes sin conexión. Luego, ML Commons utiliza la API batch_predict, que realiza inferencias sobre conjuntos de datos de gran tamaño en un modo asíncrono sin conexión mediante un modelo desplegando en servidores de modelos externos en Amazon Bedrock, Amazon SageMaker, Cohere y OpenAI. El siguiente diagrama muestra una canalización de OpenSearch Ingestion que organiza varios componentes para llevar a cabo este proceso de principio a fin:

Arquitectura de tres canales de procesamiento de inferencias mediante IA por lotes.

Los componentes de la canalización funcionan de la siguiente manera:

Canalización 1 (Preparación y transformación de datos)*:

  • Origen: los datos se analizan desde un origen externo con la ayuda de OpenSearch Ingestion.

  • Procesadores de datos: los datos sin procesar se procesan y transforman al formato correcto para su inferencia en lotes en el servicio de IA integrado.

  • S3 (receptor): los datos procesados se almacenan en un bucket de Amazon S3 y están listos para servir como entrada para ejecutar tareas de inferencia en lotes en el servicio de IA integrado.

Canalización 2 (Activar batch_inference de ML):

  • Origen: detección automática de eventos de S3 de nuevos archivos creados por la salida de la Canalización 1.

  • Procesador ml_inference: procesador que genera inferencias de ML mediante una tarea en lotes asíncrona. Se conecta a los servicios de IA a través del conector de IA configurado que se ejecuta en el dominio de destino.

  • ID de tarea: cada tarea en lotes está asociada a un ID de tarea en ml-commons para su seguimiento y administración.

  • OpenSearch ML Commons: ML Commons, que aloja el modelo de búsqueda neuronal en tiempo real, administra los conectores a los servidores remotos de IA y proporciona las API para la inferencia en lotes y la administración de tareas.

  • Servicios de IA: OpenSearch ML Commons interactúa con servicios de IA como Amazon Bedrock y Amazon SageMaker para realizar inferencias en lotes de los datos y generar predicciones o información. Los resultados se guardan de forma asíncrona en un archivo S3 independiente.

Canalización 3 (Ingesta masiva):

  • S3 (origen): los resultados de las tareas en lotes se almacenan en S3, que es el origen de esta canalización.

  • Procesadores de transformación de datos: el procesamiento y la transformación adicionales se aplican al resultado de la inferencia en lotes antes de la ingesta. Esto garantiza que los datos estén mapeados correctamente en el índice de OpenSearch.

  • Índice de OpenSearch (receptor): los resultados procesados se indexan en OpenSearch para almacenarlos, buscarlos y analizarlos más a fondo.

nota

* El proceso descrito en Canalización 1 es opcional. Si lo prefiere, puede omitir ese proceso y solo cargar los datos preparados en el receptor de S3 para crear tareas en lotes.

Acerca del procesador ml_inference

OpenSearch Ingestion utiliza una integración especializada entre el análisis de S3 y el procesador de inferencias de ML para el procesamiento en lotes. El S3 Scan funciona solo en modo de metadatos para recopilar de manera eficiente la información de los archivos S3 sin leer el contenido real del archivo. El procesador ml_inference utiliza las URL de los archivos de S3 a fin de coordinarlas con ML Commons para el procesamiento en lotes. Este diseño optimiza el flujo de trabajo de inferencia en lotes al minimizar la transferencia de datos innecesaria durante la fase de digitalización. El procesador ml_inference se define mediante parámetros. A continuación se muestra un ejemplo:

processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWStest-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWSTestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # AWS configuration settings aws: # Región de AWS where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml # Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"

Mejoras en el rendimiento de la ingesta mediante el procesador ml_inference

El procesador ml_inference de OpenSearch Inestion mejora de forma significativa el rendimiento de la ingesta de datos en las búsquedas habilitadas para ML. El procesador es ideal para casos de uso que requieren datos generados por modelos de machine learning, como la búsqueda semántica, la búsqueda multimodal, el enriquecimiento de documentos y la comprensión de consultas. En la búsqueda semántica, el procesador puede acelerar la creación e ingesta de vectores de gran volumen y alta dimensión en un orden de magnitud.

La capacidad de inferencia en lotes sin conexión del procesador ofrece claras ventajas en comparación con la invocación de modelos en tiempo real. Si bien el procesamiento en tiempo real requiere un servidor modelo en vivo con limitaciones de capacidad, la inferencia en lotes escala de forma dinámica los recursos de cómputo bajo demanda y procesa los datos en paralelo. Por ejemplo, cuando la canalización de OpenSearch Ingestion recibe mil millones de solicitudes de datos de origen, crea 100 archivos de S3 para la entrada de inferencias en lotes de ML. A continuación, el procesador ml_inference inicia una tarea en lotes de SageMaker con 100 instancias ml.m4.xlarge de Amazon Elastic Compute Cloud (Amazon EC2) y completa la vectorización de mil millones de solicitudes en 14 horas, una tarea que sería prácticamente imposible de realizar en tiempo real.

Configurar el procesador ml_inference de modo que ingiera las solicitudes de datos para una búsqueda semántica

En los siguientes procedimientos, se explica el proceso de instalación y configuración del procesador ml_inference de OpenSearch Ingestion para procesar mil millones de solicitudes de datos con fines de búsqueda semántica mediante un modelo de incrustación de texto.

Paso 1: Cree conectores y registre modelos en OpenSearch

Para el siguiente procedimiento, utilice batch_inference_sagemaker_connector_blueprint de ML Commons para crear un conector y un modelo en Amazon SageMaker. Si prefiere utilizar las plantillas de integración CloudFormation de OpenSearch, consulte (Procedimiento alternativo) Paso 1: Cree conectores y modelos mediante una plantilla de integración de CloudFormation más adelante en esta sección.

Para crear conectores y registrar modelos en OpenSearch
  1. Cree un modelo ML de Deep Java Library (DJL) en SageMaker para la transformación por lotes. Para ver otros modelos de DJL, consulte semantic_search_with_CFN_template_for_Sagemaker en GitHub:

    POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } }
  2. Cree un conector con batch_predict como el nuevo tipo action en el campo actions:

    POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] }
  3. Utilice el ID de conector devuelto para registrar el modelo de SageMaker:

    POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" }
  4. Invoque el modelo con el tipo de acción batch_predict:

    POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }

    La respuesta contiene un identificador de tarea para la tarea en lotes:

    { "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" }
  5. Compruebe el estado de la tarea en lotes llamando a la API Get Task con el ID de tarea:

    GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567

    La respuesta contiene el estado de la tarea:

    { "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }

(Procedimiento alternativo) Paso 1: Cree conectores y modelos mediante una plantilla de integración de CloudFormation

Si lo prefiere, puede utilizar AWS CloudFormation para crear automáticamente todos los conectores y modelos de Amazon SageMaker necesarios para la inferencia de ML. Este enfoque simplifica la configuración mediante el uso de una plantilla preconfigurada disponible en la consola de Amazon OpenSearch Service. Para obtener más información, consulte Uso de CloudFormation para configurar la inferencia remota para la búsqueda semántica.

Para implementar una pila de CloudFormation que cree todos los conectores y modelos de SageMaker necesarios
  1. Abra la consola de Amazon OpenSearch Service.

  2. En el panel de navegación, elija Integraciones.

  3. En el campo de búsqueda, introduzca y, a continuaciónSageMaker, seleccione Integración con modelos de incrustación de texto a través de Amazon SageMaker.

  4. Seleccione Configurar dominio y, a continuación, seleccione Configurar dominio de VPC o Configurar dominio público.

  5. Ingrese la información en los campos de la plantilla. En Habilitar la inferencia en lotes sin conexión, elija true para aprovisionar recursos para el procesamiento en lotes sin conexión.

  6. Elija Crear para crear la pila CloudFormation.

  7. Una vez creada la pila, abra la pestaña Salidas en la CloudFormation consola y busque el connector_id y el model_id. Necesitará estos valores más adelante cuando configure la canalización.

Paso 2: Crear una canalización de OpenSearch Ingestion para la inferencia en lotes de ML sin conexión

Utilice el siguiente ejemplo a fin de crear una canalización de OpenSearch Ingestion para la inferencia en lotes de ML sin conexión. Para obtener más información sobre la creación de una canalización para OpenSearch Ingestion, consulte Creación de canalizaciones de Amazon OpenSearch Ingestion.

Antes de empezar

En el siguiente ejemplo, se especifica un ARN de rol de IAM para el parámetro sts_role_arn. Utilice el siguiente procedimiento para comprobar que este rol está asignado al rol de backend que tiene acceso a ml-commons en OpenSearch.

  1. Desplácese hasta el complemento de OpenSearch Dashboards para ver su dominio de OpenSearch Service. Puede encontrar el punto de conexión de paneles en el panel del dominio de la consola de OpenSearch Service.

  2. En el menú principal, seleccione Seguridad, Roles y seleccione el rol ml_full_access.

  3. Seleccione Usuarios asignados, Administrar mapeo.

  4. En Roles de backend, escriba el ARN del rol de Lambda que necesita permiso para llamar a su dominio. Este es un ejemplo: arn:aws:iam::111122223333:role/lambda-role

  5. Seleccione Asignar y confirme que el usuario o el rol aparecen en Usuarios asignados.

Ejemplo para crear una canalización de OpenSearch Ingestion destinada la inferencia en lotes de ML sin conexión

version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name: name data_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name: name data_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region: name default_bucket_owner: account_ID codec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml - copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null

Paso 3: preparar los datos para la ingesta

A fin de preparar los datos para el procesamiento de inferencias en lotes de ML sin conexión, prepare los datos usted mismo con sus propias herramientas o procesos o utilice OpenSearch Data Prepper. Compruebe que los datos estén organizados en el formato correcto, ya sea mediante una canalización para consumir los datos del origen de datos o con la creación de un conjunto de datos de machine learning.

El siguiente ejemplo usa el conjunto de datos MS MARCO, que incluye una colección de consultas de usuarios reales para tareas de procesamiento del lenguaje natural. El conjunto de datos está estructurado en formato JSONL, donde cada línea representa una solicitud enviada al modelo de incrustación de ML:

{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...

Para realizar una prueba con el conjunto de datos MS MARCO, imagine un escenario en el que cree mil millones de solicitudes de entrada distribuidas en 100 archivos, cada uno con 10 millones de solicitudes. Los archivos se almacenarían en Amazon S3 con el prefijo s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/. El proceso de OpenSearch Ingestion escanearía estos 100 archivos simultáneamente e iniciaría una tarea en lotes de SageMaker con 100 trabajadores para su procesamiento paralelo, lo que permitiría la vectorización y la ingesta eficientes de los mil millones de documentos en OpenSearch.

En los entornos de producción, puede utilizar una canalización de OpenSearch Ingestion para generar archivos de S3 para la entrada de inferencias en lotes. La canalización admite varios orígenes de datos y funciona según un cronograma para transformar de manera continua los datos de origen en archivos de S3. Luego, los servidores de IA procesan automáticamente estos archivos mediante tareas en lotes programadas sin conexión, lo que garantiza el procesamiento y la ingesta continuos de datos.

Paso 4: Supervisar la tarea de inferencia en lotes

Puede supervisar las tareas de inferencia en lotes mediante la consola de SageMaker o la AWS CLI. También puede utilizar la API Get Task para supervisar tareas en lotes:

GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }

La API devuelve una lista de tareas en lotes activas:

{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }

Tras supervisar la tarea de inferencia en lotes y confirmar que se ha completado, puede ejecutar varios tipos de búsquedas de IA, incluidas las semánticas, las híbridas, las conversacionales (con RAG), las dispersas neuronales y las multimodales. Para obtener más información sobre las búsquedas de IA compatibles con OpenSearch Service, consulte Búsqueda de IA.

Para buscar vectores sin procesar, utilice el tipo de consulta knn, proporcione la matriz vector como entrada y especifique el k número de resultados devueltos:

GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }

Para ejecutar una búsqueda basada en la IA, utilice el tipo de consulta neural. Especifique la entrada query_text, el model_id del modelo de incrustación que configuró en la canalización de OpenSearch Ingestion y el número k de resultados devueltos. Para excluir las incrustaciones de los resultados de búsqueda, especifique el nombre del campo de incrustación en el parámetro: _source.excludes

GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }