Esempi per Kinesis con SDK per Rust - AWS SDK per Rust

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi per Kinesis con SDK per Rust

I seguenti esempi di codice mostrano come eseguire azioni e implementare scenari comuni utilizzando l'AWSSDK per Rust con Kinesis.

Le azioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Sebbene le azioni mostrino come richiamare le singole funzioni del servizio, è possibile visualizzarle contestualizzate negli scenari correlati.

Ogni esempio include un link al codice sorgente completo, in cui vengono fornite le istruzioni su come configurare ed eseguire il codice nel contesto.

Azioni

Il seguente esempio di codice mostra come utilizzare. CreateStream

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn make_stream(client: &Client, stream: &str) -> Result<(), Error> { client .create_stream() .stream_name(stream) .shard_count(4) .send() .await?; println!("Created stream"); Ok(()) }
  • Per i dettagli sulle API, consulta il riferimento CreateStreamall'API AWS SDK for Rust.

Il seguente esempio di codice mostra come utilizzareDeleteStream.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn remove_stream(client: &Client, stream: &str) -> Result<(), Error> { client.delete_stream().stream_name(stream).send().await?; println!("Deleted stream."); Ok(()) }
  • Per i dettagli sulle API, consulta il riferimento DeleteStreamall'API AWS SDK for Rust.

Il seguente esempio di codice mostra come utilizzareDescribeStream.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn show_stream(client: &Client, stream: &str) -> Result<(), Error> { let resp = client.describe_stream().stream_name(stream).send().await?; let desc = resp.stream_description.unwrap(); println!("Stream description:"); println!(" Name: {}:", desc.stream_name()); println!(" Status: {:?}", desc.stream_status()); println!(" Open shards: {:?}", desc.shards.len()); println!(" Retention (hours): {}", desc.retention_period_hours()); println!(" Encryption: {:?}", desc.encryption_type.unwrap()); Ok(()) }
  • Per i dettagli sulle API, consulta la DescribeStreamguida di riferimento all'API AWS SDK for Rust.

Il seguente esempio di codice mostra come utilizzareListStreams.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn show_streams(client: &Client) -> Result<(), Error> { let resp = client.list_streams().send().await?; println!("Stream names:"); let streams = resp.stream_names; for stream in &streams { println!(" {}", stream); } println!("Found {} stream(s)", streams.len()); Ok(()) }
  • Per i dettagli sulle API, consulta la ListStreamsguida di riferimento all'API AWS SDK for Rust.

Il seguente esempio di codice mostra come utilizzarePutRecord.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> { let blob = Blob::new(data); client .put_record() .data(blob) .partition_key(key) .stream_name(stream) .send() .await?; println!("Put data into stream."); Ok(()) }
  • Per i dettagli sulle API, consulta la PutRecordguida di riferimento all'API AWS SDK for Rust.

Esempi serverless

L’esempio di codice seguente mostra come implementare una funzione Lambda che riceve un evento attivato dalla ricezione di record da un flusso Kinesis. La funzione recupera il payload Kinesis, lo decodifica da Base64 e registra il contenuto del record.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di Esempi serverless.

Utilizzo di un evento Kinesis con Lambda tramite Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

L’esempio di codice seguente mostra come implementare una risposta batch parziale per funzioni Lambda che ricevono eventi da un flusso Kinesis. La funzione riporta gli errori degli elementi batch nella risposta, segnalando a Lambda di riprovare tali messaggi in un secondo momento.

SDK per Rust
Nota

C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di Esempi serverless.

Segnalazione di errori di elementi batch di Kinesis con Lambda tramite Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }