Programar tarefas personalizadas do Spark e do Spark SQL

O Dataplex oferece suporte à programação de execução de código personalizada, como uma execução única, em uma programação regular ou sob demanda. A opção "Sob demanda" está em Pré-lançamento e só está disponível por meio da API. É possível programar transformações de dados do cliente usando o Spark (Java), o PySpark (limitado à versão 3.2 do Spark) ou o Spark SQL. O Dataplex executa o código usando o processamento sem servidor do Spark e um programador sem servidor integrado.

Terminologia

Tarefa
Uma tarefa do Dataplex representa o trabalho que você quer que o Dataplex realize em uma programação. Ela encapsula seu código, seus parâmetros e a programação.
Job

Um job representa uma única execução de uma tarefa do Dataplex. Por exemplo, se uma tarefa estiver programada para ser executada diariamente, o Dataplex criará um job todos os dias.

Para jobs criados a partir de 10 de maio de 2023, o campo Acionador mostra o tipo de acionador de execução do job.

Veja a seguir os tipos de gatilho de execução de jobs:

  • RUN_REQUEST: indica que o job foi executado devido à chamada da API RunTask.

  • TASK_CONFIG: indica que o job foi executado devido à configuração TriggerSpec da tarefa.

Modos de programação

O Dataplex é compatível com os seguintes modos de programação:

Executar uma vez
Use esse modo para executar a tarefa apenas uma vez. Você pode optar por executá-lo imediatamente ou em um horário definido no futuro. Se você executar a tarefa imediatamente, talvez leve até dois minutos para a execução ser iniciada.
Executar em uma programação
Use esse modo para executar a tarefa em uma frequência repetida. As repetições aceitas são diárias, semanais, mensais ou personalizadas.
Executar sob demanda

Use esse modo para executar uma tarefa criada anteriormente sob demanda. O modo de execução sob demanda é compatível apenas com a API RunTask. Quando o job é executado sob demanda, o Dataplex usa os parâmetros atuais para criar um job. É possível especificar os argumentos ExecutionSpec e os rótulos para executar o job.

Antes de começar

  1. Ativar a API Dataproc.

    Ativar a API Dataproc

  2. Ative o Acesso privado do Google para a rede e a sub-rede. Ative o Acesso privado do Google na rede usada com as tarefas do Dataplex. Se você não especificar uma rede ou uma sub-rede ao criar a tarefa do Dataplex, o Dataplex usará a sub-rede padrão, e você precisará ativar o Acesso privado do Google para a sub-rede padrão.

  3. Crie uma conta de serviço. É necessário ter uma conta de serviço para programar qualquer tarefa do Dataplex. A conta de serviço precisa pertencer ao projeto em que você executa as tarefas. A conta de serviço precisa ter as seguintes permissões:

    • Acesso aos dados do BigQuery e do Cloud Storage que estão sendo processados.

    • Papel de worker do Dataproc no projeto em que você executa a tarefa.

    • Se a tarefa precisar ler ou atualizar a instância do Dataproc Metastore anexada ao lake, a conta de serviço precisará do papel de Leitor ou editor do Metastore do Dataproc. Esse papel precisa ser concedido no projeto em que o lago do Dataplex está configurado.

    • Se a tarefa for um job do Spark SQL, conceda à conta de serviço o papel de Desenvolvedor do Dataplex. Esse papel precisa ser concedido no projeto em que o lake do Dataplex está configurado.

    • Se a tarefa for um job do Spark SQL, serão necessárias permissões de administrador do Cloud Storage no bucket em que os resultados são gravados.

    • Para programar e executar tarefas do Spark SQL e personalizadas do Spark, você precisa ter os papéis do IAM de Leitor de metadados do Dataplex (roles/dataplex.metadataReader), Leitor do Dataplex (roles/dataplex.viewer) e Usuário de metadados do Dataproc Metastore (roles/metastore.metadataUser) na conta de serviço.

  4. Conceda ao usuário que envia o job o papel de usuário da conta de serviço (roles/iam.serviceAccountUser) na conta de serviço. Veja mais instruções em Gerenciar o acesso às contas de serviço.

  5. Conceda à conta de serviço de lake do Dataplex as permissões para usar a conta de serviço. É possível encontrar a conta de serviço de lake do Dataplex na página Detalhes do lago do Console do Google Cloud.

  6. Se o projeto que contém o lake do Dataplex for diferente do projeto em que a tarefa será executada, conceda à conta de serviço do Dataplex lake o papel de editor do Dataproc no projeto em que a tarefa será executada.

  7. Coloque os artefatos de código necessários (arquivos de script JARs, Python ou SQL) ou arquivos arquivados (.jar, .tar, .tar.gz, .tgz, .zip) em um caminho do Cloud Storage.

  8. Verifique se a conta de serviço tem a permissão storage.objects.get necessária para o bucket do Cloud Storage que está armazenando esses artefatos de código.

Agendar uma tarefa do Spark (Java ou Python)

Console

  1. No console do Google Cloud, acesse a página do Dataplex:

    Acessar o Dataplex

  2. Navegue até a visualização Processo.

  3. Clique em Criar tarefa.

  4. Em Criar tarefa personalizada do Spark, clique em Criar tarefa.

  5. Escolha um lake do Dataplex.

  6. Dê um nome à tarefa.

  7. Crie um ID para a tarefa.

  8. Na seção Configuração da tarefa, em Tipo, selecione Spark ou PySpark.

  9. Insira os argumentos relevantes.

  10. No campo Conta de serviço, insira uma conta de serviço de usuário com que sua tarefa personalizada do Spark possa ser executada.

  11. Clique em Continuar.

  12. Opcional: Definir programação: selecione Executar uma vez ou Repetir. Preencha os campos obrigatórios.

  13. Clique em Continuar.

  14. Opcional: Personalizar recursos e Adicionar outras configurações.

  15. Clique em Criar.

gcloud

É possível programar uma tarefa do Spark (Java / Python) usando o comando da CLI gcloud. A tabela abaixo lista os parâmetros obrigatórios e opcionais que podem ser usados:

Parâmetro Descrição
--lake O ID do lake para o recurso de lake do serviço do Dataplex.
--location O local do serviço Dataplex.
--spark-main-class A principal classe de motorista. O arquivo jar que contém a classe precisa estar no CLASSPATH padrão.
--spark-main-jar-file-uri O URI do Cloud Storage do arquivo jar que contém a classe principal.
--spark-archive-uris Opcional: URIs do Cloud Storage de arquivos a serem extraídos para o diretório de trabalho de cada executor. Tipos de arquivos aceitos: .jar, .tar, .tar.gz, .tgz e .zip.
--spark-file-uris Opcional: URIs do Cloud Storage de arquivos a serem colocados no diretório de trabalho de cada executor.
--batch-executors-count Opcional: o número total de executores de job. O valor padrão é 2.
--batch-max-executors-count Opcional: o número máximo de executores configuráveis. O valor padrão é 1.000. Se batch-max-executors-count for maior que batch-executors-count, o Dataplex vai ativar o escalonamento automático.
--container-image-java-jars Opcional: uma lista de JARS do Java para adicionar ao caminho de classe. As entradas válidas incluem URIs do Cloud Storage para binários Jar.
Por exemplo, gs://bucket-name/my/path/to/file.jar.
--container-image-properties Opcional: chaves de propriedade, especificadas em um formato prefix:property.
Por exemplo, core:hadoop.tmp.dir.
Para mais informações, consulte Propriedades do cluster.
--vpc-network-tags Opcional: uma lista de tags de rede a serem aplicadas ao job.
--vpc-network-name Opcional: a rede de nuvem privada virtual em que o job é executado. Por padrão, o Dataplex usa a rede VPC chamada Default no projeto.
Use apenas --vpc-network-name ou --vpc-sub-network-name.
--vpc-sub-network-name Opcional: a sub-rede VPC em que o job é executado.
Use apenas --vpc-sub-network-name ou --vpc-network-name.
--trigger-type Tipo de acionador da tarefa especificada pelo usuário. Os valores precisam ser:
ON_DEMAND: a tarefa é executada uma vez logo após a criação dela.
RECURRING: a tarefa é executada periodicamente em uma programação.
--trigger-start-time Opcional: o horário da primeira execução da tarefa. O formato é `{year}-{month}-{day}T{hour}:{min}:{sec}Z`, em que o fuso horário é UTC. Por exemplo, "2017-01-15T01:30:00Z" codifica 01:30 UTC em 15 de janeiro de 2017. Se esse valor não for especificado, a tarefa será executada após o envio se o tipo de acionador for ON_DEMAND ou na programação especificada se o tipo for RECURRING.
--trigger-disabled Opcional: impede a execução da tarefa. Esse parâmetro não cancela as tarefas que já estão em execução, mas desativa temporariamente as tarefas RECURRING.
--trigger-max-retires Opcional: o número de novas tentativas antes do cancelamento. Defina o valor como zero para nunca tentar repetir uma tarefa com falha.
--trigger-schedule Programação do cron para executar tarefas periodicamente.
--description Opcional: descrição da tarefa.
--display-name Opcional: nome de exibição da tarefa.
--labels Opcional: lista de pares de KEY=VALUE rótulos a serem adicionados.
--execution-args Opcional: os argumentos a serem transmitidos para a tarefa. Os argumentos podem ser uma combinação de pares de chave-valor. É possível transmitir uma lista separada por vírgulas de pares de chave-valor como argumentos de execução. Para transmitir argumentos posicionais, defina a chave como TASK_ARGS e defina o valor como uma string separada por vírgulas de todos os argumentos posicionais. Para usar um delimitador diferente de uma vírgula, consulte escapamento.
Caso key-value e argumentos posicionais sejam transmitidos juntos, TASK_ARGS vai ser transmitido como o último argumento.
--execution-service-account Conta de serviço a ser usada para executar uma tarefa.
--max-job-execution-lifetime Opcional: a duração máxima antes da execução do job expirar.
--container-image Opcional: imagem personalizada do contêiner para o ambiente de execução do job. Se não for especificado, uma imagem de contêiner padrão será usada.
--kms-key Opcional: a chave do Cloud KMS a ser usada para criptografia, no formato:
projects/{project_number}/locations/{location_id}/keyRings/{key-ring-name}/cryptoKeys/{key-name}

Exemplo em Java:

glcoud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=ON_DEMAND –spark-main-jar-file-uri=<gcs location to java file> --execution-service-account=<service-account-email> --trigger-start-time=<timestamp after which job starts ex. 2099-01-01T00:00:00Z> --labels=key1=value1,key2=value3,key3=value3 --execution-args=arg1=value1,arg2=value3,arg3=value3 <task-id>

Exemplo do PySpark:

gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --trigger-type=RECURRING --trigger-schedule=<Cron schedule https://en.wikipedia.org/wiki/Cron> --spark-python-script-file=<gcs location to python script> --execution-service-account=<service-account-email> --execution-args=^::^arg1=value1::arg2=value2::TASK_ARGS="pos-arg1, pos-arg2" <task-id>

REST

Para criar uma tarefa, use o APIs Explorer.

Agendar uma tarefa do Spark SQL

gcloud

Para programar uma tarefa do Spark SQL, execute o mesmo comando da CLI gcloud usado em Programar uma tarefa do Spark (Java ou Python), com os seguintes parâmetros extras:

Parâmetro Descrição
--spark-sql-script O texto da consulta SQL. É preciso usar spark-sql-script ou spark-sql-script-file.
--spark-sql-script-file Uma referência a um arquivo de consulta. Esse valor pode ser o URI do Cloud Storage do arquivo de consulta ou o caminho para o conteúdo do script SQL. É preciso usar spark-sql-script ou spark-sql-script-file.
--execution-args Para tarefas do Spark SQL, os argumentos a seguir são obrigatórios e precisam ser transmitidos como argumentos posicionais:
--output_location, <GCS uri of the output directory>
--output_format, <output file format>.
Os formatos com suporte são arquivo CSV, arquivo JSON, parquet e orc.
gcloud dataplex tasks create --project=<project-name> --location=<location> --lake=<lake-id> --execution-service-account=<service-account-email> --trigger-type=ON_DEMAND --spark-sql-script=<sql-script> --execution-args=^::^TASK_ARGS="--output_location, <gcs folder location>, --output_format, json" <sql-task-id>

REST

Para criar uma tarefa, use o APIs Explorer.

Monitorar sua tarefa

Console

  1. No console do Google Cloud, acesse a página do Dataplex:

    Acessar o Dataplex

  2. Navegue até a visualização Processo.

  3. Na guia Tasks, há uma lista de tarefas, filtradas por tipos de modelo de tarefa.

  4. Na coluna Nome, clique em qualquer tarefa que você queira ver.

  5. Clique no ID do job que você quer acessar.

    A página do Dataproc é aberta no console do Google Cloud, permitindo visualizar os detalhes de monitoramento e de saída.

gcloud

A tabela a seguir lista os comandos da CLI gcloud para monitorar suas tarefas.

Ação Comando da CLI gcloud
Como listar tarefas gcloud dataplex tasks list --project=<project-name> --location=<location> --lake=<lake-id>
Como visualizar os detalhes da tarefa gcloud dataplex tasks describe --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Como listar jobs de uma tarefa gcloud dataplex tasks jobs list --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id>
Como visualizar detalhes do job gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

O Dataplex executa jobs no Dataproc sem servidor (lotes). Para visualizar os registros de execução de um job do Dataplex, siga estas etapas:

  1. Consiga o ID do job do Dataproc sem servidor (lotes). Execute este comando:

    gcloud dataplex tasks jobs describe --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>
    
  2. Veja os registros. Execute o comando a seguir usando o ID do job que você recebeu ao executar o comando anterior:

    gcloud beta dataproc batches wait --project=<project-name> --region=<location> <job-id>
    

REST

Para get ou list uma tarefa ou job, use as APIs Explorer.

Gerenciar a programação

No console do Google Cloud, no Dataplex, é possível editar a programação de uma tarefa, excluir uma tarefa ou cancelar um job em andamento. A tabela a seguir lista os comandos da CLI gcloud para essas ações.

Ação Comando da CLI gcloud
Editar programação de tarefas gcloud dataplex tasks update --project=<project-name> --location=<location> --lake=<lake-id> --trigger-schedule=<updated-schedule> <task-id>
Excluir uma tarefa gcloud dataplex tasks delete --project=<project-name> --location=<location> --lake=<lake-id> <task-id>
Cancelar um job gcloud dataplex tasks jobs cancel --project=<project-name> --location=<location> --lake=<lake-id> --task=<task-id> <job-id>

A seguir