Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] kafka pipeline-nlp fails in context.cpp (22.06) #244

Closed
pdmack opened this issue Jun 29, 2022 · 2 comments · Fixed by #245
Closed

[BUG] kafka pipeline-nlp fails in context.cpp (22.06) #244

pdmack opened this issue Jun 29, 2022 · 2 comments · Fixed by #245
Assignees
Labels
bug Something isn't working Needs Triage Need team to review and classify

Comments

@pdmack
Copy link
Contributor

pdmack commented Jun 29, 2022

Describe the bug
Kafka NLP run failure. Note this is using correct (not LFS) morpheus data files: labels/hash (per #242)

RuntimeError: multiple edges to a source detected; use an operator to select proper behavior

Configuring Pipeline via CLI
Loaded labels file. Current labels: [['address', 'bank_acct', 'credit_card', 'email', 'govt_id', 'name', 'password', 'phone_num', 'secret_keys', 'user']]
Starting pipeline via CLI... Ctrl+C to Quit
Config:
{
  "ae": null,
  "class_labels": [
    "address",
    "bank_acct",
    "credit_card",
    "email",
    "govt_id",
    "name",
    "password",
    "phone_num",
    "secret_keys",
    "user"
  ],
  "debug": false,
  "edge_buffer_size": 32,
  "feature_length": 256,
  "fil": null,
  "log_config_file": null,
  "log_level": 10,
  "mode": "NLP",
  "model_max_batch_size": 32,
  "num_threads": 4,
  "pipeline_batch_size": 8192
}
WARNING: Logging before InitGoogleLogging() is written to STDERR
W20220629 17:35:30.460322  4155 triton_inference.cpp:248] Failed to connect to Triton at 'ai-engine:8001'. Default gRPC port of (8001) was detected but C++ InferenceClientStage uses HTTP protocol. Retrying with default HTTP port (8000)
E20220629 17:35:30.467113  4155 controller.cpp:62] exception caught while performing update - this is fatal - issuing kill
E20220629 17:35:30.467154  4155 context.cpp:125] rank: 0; size: 1; tid: 139764432090880; fid: 0x7f1b90041000: set_exception issued; issuing kill to current runnable. Exception msg: RuntimeError: multiple edges to a source detected; use an operator to select proper behavior

At:
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/stages/general/monitor_stage.py(238): _build_single
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/single_port_stage.py(84): _build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(322): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py(159): inner_build
E20220629 17:35:30.467185  4155 context.cpp:125] rank: 0; size: 1; tid: 139764432090880; fid: 0x7f1b90041000: set_exception issued; issuing kill to current runnable. Exception msg: RuntimeError: multiple edges to a source detected; use an operator to select proper behavior

At:
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/stages/general/monitor_stage.py(238): _build_single
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/single_port_stage.py(84): _build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(322): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/stream_wrapper.py(344): build
  /opt/conda/envs/morpheus/lib/python3.8/site-packages/morpheus/pipeline/pipeline.py(159): inner_build
E20220629 17:35:30.467211  4155 manager.cpp:85] error detected on controller
E20220629 17:35:30.467285  3987 runner.cpp:190] Runner::await_join - an exception was caught while awaiting on one or more contexts/instances - rethrowing
CPP Enabled: True
====Registering Pipeline====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Pipeline====
Added source: <from-kafka-0; KafkaSourceStage(bootstrap_servers=broker:9092, input_topic=morpheus-input, group_id=custreamz, poll_interval=10millis, disable_commit=False, disable_pre_filtering=False)>
  └─> morpheus.MessageMeta
Added stage: <monitor-1; MonitorStage(description=From Kafka rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <deserialize-2; DeserializeStage()>
  └─ morpheus.MessageMeta -> morpheus.MultiMessage
Added stage: <monitor-3; MonitorStage(description=Deserialization rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
  └─ morpheus.MultiMessage -> morpheus.MultiMessage
Added stage: <preprocess-nlp-4; PreprocessNLPStage(vocab_hash_file=/common/data/model_data/bert-base-uncased-hash.txt, truncation=True, do_lower_case=True, add_special_tokens=False, stride=-1)>
  └─ morpheus.MultiMessage -> morpheus.MultiInferenceNLPMessage
Added stage: <monitor-5; MonitorStage(description=Preprocessing rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
  └─ morpheus.MultiInferenceNLPMessage -> morpheus.MultiInferenceNLPMessage
Added stage: <inference-6; TritonInferenceStage(model_name=sid-minibert-onnx, server_url=ai-engine:8001, force_convert_inputs=True, use_shared_memory=True)>
  └─ morpheus.MultiInferenceNLPMessage -> morpheus.MultiResponseProbsMessage
Added stage: <monitor-7; MonitorStage(description=Inference rate, smoothing=0.001, unit=inf, delayed_start=False, determine_count_fn=None)>
  └─ morpheus.MultiResponseProbsMessage -> morpheus.MultiResponseProbsMessage
Added stage: <serialize-8; SerializeStage(include=[], exclude=['^ts_'], fixed_columns=True)>
  └─ morpheus.MultiResponseProbsMessage -> morpheus.MessageMeta
Added stage: <monitor-9; MonitorStage(description=Serialization rate, smoothing=0.05, unit=messages, delayed_start=False, determine_count_fn=None)>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta
Added stage: <to-kafka-10; WriteToKafkaStage(bootstrap_servers=broker:9092, output_topic=morpheus-output)>
  └─ morpheus.MessageMeta -> morpheus.MessageMeta
====Pipeline Complete====

Steps/Code to reproduce bug

morpheus --log_level=DEBUG run --num_threads=4 --pipeline_batch_size=8192 --model_max_batch_size=32 --edge_buffer_size=32 pipeline-nlp --labels_file=/common/data/model_data/labels_nlp.txt --model_seq_length=256 from-kafka --input_topic morpheus-input --bootstrap_servers broker:9092 monitor --description='From Kafka rate' deserialize monitor --description='Deserialization rate' preprocess --vocab_hash_file=/common/data/model_data/bert-base-uncased-hash.txt --truncation=True --do_lower_case=True --add_special_tokens=False monitor --description='Preprocessing rate' inf-triton --force_convert_inputs=True --model_name=sid-minibert-onnx --server_url=ai-engine:8001 --use_shared_memory=True monitor --description='Inference rate' --smoothing=0.001 --unit inf serialize --exclude '^ts_' monitor --description='Serialization rate' to-kafka --output_topic morpheus-output --bootstrap_servers broker:9092 monitor --description='To Kafka rate'

Expected behavior
Successful completion of pipeline run.

Environment overview (please complete the following information)

  • Environment location: [Bare-metal, Docker]
  • Method of Morpheus install: [Docker/k8s]

Environment details
https://gist.github.com/pdmack/8f9342321523251bf9ecdd8cd349a029

Additional context
Needed by #237

@pdmack pdmack added bug Something isn't working Needs Triage Need team to review and classify labels Jun 29, 2022
@pdmack
Copy link
Contributor Author

pdmack commented Jun 29, 2022

    void complete_edge(std::shared_ptr<channel::IngressHandle> untyped_ingress) override
    {
        CHECK(untyped_ingress);
        if (m_ingress != nullptr)
        {
            // todo(ryan) - we could specialize this exception, then if we catch it in segment::Builder::make_edge, we
            // could enhance the error description and rethrow the same exception
            throw exceptions::SrfRuntimeError(
                "multiple edges to a source detected; use an operator to select proper behavior");
        }
        m_ingress = std::dynamic_pointer_cast<channel::Ingress<T>>(untyped_ingress);
        CHECK(m_ingress);
    }

@mdemoret-nv
Copy link
Contributor

Ok, found the issue, WriteToKafkaStage needs to return return node, input_stream[1] at the end of _build_single instead of input_stream

@ghost ghost closed this as completed in #245 Jun 30, 2022
ghost pushed a commit that referenced this issue Jun 30, 2022
Fixes #244 

The kafka stage needs to be a pass through node.

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)

URL: #245
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working Needs Triage Need team to review and classify
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants