Skip to content

Commit

Permalink
added POI support
Browse files Browse the repository at this point in the history
  • Loading branch information
tozesm committed Apr 3, 2019
1 parent 5a49afa commit adbcdcc
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 6 deletions.
File renamed without changes.
70 changes: 64 additions & 6 deletions parserToElk.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
import getopt
import sys

def addToElastic(indexSuffix,payload):
def addToElasticBulk(payload):
try:
logging.debug(payload)
r = requests.post("http://elasticsearch.tz:9200/hackacity-%s/doc/_bulk"%(indexSuffix,), data=payload, headers={"Content-Type": "application/json"})
logging.debug(r.text)
except Exception as e:
logging.error("Failed to push event to elastic: " + str(e))

def addToElastic(payload):
try:
logging.debug(payload)
logging.debug(indexSuffix)
r = requests.post("http://elasticsearch.tz:9200/hackacity-%s/doc"%(indexSuffix,), json=payload)
logging.debug(r.text)
except Exception as e:
Expand All @@ -30,9 +37,47 @@ def getDocuments(index,query={"query": {"match_all": {}}}):

return results

def addInBulkMode(payload,sendLast=False):
global batchData, batchCount

if sendLast:
addToElasticBulk(batchData)
return

batchData += '{"index": {"_index": "%s", "_type": "doc"}}'%("hackacity-"+indexSuffix) + "\n"
batchData += json.dumps(payload) + "\n"
batchCount += 1

if batchCount >= batchSize:
addToElasticBulk(batchData)
batchData = ""
batchCount = 0

def parseCsvFile_POI(filename):
index,category,datatype=filename.split("/")[1].split(".")[0].split("_")
index = index.lower()
payload = {}
payload["category"] = category
payload["type"] = datatype
with open(filename, mode='r') as csv_file:
line_count = 0
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
if line_count == 0:
headers = row
line_count = 1
continue
try:
payload["value"] = float(row[3])
location = {}
location["lat"] = row[1]
location["lon"] = row[2]
payload["location"] = location
except ValueError as ve:
logging.error("Failed to parse entry: %s due to %s "%(str(row),str(ve)) )
addInBulkMode(payload)

addInBulkMode(None,True)

def parseCsvFile_IOT(filename):
payload = {}
payload["category"] = category
payload["type"] = datatype
Expand All @@ -53,7 +98,9 @@ def parseCsvFile_POI(filename):
payload["location"] = location
except ValueError as ve:
logging.error("Failed to parse entry: %s due to %s "%(str(row),str(ve)) )
addToElastic(index,payload)
addInBulkMode(payload)

addInBulkMode(None,True)

if __name__ == '__main__':

Expand All @@ -62,6 +109,10 @@ def parseCsvFile_POI(filename):

es = elasticsearch.Elasticsearch(['http://elasticsearch.tz:9200'],timeout=30)

batchData = ""
batchCount = 0
batchSize = 1000

try:
opts, args = getopt.getopt(sys.argv[1:], "f:e:")
except getopt.GetoptError, err:
Expand All @@ -76,5 +127,12 @@ def parseCsvFile_POI(filename):
else:
sys.exit(2)

#filename = "output/IOT_AirQuality_CO.csv"
index,category,datatype=filename.split("/")[1].split(".")[0].split("_")
indexSuffix = index.lower()
parseCsvFile_POI(filename)

parseCsvFile_POI("output/IOT_AirQuality_CO.csv")
"""
IOT_AirQuality_CO.csv IOT_AirQuality_NO2.csv IOT_AirQuality_O3.csv IOT_AirQuality_Ox.csv IOT_AirQuality_PM10.csv IOT_AirQuality_PM1.csv IOT_AirQuality_PM25.csv IOT_NoiseLevelObserved_LAeq.csv
"""

0 comments on commit adbcdcc

Please sign in to comment.