-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
67 lines (57 loc) · 2.09 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import ReadFromParquet
from apache_beam.io.parquetio import WriteToParquet
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re
from utils import *
pipeline_options = PipelineOptions(argv = None)
pipeline = beam.Pipeline(options = pipeline_options)
dengue = (
pipeline
| "Leitura do dataset de dengue" >>
ReadFromText('casos_dengue.txt', skip_header_lines = 1)
| "Dengue - texto para lista" >>
beam.Map(texto_para_lista)
| "Dengue - lista para dicionario" >>
beam.Map(lista_para_dicionario, colunas_dengue)
| "Dengue - Criar campo ano_mes" >>
beam.Map(trata_datas, coluna_data = 'data_iniSE')
| "Dengue - Criar chave pelo estado" >>
beam.Map(chave_uf)
| "Dengue - Agrupar pelo estado" >>
beam.GroupByKey()
| "Dengue - Descompactar casos de dengue" >>
beam.FlatMap(casos_dengue)
| "Dengue - Soma dos casos pela chave" >>
beam.CombinePerKey(sum)
#| "Dengue - Mostrar resultados" >> beam.Map(print)
)
chuvas = (
pipeline
| "Leitura do dataset de chuvas" >>
ReadFromParquet('chuvas.parquet')
| 'Chuvas - Criar chave uf_ano_mes' >>
beam.Map(chuva_chave_uf_ano_mes_lista)
| 'Chuvas - Soma dos mm pela chave' >>
beam.CombinePerKey(sum)
| 'Chuvas - Arredondar resultados' >>
beam.Map(arredonda)
#| "Chuvas teste - Mostrar resultados" >> beam.Map(print)
)
final = (
({'chuvas': chuvas, 'dengue': dengue})
| "Final - Mesclar pcols" >>
beam.CoGroupByKey()
| "Final - filtrar valores vazios" >>
beam.Filter(filtra_campos_nao_vazios)
| "Final - descompacta a saída" >>
beam.Map(descompacta_elementos)
| "Final - preparar csv" >>
beam.Map(preparar_csv)
#| "Final - Mostrar resultados" >> beam.Map(print)
)
header = 'UF;ANO;MES;CHUVA;DENGUE'
final | "Criar arquivo csv" >> WriteToText('final', file_name_suffix='.csv', header = header)
pipeline.run()