Skip to content

Commit

Permalink
WIP : bypassed the message incompatibility error, but now failing on …
Browse files Browse the repository at this point in the history
…a validation error on the rss source schema
  • Loading branch information
dagardner-nv committed Apr 18, 2024
1 parent ab3b144 commit c401291
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
3 changes: 2 additions & 1 deletion examples/llm/vdb_upload/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from morpheus.config import Config
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.pipeline.pipeline import Pipeline
from morpheus.stages.general.linear_modules_source import LinearModuleSourceStage

Expand Down Expand Up @@ -73,7 +74,7 @@ def setup_rss_source(pipe: Pipeline, config: Config, source_name: str, rss_confi
module_config={"rss_config": rss_config},
)
rss_pipe = pipe.add_stage(
LinearModuleSourceStage(config, module_definition, output_type=ControlMessage, output_port_name="output"))
LinearModuleSourceStage(config, module_definition, output_type=MessageMeta, output_port_name="output"))

return rss_pipe

Expand Down
11 changes: 7 additions & 4 deletions examples/llm/vdb_upload/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from morpheus.stages.general.trigger_stage import TriggerStage
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.output.write_to_vector_db_stage import WriteToVectorDBStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_nlp_stage import PreprocessNLPStage

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,6 +65,8 @@ def pipeline(pipeline_config: Config,

vdb_sources = process_vdb_sources(pipe, pipeline_config, source_config)

deserialize_stage = pipe.add_stage(DeserializeStage(pipeline_config))

trigger = None
if (isolate_embeddings):
trigger = pipe.add_stage(TriggerStage(pipeline_config))
Expand All @@ -85,13 +88,13 @@ def pipeline(pipeline_config: Config,

# Connect the pipeline
for source_output in vdb_sources:
if (isolate_embeddings):
pipe.add_edge(source_output, trigger)
else:
pipe.add_edge(source_output, nlp_stage)
pipe.add_edge(source_output, deserialize_stage)

if (isolate_embeddings):
pipe.add_edge(deserialize_stage, trigger)
pipe.add_edge(trigger, nlp_stage)
else:
pipe.add_edge(deserialize_stage, nlp_stage)

pipe.add_edge(nlp_stage, monitor_1)
pipe.add_edge(monitor_1, embedding_stage)
Expand Down

0 comments on commit c401291

Please sign in to comment.