-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
127 lines (97 loc) · 3.13 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import ReadFromParquet
from apache_beam.io import WriteToParquet
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re
colunas_dengue = [
'id',
'data_iniSE',
'casos',
'ibge_code',
'cidade',
'uf',
'cep',
'latitude',
'longitude'
]
colunas_chuvas = [
'data',
'mm',
'uf'
]
def texto_para_lista(elemento, delimitador ='|'):
"""
Função que retorna uma lista apos receber uma string
"""
return elemento.split(delimitador)
def lista_para_dicionario(elemento, colunas):
"""
Função que retorna o dicionario apos a entrada de uma lista
"""
return dict(zip(colunas, elemento))
def trata_datas(elemento, coluna_data):
"""
Recebe um dicionario e altera a coluna de data
"""
elemento['ano_mes'] = '-'.join(elemento[coluna_data].split('-')[:2])
return elemento
def chave_uf(elemento):
"""
Recebe um dicionario e retorna uma tupla (UF, dicionario)
"""
chave = elemento['uf']
return chave, elemento
def chuva_chave_uf_ano_mes_lista(elemento):
"""
Recebe um dicionario de elementos {'data': '2015-01-31', 'mm': 0.0, 'uf': 'CE'} e retorna uma tupla contendo uma chave e o valor de chuva em mm
('UF-ANO-MES', '1.3')
"""
data, mm, uf = elemento['data'], elemento['mm'], elemento['uf']
if mm < 0: mm = 0
ano_mes = '-'.join(data.split('-')[:2])
chave = f"{uf}-{ano_mes}"
return chave, mm
def arredonda(elemento):
"""
Recebe uma tupla e retorna a mesma com o valor mm arredondado
"""
chave , mm = elemento
return chave, round(mm,1)
def casos_dengue(elemento):
"""
Recebe uma tupla ('RS', [{},{}]) e retorna uma tupla com uf_ano_mes e casos ('RS-2014-12', 8.0)
"""
uf, registros = elemento
for registro in registros:
if re.search(r'\d',registro['casos']):
yield (f"{uf}-{registro['ano_mes']}", float(registro['casos']))
else:
yield (f"{uf}-{registro['ano_mes']}", 0.0)
def filtra_campos_nao_vazios(elemento):
"""
Remove elementos que tenham chaves vazias
"""
chave, dados = elemento
if all([
dados['chuvas'],
dados['dengue']
]):
return True
return False
def descompacta_elementos(elemento):
"""
Recebe uma tupla com uma chuva e um dicionario ('CE-2015-11', {'chuvas': 0.4, 'dengue': 21.0})
Retorna uma tupla com uf,ano,mes,mm de chuva,casos de dengue ('CE', '2015','11', '0.4','21.0')
"""
chave, dados = elemento
chuva = dados['chuvas'][0]
dengue = dados['dengue'][0]
uf, ano, mes = chave.split('-')
return uf, ano, mes, str(chuva), str(dengue)
def preparar_csv(elemento, delimitador = ';'):
"""
Recebe uma tupla e retorna uma string delimitada
"""
return delimitador.join(elemento)