Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Ignorar ambiente virtual
.venv/

# Ignorar dados públicos e suas versões comprimidas
dados-publicos/
dados-publicos-zip/
Empty file.
217 changes: 72 additions & 145 deletions dados_cnpj_baixa.py
Original file line number Diff line number Diff line change
@@ -1,153 +1,80 @@
# -*- coding: utf-8 -*-
"""

lista relação de arquivos na página de dados públicos da receita federal
e faz o download
https://www.gov.br/receitafederal/pt-br/assuntos/orientacao-tributaria/cadastros/consultas/dados-publicos-cnpj
https://dadosabertos.rfb.gov.br/CNPJ/
http://200.152.38.155/CNPJ/
"""
import os
import requests
from bs4 import BeautifulSoup
import requests, wget, os, sys, time, glob, parfive

#url = 'http://200.152.38.155/CNPJ/dados_abertos_cnpj/2024-08/' #padrão a partir de agosto/2024
#url_dados_abertos = 'https://dadosabertos.rfb.gov.br/CNPJ/dados_abertos_cnpj/'
#url_dados_abertos = 'http://200.152.38.155/CNPJ/dados_abertos_cnpj/'
url_dados_abertos = 'https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/'

pasta_zip = r"dados-publicos-zip" #local dos arquivos zipados da Receita
pasta_cnpj = 'dados-publicos'

def requisitos():
#se pastas não existirem, cria automaticamente
if not os.path.isdir(pasta_cnpj):
os.mkdir(pasta_cnpj)
if not os.path.isdir(pasta_zip):
os.mkdir(pasta_zip)

arquivos_existentes = list(glob.glob(pasta_cnpj +'/*.*')) + list(glob.glob(pasta_zip + '/*.*'))
if len(arquivos_existentes):
#eg.msgbox("Este programa baixa arquivos csv.zip de dados abertos da Receita Federal e converte para uso na RedeCNPJ aplicativo.\nIMPORTANTE: Para prosseguir, as pastas 'dados-publicos' e 'dados-publicos-zip', devem estar vazias, senão poderá haver inconsistências (juntar dados de meses distintos).\n",'Criar Bases RedeCNPJ')
#if eg.ynbox('Deseja apagar os arquivos das pastas ' + pasta_cnpj + ' e ' + pasta_zip + '?\nNÃO SERÁ POSSÍVEL REVERTER!!!!\n' + '\n'.join(arquivos_existentes) + '\nATENÇÃO: SE FOR EXECUTAR APENAS ALGUMA PARTE DO PROGRAMA, NÃO SELECIONE ESTA OPÇÃO, APAGUE MANUALMENTE.','Criar Bases RedeCNPJ', ['SIM-APAGAR', 'NÃO']):
r = input('Deseja apagar os arquivos das pastas ' + pasta_cnpj + ' e ' + pasta_zip + '?\n' + '\n'.join(arquivos_existentes) + '\nATENÇÃO: SE FOR EXECUTAR APENAS ALGUMA PARTE DO PROGRAMA, NÃO SELECIONE ESTA OPÇÃO, APAGUE MANUALMENTE. \nNÃO SERÁ POSSÍVEL REVERTER!!!!\nDeseja prosseguir e apagar os arquivos (y/n)??')
if r and r.upper()=='Y':
for arq in arquivos_existentes:
print('Apagando arquivo ' + arq)
os.remove(arq)
else:
print('Parando... Apague os arquivos ' + pasta_cnpj + ' e ' + pasta_zip +' e tente novamente')
input('Pressione Enter')
sys.exit(1)
# else:
# eg.msgbox("Este programa baixa arquivos csv.zip de dados abertos da Receita Federal e converte para uso na RedeCNPJ aplicativo.",'Criar Bases RedeCNPJ')

# if len(glob.glob(os.path.join(pasta_zip,'*.zip'))):
# print(f'Há arquivos zip na pasta {pasta_zip}. Apague ou mova esses arquivos zip e tente novamente')
# input('Pressione Enter')
# sys.exit(1)
requisitos()

print(time.asctime(), f'Início de {sys.argv[0]}:')

soup_pagina_dados_abertos = BeautifulSoup(requests.get(url_dados_abertos).text, features="lxml")
try:
ultima_referencia = sorted([link.get('href') for link in soup_pagina_dados_abertos.find_all('a') if link.get('href').startswith('20')])[-1]
except:
print('Não encontrou pastas em ' + url_dados_abertos)
r = input('Pressione Enter.')
sys.exit(1)


url = url_dados_abertos + ultima_referencia
# page = requests.get(url)
# data = page.text
soup = BeautifulSoup(requests.get(url).text, features="lxml")
lista = []
print('Relação de Arquivos em ' + url)
for link in soup.find_all('a'):
if str(link.get('href')).endswith('.zip'):
cam = link.get('href')
if not cam.startswith('http'):
print(url+cam)
lista.append(url+cam)
else:
print(cam)
lista.append(cam)

if __name__ == '__main__':
resp = input(f'Deseja baixar os arquivos acima para a pasta {pasta_zip} (y/n)?')
if resp.lower()!='y' and resp.lower()!='s':
sys.exit()
# URL base da Receita Federal
URL_BASE = "https://arquivos.receitafederal.gov.br/cnpj/dados_abertos_cnpj/2024-11/"
DIRETORIO_DOWNLOAD = "./downloads_cnpj"

# Configuração de sessão com User-Agent
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
})

def listar_arquivos(url):
"""Obtém a lista de arquivos disponíveis para download."""
try:
print(f"Acessando {url}...")
response = session.get(url, timeout=10)
response.raise_for_status()

# Parse do HTML
soup = BeautifulSoup(response.text, "html.parser")
arquivos = []

# Procura links de arquivos
for link in soup.find_all("a"):
href = link.get("href")
if href and href.endswith(".zip"):
arquivos.append(href)

return arquivos

except requests.exceptions.RequestException as e:
print(f"Erro ao acessar a URL base: {e}")
return []

def baixar_arquivo(url_arquivo, caminho_destino):
"""Faz o download de um arquivo."""
try:
print(f"Baixando {url_arquivo}...")
response = session.get(url_arquivo, stream=True, timeout=30)
response.raise_for_status()

print(time.asctime(), 'Início do Download dos arquivos...')
with open(caminho_destino, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

if True: #baixa usando parfive, download em paralelo
downloader = parfive.Downloader()
for url in lista:
downloader.enqueue_file(url, path=pasta_zip, filename=os.path.split(url)[1])
downloader.download()
else: #baixar sequencial, rotina antiga
def bar_progress(current, total, width=80):
if total>=2**20:
tbytes='Megabytes'
unidade = 2**20
print(f"Arquivo salvo em {caminho_destino}")
except requests.exceptions.RequestException as e:
print(f"Erro ao baixar {url_arquivo}: {e}")

def main():
"""Executa o fluxo principal."""
print("Iniciando o processo de download dos arquivos do CNPJ...")

# Garantir que o diretório de downloads exista
os.makedirs(DIRETORIO_DOWNLOAD, exist_ok=True)

# Listar arquivos disponíveis
arquivos = listar_arquivos(URL_BASE)

if not arquivos:
print("Não foi possível listar os arquivos.")
return

print(f"Arquivos encontrados: {arquivos}")

# Baixar cada arquivo
for arquivo in arquivos:
url_arquivo = os.path.join(URL_BASE, arquivo)
caminho_destino = os.path.join(DIRETORIO_DOWNLOAD, arquivo)

if not os.path.exists(caminho_destino): # Evita baixar novamente
baixar_arquivo(url_arquivo, caminho_destino)
else:
tbytes='kbytes'
unidade = 2**10
progress_message = f"Baixando: %d%% [%d / %d] {tbytes}" % (current / total * 100, current//unidade, total//unidade)
# Don't use print() as it will print in new line every time.
sys.stdout.write("\r" + progress_message)
sys.stdout.flush()

for k, url in enumerate(lista):
print('\n' + time.asctime() + f' - item {k}: ' + url)
wget.download(url, out=os.path.join(pasta_zip, os.path.split(url)[1]), bar=bar_progress)
print(f"Arquivo já existe: {caminho_destino}")


print('\n\n'+ time.asctime(), f' Finalizou {sys.argv[0]}!!!')
print(f"Baixou {len(glob.glob(os.path.join(pasta_zip,'*.zip')))} arquivos.")
if __name__ == '__main__':
input('Pressione Enter')

#lista dos arquivos (até julho/2024)
'''
http://200.152.38.155/CNPJ/Cnaes.zip
http://200.152.38.155/CNPJ/Empresas0.zip
http://200.152.38.155/CNPJ/Empresas1.zip
http://200.152.38.155/CNPJ/Empresas2.zip
http://200.152.38.155/CNPJ/Empresas3.zip
http://200.152.38.155/CNPJ/Empresas4.zip
http://200.152.38.155/CNPJ/Empresas5.zip
http://200.152.38.155/CNPJ/Empresas6.zip
http://200.152.38.155/CNPJ/Empresas7.zip
http://200.152.38.155/CNPJ/Empresas8.zip
http://200.152.38.155/CNPJ/Empresas9.zip
http://200.152.38.155/CNPJ/Estabelecimentos0.zip
http://200.152.38.155/CNPJ/Estabelecimentos1.zip
http://200.152.38.155/CNPJ/Estabelecimentos2.zip
http://200.152.38.155/CNPJ/Estabelecimentos3.zip
http://200.152.38.155/CNPJ/Estabelecimentos4.zip
http://200.152.38.155/CNPJ/Estabelecimentos5.zip
http://200.152.38.155/CNPJ/Estabelecimentos6.zip
http://200.152.38.155/CNPJ/Estabelecimentos7.zip
http://200.152.38.155/CNPJ/Estabelecimentos8.zip
http://200.152.38.155/CNPJ/Estabelecimentos9.zip
http://200.152.38.155/CNPJ/Motivos.zip
http://200.152.38.155/CNPJ/Municipios.zip
http://200.152.38.155/CNPJ/Naturezas.zip
http://200.152.38.155/CNPJ/Paises.zip
http://200.152.38.155/CNPJ/Qualificacoes.zip
http://200.152.38.155/CNPJ/Simples.zip
http://200.152.38.155/CNPJ/Socios0.zip
http://200.152.38.155/CNPJ/Socios1.zip
http://200.152.38.155/CNPJ/Socios2.zip
http://200.152.38.155/CNPJ/Socios3.zip
http://200.152.38.155/CNPJ/Socios4.zip
http://200.152.38.155/CNPJ/Socios5.zip
http://200.152.38.155/CNPJ/Socios6.zip
http://200.152.38.155/CNPJ/Socios7.zip
http://200.152.38.155/CNPJ/Socios8.zip
http://200.152.38.155/CNPJ/Socios9.zip
'''
if __name__ == "__main__":
main()
54 changes: 54 additions & 0 deletions dados_cnpj_extrair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from pathlib import Path
import zipfile
from tqdm import tqdm

# Caminhos das pastas
PASTA_ZIPS = Path("dados-publicos-zip")
PASTA_DESTINO = Path("dados-publicos")

def excluir_arquivos_existentes(pasta_destino):
"""Exclui todos os arquivos existentes na pasta de destino."""
if pasta_destino.exists():
for arquivo in pasta_destino.rglob("*"):
if arquivo.is_file():
arquivo.unlink()

def extrair_arquivos_zip():
"""Extrai arquivos ZIP da pasta origem para a pasta destino com barra de progresso."""
if not PASTA_ZIPS.exists():
print(f"A pasta {PASTA_ZIPS} não existe. Verifique o caminho.")
return

# Excluir arquivos existentes na pasta de destino
excluir_arquivos_existentes(PASTA_DESTINO)

# Criar a pasta de destino, se não existir
PASTA_DESTINO.mkdir(parents=True, exist_ok=True)

# Obter lista de arquivos ZIP
arquivos_zip = list(PASTA_ZIPS.glob("*.zip"))

if not arquivos_zip:
print("Nenhum arquivo ZIP encontrado na pasta origem.")
return

total_arquivos = len(arquivos_zip)
sucesso = 0

# Barra de progresso
print("Extraindo arquivos...")
with tqdm(total=total_arquivos, unit="arquivo", desc="Progresso") as barra:
for arquivo_zip in arquivos_zip:
try:
with zipfile.ZipFile(arquivo_zip, 'r') as zip_ref:
zip_ref.extractall(PASTA_DESTINO)
sucesso += 1
except Exception as e:
print(f"Erro ao extrair {arquivo_zip.name}: {e}")
finally:
barra.update(1)

print(f"Extração concluída: {sucesso}/{total_arquivos} arquivos extraídos com sucesso.")

if __name__ == "__main__":
extrair_arquivos_zip()