Skip to content

Commit

Permalink
Merge pull request #5 from gabrielbazan/development
Browse files Browse the repository at this point in the history
Remote Inputs and Asynchronous Execution
  • Loading branch information
gabrielbazan authored Aug 13, 2018
2 parents 6b0cc9c + d8c82bf commit b136b44
Show file tree
Hide file tree
Showing 39 changed files with 272 additions and 425 deletions.
79 changes: 51 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ It allows you to:
* [Executing](#executing)
* [Discovering Automatically Published Products](#discovering-automatically-published-products)
- [Setting Up Your Development Environment](#setting-up-your-development-environment)
- [CLI](#cli)
- [TODO List](#todo-list)
- [Ideas](#ideas)

Expand Down Expand Up @@ -341,7 +342,7 @@ Here you can see a metadata example for the [L0](/tests/cosmo/processes/L0.py) p
"identifier": "crude",
"title": "Crude data",
"abstract": "Downloaded from the satellite to the ground station",
"dataType": "string"
"format": "GEOTIFF"
}
],
"outputs": [
Expand Down Expand Up @@ -375,7 +376,7 @@ You can retrieve a list of the published processes, using the _{host}/configurat
"metadata": ["Level 0", "Processor"],
"inputs": [
{
"dataType": "string",
"format": "GEOTIFF",
"format": null,
"abstract": "Downloaded from the satellite to the ground station",
"identifier": "crude",
Expand Down Expand Up @@ -485,7 +486,7 @@ _LycheePy_ will calculate and show you which are inputs and the outputs of the c
"metadata": ["Cosmo", "Skymed", "Mission", "Chain"],
"inputs": [
{
"dataType": "string",
"format": "GEOTIFF",
"format": null,
"abstract": "Downloaded from the satellite to the ground station",
"identifier": "crude",
Expand Down Expand Up @@ -656,10 +657,19 @@ _{host}/wps?service=WPS&request=describeprocess&version=1.0.0&identifier=**L0**_
<Input minOccurs="1" maxOccurs="1">
<ows:Identifier>crude</ows:Identifier>
<ows:Title>Crude data</ows:Title>
<LiteralData>
<ows:DataType ows:reference="urn:ogc:def:dataType:OGC:1.1:string">string</ows:DataType>
<ows:AnyValue/>
</LiteralData>
<ows:Abstract></ows:Abstract>
<ComplexData maximumMegabytes="10">
<Default>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Default>
<Supported>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Supported>
</ComplexData>
</Input>
</DataInputs>
<ProcessOutputs>
Expand Down Expand Up @@ -698,10 +708,19 @@ _{host}/wps?service=WPS&request=describeprocess&version=1.0.0&identifier=**Cosmo
<Input minOccurs="1" maxOccurs="1">
<ows:Identifier>crude</ows:Identifier>
<ows:Title>Crude data</ows:Title>
<LiteralData>
<ows:DataType ows:reference="urn:ogc:def:dataType:OGC:1.1:string">string</ows:DataType>
<ows:AnyValue/>
</LiteralData>
<ows:Abstract></ows:Abstract>
<ComplexData maximumMegabytes="10">
<Default>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Default>
<Supported>
<Format>
<MimeType>image/tiff; subtype=geotiff</MimeType>
</Format>
</Supported>
</ComplexData>
</Input>
</DataInputs>
<ProcessOutputs>
Expand Down Expand Up @@ -758,13 +777,12 @@ The following example will execute the _L0_ processor:
<wps:Execute service="WPS" version="1.0.0" xmlns:wps="http://www.opengis.net/wps/1.0.0" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 ../wpsExecute_request.xsd">
<ows:Identifier>L0</ows:Identifier>
<wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Data>
<wps:LiteralData>/cosmo/downlinks/matera/vesubio.sar</wps:LiteralData>
</wps:Data>
</wps:Input>
</wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Reference xlink:href="http://repository:8080/geoserver/ows?service=WCS&amp;version=2.0.0&amp;request=GetCoverage&amp;coverageId=nurc:Img_Sample&amp;format=image/tiff">
</wps:Reference>
</wps:Input>
</wps:DataInputs>
</wps:Execute>
```

Expand All @@ -784,7 +802,8 @@ As a response, you will receive something like this:
<wps:Output>
<ows:Identifier>RAW</ows:Identifier>
<ows:Title>RAW product</ows:Title>
<wps:Reference xlink:href="http://wps/wps/outputs/CSKS2_GEC_B_HI_16_HH_RA_SF_20130301045754_20130301045801.S01.QLKwcDF6_.tif" mimeType="image/tiff; subtype=geotiff"/>
<ows:Abstract></ows:Abstract>
<wps:Reference href="http://{host_name_or_ip}/outputs/1f638322-9e9f-11e8-b5db-0242ac12000a/ows_yu78Ak" mimeType="image/tiff; subtype=geotiff" encoding="" schema=""/>
</wps:Output>
</wps:ProcessOutputs>
</wps:ExecuteResponse>
Expand All @@ -801,13 +820,12 @@ So, this request is identical to the [previous](#executing-a-process), since the
<wps:Execute service="WPS" version="1.0.0" xmlns:wps="http://www.opengis.net/wps/1.0.0" xmlns:ows="http://www.opengis.net/ows/1.1" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.opengis.net/wps/1.0.0 ../wpsExecute_request.xsd">
<ows:Identifier>Cosmo Skymed</ows:Identifier>
<wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Data>
<wps:LiteralData>/cosmo/downlinks/matera/vesubio.sar</wps:LiteralData>
</wps:Data>
</wps:Input>
</wps:DataInputs>
<wps:Input>
<ows:Identifier>crude</ows:Identifier>
<wps:Reference xlink:href="http://repository:8080/geoserver/ows?service=WCS&amp;version=2.0.0&amp;request=GetCoverage&amp;coverageId=nurc:Img_Sample&amp;format=image/tiff">
</wps:Reference>
</wps:Input>
</wps:DataInputs>
</wps:Execute>
```

Expand All @@ -827,12 +845,12 @@ As a response, you will receive something like this:
<wps:Output>
<ows:Identifier>GEC</ows:Identifier>
<ows:Title>GEC Product</ows:Title>
<wps:Reference xlink:href="http://wps/wps/outputs/CSKS2_GEC_B_HI_16_HH_RA_SF_20130301045754_20130301045801.S01.QLKOfl_oP.tif" mimeType="image/tiff; subtype=geotiff"/>
<wps:Reference href="http://{host_name_or_ip}/outputs/1f638322-9e9f-11e8-b5db-0242ac12000a/ows_yu78Ak" mimeType="image/tiff; subtype=geotiff" encoding="" schema=""/>
</wps:Output>
<wps:Output>
<ows:Identifier>GTC</ows:Identifier>
<ows:Title>GTC Product</ows:Title>
<wps:Reference xlink:href="http://wps/wps/outputs/CSKS2_GEC_B_HI_16_HH_RA_SF_20130301045754_20130301045801.S01.QLKZShsyw.tif" mimeType="image/tiff; subtype=geotiff"/>
<wps:Reference href="http://{host_name_or_ip}/outputs/1f638322-9e9f-11e8-b5db-0242ac12000a/ows_yu78Ak" mimeType="image/tiff; subtype=geotiff" encoding="" schema=""/>
</wps:Output>
</wps:ProcessOutputs>
</wps:ExecuteResponse>
Expand Down Expand Up @@ -1052,6 +1070,11 @@ sudo ./start.sh
And now you're able to use your LycheePy instance.


## CLI

LycheePy also has a web client, which you can find [here](https://github.com/gabrielbazan/lycheepy.cli).


## TODO List

To be defined.
Expand Down
10 changes: 5 additions & 5 deletions lycheepy/configuration/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
FROM python:2

ADD requirements.txt /root/
ADD configuration/ /root/configuration/
ADD wait-service.sh /root/

WORKDIR /root/

ADD requirements.txt /root/

RUN virtualenv venv
RUN /root/venv/bin/pip install -r requirements.txt

ADD configuration/ /root/configuration/
ADD wait-service.sh /root/

CMD ./wait-service.sh persistence 5432 && \
./wait-service.sh processes 21 && \
/root/venv/bin/python /root/configuration/models.py && \
/root/venv/bin/python /root/configuration/parametric.py && \
cd /root/configuration; /root/venv/bin/gunicorn -b 0.0.0.0:80 wsgi
#/root/venv/bin/python /root/configuration/app.py
9 changes: 7 additions & 2 deletions lycheepy/configuration/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ class Process(Describable):
version = Column(Text, nullable=False)
meta_data = relationship('Metadata', secondary='process_metadata', backref='processes')

@property
def chains(self):
steps = self.steps_before + self.steps_after
return [step.chain for step in steps]


class Chain(Describable):
__tablename__ = 'chain'
Expand Down Expand Up @@ -94,9 +99,9 @@ class Step(Model):
__tablename__ = 'step'
id = Column(Integer, primary_key=True)
after_id = Column(Integer, ForeignKey('process.id'), nullable=False)
after = relationship('Process', foreign_keys=after_id)
after = relationship('Process', foreign_keys=after_id, backref='steps_after')
before_id = Column(Integer, ForeignKey('process.id'), nullable=False)
before = relationship('Process', foreign_keys=before_id)
before = relationship('Process', foreign_keys=before_id, backref='steps_before')
chain_id = Column(Integer, ForeignKey('chain.id'), nullable=False)
chain = relationship('Chain', backref='steps')

Expand Down
30 changes: 25 additions & 5 deletions lycheepy/configuration/configuration/serializers/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from simplyrestful.serializers import Serializer
from simplyrestful.database import session
from simplyrestful.models import get_or_create
from simplyrestful.exceptions import Conflict
from simplyrestful.exceptions import Conflict, NotFound

from models import *
from validators import ProcessValidator
Expand All @@ -19,13 +19,11 @@ class ProcessSerializer(Serializer):

def create(self, data):
self.save_file(data.get('identifier'), creation=True)
serial = super(ProcessSerializer, self).create(data)
return serial
return super(ProcessSerializer, self).create(data)

def update(self, identifier, data):
self.save_file(data.get('identifier'), creation=False)
serial = super(ProcessSerializer, self).update(identifier, data)
return serial
return super(ProcessSerializer, self).update(identifier, data)

def deserialize(self, data, instance):
instance.identifier = data.get('identifier', instance.identifier)
Expand Down Expand Up @@ -112,3 +110,25 @@ def save_file(self, identifier, creation=True):
@staticmethod
def is_valid_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1] in ALLOWED_PROCESSES_EXTENSIONS

def delete(self, identifier):
try:
instance = self.query.filter_by(id=identifier).one_or_none()

if not instance:
raise NotFound('The process does not exist')

if instance.chains:
raise Conflict('The process belongs to a chain, so it cannot be deleted')

# Delete related objects
session.query(ProcessMetadata).filter_by(process_id=identifier).delete()
session.query(Output).filter_by(process_id=identifier).delete()
session.query(Input).filter_by(process_id=identifier).delete()

# Delete object
session.delete(instance)
session.commit()
except:
session.rollback()
raise
63 changes: 39 additions & 24 deletions lycheepy/configuration/configuration/validators/chain.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,54 @@
from networkx import DiGraph, is_directed_acyclic_graph
from simplyrestful.exceptions import Conflict
from simplyrestful.validators import Validator
from models import Process, Output
from validators.executable import ExecutableValidator


class ChainValidator(ExecutableValidator):
class ChainValidator(Validator):

def validate(self, data, instance=None):
super(ChainValidator, self).validate(data, instance=instance)
steps = data.get('steps', [])
self._validate_at_least_one_step(steps)
graph = self._build_digraph(steps)
self._validate_acyclic(graph)
self._validate_match(steps, graph)
ChainValidator.validate_at_least_one_step(steps)
graph = ChainValidator.build_digraph(steps)
ChainValidator.validate_acyclic(graph)
ChainValidator.validate_match(steps, graph)
ChainValidator.validate_unique(data)

def _validate_at_least_one_step(self, steps):
@staticmethod
def validate_unique(data):
identifier = data.get('identifier')
process = Process.query.filter_by(identifier=identifier).one_or_none()
if process:
raise Conflict('Already exists a process with the given identifier')

@staticmethod
def validate_at_least_one_step(steps):
if not steps:
raise Conflict('At least one step is required')

def _validate_acyclic(self, graph):
@staticmethod
def validate_acyclic(graph):
if not is_directed_acyclic_graph(graph):
raise Conflict('Graph must be directed and acyclic')

def _validate_match(self, steps, graph):
processes = self._get_chain_processes(graph)
@staticmethod
def validate_match(steps, graph):
m = 'The "{}" input of "{}" requires an explicit match, or only one predecessor output with the same identifier'
processes = ChainValidator.get_chain_processes(graph)
for process in processes:
process_identifier = process.identifier
predecessors = graph.predecessors(process_identifier)
if predecessors:
predecessors_outputs = self._get_outpus(predecessors)
predecessors_outputs = ChainValidator.get_outputs(predecessors)
for process_input in process.inputs:
input_identifier = process_input.identifier
has_match = self._has_match(process_identifier, input_identifier, steps)
matchable = self._is_matchable(input_identifier, predecessors_outputs)
has_match = ChainValidator.has_match(process_identifier, input_identifier, steps)
matchable = ChainValidator.is_matchable(input_identifier, predecessors_outputs)
if not has_match and not matchable:
raise Conflict(
'The "{}" input of "{}" requires an explicit match, or only one predecessor output with the same identifier'.format(input_identifier, process_identifier)
)
raise Conflict(m.format(input_identifier, process_identifier))

def _has_match(self, process, process_input, steps):
@staticmethod
def has_match(process, process_input, steps):
has = False
for step in steps:
if step.get('after') == process:
Expand All @@ -47,27 +57,32 @@ def _has_match(self, process, process_input, steps):
has = True
return has

def _is_matchable(self, process_input, predecessors_outputs):
@staticmethod
def is_matchable(process_input, predecessors_outputs):
count = 0
for output in predecessors_outputs:
if output.identifier == process_input:
count += 1
return count == 1

def _get_outpus(self, processes):
@staticmethod
def get_outputs(processes):
return Output.query.filter(Output.process.has(Process.identifier.in_(processes))).all()

def _get_chain_processes(self, graph):
@staticmethod
def get_chain_processes(graph):
return Process.query.filter(Process.identifier.in_(graph.nodes())).all()

def _build_digraph(self, steps):
@staticmethod
def build_digraph(steps):
graph = DiGraph()
for step in steps:
self._validate_repeated(graph, step['before'], step['after'])
ChainValidator.validate_repeated(graph, step['before'], step['after'])
graph.add_edge(step['before'], step['after'])
return graph

def _validate_repeated(self, graph, before, after):
@staticmethod
def validate_repeated(graph, before, after):
nodes = graph.nodes()
if before in nodes and after in nodes and graph.number_of_edges(before, after):
raise Conflict('{} to {} step is repeated'.format(before, after))
18 changes: 0 additions & 18 deletions lycheepy/configuration/configuration/validators/executable.py

This file was deleted.

Loading

0 comments on commit b136b44

Please sign in to comment.