

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisation d'Apache Hudi avec Apache Flink
<a name="tutorial-hudi-for-flink"></a>

Apache Hudi est un framework de gestion de données open source avec des opérations de niveau enregistrement telles que l'insertion, la mise à jour, la modification et la suppression, que vous pouvez utiliser pour simplifier la gestion des données et le développement de pipelines de données. Associé à une gestion efficace des données dans Amazon S3, Hudi vous permet d'ingérer et de mettre à jour des données en temps réel. Hudi gère les métadonnées de toutes les opérations que vous exécutez sur le jeu de données, afin que toutes les actions restent atomiques et cohérentes. 

Apache Hudi est disponible sur Amazon EMR sur EKS avec Apache Flink avec Amazon EMR versions 7.2.0 et supérieures. Consultez les étapes suivantes pour savoir comment démarrer et soumettre des tâches Apache Hudi.

## Soumettre une offre d'emploi Apache Hudi
<a name="tutorial-hudi-for-flink-submit-jobs"></a>

Consultez les étapes suivantes pour savoir comment soumettre une tâche Apache Hudi.

1. Créez une base AWS de données Glue nommée`default`.

   ```
   aws glue create-database --database-input "{\"Name\":\"default\"}"
   ```

1. Suivez l'[exemple SQL de l'opérateur Flink Kubernetes](https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example) pour créer le fichier. `flink-sql-runner.jar`

1. Créez un script SQL Hudi comme suit.

   ```
   CREATE CATALOG hudi_glue_catalog WITH (
   'type' = 'hudi',
   'mode' = 'hms',
   'table.external' = 'true',
   'default-database' = 'default',
   'hive.conf.dir' = '/glue/confs/hive/conf/',
   'catalog.path' = 's3://<hudi-example-bucket>/FLINK_HUDI/warehouse/'
   );
   
   USE CATALOG hudi_glue_catalog;
   CREATE DATABASE IF NOT EXISTS hudi_db;
   use hudi_db;
   
   CREATE TABLE IF NOT EXISTS hudi-flink-example-table(
       uuid VARCHAR(20),
       name VARCHAR(10),
       age INT,
       ts TIMESTAMP(3),
       `partition` VARCHAR(20)
   )
   PARTITIONED BY (`partition`)
   WITH (
     'connector' = 'hudi',
     'path' = 's3://<hudi-example-bucket>/hudi-flink-example-table',
     'hive_sync.enable' = 'true',
     'hive_sync.mode' = 'glue',
     'hive_sync.table' = 'hudi-flink-example-table',
     'hive_sync.db' = 'hudi_db',
     'compaction.delta_commits' = '1',
     'hive_sync.partition_fields' = 'partition',
     'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
     'table.type' = 'COPY_ON_WRITE'
   );
   
   EXECUTE STATEMENT SET
   BEGIN
   
   INSERT INTO hudi-flink-example-table VALUES
       ('id1','Alex',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
       ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
       ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
       ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
       ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
       ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
       ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
       ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
   
   END;
   ```

1. Téléchargez votre script Hudi SQL et le `flink-sql-runner.jar` fichier dans un emplacement S3.

1. Dans votre fichier `FlinkDeployments` YAML, définissez sur`hudi.enabled`. `true`

   ```
   spec:
     flinkConfiguration:
       hudi.enabled: "true"
   ```

1. Créez un fichier YAML pour exécuter votre configuration. Ce fichier d'exemple est nommé`hudi-write.yaml`.

   ```
   apiVersion: flink.apache.org/v1beta1
   kind: FlinkDeployment
   metadata:
     name: hudi-write-example
   spec:
     flinkVersion: v1_18
     flinkConfiguration:
       taskmanager.numberOfTaskSlots: "2"
       hudi.enabled: "true"
     executionRoleArn: "<JobExecutionRole>"
     emrReleaseLabel: "emr-7.12.0-flink-latest"
     jobManager:
       highAvailabilityEnabled: false
       replicas: 1
       resource:
         memory: "2048m"
         cpu: 1
     taskManager:
       resource:
         memory: "2048m"
         cpu: 1
     job:
       jarURI: local:///opt/flink/usrlib/flink-sql-runner.jar
       args: ["/opt/flink/scripts/hudi-write.sql"]
       parallelism: 1
       upgradeMode: stateless
     podTemplate:
       spec:
         initContainers:
           - name: flink-sql-script-download
             args: 
               - s3
               - cp
               - s3://<s3_location>/hudi-write.sql
               - /flink-scripts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-scripts
                 name: flink-scripts
           - name: flink-sql-runner-download
             args: 
               - s3
               - cp
               - s3://<s3_location>/flink-sql-runner.jar
               - /flink-artifacts
             image: amazon/aws-cli:latest
             imagePullPolicy: Always
             resources: {}
             terminationMessagePath: /dev/termination-log
             terminationMessagePolicy: File
             volumeMounts:
               - mountPath: /flink-artifacts
                 name: flink-artifact
         containers:
           - name: flink-main-container
             volumeMounts:
               - mountPath: /opt/flink/scripts
                 name: flink-scripts
               - mountPath: /opt/flink/usrlib
                 name: flink-artifact
         volumes:
           - emptyDir: {}
             name: flink-scripts
           - emptyDir: {}
             name: flink-artifact
   ```

1. Soumettez une tâche Flink Hudi à l'opérateur [Flink](https://docs.aws.amazon.com/emr/latest/EMR-on-EKS-DevelopmentGuide/jobruns-flink-kubernetes-operator.html) Kubernetes.

   ```
   kubectl apply -f hudi-write.yaml
   ```