Universalizando o acesso a dados de qualidade.
Neste documento, mostra-se como configurar o ambiente para executar os 2 processos de dados existentes na BD: Elaboração de pipelines; Elaboração de códigos de ELT/ETL para subida de dados com frequência de atualização baixa.
Este guia é dedicado para novos integrantes da equipe da BD e voluntários que desejam colaborar com o projeto.
Warning
Você precisa ter uma conta no GitHub e ter o git configurado.
- git
- uv
- Um editor de texto (recomendado VS Code)
- WSL 2, apenas para usuário Windows
- Se você usa o Windows é essencial Instalar o WSL 2 (Ubuntu). Siga esse passo a passo
Clone esse repositório
git clone git@github.com:basedosdados/pipelines.gitEntre no repositório clonado
cd pipelinesuv syncInstalar os hooks de pré-commit (ver https://pre-commit.com/ para entendimento dos hooks)
uv run pre-commit install --install-hooksInstale as dependências do dbt
uv run dbt depsCrie um variável ambiente BD_SERVICE_ACCOUNT_DEV apontado para o arquivo da conta de serviço.
Abra o arquivo ~/.bashrc com seu editor ou use nano ~/.bashrc e adicione no final do arquivo.
Note
Certifique-se de o arquivo JSON existir. No exemplo abaixo ele está em $HOME/.basedosdados/credentials/staging.json
export BD_SERVICE_ACCOUNT_DEV="$HOME/.basedosdados/credentials/staging.json"Salve, feche e execute exec bash
- Se atente para sempre carregar o arquivo
.envcom o comandosource .env- Há a extensão do vscode chamada Python Environment Manager que você consegue ver e configurar as envs. Segue o link: Python Environment Manager
- Garanta que o arquivo
.envestá certinho: - Não deve ter espaços após o
: - Não pode ter
_a mais nem a menos
- Não se esqueça de criar o arquivo
auth.tomlna pasta$HOME/.prefectconforme descrito noREADME.md- Caso você não tenha a api_key do arquivo auth.toml, mande mensagem para a Laura, uma vez que é uma chave pessoal.
Essa seção cobre o desenvolvimento de pipelines. Pipelines são construídas usando o prefect e são para dados com frequência de atualização alta.
datasets/ # diretório raiz para o órgão
|-- projeto1/ # diretório de projeto
|-- |-- __init__.py # vazio
|-- |-- constants.py # valores constantes para o projeto
|-- |-- flows.py # declaração dos flows
|-- |-- schedules.py # declaração dos schedules
|-- |-- tasks.py # declaração das tasks
|-- |-- utils.py # funções auxiliares para o projeto
...
|-- __init__.py # importa todos os flows de todos os projetos
|-- constants.py # valores constantes para o órgão
|-- flows.py # declaração de flows genéricos do órgão
|-- schedules.py # declaração de schedules genéricos do órgão
|-- tasks.py # declaração de tasks genéricas do órgão
|-- utils.py # funções auxiliares para o órgão
...
utils/
|-- __init__.py
|-- flow1/
|-- |-- __init__.py
|-- |-- flows.py
|-- |-- tasks.py
|-- |-- utils.py
|-- flows.py # declaração de flows genéricos
|-- tasks.py # declaração de tasks genéricas
|-- utils.py # funções auxiliares
constants.py # valores constantes para todos os órgãos
O script manage.py é responsável por criar e listar pipelines desse repositório.
Você pode obter mais informações sobre os comandos com
uv run manage.py --helpO comando add-pipeline permite que você adicione uma nova pipeline a partir do template padrão. Para fazê-lo, basta executar
uv run manage.py add-pipeline nome_da_pipelineIsso irá criar um diretório com o nome da pipeline em pipelines/datasets/ com o template padrão, já adaptado ao nome do órgão. O nome do órgão deve estar em snake case e deve ser único. Qualquer conflito com um projeto já existente será reportado.
Para listar os órgãos existentes e nomes reservados
uv run manage.py list-pipelinesEscolha a pipeline que deseja executar (exemplo pipelines.rj_escritorio.test_pipeline.flows.flow). Crie um arquivo test.py na pasta e importe o flow
from pipelines.datasets.test_pipeline.flows import flow
from pipelines.utils.utils import run_local
run_local(flow, parameters={"param": "val"})Faça a cópia do arquivo .env.example para um novo arquivo nomeado .env:
cp .env.example .envSubstitua os valores das seguintes variáveis pelos seus respectivos valores:
GOOGLE_APPLICATION_CREDENTIALS: Path para um arquivo JSON com as credenciais da API do Google Cloud de uma conta de serviço com acesso de escrita ao bucketbasedosdados-devno Google Cloud Storage.VAULT_TOKEN: deve ter o valor do token do órgão para o qual você está desenvolvendo. Caso não saiba o token, entre em contato.
Carregue as variáveis de ambiente do arquivo .env:
source .envTambém, garanta que o arquivo $HOME/.prefect/auth.toml exista e tenha um conteúdo semelhante ao seguinte:
["prefect.basedosdados.org"]
api_key = "<sua-api-key>"
tenant_id = "<tenant-id>"O valor da chave tenant_id pode ser coletada através da seguinte URL: https://prefect.basedosdados.org/default/api. Devendo ser executado a seguinte query:
query {
tenant {
id
}
}Em seguida, tenha certeza que você já tem acesso à UI do Prefect, tanto para realizar a submissão da run, como para acompanhá-la durante o processo de execução.
Crie o arquivo test.py com a pipeline que deseja executar e adicione a função run_cloud com os parâmetros necessários:
from pipelines.utils.utils import run_cloud
from pipelines.[secretaria].[pipeline].flows import flow # Complete com as infos da sua pipeline
run_cloud(
flow, # O flow que você deseja executar
labels=[
"example", # Label para identificar o agente que irá executar a pipeline (ex: basedosdados-dev)
],
parameters = {
"param": "val", # Parâmetros que serão passados para a pipeline (opcional)
}
)Execute a pipeline com:
uv run test.pyA saída deverá se assemelhar ao exemplo abaixo:
[2022-02-19 12:22:57-0300] INFO - prefect.GCS | Uploading xxxxxxxx-development/2022-02-19t15-22-57-694759-00-00 to basedosdados-dev
Flow URL: http://localhost:8080/default/flow/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
└── ID: xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
└── Project: main
└── Labels: []
Run submitted, please check it at:
https://prefect.basedosdados.org/flow-run/xxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
(Opcional, mas recomendado) Quando acabar de desenvolver sua pipeline, delete todas as versões da mesma pela UI do Prefect.
Essa seção explica como enviar dados para o datalake da BD.
- Escolher a base e entender mais dos dados
- Preencher as tabelas de arquitetura
- As tabelas de arquitetura determinam qual a estrutura de cada tabela do seu conjunto de dados. Elas definem, por exemplo, o nome, ordem e metadados das variáveis, além de compatibilizações quando há mudanças em versões (por exemplo, se uma variável muda de nome de um ano para o outro).
- Template da tabela de arquitetura para preencher
- Leia o Manual de estido da BD para preencher a tabela de arquitetura
- Compartilhe a tabela com a equipe de dados da BD para aprovar.
- Escrever código de captura e limpeza de dados
- Os arquivos devem estar no formato CSV ou Parquet
- Chave de acesso ao Google Cloud dos voluntários
Você precisa adicionar seu código em models/<dataset-id>/code/ na pasta do dataset específico.
Para subir os dados, use o módulo da BD.
import basedosdados as bd
DATASET_ID = "dataset_id" # Nome do dataset
TABLE_ID = "table_id" # Nome da tabela
tb = bd.Table(dataset_id=DATASET_ID, table_id=TABLE_ID)No próximo passo carregamos os dados para o Storage da BD e criamos uma tabela BigQuery que por uma conexão externa acessa esses dados diretamente do Storage
Deixamos os parâmetros que mais usamos visíveis aqui, mas é importante explorar outras opções de parâmetros na documentação.
tb.create(
path=path_to_data, # Caminho para o arquivo csv ou parquet
if_storage_data_exists="raise",
if_table_exists="replace",
source_format="csv",
)from databasers_utils import TableArchitecture
arch = TableArchitecture(
dataset_id="<dataset-id>",
tables={
"<table-id>": "URL da arquiterura do Google Sheet", # Exemplo https://docs.google.com/spreadsheets/d/1K1svie4Gyqe6NnRjBgJbapU5sTsLqXWTQUmTRVIRwQc/edit?usp=drive_link
},
)
# Cria o yaml file
arch.create_yaml_file()
# Cria os arquivos sql
arch.create_sql_files()
# Atualiza o dbt_project.yml
arch.update_dbt_project()
# Faz o upload das colunas para o DJango
# Para essa etapa é necessário ter duas varíaveis de ambiente configurada
# BD_DJANGO_EMAIL="seuemail@basedosdados.org"
# BD_DJANGO_PASSWORD="password"
arch.upload_columns()Os arquivos sql do dbt usam a macro set_datalake_project que indica de qual projeto (basedosdados-staging ou basedosdados-dev) serão consumidos os dados. Ao criar os arquivos usando a função create_sql_files a macro será inserida.
select
col_name
from {{ set_datalake_project("<DATASET_ID>_staging.<TABLE_ID>") }}Important
Não use a macro para fazer join, joins devem ser feitos com a tabelas na zona de produção usando o projeto basedosdados basedosdados.<DATASET_ID>.<TABLE_ID> Exemplo: modelo br_bd_diretorios_brasil__distrito_2022.sql
Important
Ative o ambiente virtual (venv) com source .venv/bin/activate para executar os comandos dbt.
Important
No arquivo de configuração do DBT schema.yml o target é pré definido como dev. Com esta configuração, quando um modelo dbt for executado os dados serão consumidos do projeto basedosdados-dev.*_staging. Deste modo, não é preciso informar a flag --target no momento de testagem e validação de modelos em ambientes locais.
Materializa um único modelo pelo nome em basedosdados-dev consumindo os dados de basedosdados-dev.{table_id}_staging
dbt run --select dataset_id__table_idMaterializa todos os modelos em uma pasta em basedosdados-dev consumindo os dados de basedosdados-dev.{table_id}_staging
dbt run --select model.dateset_id.dateset_id__table_idMaterializa todos os modelos no caminho em basedosdados-dev consumindo os dados de basedosdados-dev.{table_id}_staging
dbt run --select models/dataset_idMaterializa um único modelo pelo caminho do arquivo sql em basedosdados-dev consumindo os dados de basedosdados-dev.{table_id}_staging
dbt run --select models/dataset/table_id.sqlOs testes do modelo são definidos no arquivo schema.yml
---
models:
- name: orders
columns:
- name: id_municipio
tests:
- relationships:
to: ref('br_bd_diretorios_brasil__municipio')
field: id_municipio---
models:
- name: orders
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns: [country_code, order_id]Note
Os testes abaixo são customizados, eles ficam em tests-dbt/generic
---
models:
- name: dataset_id__table_id
description:
tests:
- not_null_proportion_multiple_columns:
at_least: 0.95Permite explicitar valores que vão ser ignorados no momento de realizar o teste de relação e a proporção de valores sem correspondência que será tolerada.
Warning
Caso utilize esse teste é essencial documentar nos metadados da tabela quais exceções foram aplicadas e porque elas são aceitáveis. De preferência adicionar essas informações na descrição da tabela para que o usuário seja informado dessas exceções
---
models:
- name: dataset_id__table_id
description: Table description
tests:
- custom_relationships:
to: ref('br_bd_diretorios_mundo__sistema_harmonizado')
field: id_sh4
ignore_values: ['5410'] # O valor 5410 será ignorado
proportion_allowed_failures: 0Permite inserir uma proporção de chaves únicas que repetidas que pode ser tolerada. Usar com parcimônia! Pode mascarar repetição de linhas.
Warning
Caso utilize esse teste é essencial documentar nos metadados da tabela quais exceções foram aplicadas e porque elas são aceitáveis. De preferência adicionar essas informações na descrição da tabela para que o usuário seja informado dessas exceções
---
models:
- name: dataset_id__table_id
description: Table description
tests:
- custom_unique_combinations_of_columns:
combination_of_columns: [column_1, column_2]
proportion_allowed_failures: 0.05Para tabelas muito grandes é importante que o teste rode apenas nas linhas novas que serão incluidas. Para isso usaremos o config where e uma das keywords __most_recent_year_month__ | __most_recent_date__ | __most_recent_year__
É inserido à nível do teste e permite inserir lógica SQL para filtrar os dados.
---
models:
- name: dataset_id__table_id
description: Table description
tests:
- custom_unique_combinations_of_columns:
combination_of_columns: [column_1, column_2]
proportion_allowed_failures: 0.05
config:
where: 'date_column = '2024-01-01''A macro custom_get_where_subquery detecta a presença de uma das keywords acima e executa uma consulta para determinar os valores mais recentes de ano e mês, data ou ano de uma tabela. Em seguida, substitui a keyword declarada pelos valores mais recentes encontrados, o que garante que o teste seja executado apenas nas linhas mais recentes da tabela.
__most_recent_year_month__: A macro faz um query usando as colunasanoemespara identificar a data mais recente__most_recent_date__: A macro faz um query usando a colunadatapara identificar a data mais recente__most_recent_year__: A macro faz um query usando a colunaanopara identificar o ano
---
models:
- name: dataset_id__table_id
description: Table description
tests:
- custom_unique_combinations_of_columns:
combination_of_columns: [column_1]
proportion_allowed_failures: 0.05
config:
where: __most_recent_year_month__Testa um único modelo
dbt test --select dataset_id__table_idTesta todos os modelos em uma pasta
dbt test --select model.dateset_id.dateset_id__table_idTesta todos os modelos no caminho
dbt test --select models/dataset_id