Skip to content

Commit

Permalink
Merge pull request #11 from qiqb-osaka/feature/SSE
Browse files Browse the repository at this point in the history
Add client functions for SSE
  • Loading branch information
snuffkin authored Apr 16, 2024
2 parents 998430c + 674ecc7 commit 1230267
Show file tree
Hide file tree
Showing 7 changed files with 1,073 additions and 0 deletions.
5 changes: 5 additions & 0 deletions quri_parts/riqu/backend/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
RiquSamplingResult,
)

from .sse import (
RiquSseJob,
)

__all__ = [
"RiquConfig",
"RiquSamplingBackend",
"RiquSamplingJob",
"RiquSamplingResult",
"RiquSseJob",
]
132 changes: 132 additions & 0 deletions quri_parts/riqu/backend/sse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A module to run sse job on riqu server.
"""
import os
import base64
from typing import Optional

from quri_parts.backend import (
BackendError
)

from .sampling import RiquSamplingBackend, RiquConfig, RiquSamplingJob
from ..rest import ApiClient, Configuration, JobApi

class RiquSseJob:

def __init__(
self,
config: Optional[RiquConfig] = None
):
# if config is None, load them from file
if config is None:
self.config = RiquConfig.from_file()
else:
self.config = config

# construct JobApi
rest_config = Configuration()
rest_config.host = self.config.url
if self.config.proxy:
rest_config.proxy = self.config.proxy
api_client = ApiClient(
configuration=rest_config,
header_name="q-api-token",
header_value=self.config.api_token,
)
self._job_api: JobApi = JobApi(api_client=api_client)
self.job = None

def run_sse(
self,
file_path: str,
remark: Optional[str] = ""
) -> RiquSamplingJob:
# if file_path is not set, raise ValueError
if file_path is None:
raise ValueError("file_path is not set.")

# if the file does not exist, raise ValueError
if not os.path.exists(file_path):
raise ValueError(f'The file does not exist: {file_path}')

# get the base name and the extension of the file
base_name, ext = os.path.splitext(file_path)

# if the extension is not .py, raise ValueError
if ext != ".py":
raise ValueError(f"The file is not python file: {file_path}")

max_file_size = 10 * 1024 * 1024 # 10MB

# if the file size is larger than max_file_size, raise ValueError
if os.path.getsize(file_path) >= max_file_size:
raise ValueError(f"file size is larger than {max_file_size}")

# set sse job type
jobType = "sse"

try:
response = self._job_api.post_ssejob(up_file=file_path, remark=remark, job_type=jobType)

job_id = response["job_id"]

# make an instance of RiquSamplingBackend
riqu_sampling_backend = RiquSamplingBackend(config=self.config)
self.job = riqu_sampling_backend.retrieve_job(job_id=job_id)
except Exception as e:
raise BackendError("To perform sse on riqu server is failed.") from e

return self.job

def download_log(
self,
job_id: str = None,
download_path: str = None,
) -> str:

# if job_id is not set, raise ValueError
if job_id is None:
if self.job is not None:
job_id = self.job.id
else:
raise ValueError("job_id is not set.")

try:
response = self._job_api.download_file(job_id=job_id)
except Exception as e:
raise BackendError("To perform sse on riqu server is failed.") from e

if response is None or not "file" in response or not "filename" in response \
or not response["file"] or not response["filename"]:
raise BackendError("To perform sse on riqu server is failed. The response does not contain valid file data.")

data = response["file"]
filename = response["filename"]

if download_path is None:
download_path = os.getcwd()
else:
if not os.path.exists(download_path):
raise ValueError(f"The destination path does not exist: {download_path}")

file_path = os.path.join(download_path, filename)

# if the file already exists, raise ValueError
if os.path.exists(file_path):
raise ValueError(f"The file already exists: {file_path}")

# decode the base64 encoded data and write it to the file
decoded_zip = base64.b64decode(data)
with open(file_path, 'bw') as t_file:
t_file.write(decoded_zip)

return file_path
1 change: 1 addition & 0 deletions quri_parts/riqu/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
from quri_parts.riqu.rest.models.inline_response400 import InlineResponse400
from quri_parts.riqu.rest.models.job import Job
from quri_parts.riqu.rest.models.jobs_body import JobsBody
from quri_parts.riqu.rest.models.ssejobs_body import SsejobsBody
196 changes: 196 additions & 0 deletions quri_parts/riqu/rest/api/job_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,101 @@ def delete_job_with_http_info(self, job_id, **kwargs): # noqa: E501
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def download_file(self, job_id, **kwargs): # noqa: E501
"""Download file # noqa: E501
download file of the job. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.download_file(job_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str job_id: Job ID (required)
:return: object
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.download_file_with_http_info(job_id, **kwargs) # noqa: E501
else:
(data) = self.download_file_with_http_info(job_id, **kwargs) # noqa: E501
return data

def download_file_with_http_info(self, job_id, **kwargs): # noqa: E501
"""Download file # noqa: E501
download file of the job. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.download_file_with_http_info(job_id, async_req=True)
>>> result = thread.get()
:param async_req bool
:param str job_id: Job ID (required)
:return: object
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['job_id'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method download_file" % key
)
params[key] = val
del params['kwargs']
# verify the required parameter 'job_id' is set
if ('job_id' not in params or
params['job_id'] is None):
raise ValueError("Missing the required parameter `job_id` when calling `download_file`") # noqa: E501

collection_formats = {}

path_params = {}
if 'job_id' in params:
path_params['job_id'] = params['job_id'] # noqa: E501

query_params = []

header_params = {}

form_params = []
local_var_files = {}

body_params = None
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
['application/json']) # noqa: E501

# Authentication setting
auth_settings = ['apiKeyAuth'] # noqa: E501

return self.api_client.call_api(
'/ssejobs/{job_id}/download-log', 'GET',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='object', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def get_job(self, job_id, **kwargs): # noqa: E501
"""Get Job # noqa: E501
Expand Down Expand Up @@ -313,6 +408,107 @@ def post_job_with_http_info(self, **kwargs): # noqa: E501
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def post_ssejob(self, **kwargs): # noqa: E501
"""Post SSE Job # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.post_ssejob(async_req=True)
>>> result = thread.get()
:param async_req bool
:param str up_file:
:param str remark:
:param str job_type:
:return: object
If the method is called asynchronously,
returns the request thread.
"""
kwargs['_return_http_data_only'] = True
if kwargs.get('async_req'):
return self.post_ssejob_with_http_info(**kwargs) # noqa: E501
else:
(data) = self.post_ssejob_with_http_info(**kwargs) # noqa: E501
return data

def post_ssejob_with_http_info(self, **kwargs): # noqa: E501
"""Post SSE Job # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.post_ssejob_with_http_info(async_req=True)
>>> result = thread.get()
:param async_req bool
:param str up_file:
:param str remark:
:param str job_type:
:return: object
If the method is called asynchronously,
returns the request thread.
"""

all_params = ['up_file', 'remark', 'job_type'] # noqa: E501
all_params.append('async_req')
all_params.append('_return_http_data_only')
all_params.append('_preload_content')
all_params.append('_request_timeout')

params = locals()
for key, val in six.iteritems(params['kwargs']):
if key not in all_params:
raise TypeError(
"Got an unexpected keyword argument '%s'"
" to method post_ssejob" % key
)
params[key] = val
del params['kwargs']

collection_formats = {}

path_params = {}

query_params = []

header_params = {}

form_params = []
local_var_files = {}
if 'up_file' in params:
local_var_files['up_file'] = params['up_file'] # noqa: E501
if 'remark' in params:
form_params.append(('remark', params['remark'])) # noqa: E501
if 'job_type' in params:
form_params.append(('job_type', params['job_type'])) # noqa: E501

body_params = None
# HTTP header `Accept`
header_params['Accept'] = self.api_client.select_header_accept(
['application/json']) # noqa: E501

# HTTP header `Content-Type`
header_params['Content-Type'] = self.api_client.select_header_content_type( # noqa: E501
['multipart/form-data']) # noqa: E501

# Authentication setting
auth_settings = ['apiKeyAuth'] # noqa: E501

return self.api_client.call_api(
'/ssejobs', 'POST',
path_params,
query_params,
header_params,
body=body_params,
post_params=form_params,
files=local_var_files,
response_type='object', # noqa: E501
auth_settings=auth_settings,
async_req=params.get('async_req'),
_return_http_data_only=params.get('_return_http_data_only'),
_preload_content=params.get('_preload_content', True),
_request_timeout=params.get('_request_timeout'),
collection_formats=collection_formats)

def put_jobs_job_id_cancel(self, job_id, **kwargs): # noqa: E501
"""Cancel Job # noqa: E501
Expand Down
1 change: 1 addition & 0 deletions quri_parts/riqu/rest/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@
from quri_parts.riqu.rest.models.inline_response400 import InlineResponse400
from quri_parts.riqu.rest.models.job import Job
from quri_parts.riqu.rest.models.jobs_body import JobsBody
from quri_parts.riqu.rest.models.ssejobs_body import SsejobsBody
Loading

0 comments on commit 1230267

Please sign in to comment.