Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

05-05 pipeline fixed #133

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lab-materials/05/05-05/get_claims.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ def get_claims(claim_ids=None):
claim_ids = claim_ids or int(os.environ.get("claim_id"))

if not claim_ids:
logger.info("Getting unprocessed claims")
claim_ids = get_unprocessed_claims()
else:
logger.info(f"Using claim {claim_ids}")
claim_ids = [claim_ids]

with open('claims.json', 'w') as f:
Expand Down
129 changes: 129 additions & 0 deletions lab-materials/05/05-05/pipeline_components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import kfp
import kfp.dsl as dsl
from kfp.dsl import (
component,
Input,
Output,
Dataset,
Metrics,
)
from kfp import kubernetes

@dsl.container_component
def initialize():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''cd /shared-data
rm -r * 2>/dev/null
git clone https://github.com/rh-aiservices-bu/parasol-insurance
cd parasol-insurance
git checkout dev
ls
''',
],
args=[]
)

@dsl.container_component
def get_claims(claim_ids: int):
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''claim_id="$0"
export claim_id=$claim_id
export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_claims.py
ls
''',
],
args=[claim_ids]
)

@dsl.container_component
def get_accident_time():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_accident_time.py
''',
],
args=[]
)

@dsl.container_component
def get_location():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_location.py
''',
],
args=[]
)

@dsl.container_component
def get_sentiment():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_sentiment.py
''',
],
args=[]
)

@dsl.container_component
def detect_objects(detection_endpoint: str):
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''detection_endpoint="$0"
export detection_endpoint=$detection_endpoint
export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python detect_objects.py
''',
],
args=[detection_endpoint]
)

@dsl.container_component
def summarize_text():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python summarize_text.py
''',
],
args=[]
)
232 changes: 232 additions & 0 deletions lab-materials/05/05-05/process-claims-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
# PIPELINE DEFINITION
# Name: process-claims-pipeline
# Description: Processes claims.
# Inputs:
# claim_ids: int [Default: 0.0]
# detection_endpoint: str [Default: 'https://some-endpoint']
components:
comp-detect-objects:
executorLabel: exec-detect-objects
inputDefinitions:
parameters:
detection_endpoint:
parameterType: STRING
comp-get-accident-time:
executorLabel: exec-get-accident-time
comp-get-claims:
executorLabel: exec-get-claims
inputDefinitions:
parameters:
claim_ids:
parameterType: NUMBER_INTEGER
comp-get-location:
executorLabel: exec-get-location
comp-get-sentiment:
executorLabel: exec-get-sentiment
comp-initialize:
executorLabel: exec-initialize
comp-summarize-text:
executorLabel: exec-summarize-text
deploymentSpec:
executors:
exec-detect-objects:
container:
args:
- '{{$.inputs.parameters[''detection_endpoint'']}}'
command:
- sh
- -c
- "detection_endpoint=\"$0\"\n export detection_endpoint=$detection_endpoint\n\
\ export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n\
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python detect_objects.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-accident-time:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python get_accident_time.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-claims:
container:
args:
- '{{$.inputs.parameters[''claim_ids'']}}'
command:
- sh
- -c
- "claim_id=\"$0\"\n export claim_id=$claim_id\n export\
\ POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n cd /shared-data\n\
\ cd parasol-insurance/lab-materials/05/05-05\n python\
\ get_claims.py\n ls\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-location:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python get_location.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-sentiment:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python get_sentiment.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-initialize:
container:
command:
- sh
- -c
- "cd /shared-data\n rm -r * 2>/dev/null\n git clone\
\ https://github.com/rh-aiservices-bu/parasol-insurance\n cd\
\ parasol-insurance\n git checkout dev\n ls\n \
\ "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-summarize-text:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python summarize_text.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
pipelineInfo:
description: Processes claims.
name: process-claims-pipeline
root:
dag:
tasks:
detect-objects:
cachingOptions:
enableCache: true
componentRef:
name: comp-detect-objects
dependentTasks:
- get-claims
inputs:
parameters:
detection_endpoint:
componentInputParameter: detection_endpoint
taskInfo:
name: detect-objects
get-accident-time:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-accident-time
dependentTasks:
- get-claims
taskInfo:
name: get-accident-time
get-claims:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-claims
dependentTasks:
- initialize
inputs:
parameters:
claim_ids:
componentInputParameter: claim_ids
taskInfo:
name: get-claims
get-location:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-location
dependentTasks:
- get-claims
taskInfo:
name: get-location
get-sentiment:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-sentiment
dependentTasks:
- get-claims
taskInfo:
name: get-sentiment
initialize:
cachingOptions:
enableCache: true
componentRef:
name: comp-initialize
taskInfo:
name: initialize
summarize-text:
cachingOptions:
enableCache: true
componentRef:
name: comp-summarize-text
dependentTasks:
- get-claims
taskInfo:
name: summarize-text
inputDefinitions:
parameters:
claim_ids:
defaultValue: 0.0
parameterType: NUMBER_INTEGER
detection_endpoint:
defaultValue: https://some-endpoint
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.8.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-detect-objects:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-accident-time:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-claims:
fieldPathAsEnv:
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-location:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-sentiment:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-initialize:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-summarize-text:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
Loading
Loading