Skip to content

Commit

Permalink
Initial updated pipeline (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
RHRolun authored Sep 26, 2024
1 parent 1b39692 commit cd60a58
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 1,028 deletions.
File renamed without changes.
File renamed without changes.
2 changes: 2 additions & 0 deletions lab-materials/05/05-05/get_claims.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ def get_claims(claim_ids=None):
claim_ids = claim_ids or int(os.environ.get("claim_id"))

if not claim_ids:
logger.info("Getting unprocessed claims")
claim_ids = get_unprocessed_claims()
else:
logger.info(f"Using claim {claim_ids}")
claim_ids = [claim_ids]

with open('claims.json', 'w') as f:
Expand Down
129 changes: 129 additions & 0 deletions lab-materials/05/05-05/pipeline_components.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import kfp
import kfp.dsl as dsl
from kfp.dsl import (
component,
Input,
Output,
Dataset,
Metrics,
)
from kfp import kubernetes

@dsl.container_component
def initialize():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''cd /shared-data
rm -r * 2>/dev/null
git clone https://github.com/rh-aiservices-bu/parasol-insurance
cd parasol-insurance
git checkout dev
ls
''',
],
args=[]
)

@dsl.container_component
def get_claims(claim_ids: int):
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''claim_id="$0"
export claim_id=$claim_id
export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_claims.py
ls
''',
],
args=[claim_ids]
)

@dsl.container_component
def get_accident_time():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_accident_time.py
''',
],
args=[]
)

@dsl.container_component
def get_location():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_location.py
''',
],
args=[]
)

@dsl.container_component
def get_sentiment():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python get_sentiment.py
''',
],
args=[]
)

@dsl.container_component
def detect_objects(detection_endpoint: str):
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''detection_endpoint="$0"
export detection_endpoint=$detection_endpoint
export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python detect_objects.py
''',
],
args=[detection_endpoint]
)

@dsl.container_component
def summarize_text():
return dsl.ContainerSpec(
image='quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1',
command=[
'sh',
'-c',
'''export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local
cd /shared-data
cd parasol-insurance/lab-materials/05/05-05
python summarize_text.py
''',
],
args=[]
)
232 changes: 232 additions & 0 deletions lab-materials/05/05-05/process-claims-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
# PIPELINE DEFINITION
# Name: process-claims-pipeline
# Description: Processes claims.
# Inputs:
# claim_ids: int [Default: 0.0]
# detection_endpoint: str [Default: 'https://some-endpoint']
components:
comp-detect-objects:
executorLabel: exec-detect-objects
inputDefinitions:
parameters:
detection_endpoint:
parameterType: STRING
comp-get-accident-time:
executorLabel: exec-get-accident-time
comp-get-claims:
executorLabel: exec-get-claims
inputDefinitions:
parameters:
claim_ids:
parameterType: NUMBER_INTEGER
comp-get-location:
executorLabel: exec-get-location
comp-get-sentiment:
executorLabel: exec-get-sentiment
comp-initialize:
executorLabel: exec-initialize
comp-summarize-text:
executorLabel: exec-summarize-text
deploymentSpec:
executors:
exec-detect-objects:
container:
args:
- '{{$.inputs.parameters[''detection_endpoint'']}}'
command:
- sh
- -c
- "detection_endpoint=\"$0\"\n export detection_endpoint=$detection_endpoint\n\
\ export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n\
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python detect_objects.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-accident-time:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python get_accident_time.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-claims:
container:
args:
- '{{$.inputs.parameters[''claim_ids'']}}'
command:
- sh
- -c
- "claim_id=\"$0\"\n export claim_id=$claim_id\n export\
\ POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n cd /shared-data\n\
\ cd parasol-insurance/lab-materials/05/05-05\n python\
\ get_claims.py\n ls\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-location:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python get_location.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-get-sentiment:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python get_sentiment.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-initialize:
container:
command:
- sh
- -c
- "cd /shared-data\n rm -r * 2>/dev/null\n git clone\
\ https://github.com/rh-aiservices-bu/parasol-insurance\n cd\
\ parasol-insurance\n git checkout dev\n ls\n \
\ "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
exec-summarize-text:
container:
command:
- sh
- -c
- "export POSTGRES_HOST=claimdb.$NAMESPACE.svc.cluster.local\n \
\ cd /shared-data\n cd parasol-insurance/lab-materials/05/05-05\n\
\ python summarize_text.py\n "
image: quay.io/rh-aiservices-bu/rhoai-lab-insurance-claim-processing-pipeline:1.1
pipelineInfo:
description: Processes claims.
name: process-claims-pipeline
root:
dag:
tasks:
detect-objects:
cachingOptions:
enableCache: true
componentRef:
name: comp-detect-objects
dependentTasks:
- get-claims
inputs:
parameters:
detection_endpoint:
componentInputParameter: detection_endpoint
taskInfo:
name: detect-objects
get-accident-time:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-accident-time
dependentTasks:
- get-claims
taskInfo:
name: get-accident-time
get-claims:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-claims
dependentTasks:
- initialize
inputs:
parameters:
claim_ids:
componentInputParameter: claim_ids
taskInfo:
name: get-claims
get-location:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-location
dependentTasks:
- get-claims
taskInfo:
name: get-location
get-sentiment:
cachingOptions:
enableCache: true
componentRef:
name: comp-get-sentiment
dependentTasks:
- get-claims
taskInfo:
name: get-sentiment
initialize:
cachingOptions:
enableCache: true
componentRef:
name: comp-initialize
taskInfo:
name: initialize
summarize-text:
cachingOptions:
enableCache: true
componentRef:
name: comp-summarize-text
dependentTasks:
- get-claims
taskInfo:
name: summarize-text
inputDefinitions:
parameters:
claim_ids:
defaultValue: 0.0
parameterType: NUMBER_INTEGER
detection_endpoint:
defaultValue: https://some-endpoint
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.8.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-detect-objects:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-accident-time:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-claims:
fieldPathAsEnv:
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
- fieldPath: metadata.namespace
name: NAMESPACE
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-location:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-get-sentiment:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-initialize:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
exec-summarize-text:
pvcMount:
- constant: processing-pipeline-storage
mountPath: /shared-data
Loading

0 comments on commit cd60a58

Please sign in to comment.