Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote Inputs and Asynchronous Execution #5

Merged
merged 15 commits into from
Aug 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 51 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ It allows you to:
* [Executing](#executing)
* [Discovering Automatically Published Products](#discovering-automatically-published-products)
- [Setting Up Your Development Environment](#setting-up-your-development-environment)
- [CLI](#cli)
- [TODO List](#todo-list)
- [Ideas](#ideas)

Expand Down Expand Up @@ -341,7 +342,7 @@ Here you can see a metadata example for the [L0](/tests/cosmo/processes/L0.py) p
"identifier": "crude",
"title": "Crude data",
"abstract": "Downloaded from the satellite to the ground station",
"dataType": "string"
"format": "GEOTIFF"
}
],
"outputs": [
Expand Down Expand Up @@ -375,7 +376,7 @@ You can retrieve a list of the published processes, using the _{host}/configurat
"metadata": ["Level 0", "Processor"],
"inputs": [
{
"dataType": "string",
"format": "GEOTIFF",
"format": null,
"abstract": "Downloaded from the satellite to the ground station",
"identifier": "crude",
Expand Down Expand Up @@ -485,7 +486,7 @@ _LycheePy_ will calculate and show you which are inputs and the outputs of the c
"metadata": ["Cosmo", "Skymed", "Mission", "Chain"],
"inputs": [
{
"dataType": "string",
"format": "GEOTIFF",
"format": null,
"abstract": "Downloaded from the satellite to the ground station",
"identifier": "crude",
Expand Down Expand Up @@ -656,10 +657,19 @@ _{host}/wps?service=WPS&request=describeprocess&version=1.0.0&identifier=**L0**_
<Input minOccurs="1" maxOccurs="1">
<ows:Identifier>crude</ows:Identifier>
<ows:Title>Crude data</ows:Title>
<LiteralData>
<ows:DataType ows:reference="urn:ogc:def:dataType:OGC:1.1:string">string</ows:DataType>
<ows:AnyValue/>
</LiteralData>
<ows:Abstract></ows:Abstract>
<ComplexData maximumMegabytes="10">
<Default>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Default>
<Supported>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Supported>
</ComplexData>
</Input>
</DataInputs>
<ProcessOutputs>
Expand Down Expand Up @@ -698,10 +708,19 @@ _{host}/wps?service=WPS&request=describeprocess&version=1.0.0&identifier=**Cosmo
<Input minOccurs="1" maxOccurs="1">
<ows:Identifier>crude</ows:Identifier>
<ows:Title>Crude data</ows:Title>
<LiteralData>
<ows:DataType ows:reference="urn:ogc:def:dataType:OGC:1.1:string">string</ows:DataType>
<ows:AnyValue/>
</LiteralData>
<ows:Abstract></ows:Abstract>
<ComplexData maximumMegabytes="10">
<Default>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Default>
<Supported>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Supported>
</ComplexData>
</Input>
</DataInputs>
<ProcessOutputs>
Expand Down Expand Up @@ -758,13 +777,12 @@ The following example will execute the _L0_ processor:
<wps:Execute service="WPS" version="1.0.0" xmlns:wps="http://www.opengis.net/wps/1.0.0" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 ../wpsExecute_request.xsd">
<ows:Identifier>L0</ows:Identifier>
<wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Data>
<wps:LiteralData>/cosmo/downlinks/matera/vesubio.sar</wps:LiteralData>
</wps:Data>
</wps:Input>
</wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Reference xlink:href="http://repository:8080/geoserver/ows?service=WCS&amp;version=2.0.0&amp;request=GetCoverage&amp;coverageId=nurc:Img_Sample&amp;format=image/tiff">
</wps:Reference>
</wps:Input>
</wps:DataInputs>
</wps:Execute>
```

Expand All @@ -784,7 +802,8 @@ As a response, you will receive something like this:
<wps:Output>
<ows:Identifier>RAW</ows:Identifier>
<ows:Title>RAW product</ows:Title>
<wps:Reference xlink:href="http://wps/wps/outputs/CSKS2_GEC_B_HI_16_HH_RA_SF_20130301045754_20130301045801.S01.QLKwcDF6_.tif" mimeType="image/tiff; subtype=geotiff"/>
<ows:Abstract></ows:Abstract>
<wps:Reference href="http://{host_name_or_ip}/outputs/1f638322-9e9f-11e8-b5db-0242ac12000a/ows_yu78Ak" mimeType="image/tiff; subtype=geotiff" encoding="" schema=""/>
</wps:Output>
</wps:ProcessOutputs>
</wps:ExecuteResponse>
Expand All @@ -801,13 +820,12 @@ So, this request is identical to the [previous](#executing-a-process), since the
<wps:Execute service="WPS" version="1.0.0" xmlns:wps="http://www.opengis.net/wps/1.0.0" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 ../wpsExecute_request.xsd">
<ows:Identifier>Cosmo Skymed</ows:Identifier>
<wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Data>
<wps:LiteralData>/cosmo/downlinks/matera/vesubio.sar</wps:LiteralData>
</wps:Data>
</wps:Input>
</wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Reference xlink:href="http://repository:8080/geoserver/ows?service=WCS&amp;version=2.0.0&amp;request=GetCoverage&amp;coverageId=nurc:Img_Sample&amp;format=image/tiff">
</wps:Reference>
</wps:Input>
</wps:DataInputs>
</wps:Execute>
```

Expand All @@ -827,12 +845,12 @@ As a response, you will receive something like this:
<wps:Output>
<ows:Identifier>GEC</ows:Identifier>
<ows:Title>GEC Product</ows:Title>
<wps:Reference xlink:href="http://wps/wps/outputs/CSKS2_GEC_B_HI_16_HH_RA_SF_20130301045754_20130301045801.S01.QLKOfl_oP.tif" mimeType="image/tiff; subtype=geotiff"/>
<wps:Reference href="http://{host_name_or_ip}/outputs/1f638322-9e9f-11e8-b5db-0242ac12000a/ows_yu78Ak" mimeType="image/tiff; subtype=geotiff" encoding="" schema=""/>
</wps:Output>
<wps:Output>
<ows:Identifier>GTC</ows:Identifier>
<ows:Title>GTC Product</ows:Title>
<wps:Reference xlink:href="http://wps/wps/outputs/CSKS2_GEC_B_HI_16_HH_RA_SF_20130301045754_20130301045801.S01.QLKZShsyw.tif" mimeType="image/tiff; subtype=geotiff"/>
<wps:Reference href="http://{host_name_or_ip}/outputs/1f638322-9e9f-11e8-b5db-0242ac12000a/ows_yu78Ak" mimeType="image/tiff; subtype=geotiff" encoding="" schema=""/>
</wps:Output>
</wps:ProcessOutputs>
</wps:ExecuteResponse>
Expand Down Expand Up @@ -1052,6 +1070,11 @@ sudo ./start.sh
And now you're able to use your LycheePy instance.


## CLI

LycheePy also has a web client, which you can find [here](https://github.com/gabrielbazan/lycheepy.cli).


## TODO List

To be defined.
Expand Down
10 changes: 5 additions & 5 deletions lycheepy/configuration/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
FROM python:2

ADD requirements.txt /root/
ADD configuration/ /root/configuration/
ADD wait-service.sh /root/

WORKDIR /root/

ADD requirements.txt /root/

RUN virtualenv venv
RUN /root/venv/bin/pip install -r requirements.txt

ADD configuration/ /root/configuration/
ADD wait-service.sh /root/

CMD ./wait-service.sh persistence 5432 && \
./wait-service.sh processes 21 && \
/root/venv/bin/python /root/configuration/models.py && \
/root/venv/bin/python /root/configuration/parametric.py && \
cd /root/configuration; /root/venv/bin/gunicorn -b 0.0.0.0:80 wsgi
#/root/venv/bin/python /root/configuration/app.py
9 changes: 7 additions & 2 deletions lycheepy/configuration/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class Process(Describable):
version = Column(Text, nullable=False)
meta_data = relationship('Metadata', secondary='process_metadata', backref='processes')

@property
def chains(self):
steps = self.steps_before + self.steps_after
return [step.chain for step in steps]


class Chain(Describable):
__tablename__ = 'chain'
Expand Down Expand Up @@ -94,9 +99,9 @@ class Step(Model):
__tablename__ = 'step'
id = Column(Integer, primary_key=True)
after_id = Column(Integer, ForeignKey('process.id'), nullable=False)
after = relationship('Process', foreign_keys=after_id)
after = relationship('Process', foreign_keys=after_id, backref='steps_after')
before_id = Column(Integer, ForeignKey('process.id'), nullable=False)
before = relationship('Process', foreign_keys=before_id)
before = relationship('Process', foreign_keys=before_id, backref='steps_before')
chain_id = Column(Integer, ForeignKey('chain.id'), nullable=False)
chain = relationship('Chain', backref='steps')

Expand Down
30 changes: 25 additions & 5 deletions lycheepy/configuration/configuration/serializers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from simplyrestful.serializers import Serializer
from simplyrestful.database import session
from simplyrestful.models import get_or_create
from simplyrestful.exceptions import Conflict
from simplyrestful.exceptions import Conflict, NotFound

from models import *
from validators import ProcessValidator
Expand All @@ -19,13 +19,11 @@ class ProcessSerializer(Serializer):

def create(self, data):
self.save_file(data.get('identifier'), creation=True)
serial = super(ProcessSerializer, self).create(data)
return serial
return super(ProcessSerializer, self).create(data)

def update(self, identifier, data):
self.save_file(data.get('identifier'), creation=False)
serial = super(ProcessSerializer, self).update(identifier, data)
return serial
return super(ProcessSerializer, self).update(identifier, data)

def deserialize(self, data, instance):
instance.identifier = data.get('identifier', instance.identifier)
Expand Down Expand Up @@ -112,3 +110,25 @@ def save_file(self, identifier, creation=True):
@staticmethod
def is_valid_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_PROCESSES_EXTENSIONS

def delete(self, identifier):
try:
instance = self.query.filter_by(id=identifier).one_or_none()

if not instance:
raise NotFound('The process does not exist')

if instance.chains:
raise Conflict('The process belongs to a chain, so it cannot be deleted')

# Delete related objects
session.query(ProcessMetadata).filter_by(process_id=identifier).delete()
session.query(Output).filter_by(process_id=identifier).delete()
session.query(Input).filter_by(process_id=identifier).delete()

# Delete object
session.delete(instance)
session.commit()
except:
session.rollback()
raise
63 changes: 39 additions & 24 deletions lycheepy/configuration/configuration/validators/chain.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,54 @@
from networkx import DiGraph, is_directed_acyclic_graph
from simplyrestful.exceptions import Conflict
from simplyrestful.validators import Validator
from models import Process, Output
from validators.executable import ExecutableValidator


class ChainValidator(ExecutableValidator):
class ChainValidator(Validator):

def validate(self, data, instance=None):
super(ChainValidator, self).validate(data, instance=instance)
steps = data.get('steps', [])
self._validate_at_least_one_step(steps)
graph = self._build_digraph(steps)
self._validate_acyclic(graph)
self._validate_match(steps, graph)
ChainValidator.validate_at_least_one_step(steps)
graph = ChainValidator.build_digraph(steps)
ChainValidator.validate_acyclic(graph)
ChainValidator.validate_match(steps, graph)
ChainValidator.validate_unique(data)

def _validate_at_least_one_step(self, steps):
@staticmethod
def validate_unique(data):
identifier = data.get('identifier')
process = Process.query.filter_by(identifier=identifier).one_or_none()
if process:
raise Conflict('Already exists a process with the given identifier')

@staticmethod
def validate_at_least_one_step(steps):
if not steps:
raise Conflict('At least one step is required')

def _validate_acyclic(self, graph):
@staticmethod
def validate_acyclic(graph):
if not is_directed_acyclic_graph(graph):
raise Conflict('Graph must be directed and acyclic')

def _validate_match(self, steps, graph):
processes = self._get_chain_processes(graph)
@staticmethod
def validate_match(steps, graph):
m = 'The "{}" input of "{}" requires an explicit match, or only one predecessor output with the same identifier'
processes = ChainValidator.get_chain_processes(graph)
for process in processes:
process_identifier = process.identifier
predecessors = graph.predecessors(process_identifier)
if predecessors:
predecessors_outputs = self._get_outpus(predecessors)
predecessors_outputs = ChainValidator.get_outputs(predecessors)
for process_input in process.inputs:
input_identifier = process_input.identifier
has_match = self._has_match(process_identifier, input_identifier, steps)
matchable = self._is_matchable(input_identifier, predecessors_outputs)
has_match = ChainValidator.has_match(process_identifier, input_identifier, steps)
matchable = ChainValidator.is_matchable(input_identifier, predecessors_outputs)
if not has_match and not matchable:
raise Conflict(
'The "{}" input of "{}" requires an explicit match, or only one predecessor output with the same identifier'.format(input_identifier, process_identifier)
)
raise Conflict(m.format(input_identifier, process_identifier))

def _has_match(self, process, process_input, steps):
@staticmethod
def has_match(process, process_input, steps):
has = False
for step in steps:
if step.get('after') == process:
Expand All @@ -47,27 +57,32 @@ def _has_match(self, process, process_input, steps):
has = True
return has

def _is_matchable(self, process_input, predecessors_outputs):
@staticmethod
def is_matchable(process_input, predecessors_outputs):
count = 0
for output in predecessors_outputs:
if output.identifier == process_input:
count += 1
return count == 1

def _get_outpus(self, processes):
@staticmethod
def get_outputs(processes):
return Output.query.filter(Output.process.has(Process.identifier.in_(processes))).all()

def _get_chain_processes(self, graph):
@staticmethod
def get_chain_processes(graph):
return Process.query.filter(Process.identifier.in_(graph.nodes())).all()

def _build_digraph(self, steps):
@staticmethod
def build_digraph(steps):
graph = DiGraph()
for step in steps:
self._validate_repeated(graph, step['before'], step['after'])
ChainValidator.validate_repeated(graph, step['before'], step['after'])
graph.add_edge(step['before'], step['after'])
return graph

def _validate_repeated(self, graph, before, after):
@staticmethod
def validate_repeated(graph, before, after):
nodes = graph.nodes()
if before in nodes and after in nodes and graph.number_of_edges(before, after):
raise Conflict('{} to {} step is repeated'.format(before, after))
18 changes: 0 additions & 18 deletions lycheepy/configuration/configuration/validators/executable.py

This file was deleted.

Loading