Skip to content

Commit

Permalink
merge with master
Browse files Browse the repository at this point in the history
  • Loading branch information
roytman committed Feb 7, 2024
2 parents b43df3f + 1c9ac5c commit da30c1a
Show file tree
Hide file tree
Showing 116 changed files with 10,832 additions and 11,051 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,6 @@ __pycache__
# Coverage
.coverage
.coverage*

# kfp local execution default directory
local_outputs/
4 changes: 2 additions & 2 deletions backend/src/apiserver/model/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,11 +363,11 @@ var runAPIToModelFieldMap = map[string]string{
"storage_state": "StorageState",
"status": "Conditions",
"namespace": "Namespace", // v2beta1 API
"experiment_id": "ExperimentId", // v2beta1 API
"experiment_id": "ExperimentUUID", // v2beta1 API
"state": "State", // v2beta1 API
"state_history": "StateHistory", // v2beta1 API
"runtime_details": "PipelineRuntimeManifest", // v2beta1 API
"recurring_run_id": "RecurringRunId", // v2beta1 API
"recurring_run_id": "JobUUID", // v2beta1 API
}

// APIToModelFieldMap returns a map from API names to field names for model Run.
Expand Down
6 changes: 3 additions & 3 deletions backend/src/apiserver/model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var taskAPIToModelFieldMap = map[string]string{
"namespace": "Namespace",
"pipeline_name": "PipelineName", // v2beta1 API
"pipelineName": "PipelineName", // v1beta1 API
"run_id": "RunId", // v2beta1 API
"runId": "RunId", // v1beta1 API
"run_id": "RunUUID", // v2beta1 API
"runId": "RunUUID", // v1beta1 API
"display_name": "Name", // v2beta1 API
"execution_id": "MLMDExecutionID", // v2beta1 API
"create_time": "CreatedTimestamp", // v2beta1 API
Expand All @@ -91,7 +91,7 @@ var taskAPIToModelFieldMap = map[string]string{
"fingerprint": "Fingerprint",
"state": "State", // v2beta1 API
"state_history": "StateHistory", // v2beta1 API
"parent_task_id": "ParentTaskId", // v2beta1 API
"parent_task_id": "ParentTaskUUID", // v2beta1 API
"mlmdExecutionID": "MLMDExecutionID", // v1beta1 API
"created_at": "CreatedTimestamp", // v1beta1 API
"finished_at": "FinishedTimestamp", // v1beta1 API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -93,3 +94,9 @@ func TestUnsetDefaultExperimentIdIfIdMatches(t *testing.T) {

db.Close()
}

func TestExperimentAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Experiment{}).APIToModelFieldMap() {
assert.Contains(t, experimentColumns, modelField)
}
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/storage/job_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,3 +964,9 @@ func TestDeleteJob_InternalError(t *testing.T) {
assert.Equal(t, codes.Internal, err.(*util.UserError).ExternalStatusCode(),
"Expected delete job to return internal error")
}

func TestJobAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Job{}).APIToModelFieldMap() {
assert.Contains(t, jobColumns, modelField)
}
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/storage/run_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,3 +1421,9 @@ func TestParseResourceReferences(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, expectedResourceReferences, actualResourceReferences)
}

func TestRunAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Run{}).APIToModelFieldMap() {
assert.Contains(t, runColumns, modelField)
}
}
6 changes: 6 additions & 0 deletions backend/src/apiserver/storage/task_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,9 @@ func TestTaskStore_UpdateOrCreateTasks(t *testing.T) {
})
}
}

func TestTaskAPIFieldMap(t *testing.T) {
for _, modelField := range (&model.Task{}).APIToModelFieldMap() {
assert.Contains(t, taskColumns, modelField)
}
}
53 changes: 33 additions & 20 deletions backend/src/v2/metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,29 +700,42 @@ func (c *Client) GetExecutionsInDAG(ctx context.Context, dag *DAG, pipeline *Pip
// Note, because MLMD does not have index on custom properties right now, we
// take a pipeline run context to limit the number of executions the DB needs to
// iterate through to find sub-executions.
res, err := c.svc.GetExecutionsByContext(ctx, &pb.GetExecutionsByContextRequest{
ContextId: pipeline.pipelineRunCtx.Id,
Options: &pb.ListOperationOptions{
FilterQuery: &parentDAGFilter,
},
})
if err != nil {
return nil, err
}
execs := res.GetExecutions()
for _, e := range execs {
execution := &Execution{execution: e}
taskName := execution.TaskName()
if taskName == "" {
return nil, fmt.Errorf("empty task name for execution ID: %v", execution.GetID())

nextPageToken := ""
for {
res, err := c.svc.GetExecutionsByContext(ctx, &pb.GetExecutionsByContextRequest{
ContextId: pipeline.pipelineRunCtx.Id,
Options: &pb.ListOperationOptions{
FilterQuery: &parentDAGFilter,
NextPageToken: &nextPageToken,
},
})
if err != nil {
return nil, err
}

execs := res.GetExecutions()
for _, e := range execs {
execution := &Execution{execution: e}
taskName := execution.TaskName()
if taskName == "" {
return nil, fmt.Errorf("empty task name for execution ID: %v", execution.GetID())
}
existing, ok := executionsMap[taskName]
if ok {
// TODO(Bobgy): to support retry, we need to handle multiple tasks with the same task name.
return nil, fmt.Errorf("two tasks have the same task name %q, id1=%v id2=%v", taskName, existing.GetID(), execution.GetID())
}
executionsMap[taskName] = execution
}
existing, ok := executionsMap[taskName]
if ok {
// TODO(Bobgy): to support retry, we need to handle multiple tasks with the same task name.
return nil, fmt.Errorf("two tasks have the same task name %q, id1=%v id2=%v", taskName, existing.GetID(), execution.GetID())

nextPageToken = res.GetNextPageToken()

if nextPageToken == "" {
break
}
executionsMap[taskName] = execution
}

return executionsMap, nil
}

Expand Down
2 changes: 1 addition & 1 deletion components/google-cloud/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ RUN pip3 install -U "fsspec>=0.7.4" "gcsfs>=0.6.0" "pandas<=1.3.5" "scikit-learn
RUN pip3 install -U google-cloud-notebooks

# Install main package
RUN pip3 install "git+https://github.com/kubeflow/pipelines.git@google-cloud-pipeline-components-2.8.0#egg=google-cloud-pipeline-components&subdirectory=components/google-cloud"
RUN pip3 install "git+https://github.com/kubeflow/pipelines.git@google-cloud-pipeline-components-2.9.0#egg=google-cloud-pipeline-components&subdirectory=components/google-cloud"

# Note that components can override the container entry ponint.
ENTRYPOINT ["python3","-m","google_cloud_pipeline_components.container.v1.aiplatform.remote_runner"]
10 changes: 10 additions & 0 deletions components/google-cloud/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
## Upcoming release
* Fix the missing output of pipeline remote runner. `AutoMLImageTrainingJobRunOp` now passes the model artifacts correctly to downstream components.
* Fix the metadata of Model Evaluation resource when row based metrics is disabled in `preview.model_evaluation.evaluation_llm_text_generation_pipeline`.

## Release 2.9.0
* Use `large_model_reference` for `model_reference_name` when uploading models from `preview.llm.rlhf_pipeline` instead of hardcoding value as `text-bison@001`.
* Disable caching when resolving model display names for RLHF-tuned models so a unique name is generated on each `preview.llm.rlhf_pipeline` run.
* Upload the tuned adapter to Model Registry instead of model checkpoint from `preview.llm.rlhf_pipeline`.
* Fix the naming of AutoSxS's question answering task. "question_answer" -> "question_answering".
* Add Vertex model get component (`v1.model.ModelGetOp`).
* Migrate to Protobuf 4 (`protobuf>=4.21.1,<5`). Require `kfp>=2.6.0`.
* Support setting version aliases in (`v1.model.ModelUploadOp`).
* Only run `preview.llm.bulk_inference` pipeline after RLHF tuning for third-party models when `eval_dataset` is provided.
* Update LLM Evaluation Pipelines to use `text-bison@002` model by default.
* Apply latest GCPC image vulnerability resolutions (base OS and software updates).

## Release 2.8.0
* Release AutoSxS pipeline to preview.
Expand Down
5 changes: 5 additions & 0 deletions components/google-cloud/docs/source/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
[
{
"version": "https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.9.0",
"title": "2.9.0",
"aliases": []
},
{
"version": "https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-2.8.0",
"title": "2.8.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
# Copyright 2024 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,10 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""KFP Container component for preprocessing predictions for the Arbiter."""
"""Component for running LLM Batch Prediction jobs side-by-side."""

import os
from typing import Dict, List
from typing import Any, Dict, List

from google_cloud_pipeline_components import _placeholders
from google_cloud_pipeline_components import utils as gcpc_utils
Expand All @@ -24,94 +24,94 @@

def _resolve_image() -> str:
"""Determines the image URI to create a container from."""
return (
os.environ.get('AUTOSXS_IMAGE_OVERRIDE')
or utils.get_default_image_uri('autosxs'))
return os.environ.get(
'AUTOSXS_IMAGE_OVERRIDE'
) or utils.get_default_image_uri('autosxs')


# pylint: disable=unused-argument,dangerous-default-value
@dsl.container_component
def arbiter_preprocess(
def batch_prediction_pairwise(
display_name: str,
evaluation_dataset: str,
id_columns: List[str],
response_column_a: str,
response_column_b: str,
task: str,
is_bp_output_a: bool,
is_bp_output_b: bool,
autorater_prompt_parameters: Dict[str, Dict[str, str]],
response_column_a: str,
response_column_b: str,
preprocessed_evaluation_dataset: dsl.Output[dsl.Dataset], # pylint: disable=unused-argument # pytype: disable=unsupported-operands
preprocessed_evaluation_dataset_uri: dsl.OutputPath(str), # pylint: disable=unused-argument # pytype: disable=invalid-annotation
gcp_resources: dsl.OutputPath(str), # pytype: disable=invalid-annotation
prediction_uris_a: str = '',
prediction_uris_b: str = '',
metadata: dsl.OutputPath(Dict[str, Any]), # pytype: disable=invalid-annotation
model_a: str = '',
model_b: str = '',
model_a_prompt_parameters: Dict[str, Dict[str, str]] = {},
model_b_prompt_parameters: Dict[str, Dict[str, str]] = {},
model_a_parameters: Dict[str, str] = {},
model_b_parameters: Dict[str, str] = {},
human_preference_column: str = '',
) -> dsl.ContainerSpec: # pylint: disable=g-doc-args
"""Preprocesses predictions tables for the AutoSxS Arbiter.
"""Runs up to two LLM Batch Prediction jobs side-by-side.
Args:
display_name: Display name for the batch prediction job.
evaluation_dataset: GCS or BigQuery URIs representing a dataset of prompts
and responses.
id_columns: The columns which distinguish unique evaluation examples.
response_column_a: The column containing responses for model a.
response_column_b: The column containing responses for model a.
task: Task to evaluate.
output_path: Path to write the path where preprocessed predictions are
stored.
is_bp_output_a: If True, the prediction URIs will be parsed as if they came
from Vertex Batch Prediction, where response_column_a represents a field
in the model output containing the response. If False, the expected format
will be a table containing all model_prompt_parameters and the
response_column.
is_bp_output_b: If True, the prediction URIs will be parsed as if they came
from Vertex Batch Prediction, where response_column_b represents a field
in the model output containing the response. If False, the expected format
will be a table containing all model_prompt_parameters and the
response_column.
prediction_uris: A list of GCS or BigQuery URIs representing a dataset of
prompts and responses for model a.
prediction_uris: A list of GCS or BigQuery URIs representing a dataset of
prompts and responses for model b.
autorater_prompt_parameters: Map of autorater prompt template parameters to
columns or templates.
response_column_a: The column containing responses for model a.
response_column_b: The column containing responses for model b.
model_a: A fully-qualified model resource name
(`projects/{project}/locations/{location}/models/{model}@{version}`) or
publisher model resource name (`publishers/{publisher}/models/{model}`).
This parameter is optional if Model A responses are specified.
model_b: A fully-qualified model resource name
(`projects/{project}/locations/{location}/models/{model}@{version}`) or
publisher model resource name (`publishers/{publisher}/models/{model}`).
This parameter is optional if Model B responses are specified.
model_a_prompt_parameters: Map of model A prompt template parameters to
columns or templates.
model_b_prompt_parameters: Map of model B prompt template parameters to
columns or templates.
autorater_prompt_parameters: Map of autorater prompt template parameters to
columns or templates.
model_a_parameters: The parameters that govern the predictions from model A,
such as temperature or maximum output tokens.
model_b_parameters: The parameters that govern the predictions from model B,
such as temperature or maximum output tokens.
human_preference_column: The column containing ground truths. The default
value is an empty string if not be provided by users.
Returns:
preprocessed_evaluation_dataset: Dataset of the table containing the inputs
expected by the Arbiter.
expected by the Arbiter.
preprocessed_evaluation_dataset_uri: URI of the table containing the inputs
expected by the Arbiter.
expected by the Arbiter.
gcp_resources: Tracker for GCP resources created by this component.
metadata_path: Path to write the object that stores computed metrics
metadata for the task preprocess component.
"""
return gcpc_utils.build_serverless_customjob_container_spec(
project=_placeholders.PROJECT_ID_PLACEHOLDER,
location=_placeholders.LOCATION_PLACEHOLDER,
custom_job_payload=utils.build_payload(
display_name='arbiter_preprocess',
display_name='batch_prediction_pairwise',
machine_type='n1-standard-4',
image_uri=_resolve_image(),
args=[
'--', # Used to mark the start of component flags.
'arbiter_preprocess',
'batch_prediction_sxs',
f'--display_name={display_name}',
f'--evaluation_dataset={evaluation_dataset}',
f'--prediction_uris_a={prediction_uris_a}',
f'--prediction_uris_b={prediction_uris_b}',
(
'--id_columns='
"{{$.inputs.parameters['id_columns'].json_escape[0]}}"
),
(
'--autorater_prompt_parameters='
"{{$.inputs.parameters['autorater_prompt_parameters']"
'.json_escape[0]}}'
),
f'--task={task}',
f'--project={_placeholders.PROJECT_ID_PLACEHOLDER}',
f'--location={_placeholders.LOCATION_PLACEHOLDER}',
f'--model_a={model_a}',
f'--model_b={model_b}',
(
'--model_a_prompt_parameters='
"{{$.inputs.parameters['model_a_prompt_parameters']"
Expand All @@ -122,14 +122,26 @@ def arbiter_preprocess(
"{{$.inputs.parameters['model_b_prompt_parameters']"
'.json_escape[0]}}'
),
(
'--autorater_prompt_parameters='
"{{$.inputs.parameters['autorater_prompt_parameters']"
'.json_escape[0]}}'
),
f'--response_column_a={response_column_a}',
f'--response_column_b={response_column_b}',
(
'--model_a_parameters='
"{{$.inputs.parameters['model_a_parameters'].json_escape[0]}}"
),
(
'--model_b_parameters='
"{{$.inputs.parameters['model_b_parameters'].json_escape[0]}}"
),
f'--human_preference_column={human_preference_column}',
f'--task={task}',
f'--is_batch_prediction_output_a={is_bp_output_a}',
f'--is_batch_prediction_output_b={is_bp_output_b}',
f'--output_dir={dsl.PIPELINE_ROOT_PLACEHOLDER}',
f'--staging_dir={dsl.PIPELINE_ROOT_PLACEHOLDER}',
f'--preprocessed_evaluation_dataset_uri={preprocessed_evaluation_dataset_uri}',
f'--metadata_path={metadata}',
f'--gcp_resources_path={gcp_resources}',
'--executor_input={{$.json_escape[1]}}',
],
),
Expand Down
Loading

0 comments on commit da30c1a

Please sign in to comment.