Uso de una canalización de OpenSearch Ingestion con machine learning e inferencia en lotes sin conexión
Las canalizaciones de Amazon OpenSearch Ingestion (OSI) admiten el procesamiento de inferencia en lotes fuera de línea con machine learning (ML) para enriquecer de manera eficiente grandes volúmenes de datos a bajo costo. Utilice la inferencia en lotes sin conexión siempre que tenga conjuntos de datos grandes que puedan procesarse de forma asíncrona. La inferencia en lotes sin conexión funciona con los modelos de Amazon Bedrock y SageMaker. Esta característica está disponible en todas las Regiones de AWS compatibles con OpenSearch Ingestion con dominios de OpenSearch Service 2.17 y versiones posteriores.
nota
Para el procesamiento de inferencias en tiempo real, utilice Conectores de ML de Amazon OpenSearch Service para plataformas de terceros.
El procesamiento de inferencias en lotes sin conexión aprovecha una característica de OpenSearch llamada ML Commons. ML Commons proporciona algoritmos de ML mediante llamadas a la API de REST y de transporte. Esas llamadas eligen los nodos y los recursos correctos para cada solicitud de ML y supervisan las tareas de ML para garantizar el tiempo de actividad. De esta manera, ML Commons le permite aprovechar los algoritmos de ML de código abierto existentes y reducir el esfuerzo necesario para desarrollar nuevas características de ML. Para obtener más información sobre ML Commons, consulte Machine learning
Funcionamiento
Para crear una canalización de inferencias en lotes sin conexión en OpenSearch Ingestion, puede agregar un procesador de inferencias de machine learning a una canalización
OpenSearch Ingestion utiliza el procesador ml_inference con ML Commons para crear tareas de inferencia en lotes sin conexión. Luego, ML Commons utiliza la API batch_predict
Los componentes de la canalización funcionan de la siguiente manera:
Canalización 1 (Preparación y transformación de datos)*:
-
Origen: los datos se analizan desde un origen externo con la ayuda de OpenSearch Ingestion.
-
Procesadores de datos: los datos sin procesar se procesan y transforman al formato correcto para su inferencia en lotes en el servicio de IA integrado.
-
S3 (receptor): los datos procesados se almacenan en un bucket de Amazon S3 y están listos para servir como entrada para ejecutar tareas de inferencia en lotes en el servicio de IA integrado.
Canalización 2 (Activar batch_inference de ML):
-
Origen: detección automática de eventos de S3 de nuevos archivos creados por la salida de la Canalización 1.
-
Procesador ml_inference: procesador que genera inferencias de ML mediante una tarea en lotes asíncrona. Se conecta a los servicios de IA a través del conector de IA configurado que se ejecuta en el dominio de destino.
-
ID de tarea: cada tarea en lotes está asociada a un ID de tarea en ml-commons para su seguimiento y administración.
-
OpenSearch ML Commons: ML Commons, que aloja el modelo de búsqueda neuronal en tiempo real, administra los conectores a los servidores remotos de IA y proporciona las API para la inferencia en lotes y la administración de tareas.
-
Servicios de IA: OpenSearch ML Commons interactúa con servicios de IA como Amazon Bedrock y Amazon SageMaker para realizar inferencias en lotes de los datos y generar predicciones o información. Los resultados se guardan de forma asíncrona en un archivo S3 independiente.
Canalización 3 (Ingesta masiva):
-
S3 (origen): los resultados de las tareas en lotes se almacenan en S3, que es el origen de esta canalización.
-
Procesadores de transformación de datos: el procesamiento y la transformación adicionales se aplican al resultado de la inferencia en lotes antes de la ingesta. Esto garantiza que los datos estén mapeados correctamente en el índice de OpenSearch.
-
Índice de OpenSearch (receptor): los resultados procesados se indexan en OpenSearch para almacenarlos, buscarlos y analizarlos más a fondo.
nota
* El proceso descrito en Canalización 1 es opcional. Si lo prefiere, puede omitir ese proceso y solo cargar los datos preparados en el receptor de S3 para crear tareas en lotes.
Acerca del procesador ml_inference
OpenSearch Ingestion utiliza una integración especializada entre el análisis de S3 y el procesador de inferencias de ML para el procesamiento en lotes. El S3 Scan funciona solo en modo de metadatos para recopilar de manera eficiente la información de los archivos S3 sin leer el contenido real del archivo. El procesador ml_inference utiliza las URL de los archivos de S3 a fin de coordinarlas con ML Commons para el procesamiento en lotes. Este diseño optimiza el flujo de trabajo de inferencia en lotes al minimizar la transferencia de datos innecesaria durante la fase de digitalización. El procesador ml_inference se define mediante parámetros. A continuación se muestra un ejemplo:
processor: - ml_inference: # The endpoint URL of your OpenSearch domain host: "https://AWStest-offlinebatch-123456789abcdefg.us-west-2.es.amazonaws.com" # Type of inference operation: # - batch_predict: for batch processing # - predict: for real-time inference action_type: "batch_predict" # Remote ML model service provider (Amazon Bedrock or SageMaker) service_name: "bedrock" # Unique identifier for the ML model model_id: "AWSTestModelID123456789abcde" # S3 path where batch inference results will be stored output_path: "s3://amzn-s3-demo-bucket/" # Supports ISO_8601 notation strings like PT20.345S or PT15M # These settings control how long to keep your inputs in the processor for retry on throttling errors retry_time_window: "PT9M" # AWS configuration settings aws: # Región de AWS where the Lambda function is deployed region: "us-west-2" # IAM role ARN for Lambda function execution sts_role_arn: "arn:aws::iam::account_id:role/Admin" # Dead-letter queue settings for storing errors dlq: s3: region: us-west-2 bucket: batch-inference-dlq key_path_prefix: bedrock-dlq sts_role_arn: arn:aws:iam::account_id:role/OSI-invoke-ml# Conditional expression that determines when to trigger the processor # In this case, only process when bucket matches "amzn-s3-demo-bucket" ml_when: /bucket == "amzn-s3-demo-bucket"
Mejoras en el rendimiento de la ingesta mediante el procesador ml_inference
El procesador ml_inference de OpenSearch Inestion mejora de forma significativa el rendimiento de la ingesta de datos en las búsquedas habilitadas para ML. El procesador es ideal para casos de uso que requieren datos generados por modelos de machine learning, como la búsqueda semántica, la búsqueda multimodal, el enriquecimiento de documentos y la comprensión de consultas. En la búsqueda semántica, el procesador puede acelerar la creación e ingesta de vectores de gran volumen y alta dimensión en un orden de magnitud.
La capacidad de inferencia en lotes sin conexión del procesador ofrece claras ventajas en comparación con la invocación de modelos en tiempo real. Si bien el procesamiento en tiempo real requiere un servidor modelo en vivo con limitaciones de capacidad, la inferencia en lotes escala de forma dinámica los recursos de cómputo bajo demanda y procesa los datos en paralelo. Por ejemplo, cuando la canalización de OpenSearch Ingestion recibe mil millones de solicitudes de datos de origen, crea 100 archivos de S3 para la entrada de inferencias en lotes de ML. A continuación, el procesador ml_inference inicia una tarea en lotes de SageMaker con 100 instancias ml.m4.xlarge de Amazon Elastic Compute Cloud (Amazon EC2) y completa la vectorización de mil millones de solicitudes en 14 horas, una tarea que sería prácticamente imposible de realizar en tiempo real.
Configurar el procesador ml_inference de modo que ingiera las solicitudes de datos para una búsqueda semántica
En los siguientes procedimientos, se explica el proceso de instalación y configuración del procesador ml_inference de OpenSearch Ingestion para procesar mil millones de solicitudes de datos con fines de búsqueda semántica mediante un modelo de incrustación de texto.
Temas
Paso 1: Cree conectores y registre modelos en OpenSearch
Para el siguiente procedimiento, utilice batch_inference_sagemaker_connector_blueprint
Para crear conectores y registrar modelos en OpenSearch
-
Cree un modelo ML de Deep Java Library (DJL) en SageMaker para la transformación por lotes. Para ver otros modelos de DJL, consulte semantic_search_with_CFN_template_for_Sagemaker
en GitHub: POST https://api.sagemaker.us-east-1.amazonaws.com/CreateModel { "ExecutionRoleArn": "arn:aws:iam::123456789012:role/aos_ml_invoke_sagemaker", "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "PrimaryContainer": { "Environment": { "SERVING_LOAD_MODELS" : "djl://ai.djl.huggingface.pytorch/sentence-transformers/all-MiniLM-L6-v2" }, "Image": "763104351884.dkr.ecr.us-east-1.amazonaws.com/djl-inference:0.29.0-cpu-full" } } -
Cree un conector con
batch_predictcomo el nuevo tipoactionen el campoactions:POST /_plugins/_ml/connectors/_create { "name": "DJL Sagemaker Connector: all-MiniLM-L6-v2", "version": "1", "description": "The connector to sagemaker embedding model all-MiniLM-L6-v2", "protocol": "aws_sigv4", "credential": { "roleArn": "arn:aws:iam::111122223333:role/SageMakerRole" }, "parameters": { "region": "us-east-1", "service_name": "sagemaker", "DataProcessing": { "InputFilter": "$.text", "JoinSource": "Input", "OutputFilter": "$" }, "MaxConcurrentTransforms": 100, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformInput": { "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/msmarcotests/" } }, "SplitType": "Line" }, "TransformJobName": "djl-batch-transform-1-billion", "TransformOutput": { "AssembleWith": "Line", "Accept": "application/json", "S3OutputPath": "s3://offlinebatch/msmarcotestsoutputs/" }, "TransformResources": { "InstanceCount": 100, "InstanceType": "ml.m4.xlarge" }, "BatchStrategy": "SingleRecord" }, "actions": [ { "action_type": "predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://runtime.sagemaker.us-east-1.amazonaws.com/endpoints/OpenSearch-sagemaker-060124023703/invocations", "request_body": "${parameters.input}", "pre_process_function": "connector.pre_process.default.embedding", "post_process_function": "connector.post_process.default.embedding" }, { "action_type": "batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/CreateTransformJob", "request_body": """{ "BatchStrategy": "${parameters.BatchStrategy}", "ModelName": "${parameters.ModelName}", "DataProcessing" : ${parameters.DataProcessing}, "MaxConcurrentTransforms": ${parameters.MaxConcurrentTransforms}, "TransformInput": ${parameters.TransformInput}, "TransformJobName" : "${parameters.TransformJobName}", "TransformOutput" : ${parameters.TransformOutput}, "TransformResources" : ${parameters.TransformResources}}""" }, { "action_type": "batch_predict_status", "method": "GET", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/DescribeTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" }, { "action_type": "cancel_batch_predict", "method": "POST", "headers": { "content-type": "application/json" }, "url": "https://api.sagemaker.us-east-1.amazonaws.com/StopTransformJob", "request_body": """{ "TransformJobName" : "${parameters.TransformJobName}"}""" } ] } -
Utilice el ID de conector devuelto para registrar el modelo de SageMaker:
POST /_plugins/_ml/models/_register { "name": "SageMaker model for batch", "function_name": "remote", "description": "test model", "connector_id": "example123456789-abcde" } -
Invoque el modelo con el tipo de acción
batch_predict:POST /_plugins/_ml/models/teHr3JABBiEvs-eod7sn/_batch_predict { "parameters": { "TransformJobName": "SM-offline-batch-transform" } }La respuesta contiene un identificador de tarea para la tarea en lotes:
{ "task_id": "exampleIDabdcefd_1234567", "status": "CREATED" } -
Compruebe el estado de la tarea en lotes llamando a la API Get Task con el ID de tarea:
GET /_plugins/_ml/tasks/exampleIDabdcefd_1234567La respuesta contiene el estado de la tarea:
{ "model_id": "nyWbv5EB_tT1A82ZCu-e", "task_type": "BATCH_PREDICTION", "function_name": "REMOTE", "state": "RUNNING", "input_type": "REMOTE", "worker_node": [ "WDZnIMcbTrGtnR4Lq9jPDw" ], "create_time": 1725496527958, "last_update_time": 1725496527958, "is_async": false, "remote_job": { "TransformResources": { "InstanceCount": 1, "InstanceType": "ml.c5.xlarge" }, "ModelName": "DJL-Text-Embedding-Model-imageforjsonlines", "TransformOutput": { "Accept": "application/json", "AssembleWith": "Line", "KmsKeyId": "", "S3OutputPath": "s3://offlinebatch/output" }, "CreationTime": 1725496531.935, "TransformInput": { "CompressionType": "None", "ContentType": "application/json", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://offlinebatch/sagemaker_djl_batch_input.json" } }, "SplitType": "Line" }, "TransformJobArn": "arn:aws:sagemaker:us-east-1:111122223333:transform-job/SM-offline-batch-transform15", "TransformJobStatus": "InProgress", "BatchStrategy": "SingleRecord", "TransformJobName": "SM-offline-batch-transform15", "DataProcessing": { "InputFilter": "$.content", "JoinSource": "Input", "OutputFilter": "$" } } }
(Procedimiento alternativo) Paso 1: Cree conectores y modelos mediante una plantilla de integración de CloudFormation
Si lo prefiere, puede utilizar AWS CloudFormation para crear automáticamente todos los conectores y modelos de Amazon SageMaker necesarios para la inferencia de ML. Este enfoque simplifica la configuración mediante el uso de una plantilla preconfigurada disponible en la consola de Amazon OpenSearch Service. Para obtener más información, consulte Uso de CloudFormation para configurar la inferencia remota para la búsqueda semántica.
Para implementar una pila de CloudFormation que cree todos los conectores y modelos de SageMaker necesarios
-
Abra la consola de Amazon OpenSearch Service.
-
En el panel de navegación, elija Integraciones.
-
En el campo de búsqueda, introduzca y, a continuación
SageMaker, seleccione Integración con modelos de incrustación de texto a través de Amazon SageMaker. -
Seleccione Configurar dominio y, a continuación, seleccione Configurar dominio de VPC o Configurar dominio público.
-
Ingrese la información en los campos de la plantilla. En Habilitar la inferencia en lotes sin conexión, elija true para aprovisionar recursos para el procesamiento en lotes sin conexión.
-
Elija Crear para crear la pila CloudFormation.
-
Una vez creada la pila, abra la pestaña Salidas en la CloudFormation consola y busque el connector_id y el model_id. Necesitará estos valores más adelante cuando configure la canalización.
Paso 2: Crear una canalización de OpenSearch Ingestion para la inferencia en lotes de ML sin conexión
Utilice el siguiente ejemplo a fin de crear una canalización de OpenSearch Ingestion para la inferencia en lotes de ML sin conexión. Para obtener más información sobre la creación de una canalización para OpenSearch Ingestion, consulte Creación de canalizaciones de Amazon OpenSearch Ingestion.
Antes de empezar
En el siguiente ejemplo, se especifica un ARN de rol de IAM para el parámetro sts_role_arn. Utilice el siguiente procedimiento para comprobar que este rol está asignado al rol de backend que tiene acceso a ml-commons en OpenSearch.
-
Desplácese hasta el complemento de OpenSearch Dashboards para ver su dominio de OpenSearch Service. Puede encontrar el punto de conexión de paneles en el panel del dominio de la consola de OpenSearch Service.
-
En el menú principal, seleccione Seguridad, Roles y seleccione el rol ml_full_access.
-
Seleccione Usuarios asignados, Administrar mapeo.
-
En Roles de backend, escriba el ARN del rol de Lambda que necesita permiso para llamar a su dominio. Este es un ejemplo: arn:aws:iam::
111122223333:role/lambda-role -
Seleccione Asignar y confirme que el usuario o el rol aparecen en Usuarios asignados.
Ejemplo para crear una canalización de OpenSearch Ingestion destinada la inferencia en lotes de ML sin conexión
version: '2' extension: osis_configuration_metadata: builder_type: visual sagemaker-batch-job-pipeline: source: s3: acknowledgments: true delete_s3_objects_on_read: false scan: buckets: - bucket: name:namedata_selection: metadata_only filter: include_prefix: - sagemaker/sagemaker_djl_batch_input exclude_suffix: - .manifest - bucket: name:namedata_selection: data_only filter: include_prefix: - sagemaker/output/ scheduling: interval: PT6M aws: region:namedefault_bucket_owner:account_IDcodec: ndjson: include_empty_objects: false compression: none workers: '1' processor: - ml_inference: host: "https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com" aws_sigv4: true action_type: "batch_predict" service_name: "sagemaker" model_id: "model_ID" output_path: "s3://AWStest-offlinebatch/sagemaker/output" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::account_ID:role/Admin" ml_when: /bucket == "AWStest-offlinebatch" dlq: s3: region:us-west-2bucket:batch-inference-dlqkey_path_prefix:bedrock-dlqsts_role_arn: arn:aws:iam::account_ID:role/OSI-invoke-ml- copy_values: entries: - from_key: /text to_key: chapter - from_key: /SageMakerOutput to_key: chapter_embedding - delete_entries: with_keys: - text - SageMakerOutput sink: - opensearch: hosts: ["https://search-AWStest-offlinebatch-123456789abcdef.us-west-2.es.amazonaws.com"] aws: serverless: false region: us-west-2 routes: - ml-ingest-route index_type: custom index: test-nlp-index routes: - ml-ingest-route: /chapter != null and /title != null
Paso 3: preparar los datos para la ingesta
A fin de preparar los datos para el procesamiento de inferencias en lotes de ML sin conexión, prepare los datos usted mismo con sus propias herramientas o procesos o utilice OpenSearch Data Prepper
El siguiente ejemplo usa el conjunto de datos MS MARCO
{"_id": "1185869", "text": ")what was the immediate impact of the Paris Peace Treaties of 1947?", "metadata": {"world war 2"}} {"_id": "1185868", "text": "_________ justice is designed to repair the harm to victim, the community and the offender caused by the offender criminal act. question 19 options:", "metadata": {"law"}} {"_id": "597651", "text": "what is amber", "metadata": {"nothing"}} {"_id": "403613", "text": "is autoimmune hepatitis a bile acid synthesis disorder", "metadata": {"self immune"}} ...
Para realizar una prueba con el conjunto de datos MS MARCO, imagine un escenario en el que cree mil millones de solicitudes de entrada distribuidas en 100 archivos, cada uno con 10 millones de solicitudes. Los archivos se almacenarían en Amazon S3 con el prefijo s3://offlinebatch/sagemaker/sagemaker_djl_batch_input/. El proceso de OpenSearch Ingestion escanearía estos 100 archivos simultáneamente e iniciaría una tarea en lotes de SageMaker con 100 trabajadores para su procesamiento paralelo, lo que permitiría la vectorización y la ingesta eficientes de los mil millones de documentos en OpenSearch.
En los entornos de producción, puede utilizar una canalización de OpenSearch Ingestion para generar archivos de S3 para la entrada de inferencias en lotes. La canalización admite varios orígenes de datos
Paso 4: Supervisar la tarea de inferencia en lotes
Puede supervisar las tareas de inferencia en lotes mediante la consola de SageMaker o la AWS CLI. También puede utilizar la API Get Task para supervisar tareas en lotes:
GET /_plugins/_ml/tasks/_search { "query": { "bool": { "filter": [ { "term": { "state": "RUNNING" } } ] } }, "_source": ["model_id", "state", "task_type", "create_time", "last_update_time"] }
La API devuelve una lista de tareas en lotes activas:
{ "took": 2, "timed_out": false, "_shards": { "total": 5, "successful": 5, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 0.0, "hits": [ { "_index": ".plugins-ml-task", "_id": "nyWbv5EB_tT1A82ZCu-e", "_score": 0.0, "_source": { "model_id": "nyWbv5EB_tT1A82ZCu-e", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496527958, "last_update_time": 1725496527958 } }, { "_index": ".plugins-ml-task", "_id": "miKbv5EB_tT1A82ZCu-f", "_score": 0.0, "_source": { "model_id": "miKbv5EB_tT1A82ZCu-f", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496528123, "last_update_time": 1725496528123 } }, { "_index": ".plugins-ml-task", "_id": "kiLbv5EB_tT1A82ZCu-g", "_score": 0.0, "_source": { "model_id": "kiLbv5EB_tT1A82ZCu-g", "state": "RUNNING", "task_type": "BATCH_PREDICTION", "create_time": 1725496529456, "last_update_time": 1725496529456 } } ] } }
Paso 5: ejecute la búsqueda
Tras supervisar la tarea de inferencia en lotes y confirmar que se ha completado, puede ejecutar varios tipos de búsquedas de IA, incluidas las semánticas, las híbridas, las conversacionales (con RAG), las dispersas neuronales y las multimodales. Para obtener más información sobre las búsquedas de IA compatibles con OpenSearch Service, consulte Búsqueda de IA
Para buscar vectores sin procesar, utilice el tipo de consulta knn, proporcione la matriz vector como entrada y especifique el k número de resultados devueltos:
GET /my-raw-vector-index/_search { "query": { "knn": { "my_vector": { "vector": [0.1, 0.2, 0.3], "k": 2 } } } }
Para ejecutar una búsqueda basada en la IA, utilice el tipo de consulta neural. Especifique la entrada query_text, el model_id del modelo de incrustación que configuró en la canalización de OpenSearch Ingestion y el número k de resultados devueltos. Para excluir las incrustaciones de los resultados de búsqueda, especifique el nombre del campo de incrustación en el parámetro: _source.excludes
GET /my-ai-search-index/_search { "_source": { "excludes": [ "output_embedding" ] }, "query": { "neural": { "output_embedding": { "query_text": "What is AI search?", "model_id": "mBGzipQB2gmRjlv_dOoB", "k": 2 } } } }