Skip to content

Commit

Permalink
Use dataproc console url instead of gcs for log uri (#1263)
Browse files Browse the repository at this point in the history
* Use dataproc console url instead of gcs for log uri

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

* Fix positional argument to Job

Signed-off-by: Khor Shu Heng <khor.heng@gojek.com>

Co-authored-by: Khor Shu Heng <khor.heng@gojek.com>
  • Loading branch information
khorshuheng and khorshuheng authored Jan 12, 2021
1 parent f4f345e commit e8d8b72
Showing 1 changed file with 62 additions and 10 deletions.
72 changes: 62 additions & 10 deletions sdk/python/feast/pyspark/launchers/gcloud/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@

class DataprocJobMixin:
def __init__(
self, job: Job, refresh_fn: Callable[[], Job], cancel_fn: Callable[[], None]
self,
job: Job,
refresh_fn: Callable[[], Job],
cancel_fn: Callable[[], None],
project: str,
region: str,
):
"""
Implementation of common methods for different types of SparkJob running on Dataproc cluster.
Expand All @@ -39,6 +44,8 @@ def __init__(
self._job = job
self._refresh_fn = refresh_fn
self._cancel_fn = cancel_fn
self._project = project
self._region = region

def get_id(self) -> str:
"""
Expand Down Expand Up @@ -134,7 +141,10 @@ def get_start_time(self):
return self._job.status.state_start_time

def get_log_uri(self) -> Optional[str]:
return self._job.driver_output_resource_uri
return (
f"https://console.cloud.google.com/dataproc/jobs/{self.get_id()}"
f"?region={self._region}&project={self._project}"
)


class DataprocRetrievalJob(DataprocJobMixin, RetrievalJob):
Expand All @@ -147,6 +157,8 @@ def __init__(
job: Job,
refresh_fn: Callable[[], Job],
cancel_fn: Callable[[], None],
project: str,
region: str,
output_file_uri: str,
):
"""
Expand All @@ -155,7 +167,7 @@ def __init__(
Args:
output_file_uri (str): Uri to the historical feature retrieval job output file.
"""
super().__init__(job, refresh_fn, cancel_fn)
super().__init__(job, refresh_fn, cancel_fn, project, region)
self._output_file_uri = output_file_uri

def get_output_file_uri(self, timeout_sec=None, block=True):
Expand Down Expand Up @@ -187,9 +199,11 @@ def __init__(
job: Job,
refresh_fn: Callable[[], Job],
cancel_fn: Callable[[], None],
project: str,
region: str,
job_hash: str,
) -> None:
super().__init__(job, refresh_fn, cancel_fn)
super().__init__(job, refresh_fn, cancel_fn, project, region)
self._job_hash = job_hash

def get_hash(self) -> str:
Expand Down Expand Up @@ -379,21 +393,39 @@ def historical_feature_retrieval(
job_params, {"dev.feast.outputuri": job_params.get_destination_path()}
)
return DataprocRetrievalJob(
job, refresh_fn, cancel_fn, job_params.get_destination_path()
job=job,
refresh_fn=refresh_fn,
cancel_fn=cancel_fn,
project=self.project_id,
region=self.region,
output_file_uri=job_params.get_destination_path(),
)

def offline_to_online_ingestion(
self, ingestion_job_params: BatchIngestionJobParameters
) -> BatchIngestionJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn)
return DataprocBatchIngestionJob(
job=job,
refresh_fn=refresh_fn,
cancel_fn=cancel_fn,
project=self.project_id,
region=self.region,
)

def start_stream_to_online_ingestion(
self, ingestion_job_params: StreamIngestionJobParameters
) -> StreamIngestionJob:
job, refresh_fn, cancel_fn = self.dataproc_submit(ingestion_job_params, {})
job_hash = ingestion_job_params.get_job_hash()
return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash)
return DataprocStreamingIngestionJob(
job=job,
refresh_fn=refresh_fn,
cancel_fn=cancel_fn,
project=self.project_id,
region=self.region,
job_hash=job_hash,
)

def get_job_by_id(self, job_id: str) -> SparkJob:
job = self.job_client.get_job(
Expand All @@ -414,14 +446,34 @@ def _dataproc_job_to_spark_job(self, job: Job) -> SparkJob:

if job_type == SparkJobType.HISTORICAL_RETRIEVAL.name.lower():
output_path = job.pyspark_job.properties.get("dev.feast.outputuri", "")
return DataprocRetrievalJob(job, refresh_fn, cancel_fn, output_path)
return DataprocRetrievalJob(
job=job,
refresh_fn=refresh_fn,
cancel_fn=cancel_fn,
project=self.project_id,
region=self.region,
output_file_uri=output_path,
)

if job_type == SparkJobType.BATCH_INGESTION.name.lower():
return DataprocBatchIngestionJob(job, refresh_fn, cancel_fn)
return DataprocBatchIngestionJob(
job=job,
refresh_fn=refresh_fn,
cancel_fn=cancel_fn,
project=self.project_id,
region=self.region,
)

if job_type == SparkJobType.STREAM_INGESTION.name.lower():
job_hash = job.labels[self.JOB_HASH_LABEL_KEY]
return DataprocStreamingIngestionJob(job, refresh_fn, cancel_fn, job_hash)
return DataprocStreamingIngestionJob(
job=job,
refresh_fn=refresh_fn,
cancel_fn=cancel_fn,
project=self.project_id,
region=self.region,
job_hash=job_hash,
)

raise ValueError(f"Unrecognized job type: {job_type}")

Expand Down

0 comments on commit e8d8b72

Please sign in to comment.