

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Panoramica delle funzionalità della pipeline in Amazon Ingestion OpenSearch
<a name="osis-features-overview"></a>

Amazon OpenSearch Ingestion fornisce *pipeline* costituite da una fonte, un buffer, zero o più processori e uno o più sink. Le pipeline di ingestione sono alimentate da Data Prepper come motore di dati. Per una panoramica dei vari componenti di una pipeline, vedere. [Concetti chiave di Amazon OpenSearch Ingestion](ingestion-process.md)

Le seguenti sezioni forniscono una panoramica di alcune delle funzionalità più comunemente utilizzate in Amazon OpenSearch Ingestion.

**Nota**  
Questo non è un elenco esaustivo delle funzionalità disponibili per le pipeline. Per una documentazione completa di tutte le funzionalità della pipeline disponibili, consulta la documentazione di [Data](https://opensearch.org/docs/latest/data-prepper/pipelines/pipelines/) Prepper. Tieni presente che OpenSearch Ingestion impone alcuni vincoli ai plugin e alle opzioni che puoi utilizzare. Per ulteriori informazioni, consulta [Plugin e opzioni supportati per le pipeline di Amazon OpenSearch Ingestion](pipeline-config-reference.md).

**Topics**
+ [Buffering persistente](#persistent-buffering)
+ [Divisione](#osis-features-splitting)
+ [Concatenamento](#osis-features-chaining)
+ [Code di lettere morte](#osis-features-dlq)
+ [Gestione degli indici](#osis-features-index-management)
+ [End-to-end riconoscimento](#osis-features-e2e)
+ [Contropressione alla fonte](#osis-features-backpressure)

## Buffering persistente
<a name="persistent-buffering"></a>

Un buffer persistente archivia i dati in un buffer basato su disco su più zone di disponibilità per migliorare la durabilità dei dati. È possibile utilizzare il buffering persistente per importare dati da tutte le fonti basate su push supportate senza configurare un buffer autonomo. Queste fonti includono HTTP e OpenTelemetry per i log, le tracce e le metriche. Per abilitare il buffering persistente, scegli **Abilita buffer persistente** quando crei o aggiorni una pipeline. Per ulteriori informazioni, consulta [Creazione di pipeline Amazon OpenSearch Ingestion](creating-pipeline.md). 

OpenSearch L'ingestione determina dinamicamente il numero di file OCUs da utilizzare per il buffering persistente, tenendo conto dell'origine dei dati, delle trasformazioni di streaming e della destinazione del sink. Poiché ne alloca una parte OCUs al buffering, potrebbe essere necessario aumentare il valore minimo e massimo per mantenere lo stesso throughput di ingestione. OCUs Le pipeline conservano i dati nel buffer per un massimo di 72 ore.

Se abiliti il buffering persistente per una pipeline, le dimensioni massime predefinite del payload delle richieste sono le seguenti:
+ **Fonti HTTP: 10 MB**
+ **OpenTelemetry sorgenti** — 4 MB

Per le sorgenti HTTP, è possibile aumentare la dimensione massima del payload a 20 MB. La dimensione del payload della richiesta include l'intera richiesta HTTP, che in genere contiene più eventi. Ogni evento non può superare i 3,5 MB.

Le pipeline con buffering persistente dividono le unità di pipeline configurate tra unità di calcolo e unità buffer. Se una pipeline utilizza un processore che richiede un uso intensivo della CPU come grok, key-value o split string, alloca le unità in un rapporto 1:1. buffer-to-compute Altrimenti, le alloca in un rapporto 3:1, privilegiando sempre le unità di calcolo.

Esempio:
+ Pipeline con grok e 2 unità max: 1 unità di calcolo e 1 unità buffer
+ Pipeline con grok e 5 unità max: 3 unità di calcolo e 2 unità buffer
+ Pipeline senza processori e massimo 2 unità: 1 unità di calcolo e 1 unità buffer
+ Pipeline senza processori e massimo 4 unità: 1 unità di calcolo e 3 unità buffer
+ Pipeline con grok e 5 unità max: 2 unità di calcolo e 3 unità buffer

Per impostazione predefinita, le pipeline utilizzano an Chiave di proprietà di AWS per crittografare i dati del buffer. Queste pipeline non necessitano di autorizzazioni aggiuntive per il ruolo della pipeline. 

In alternativa, puoi specificare una chiave gestita dal cliente e aggiungere le seguenti autorizzazioni IAM al ruolo della pipeline:

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "KeyAccess",
            "Effect": "Allow",
            "Action": [
              "kms:Decrypt",
              "kms:GenerateDataKeyWithoutPlaintext"
            ],
            "Resource": "arn:aws:kms:us-east-1:111122223333:key/ASIAIOSFODNN7EXAMPLE"
        }
    ]
}
```

------

Per ulteriori informazioni, consulta [Customer managed keys](https://docs.aws.amazon.com/kms/latest/developerguide/concepts.html#customer-cmk) nella *Guida per sviluppatori AWS Key Management Service *. 

**Nota**  
Se disabiliti il buffering persistente, la pipeline inizia a funzionare interamente con il buffering in memoria.

## Divisione
<a name="osis-features-splitting"></a>

È possibile configurare una pipeline di OpenSearch ingestione per *suddividere* gli eventi in entrata in una sottopipeline, in modo da eseguire diversi tipi di elaborazione sullo stesso evento in entrata.

La pipeline di esempio seguente divide gli eventi in entrata in due sotto-pipeline. Ogni sottopipeline utilizza il proprio processore per arricchire e manipolare i dati, quindi invia i dati a indici diversi. OpenSearch 

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  sink:
    - pipeline:
        name: "logs_enriched_one_pipeline"
    - pipeline:
        name: "logs_enriched_two_pipeline"

logs_enriched_one_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
        index: "enriched_one_logs"

logs_enriched_two_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_two_logs"
```

## Concatenamento
<a name="osis-features-chaining"></a>

È possibile *concatenare* più sotto-pipeline per eseguire l'elaborazione e l'arricchimento dei dati in blocchi. In altre parole, è possibile arricchire un evento in entrata con determinate funzionalità di elaborazione in una sottopipeline, quindi inviarlo a un'altra sottopipeline per un ulteriore arricchimento con un processore diverso e infine inviarlo al relativo sink. OpenSearch 

Nell'esempio seguente, la `log_pipeline` sub-pipeline arricchisce un evento di registro in entrata con un set di processori, quindi invia l'evento a un indice denominato. OpenSearch `enriched_logs` La pipeline invia lo stesso evento alla `log_advanced_pipeline` pipeline secondaria, che lo elabora e lo invia a un indice diverso denominato. OpenSearch `enriched_advanced_logs` 

```
version: "2"
log-pipeline:
  source:
    http:
    ...
  processor:
    ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_logs"
    - pipeline:
        name: "log_advanced_pipeline"

log_advanced_pipeline:
  source:
    pipeline:
      name: "log-pipeline"
  processor:
   ...
  sink:
    - opensearch:
        # Provide a domain or collection endpoint
        # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection
        aws:
          ...
          index: "enriched_advanced_logs"
```

## Code di lettere morte
<a name="osis-features-dlq"></a>

Le code di lettere morte (DLQs) sono destinazioni di eventi che una pipeline non riesce a scrivere in un sink. In OpenSearch Ingestion, è necessario specificare un bucket Amazon S3 con autorizzazioni di scrittura appropriate da utilizzare come DLQ. Puoi aggiungere una configurazione DLQ a ogni sink all'interno di una pipeline. Quando una pipeline incontra errori di scrittura, crea oggetti DLQ nel bucket S3 configurato. Gli oggetti DLQ esistono all'interno di un file JSON come una serie di eventi falliti.

Una pipeline scrive eventi nel DLQ quando viene soddisfatta una delle seguenti condizioni:
+ Il numero **massimo di tentativi** per il OpenSearch lavandino è stato esaurito. OpenSearch L'ingestione richiede un minimo di 16 per questa impostazione.
+ Il sink rifiuta gli eventi dovuti a una condizione di errore.

### Configurazione
<a name="osis-features-dlq-config"></a>

Per configurare una coda di lettere non scritte per una pipeline secondaria, scegli **Abilita S3 DLQ** quando configuri la destinazione del sink. Quindi, specifica le impostazioni richieste per la coda. Per ulteriori informazioni, consulta [Configurazione](https://opensearch.org/docs/latest/data-prepper/pipelines/dlq/#configuration) nella documentazione di Data Prepper DLQ.

I file scritti in questo S3 DLQ hanno il seguente schema di denominazione:

```
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
```

Per istruzioni su come configurare manualmente il ruolo della pipeline per consentire l'accesso al bucket S3 su cui il DLQ scrive, consulta. [Autorizzazioni per scrivere su Amazon S3 o su una coda di lettere non scritte](pipeline-security-overview.md#pipeline-security-dlq)

### Esempio
<a name="osis-features-dlq-example"></a>

Considerate il seguente file DLQ di esempio:

```
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
```

Ecco un esempio di dati che non sono stati scritti nel sink e che vengono inviati al bucket DLQ S3 per ulteriori analisi:

```
Record_0	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index		  "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log		    "sample log"
timestamp	    "2023-04-14T10:36:01.070Z"

Record_1	
pluginId            "opensearch"
pluginName          "opensearch"
pipelineName        "apache-log-pipeline"
failedData	
index               "logs"
indexId		 null
status		  0
message		"Number of retries reached the limit of max retries (configured value 15)"
document	
log                 "another sample log"
timestamp           "2023-04-14T10:36:01.071Z"
```

## Gestione degli indici
<a name="osis-features-index-management"></a>

Amazon OpenSearch Ingestion offre molte funzionalità di gestione degli indici, tra cui le seguenti.

### Creazione di indici
<a name="osis-features-index-management-create"></a>

È possibile specificare un nome di indice in un sink di pipeline e OpenSearch Ingestion crea l'indice quando effettua il provisioning della pipeline. Se esiste già un indice, la pipeline lo utilizza per indicizzare gli eventi in entrata. Se si arresta e si riavvia una pipeline o se si aggiorna la configurazione YAML, la pipeline tenta di creare nuovi indici se non esistono già. Una pipeline non può mai eliminare un indice.

I sinks di esempio seguenti creano due indici quando viene eseguito il provisioning della pipeline:

```
sink:
  - opensearch:
      index: apache_logs
  - opensearch:
      index: nginx_logs
```

### Generazione di nomi e modelli di indici
<a name="osis-features-index-management-patterns"></a>

È possibile generare nomi di indice dinamici utilizzando variabili dai campi degli eventi in arrivo. Nella configurazione sink, usa il formato `string${}` per segnalare l'interpolazione delle stringhe e usa un puntatore JSON per estrarre i campi dagli eventi. Le opzioni per sono o. `index_type` `custom` `management_disabled` Poiché l'`index_type`impostazione predefinita è per i OpenSearch domini e `custom` `management_disabled` per le raccolte OpenSearch Serverless, può essere lasciata non impostata.

Ad esempio, la seguente pipeline seleziona il `metadataType` campo dagli eventi in entrata per generare i nomi degli indici.

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}"
```

La seguente configurazione continua a generare un nuovo indice ogni giorno o ogni ora.

```
pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd}"

pipeline:
  ...
  sink:
    opensearch:
      index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
```

Il nome dell'indice può anche essere una stringa semplice con un modello data-ora come suffisso, ad esempio. `my-index-%{yyyy.MM.dd}` Quando il sink invia dati a OpenSearch, sostituisce il modello data-ora con l'ora UTC e crea un nuovo indice per ogni giorno, ad esempio. `my-index-2022.01.25` Per ulteriori informazioni, consultate la classe. [DateTimeFormatter](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html)

Questo nome di indice può anche essere una stringa formattata (con o senza un suffisso del modello data-ora), ad esempio. `my-${index}-name` Quando il sink invia dati a OpenSearch, sostituisce la `"${index}"` parte con il valore dell'evento in fase di elaborazione. Se il formato è`"${index1/index2/index3}"`, sostituisce il campo `index1/index2/index3` con il relativo valore nell'evento.

### Generazione di un documento IDs
<a name="osis-features-index-management-ids"></a>

Una pipeline può generare un ID di documento durante l'indicizzazione dei documenti su. OpenSearch Può dedurre questi documenti IDs dai campi all'interno degli eventi in arrivo.

Questo esempio utilizza il `uuid` campo di un evento in entrata per generare un ID del documento.

```
pipeline:
  ...
  sink:
    opensearch:
      index_type: custom
      index: "metadata-${metadataType}-%{yyyy.MM.dd}" 
      "document_id": "uuid"
```

Nell'esempio seguente, il processore [Add entries](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/add-entries/) unisce i campi `uuid` e `other_field` l'evento in entrata per generare un ID del documento.

L'`create`azione garantisce che i documenti identici IDs non vengano sovrascritti. La pipeline elimina i documenti duplicati senza alcun nuovo tentativo o evento DLQ. Si tratta di un'aspettativa ragionevole per gli autori della pipeline che utilizzano questa azione, poiché l'obiettivo è evitare l'aggiornamento dei documenti esistenti.

```
pipeline:
  ...
  processor:
   - add_entries:
      entries:
        - key: "my_doc_id_field"
          format: "${uuid}-${other_field}"
  sink:
    - opensearch:
       ...
       action: "create"
       document_id: "my_doc_id"
```

Potresti voler impostare l'ID del documento di un evento su un campo di un oggetto secondario. Nell'esempio seguente, il plugin OpenSearch sink utilizza l'oggetto secondario `info/id` per generare un ID del documento.

```
sink:
  - opensearch:
       ...
       document_id: info/id
```

Dato il seguente evento, la pipeline genererà un documento con il `_id` campo impostato su: `json001`

```
{
   "fieldA":"arbitrary value",
   "info":{
      "id":"json001",
      "fieldA":"xyz",
      "fieldB":"def"
   }
}
```

### Generazione del routing IDs
<a name="osis-features-index-management-routing-ids"></a>

È possibile utilizzare l'`routing_field`opzione all'interno del plug-in OpenSearch sink per impostare il valore di una proprietà di routing del documento (`_routing`) su un valore proveniente da un evento in arrivo.

Il routing supporta la sintassi del puntatore JSON, quindi sono disponibili anche campi annidati, non solo campi di primo livello.

```
sink:
  - opensearch:
       ...
       routing_field: metadata/id
       document_id: id
```

Dato il seguente evento, il plugin genera un documento con il campo impostato su: `_routing` `abcd`

```
{
   "id":"123",
   "metadata":{
      "id":"abcd",
      "fieldA":"valueA"
   },
   "fieldB":"valueB"
}
```

Per istruzioni su come creare modelli di indice che le pipeline possono utilizzare durante la creazione dell'indice, vedete [Modelli di indice](https://opensearch.org/docs/latest/im-plugin/index-templates/).

## End-to-end riconoscimento
<a name="osis-features-e2e"></a>

OpenSearch *L'ingestione garantisce la durata e l'affidabilità dei dati tracciandone la trasmissione dalla fonte ai pozzi nelle condutture stateless mediante riconoscimento. end-to-end*

**Nota**  
[Attualmente, solo il plug-in sorgente S3 supporta il riconoscimento.](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/) end-to-end

Con il end-to-end riconoscimento, il plug-in pipeline source crea un set di *riconoscimenti per monitorare un batch di eventi*. Riceve un riconoscimento positivo quando tali eventi vengono inviati con successo ai rispettivi sink o un riconoscimento negativo quando nessuno degli eventi non può essere inviato ai rispettivi sink.

In caso di guasto o arresto anomalo di un componente della pipeline, o se una fonte non riceve una conferma, la fonte scade e intraprende le azioni necessarie, come riprovare o registrare l'errore. **Se nella pipeline sono configurati più sink o più subpipeline, i riconoscimenti a livello di evento vengono inviati solo dopo che l'evento è stato inviato a tutti i sink di tutte le pipeline secondarie.** Se un sink ha un DLQ configurato, i riconoscimenti tengono traccia anche degli eventi scritti nel DLQ. end-to-end

Per abilitare il end-to-end riconoscimento, espandi **Opzioni aggiuntive** nella configurazione di origine di Amazon S3 e **scegli end-to-end** Abilita il riconoscimento dei messaggi.

## Contropressione alla fonte
<a name="osis-features-backpressure"></a>

Una pipeline può subire una contropressione quando è impegnata nell'elaborazione dei dati o se i suoi sink sono temporaneamente inattivi o rallentano l'acquisizione dei dati. OpenSearch Ingestion ha diversi modi di gestire la contropressione a seconda del plug-in di origine utilizzato da una pipeline.

### Origine HTTP
<a name="osis-features-backpressure-http"></a>

Le pipeline che utilizzano il plug-in di [origine HTTP](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/http-source/) gestiscono la contropressione in modo diverso a seconda del componente della pipeline che è congestionato:
+ **Buffer**: quando i buffer sono pieni, la pipeline inizia a restituire lo stato HTTP `REQUEST_TIMEOUT` con il codice di errore 408 all'endpoint di origine. Quando i buffer vengono liberati, la pipeline riavvia l'elaborazione degli eventi HTTP.
+ **Thread di origine**: quando tutti i thread di origine HTTP sono impegnati nell'esecuzione di richieste e la dimensione della coda delle richieste non elaborate ha superato il numero massimo consentito di richieste, la pipeline inizia a restituire lo stato HTTP `TOO_MANY_REQUESTS` con il codice di errore 429 all'endpoint di origine. Quando la coda delle richieste scende al di sotto della dimensione massima consentita, la pipeline riavvia l'elaborazione delle richieste.

### OTel fonte
<a name="osis-features-backpressure-otel"></a>

Quando i buffer sono pieni per le pipeline che utilizzano OpenTelemetry sorgenti ([OTel log](https://github.com/opensearch-project/data-prepper/tree/main/data-prepper-plugins/otel-logs-source), [OTel metriche](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-metrics-source/) e [OTel trace](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/otel-trace/)), la pipeline inizia a restituire lo stato HTTP `REQUEST_TIMEOUT` con il codice di errore 408 all'endpoint di origine. Quando i buffer vengono liberati, la pipeline riprende a elaborare gli eventi.

### Fonte S3
<a name="osis-features-backpressure-s3"></a>

Quando i buffer sono pieni per le pipeline con una sorgente [S3](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/s3/), le pipeline interrompono l'elaborazione delle notifiche SQS. Man mano che i buffer vengono liberati, le pipeline riprendono a elaborare le notifiche. 

Se un sink è inattivo o non è in grado di inserire dati e la end-to-end conferma è abilitata per la fonte, la pipeline interrompe l'elaborazione delle notifiche SQS finché non riceve un riconoscimento riuscito da tutti i sink.