diff --git a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py index bd9cf11647..ea6ecdaae5 100644 --- a/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py +++ b/sdk/python/feast/pyspark/launchers/gcloud/dataproc.py @@ -26,7 +26,12 @@ class DataprocJobMixin: def __init__( - self, job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None] + self, + job: Job, + refresh_fn: Callable[[], Job], + cancel_fn: Callable[[], None], + project: str, + region: str, ): """ Implementation of common methods for different types of SparkJob running on Dataproc cluster. @@ -39,6 +44,8 @@ def __init__( self._job = job self._refresh_fn = refresh_fn self._cancel_fn = cancel_fn + self._project = project + self._region = region def get_id(self) -> str: """ @@ -134,7 +141,10 @@ def get_start_time(self): return self._job.status.state_start_time def get_log_uri(self) -> Optional[str]: - return self._job.driver_output_resource_uri + return ( + f"https://console.cloud.google.com/dataproc/jobs/{self.get_id()}" + f"?region={self._region}&project={self._project}" + ) class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob): @@ -147,6 +157,8 @@ def __init__( job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], + project: str, + region: str, output_file_uri: str, ): """ @@ -155,7 +167,7 @@ def __init__( Args: output_file_uri (str): Uri to the historical feature retrieval job output file. """ - super().__init__(job, refresh_fn, cancel_fn) + super().__init__(job, refresh_fn, cancel_fn, project, region) self._output_file_uri = output_file_uri def get_output_file_uri(self, timeout_sec=None, block=True): @@ -187,9 +199,11 @@ def __init__( job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None], + project: str, + region: str, job_hash: str, ) -> None: - super().__init__(job, refresh_fn, cancel_fn) + super().__init__(job, refresh_fn, cancel_fn, project, region) self._job_hash = job_hash def get_hash(self) -> str: @@ -379,21 +393,39 @@ def historical_feature_retrieval( job_params, {"dev.feast.outputuri": job_params.get_destination_path()} ) return DataprocRetrievalJob( - job, refresh_fn, cancel_fn, job_params.get_destination_path() + job=job, + refresh_fn=refresh_fn, + cancel_fn=cancel_fn, + project=self.project_id, + region=self.region, + output_file_uri=job_params.get_destination_path(), ) def offline_to_online_ingestion( self, ingestion_job_params: BatchIngestionJobParameters ) -> BatchIngestionJob: job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {}) - return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn) + return DataprocBatchIngestionJob( + job=job, + refresh_fn=refresh_fn, + cancel_fn=cancel_fn, + project=self.project_id, + region=self.region, + ) def start_stream_to_online_ingestion( self, ingestion_job_params: StreamIngestionJobParameters ) -> StreamIngestionJob: job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {}) job_hash = ingestion_job_params.get_job_hash() - return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash) + return DataprocStreamingIngestionJob( + job=job, + refresh_fn=refresh_fn, + cancel_fn=cancel_fn, + project=self.project_id, + region=self.region, + job_hash=job_hash, + ) def get_job_by_id(self, job_id: str) -> SparkJob: job = self.job_client.get_job( @@ -414,14 +446,34 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob: if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower(): output_path = job.pyspark_job.properties.get("dev.feast.outputuri", "") - return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path) + return DataprocRetrievalJob( + job=job, + refresh_fn=refresh_fn, + cancel_fn=cancel_fn, + project=self.project_id, + region=self.region, + output_file_uri=output_path, + ) if job_type == SparkJobType.BATCH_INGESTION.name.lower(): - return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn) + return DataprocBatchIngestionJob( + job=job, + refresh_fn=refresh_fn, + cancel_fn=cancel_fn, + project=self.project_id, + region=self.region, + ) if job_type == SparkJobType.STREAM_INGESTION.name.lower(): job_hash = job.labels[self.JOB_HASH_LABEL_KEY] - return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash) + return DataprocStreamingIngestionJob( + job=job, + refresh_fn=refresh_fn, + cancel_fn=cancel_fn, + project=self.project_id, + region=self.region, + job_hash=job_hash, + ) raise ValueError(f"Unrecognized job type: {job_type}")