Código de inferencia personalizado con los servicios de alojamiento - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Código de inferencia personalizado con los servicios de alojamiento

En esta sección se explica cómo Amazon SageMaker AI interactúa con un contenedor de Docker que ejecuta su propio código de inferencia para los servicios de alojamiento. Utilice esta información para escribir el código de inferencia y crear una imagen de Docker.

Cómo ejecuta la SageMaker IA su imagen de inferencia

Para configurar un contenedor para que se inicie como un archivo ejecutable, utilice una instrucción ENTRYPOINT en un Dockerfile. Tenga en cuenta lo siguiente:

  • Para la inferencia de modelos, la SageMaker IA ejecuta el contenedor de la siguiente manera:

    docker run image serve

    SageMaker La IA anula CMD las sentencias predeterminadas de un contenedor especificando el serve argumento después del nombre de la imagen. El argumento serve anula los argumentos que proporciona con el comando CMD en el Dockerfile.

     

  • SageMaker La IA espera que todos los contenedores se ejecuten con usuarios root. Cree su contenedor de modo que solo utilice usuarios raíz. Cuando la SageMaker IA ejecuta tu contenedor, los usuarios que no tienen acceso desde la raíz pueden provocar problemas de permisos.

     

  • Le recomendamos que utilice la forma exec de la instrucción ENTRYPOINT:

    ENTRYPOINT ["executable", "param1", "param2"]

    Por ejemplo:

    ENTRYPOINT ["python", "k_means_inference.py"]

    La forma exec de la instrucción ENTRYPOINT inicia el archivo ejecutable directamente, no como un elemento secundario de /bin/sh. Esto le permite recibir señales como SIGTERM y SIGKILL desde las operaciones de la SageMaker API, lo cual es un requisito.

     

    Por ejemplo, cuando utilizas la CreateEndpointAPI para crear un punto final, la SageMaker IA proporciona el número de instancias de procesamiento de aprendizaje automático que requiere la configuración del punto final, que especificas en la solicitud. SageMaker La IA ejecuta el contenedor de Docker en esas instancias.

     

    Si reduces el número de instancias que respaldan el punto final (mediante una llamada a la UpdateEndpointWeightsAndCapacitiesAPI), la SageMaker IA ejecuta un comando para detener el contenedor de Docker en las instancias que se van a terminar. El comando envía la señal SIGTERM y, a continuación, envía la señal SIGKILL 30 segundos más tarde.

     

    Si actualizas el punto final (mediante una llamada a la UpdateEndpointAPI), la SageMaker IA lanza otro conjunto de instancias informáticas de aprendizaje automático y ejecuta los contenedores de Docker que contienen tu código de inferencia. A continuación se ejecuta un comando para detener los contenedores de Docker anteriores. Para detener un contenedor de Docker, el comando envía la señal SIGTERM y, a continuación, envía la señal SIGKILL 30 segundos más tarde.

     

  • SageMaker AI usa la definición de contenedor que proporcionaste en tu CreateModelsolicitud para establecer las variables de entorno y el nombre de host DNS del contenedor de la siguiente manera:

     

    • Establece las variables de entorno mediante el ContainerDefinition.Environment string-to-string mapa.

    • Establece el nombre de host de DNS mediante ContainerDefinition.ContainerHostname.

       

  • Si tiene previsto utilizar los dispositivos GPU para inferencias de modelos (especificando instancias de computación de ML basadas en GPU en su solicitud CreateEndpointConfig), asegúrese de que los contenedores son compatibles con nvidia-docker. No cree un paquete con controladores de NVIDIA con la imagen. Para obtener más información sobre nvidia-docker, consulte NVIDIA/nvidia-docker.

     

  • No puedes usar el tini inicializador como punto de entrada en los contenedores de SageMaker IA porque los argumentos train y serve lo confunden.

Cómo carga la SageMaker IA los artefactos de tus modelos

En su solicitud de CreateModelAPI, puede utilizar el S3DataSource parámetro ModelDataUrl o para identificar la ubicación S3 en la que se almacenan los artefactos del modelo. SageMaker La IA copia los artefactos del modelo de la ubicación de S3 al /opt/ml/model directorio para utilizarlos en el código de inferencia. Su contenedor tiene acceso de solo lectura a /opt/ml/model. No escriba en este directorio.

La ModelDataUrl debe apuntar a un archivo tar.gz. De lo contrario, SageMaker AI no descargará el archivo.

Si ha entrenado su modelo en SageMaker IA, los artefactos del modelo se guardan como un único archivo tar comprimido en Amazon S3. Si ha entrenado su modelo fuera de la SageMaker IA, debe crear este único archivo tar comprimido y guardarlo en una ubicación S3. SageMaker AI descomprime este archivo tar en el opt/ml/model directorio/antes de que se inicie el contenedor.

Para implementar modelos de gran tamaño, recomendamos que siga Implementación de modelos sin comprimir.

Cómo debe responder su contenedor a las solicitudes de inferencia

Para obtener inferencias, la aplicación cliente envía una solicitud POST al punto final de la SageMaker IA. SageMaker La IA pasa la solicitud al contenedor y devuelve el resultado de la inferencia del contenedor al cliente.

Para obtener más información sobre las solicitudes de inferencia que recibirá su contenedor, consulte las siguientes acciones en la referencia de la API de Amazon SageMaker AI:

Requisitos para los contenedores de inferencia

Para responder a las solicitudes de inferencia, su contenedor debe cumplir los siguientes requisitos:

  • SageMaker La IA elimina todos los POST encabezados excepto los compatibles con. InvokeEndpoint SageMaker La IA podría añadir encabezados adicionales. Los contenedores de inferencia deben poder ignorar de forma segura estos encabezados adicionales.

  • Para recibir solicitudes de inferencia, el contenedor debe tener un servidor web que realice la escucha en el puerto 8080 y debe aceptar las solicitudes POST en los punto de conexión /invocations y /ping.

  • Los contenedores de modelo de un cliente deben aceptar las solicitudes de conexión del socket en un plazo de 250 ms.

  • Los contenedores de modelos de un cliente deben responder a las solicitudes en un plazo de 60 segundos. El propio modelo puede tardar un tiempo máximo de procesamiento de 60 segundos antes de responder a las /invocations. Si el modelo tiene un tiempo de procesamiento de entre 50 y 60 segundos, deberá establecer el tiempo de espera del conector del SDK en 70 segundos.

  • El contenedor modelo de un cliente que admita la transmisión bidireccional debe:

    • admite WebSockets conexiones en el puerto 8080 a/de forma invocations-bidirectional-stream predeterminada.

    • tienen un servidor web que escucha en el puerto 8080 y deben aceptar las solicitudes POST a los puntos finales /ping.

    • Además de las comprobaciones de estado del contenedor a través de HTTP, el contenedor debe responder con Pong Frame per (RFC6455), si se envía WebSocket Ping Frame.

ejemplo funciones de invocación

En los siguientes ejemplos se muestra cómo el código del contenedor puede procesar las solicitudes de inferencia. Estos ejemplos gestionan las solicitudes que las aplicaciones cliente envían mediante la InvokeEndpoint acción.

FastAPI

FastAPI es un marco web para construir APIs con Python.

from fastapi import FastAPI, status, Request, Response . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # model() is a hypothetical function that gets the inference output: model_resp = await model(Request) response = Response( content=model_resp, status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

En este ejemplo, la invocations función gestiona la solicitud de inferencia que la SageMaker IA envía al /invocations punto final.

Flask

Flask es un marcopara desarrollar aplicaciones web con Python.

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invoke(request): # model() is a hypothetical function that gets the inference output: resp_body = model(request) return flask.Response(resp_body, mimetype='text/plain')

En este ejemplo, la invoke función gestiona la solicitud de inferencia que la SageMaker IA envía al /invocations punto final.

ejemplo funciones de invocación para solicitudes de streaming

En los siguientes ejemplos se muestra cómo el código del contenedor de inferencia puede procesar las solicitudes de streaming. Estos ejemplos gestionan las solicitudes que las aplicaciones cliente envían mediante la InvokeEndpointWithResponseStream acción.

Cuando un contenedor gestiona una solicitud de inferencia de streaming, devuelve la inferencia del modelo como una serie de partes de forma incremental a medida que el modelo las genera. Las aplicaciones cliente comienzan a recibir respuestas inmediatamente cuando están disponibles. No necesitan esperar a que el modelo genere la respuesta completa. Puede implementar la transmisión en streaming para que admita experiencias interactivas rápidas, como chatbots, asistentes virtuales y generadores de música.

FastAPI

FastAPI es un marco web para construir APIs con Python.

from starlette.responses import StreamingResponse from fastapi import FastAPI, status, Request . . . app = FastAPI() . . . @app.post('/invocations') async def invocations(request: Request): # Streams inference response using HTTP chunked encoding async def generate(): # model() is a hypothetical function that gets the inference output: yield await model(Request) yield "\n" response = StreamingResponse( content=generate(), status_code=status.HTTP_200_OK, media_type="text/plain", ) return response . . .

En este ejemplo, la invocations función gestiona la solicitud de inferencia que la SageMaker IA envía al /invocations punto final. Para transmitir la respuesta, el ejemplo utiliza la clase StreamingResponse del marco Starlette.

Flask

Flask es un marcopara desarrollar aplicaciones web con Python.

import flask . . . app = flask.Flask(__name__) . . . @app.route('/invocations', methods=["POST"]) def invocations(request): # Streams inference response using HTTP chunked encoding def generate(): # model() is a hypothetical function that gets the inference output: yield model(request) yield "\n" return flask.Response( flask.stream_with_context(generate()), mimetype='text/plain') . . .

En este ejemplo, la invocations función gestiona la solicitud de inferencia que la SageMaker IA envía al /invocations punto final. Para transmitir la respuesta, el ejemplo utiliza la función flask.stream_with_context del marco Flask.

ejemplo Ejemplos de funciones de invocación para la transmisión bidireccional

Los siguientes ejemplos muestran cómo el código de su contenedor puede procesar las solicitudes y respuestas de inferencia de streaming. Estos ejemplos gestionan las solicitudes de streaming que las aplicaciones cliente envían mediante la InvokeEndpointWithBidirectionalStream acción.

Un contenedor con capacidad de transmisión bidireccional gestiona las solicitudes de inferencia de transmisión en las que las piezas se generan de forma incremental en el cliente y se transmiten al contenedor. Devuelve la inferencia del modelo al cliente como una serie de partes a medida que el modelo las genera. Las aplicaciones cliente comienzan a recibir respuestas inmediatamente cuando están disponibles. No necesitan esperar a que la solicitud se genere por completo en el cliente ni a que el modelo genere la respuesta completa. Puedes implementar la transmisión bidireccional para ofrecer experiencias interactivas rápidas, como chatbots, asistentes de voz interactivos con inteligencia artificial y traducciones en tiempo real, para disfrutar de una experiencia más en tiempo real.

FastAPI

FastAPI es un marco web para construir APIs con Python.

import sys import asyncio import json from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import JSONResponse import uvicorn app = FastAPI() ... @app.websocket("/invocations-bidirectional-stream") async def websocket_invoke(websocket: WebSocket): """ WebSocket endpoint with RFC 6455 ping/pong and fragmentation support Handles: - Text messages (JSON) - including fragmented frames - Binary messages - including fragmented frames - Ping frames (automatically responds with pong) - Pong frames (logs receipt) - Fragmented frames per RFC 6455 Section 5.4 """ await manager.connect(websocket) # Fragment reassembly buffers per RFC 6455 Section 5.4 text_fragments = [] binary_fragments = [] while True: # Use receive() to handle all WebSocket frame types message = await websocket.receive() print(f"Received message: {message}") if message["type"] == "websocket.receive": if "text" in message: # Handle text frames (including fragments) text_data = message["text"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it text_fragments.append(text_data) print(f"Received text fragment: {len(text_data)} chars (more coming)") else: # This is the final frame or a complete message if text_fragments: # Reassemble fragmented message text_fragments.append(text_data) complete_text = "".join(text_fragments) text_fragments.clear() print(f"Reassembled fragmented text message: {len(complete_text)} chars total") await handle_text_message(websocket, complete_text) else: # Complete message in single frame await handle_text_message(websocket, text_data) elif "bytes" in message: # Handle binary frames (including fragments) binary_data = message["bytes"] more_body = message.get("more_body", False) if more_body: # This is a fragment, accumulate it binary_fragments.append(binary_data) print(f"Received binary fragment: {len(binary_data)} bytes (more coming)") else: # This is the final frame or a complete message if binary_fragments: # Reassemble fragmented message binary_fragments.append(binary_data) complete_binary = b"".join(binary_fragments) binary_fragments.clear() print(f"Reassembled fragmented binary message: {len(complete_binary)} bytes total") await handle_binary_message(websocket, complete_binary) else: # Complete message in single frame await handle_binary_message(websocket, binary_data) elif message["type"] == "websocket.ping": # Handle ping frames - RFC 6455 Section 5.5.2 ping_data = message.get("bytes", b"") print(f"Received PING frame with payload: {ping_data}") # FastAPI automatically sends pong response elif message["type"] == "websocket.pong": # Handle pong frames pong_data = message.get("bytes", b"") print(f"Received PONG frame with payload: {pong_data}") elif message["type"] == "websocket.close": # Handle close frames - RFC 6455 Section 5.5.1 close_code = message.get("code", 1000) close_reason = message.get("reason", "") print(f"Received CLOSE frame - Code: {close_code}, Reason: '{close_reason}'") # Send close frame response if not already closing try: await websocket.close(code=close_code, reason=close_reason) print(f"Sent CLOSE frame response - Code: {close_code}") except Exception as e: print(f"Error sending close frame: {e}") break elif message["type"] == "websocket.disconnect": print("Client initiated disconnect") break else: print(f"Received unknown message type: {message['type']}") break async def handle_binary_message(websocket: WebSocket, binary_data: bytes): """Handle incoming binary messages (complete or reassembled from fragments)""" print(f"Processing complete binary message: {len(binary_data)} bytes") try: # Echo back the binary data await websocket.send_bytes(binary_data) except Exception as e: print(f"Error handling binary message: {e}") async def handle_text_message(websocket: WebSocket, data: str): """Handle incoming text messages""" try: # Send response back to the same client await manager.send_personal_message(data, websocket) except Exception as e: print(f"Error handling text message: {e}") def main(): if len(sys.argv) > 1 and sys.argv[1] == "serve": print("Starting server on port 8080...") uvicorn.run(app, host="0.0.0.0", port=8080) else: print("Usage: python app.py serve") sys.exit(1) if __name__ == "__main__": main()

En este ejemplo, la websocket_invoke función gestiona la solicitud de inferencia que la SageMaker IA envía al /invocations-bidirectional-stream punto final. Muestra cómo se gestionan las solicitudes de transmisión y las respuestas de las transmisiones al cliente.

Cómo debe responder su contenedor a las solicitudes de comprobación de estado (ping)

SageMaker La IA lanza nuevos contenedores de inferencia en las siguientes situaciones:

  • Responder a las llamadas a la API CreateEndpoint, UpdateEndpoint y UpdateEndpointWeightsAndCapacities

  • Creación de parches de seguridad

  • Reemplazo de instancias con estado incorrecto

Poco después del inicio del contenedor, la SageMaker IA comienza a enviar solicitudes GET periódicas al /ping punto final.

El requisito más sencillo en el contenedor es responder con un código de estado HTTP 200 y un cuerpo vacío. Esto le indica a SageMaker AI que el contenedor está listo para aceptar solicitudes de inferencia en el /invocations punto final.

Si el contenedor no comienza a superar las comprobaciones de estado y responde de forma constante durante 200 segundos durante los 8 minutos posteriores al inicio, no se inicia la nueva instancia. Esto provoca un error de CreateEndpoint y el punto de conexión quedará en un estado de error. No se ha completado la actualización solicitada por UpdateEndpoint, no se han aplicado los parches de seguridad y no se han reemplazado las instancias en mal estado.

Puesto que la barrera mínima es que el contenedor devuelva un código 200 estático, un desarrollador del contenedor puede usar esta funcionalidad para realizar más comprobaciones. El tiempo de espera de la solicitud en los intentos de /ping es de 2 segundos.

Además, un contenedor que sea capaz de gestionar solicitudes de streaming bidireccionales debe responder con una Ping Frame (según el WebSocket protocolo RFC6455) a una Ping Frame. Si no se recibe ningún Pong Frame durante 5 pings consecutivos, la plataforma de SageMaker IA cerrará la conexión al contenedor. SageMaker La plataforma AI también responderá a los Ping Frames de un contenedor modelo con Pong Frames.

Container Contract to Support Bidirectional Streaming Capabilities

Si desea alojar su contenedor modelo como terminal de SageMaker IA que admita capacidades de transmisión bidireccional, el contenedor modelo debe ser compatible con el siguiente contrato:

1. Etiqueta Docker bidireccional

El contenedor modelo debe tener una etiqueta Docker que indique a la plataforma de SageMaker IA que este contenedor admite la capacidad de transmisión bidireccional.

com.amazonaws.sagemaker.capabilities.bidirectional-streaming=true

2. Support WebSocket Connection para invocaciones

El contenedor modelo de un cliente que admite la transmisión bidireccional debe admitir WebSockets las conexiones en el puerto 8080 de forma predeterminada. /invocations-bidirectional-stream

Esta ruta se puede anular pasando el encabezado X-Amzn-SageMaker-Model -Invocation-Path al invocar la API. InvokeEndpointWithBidirectionalStream Además, los usuarios pueden especificar una cadena de consulta para añadirla a esta ruta pasando el encabezado -Query-String al invocar la API. X-Amzn-SageMaker-Model InvokeEndpointWithBidirectionalStream

3. Gestión del flujo de solicitudes

<Blob>Las cargas útiles de entrada de la InvokeEndpointWithBidirectionalStream API se transmiten como una serie de PayloadParts, que es solo un contenedor de un fragmento binario («Bytes»:):

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE> "P": <String> } }

3.1. Marcos de datos

SageMaker La IA pasa la entrada PayloadParts al contenedor del modelo como marcos de WebSocket datos (RFC6455-Section-5.6)

  1. SageMaker La IA no inspecciona el fragmento binario.

  2. Al recibir una entrada PayloadPart

    • SageMaker La IA crea exactamente un marco de WebSocket datos a partir del PayloadPart.Bytes cual lo pasa al contenedor modelo.

    • SiPayloadPart.DataType = UTF8, SageMaker AI crea un marco de datos de texto

    • Si PayloadPart.DataType no lo presentaPayloadPart.DataType = BINARY, la SageMaker IA crea un marco de datos binario

  3. Para una secuencia de PayloadParts con y terminada PayloadPart con PayloadPart.CompletionState = COMPLETE un signoPayloadPart.CompletionState = PARTIAL, la SageMaker IA la traduce en un mensaje WebSocket fragmentado RFC6455-Sección 5.4: Fragmentación:

    • La inicial PayloadPart con se PayloadPart.CompletionState = PARTIAL traducirá a un marco de WebSocket datos, con el bit FIN libre.

    • El siguiente PayloadParts con se PayloadPart.CompletionState = PARTIAL traducirá a marcos de WebSocket continuación con el bit FIN limpio.

    • El ancho final PayloadPart se PayloadPart.CompletionState = COMPLETE traducirá a un marco de WebSocket continuación con el conjunto de bits FIN.

  4. SageMaker La IA no codifica ni decodifica el fragmento binario de la entrada PayloadPart, sino que los bytes se pasan al contenedor del modelo tal cual.

  5. SageMaker La IA no combina varias entradas en una sola. PayloadParts BinaryDataFrame

  6. SageMaker La IA no divide una entrada PayloadPart en varias BinaryDataFrames.

Ejemplo: flujo de mensajes fragmentado

Client sends: PayloadPart 1: {Bytes: "Hello ", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: "World", DataType: "UTF8", CompletionState: "COMPLETE"} Container receives: Frame 1: Text Data Frame with "Hello " (FIN=0) Frame 2: Continuation Frame with "World" (FIN=1)

3.2. Marcos de control

Además de los marcos de datos, la SageMaker IA también envía marcos de control al contenedor de modelos (RFC6455-Section-5.5):

  1. Cerrar marco: la SageMaker IA puede enviar un marco cerrado (RFC6455-Section-5.5.1) al contenedor modelo en caso de que la conexión se cierre por cualquier motivo.

  2. Ping Frame: la SageMaker IA envía Ping Frame (RFC6455-Section-5.5.2) una vez cada 60 segundos, el contenedor modelo debe responder con Pong Frame. Si no se recibe ningún Pong Frame (RFC6455-Sección-5.5.3) durante 5 pings consecutivos, la IA cerrará la conexión. SageMaker

  3. Pong Frame: la SageMaker IA responderá a los Ping Frames desde un contenedor modelo con Pong Frames.

4. Manejo del flujo de respuesta

La salida se transmite como una serie de PayloadParts, ModelStreamErrors o InternalStreamFailures.

{ "PayloadPart": { "Bytes": <Blob>, "DataType": <String: UTF8 | BINARY>, "CompletionState": <String: PARTIAL | COMPLETE>, }, "ModelStreamError": { "ErrorCode": <String>, "Message": <String> }, "InternalStreamFailure": { "Message": <String> } }

4.1. Marcos de datos

SageMaker La IA convierte los marcos de datos recibidos del contenedor del modelo en resultados PayloadParts:

  1. Al recibir un marco de datos de WebSocket texto del contenedor del modelo, la SageMaker IA obtiene los bytes sin procesar del marco de datos de texto y los agrupa en una respuesta PayloadPart, mientras está configuradaPayloadPart.DataType = UTF8.

  2. Al recibir un marco de datos WebSocket binario del contenedor del modelo, la SageMaker IA agrupa directamente los bytes del marco de datos en una respuesta PayloadPart, mientras está configurada. PayloadPart.DataType = BINARY

  3. Para mensajes fragmentados, tal como se define en la RFC6455Sección -5.4: Fragmentación:

    • El marco de datos inicial con el bit FIN libre se traducirá a un ancho. PayloadPart PayloadPart.CompletionState = PARTIAL

    • Los fotogramas de continuación siguientes con el bit FIN limpio se traducirán a PayloadParts conPayloadPart.CompletionState = PARTIAL.

    • El último cuadro de continuación con el conjunto de bits FIN se traducirá a PayloadPart conPayloadPart.CompletionState = COMPLETE.

  4. SageMaker La IA no codifica ni decodifica los bytes recibidos de los contenedores del modelo, sino que los pasa al contenedor del modelo tal cual.

  5. SageMaker La IA no combina varios marcos de datos recibidos del contenedor modelo en una sola respuesta. PayloadPart

  6. SageMaker La IA no divide un marco de datos recibido del contenedor del modelo en respuestas PayloadParts múltiples.

Ejemplo: flujo de respuesta en streaming

Container sends: Frame 1: Text Data Frame with "Generating" (FIN=0) Frame 2: Continuation Frame with " response..." (FIN=1) Client receives: PayloadPart 1: {Bytes: "Generating", DataType: "UTF8", CompletionState: "PARTIAL"} PayloadPart 2: {Bytes: " response...", DataType: "UTF8", CompletionState: "COMPLETE"}

4.2. Marcos de control

SageMaker La IA responde a los siguientes marcos de control desde el contenedor del modelo:

  1. Al recibir un cuadro cerrado (RFC6455-Sección-5.5.1) del contenedor modelo, la SageMaker IA empaquetará el código de estado (RFC6455-Sección-7.4) y los mensajes de error y los transmitirá al usuario ModelStreamError final.

  2. Al recibir un Ping Frame (RFC6455-Section-5.5.2) del contenedor modelo, la IA responderá con un Pong Frame. SageMaker

  3. Pong Frame (RFC6455-Sección-5.5.3): Si no se recibe ningún Pong Frame durante 5 pings consecutivos, la IA cerrará la conexión. SageMaker