Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

made changes in the script #37

Merged
merged 1 commit into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 35 additions & 25 deletions observations/observation_realtime_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,17 +289,17 @@ def obj_creation(obSub):
obsAppName = ''
userRolesArrUnique = []
roleObj = {}
roleObj["role_title"] = userSubType
roleObj["user_boardName"] = boardName
roleObj["user_type"] = user_type
roleObj["roleTitle"] = userSubType
roleObj["userBoardName"] = boardName
roleObj["userType"] = user_type
userRolesArrUnique.append(roleObj)
try:
orgArr = orgName(obSub["userProfile"]["organisations"])
if len(orgArr) >0:
for org in orgArr:
for obj in userRolesArrUnique:
obj["organisation_id"] = org["orgId"]
obj["organisation_name"] = org["orgName"]
obj["organisationId"] = org["orgId"]
obj["organisationName"] = org["orgName"]
except KeyError:
pass

Expand All @@ -326,13 +326,13 @@ def creatingObj(answer, quesexternalId, ans_val, instNumber, responseLabel, usrR
observationSubQuestionsObj['appName'] = obsAppName
try:
if obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == True:
observationSubQuestionsObj['solution_type'] = "observation_with_rubric"
observationSubQuestionsObj['solutionType'] = "observation_with_rubric"
elif obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == False:
observationSubQuestionsObj['solution_type'] = "observation_with_rubric_no_criteria_level_report"
observationSubQuestionsObj['solutionType'] = "observation_with_rubric_no_criteria_level_report"
else:
observationSubQuestionsObj['solution_type'] = "observation_with_out_rubric"
observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric"
except KeyError:
observationSubQuestionsObj['solution_type'] = "observation_with_out_rubric"
observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric"
observationSubQuestionsObj['entity'] = str(obSub['entityId'])
observationSubQuestionsObj['entityExternalId'] = obSub['entityExternalId']
# observationSubQuestionsObj['entityName'] = obSub['entityInformation']['name']
Expand Down Expand Up @@ -452,9 +452,9 @@ def creatingObj(answer, quesexternalId, ans_val, instNumber, responseLabel, usrR
observationSubQuestionsObj['questionResponseType'] = answer['responseType']
if answer['responseType'] == 'number':
if responseLabel:
observationSubQuestionsObj['questionResponseLabel_number'] = responseLabel
observationSubQuestionsObj['questionResponseLabelNumber'] = responseLabel
else:
observationSubQuestionsObj['questionResponseLabel_number'] = 0
observationSubQuestionsObj['questionResponseLabelNumber'] = 0
try:
if responseLabel:
if answer['responseType'] == 'text':
Expand Down Expand Up @@ -486,8 +486,8 @@ def creatingObj(answer, quesexternalId, ans_val, instNumber, responseLabel, usrR
else:
multipleFiles = multipleFiles + ' , ' + filedetail['sourcePath']
observationSubQuestionsObj['evidences'] = multipleFiles
observationSubQuestionsObj['evidence_count'] = str(len(answer['fileName']))
observationSubQuestionsObj['total_evidences'] = evidence_sub_count
observationSubQuestionsObj['evidenceCount'] = str(len(answer['fileName']))
observationSubQuestionsObj['totalEvidences'] = evidence_sub_count
# to fetch the parent question of matrix
if ans['responseType']=='matrix':
observationSubQuestionsObj['instanceParentQuestion'] = ans['question'][0]
Expand All @@ -512,7 +512,7 @@ def creatingObj(answer, quesexternalId, ans_val, instNumber, responseLabel, usrR
observationSubQuestionsObj['instanceParentExternalId'] = ''
observationSubQuestionsObj['instanceParentEcmSequence'] = ''
observationSubQuestionsObj['channel'] = rootOrgId
observationSubQuestionsObj['parent_channel'] = "SHIKSHALOKAM"
observationSubQuestionsObj['parentChannel'] = "SHIKSHALOKAM"
### Assessment Domain Logic - Start ###
domainArr = []
for domain in obSub['themes']:
Expand Down Expand Up @@ -727,6 +727,7 @@ def fetchingQuestiondetails(ansFn, instNumber):
usrRol
)
if finalObj["completedDate"]:
print(f"at line 730 == {finalObj}")
producer.send(
(config.get("KAFKA", "observation_druid_topic")),
json.dumps(finalObj).encode('utf-8')
Expand All @@ -743,6 +744,7 @@ def fetchingQuestiondetails(ansFn, instNumber):
None
)
if finalObj["completedDate"]:
print(f"at line 747 == {finalObj}")
producer.send(
(config.get("KAFKA", "observation_druid_topic")),
json.dumps(finalObj).encode('utf-8')
Expand All @@ -769,6 +771,7 @@ def fetchingQuestiondetails(ansFn, instNumber):
usrRol
)
if finalObj["completedDate"]:
print(f"at line 774 == {finalObj}")
producer.send(
(config.get("KAFKA", "observation_druid_topic")),
json.dumps(finalObj).encode('utf-8')
Expand All @@ -785,6 +788,7 @@ def fetchingQuestiondetails(ansFn, instNumber):
None
)
if finalObj["completedDate"]:
print(f"at line 791 == {finalObj}")
producer.send(
(config.get("KAFKA", "observation_druid_topic")),
json.dumps(finalObj).encode('utf-8')
Expand All @@ -807,6 +811,7 @@ def fetchingQuestiondetails(ansFn, instNumber):
usrRol
)
if finalObj["completedDate"]:
print(f"at line 814 == {finalObj}")
producer.send(
(config.get("KAFKA", "observation_druid_topic")),
json.dumps(finalObj).encode('utf-8')
Expand All @@ -824,6 +829,7 @@ def fetchingQuestiondetails(ansFn, instNumber):
None
)
if finalObj["completedDate"]:
print(f"at line 832 == {finalObj}")
producer.send(
(config.get("KAFKA", "observation_druid_topic")),
json.dumps(finalObj).encode('utf-8')
Expand Down Expand Up @@ -891,13 +897,13 @@ def main_data_extraction(obSub):
observationSubQuestionsObj['solutionName'] = ''
try:
if obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == True:
observationSubQuestionsObj['solution_type'] = "observation_with_rubric"
observationSubQuestionsObj['solutionType'] = "observation_with_rubric"
elif obSub["isRubricDriven"] == True and obSub["criteriaLevelReport"] == False:
observationSubQuestionsObj['solution_type'] = "observation_with_rubric_no_criteria_level_report"
observationSubQuestionsObj['solutionType'] = "observation_with_rubric_no_criteria_level_report"
else:
observationSubQuestionsObj['solution_type'] = "observation_with_out_rubric"
observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric"
except KeyError:
observationSubQuestionsObj['solution_type'] = "observation_with_out_rubric"
observationSubQuestionsObj['solutionType'] = "observation_with_out_rubric"

try:
observationSubQuestionsObj['completedDate'] = obSub['completedDate']
Expand Down Expand Up @@ -935,7 +941,7 @@ def main_data_extraction(obSub):
else:
# Handle the case when the list is empty
user_type = None
observationSubQuestionsObj['user_type'] = user_type
observationSubQuestionsObj['userType'] = user_type

observationSubQuestionsObj['solutionExternalId'] = obSub.get('solutionExternalId', '')
observationSubQuestionsObj['solutionId'] = obSub.get('solutionId', '')
Expand All @@ -950,10 +956,10 @@ def main_data_extraction(obSub):
orgArr = orgName(obSub.get('userProfile', {}).get('organisations',None))
if orgArr:
# observationSubQuestionsObj['schoolId'] = orgArr[0].get("organisation_id")
observationSubQuestionsObj['organisation_name'] = orgArr[0].get("organisation_name")
observationSubQuestionsObj['organisationName'] = orgArr[0].get("organisation_name")
else:
# observationSubQuestionsObj['schoolId'] = None
observationSubQuestionsObj['organisation_name'] = None
observationSubQuestionsObj['organisationName'] = None

# Insert data to sl-observation-meta druid datasource if status is anything
_id = observationSubQuestionsObj.get('observationSubmissionId', None)
Expand All @@ -962,6 +968,7 @@ def main_data_extraction(obSub):
if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-meta"):
infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-meta datasource")
# Upload observation submission data to Druid topic
print(f"at line 971 == {observationSubQuestionsObj}")
producer.send((config.get("KAFKA", "observation_meta_druid_topic")), json.dumps(observationSubQuestionsObj).encode('utf-8'))
producer.flush()
infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-meta datasource.")
Expand All @@ -977,15 +984,16 @@ def main_data_extraction(obSub):
if obSub['status'] == 'started':
observation_status['observationSubmissionId'] = obSub['_id']
try:
observation_status['started_at'] = obSub['completedDate']
observation_status['startedAt'] = obSub['completedDate']
except KeyError:
observation_status['started_at'] = ''
observation_status['startedAt'] = ''
_id = observation_status.get('observationSubmissionId', None)
try :
if _id:
if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-status-started"):
infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-status-started datasource")
# Upload observation status data to Druid topic
print(f"at line 996 == {observation_status}")
producer.send((config.get("KAFKA", "observation_started_druid_topic")), json.dumps(observation_status).encode('utf-8'))
producer.flush()
infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-status-started datasource.")
Expand All @@ -1000,13 +1008,14 @@ def main_data_extraction(obSub):
# Insert data to sl-observation-status-started druid datasource if status is inprogress
elif obSub['status'] == 'inprogress':
observation_status['observationSubmissionId'] = obSub['_id']
observation_status['inprogress_at'] = obSub['completedDate']
observation_status['inprogressAt'] = obSub['completedDate']
_id = observation_status.get('observationSubmissionId', None)
try :
if _id:
if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-status-inprogress"):
infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-status-inprogress datasource")
# Upload observation status data to Druid topic
print(f"at line 996 == {observation_status}")
producer.send((config.get("KAFKA", "observation_inprogress_druid_topic")), json.dumps(observation_status).encode('utf-8'))
producer.flush()
infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-status-inprogress datasource.")
Expand All @@ -1020,13 +1029,14 @@ def main_data_extraction(obSub):

elif obSub['status'] == 'completed':
observation_status['observationSubmissionId'] = obSub['_id']
observation_status['completed_at'] = obSub['completedDate']
observation_status['completedAt'] = obSub['completedDate']
_id = observation_status.get('observationSubmissionId', None)
try :
if _id:
if check_observation_submission_id_existance(_id,"observationSubmissionId","sl-observation-status-completed"):
infoLogger.info(f"No data duplection for the Submission ID : {_id} in sl-observation-status-completed datasource")
# Upload observation status data to Druid topic
print(f"at line 996 == {observation_status}")
producer.send((config.get("KAFKA", "observation_completed_druid_topic")), json.dumps(observation_status).encode('utf-8'))
producer.flush()
infoLogger.info(f"Data with submission_id {_id} is being inserted into the sl-observation-status-completed datasource")
Expand Down
12 changes: 6 additions & 6 deletions survey/survey_realtime_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,11 @@ def creatingObj(answer,quesexternalId,ans_val,instNumber,responseLabel):
# Extract response label for number response type
if answer['responseType'] == 'number':
if responseLabel:
surveySubQuestionsObj['questionResponseLabel_number'] = responseLabel
surveySubQuestionsObj['questionResponseLabelNumber'] = responseLabel
else:
surveySubQuestionsObj['questionResponseLabel_number'] = 0
surveySubQuestionsObj['questionResponseLabelNumber'] = 0
else:
surveySubQuestionsObj['questionResponseLabel_number'] = 0
surveySubQuestionsObj['questionResponseLabelNumber'] = 0

# Extract response label for other response types
try:
Expand Down Expand Up @@ -684,7 +684,7 @@ def main_data_extraction(obSub):
# Insert data to sl-survey-status-started druid datasource if status is started
if obSub['status'] == 'started':
survey_status['surveySubmissionId'] = obSub['_id']
survey_status['started_at'] = obSub['completedDate']
survey_status['startedAt'] = obSub['completedDate']
_id = survey_status.get('surveySubmissionId', None)
try :
if _id:
Expand All @@ -705,7 +705,7 @@ def main_data_extraction(obSub):
# Insert data to sl-survey-status-started druid datasource if status is inprogress
elif obSub['status'] == 'inprogress':
survey_status['surveySubmissionId'] = obSub['_id']
survey_status['inprogress_at'] = obSub['completedDate']
survey_status['inprogressAt'] = obSub['completedDate']
_id = survey_status.get('surveySubmissionId', None)
try :
if _id:
Expand All @@ -725,7 +725,7 @@ def main_data_extraction(obSub):

elif obSub['status'] == 'completed':
survey_status['surveySubmissionId'] = obSub['_id']
survey_status['completed_at'] = obSub['completedDate']
survey_status['completedAt'] = obSub['completedDate']
_id = survey_status.get('surveySubmissionId', None)
try :
if _id:
Expand Down