Skip to content

Commit

Permalink
Added a new endpoint in EpTO to get the list of delivered events, closes
Browse files Browse the repository at this point in the history
  • Loading branch information
robzenn92 committed Mar 15, 2018
1 parent d5c46fa commit ca25d4f
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]

### Added
- EpTO endpoint to retrieve the list of delivered events. [#20](https://github.com/robzenn92/EpTODocker/issues/20)
- EpTO endpoint to broadcast a new event.[#18](https://github.com/robzenn92/EpTODocker/issues/18)
- EpTO endpoint to get the Ball set at any time for debugging purposes. [#16](https://github.com/robzenn92/EpTODocker/issues/16)
- API version v1. [#15](https://github.com/robzenn92/EpTODocker/issues/15)
Expand Down
12 changes: 11 additions & 1 deletion epto_project/api/eptoApplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import os
import time
import random

from event.event import Event
from .configuration import logger
from .eptoDissemination import EpTODissemination
from apscheduler.schedulers.background import BackgroundScheduler
Expand All @@ -11,9 +13,10 @@
class EpTOApplication(object):

def __init__(self):
self.delivered_events = []
logger.debug('Epto Dissemination component will broadcast Balls every delta time unit.', delta=10, unit='seconds')
logger.debug('Epto Ordering component will deliver Balls every delta time unit.', delta=15, unit='seconds')
self.dissemination = EpTODissemination(10, 15)
self.dissemination = EpTODissemination(10, 15, self)
# self.schedule_probabilistic_broadcast(10, 10)

def schedule_probabilistic_broadcast(self, initial_delay, interval):
Expand All @@ -34,3 +37,10 @@ def probabilistic_broadcast(self):

def broadcast(self, data: str):
self.dissemination.broadcast(data)

def deliver_event(self, event: Event):
self.delivered_events.append(event.to_json())
logger.critical("I delivered " + str(event))

def get_delivered_events(self):
return self.delivered_events
14 changes: 5 additions & 9 deletions epto_project/api/eptoDissemination.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@

class EpTODissemination(object):

def __init__(self, initial_delay, interval):
def __init__(self, initial_delay, interval, application):
self.ip = my_ip()
self.api_version = 'v1'
self.next_ball = Ball()
self.view = []
self.fanout = int(os.environ['FANOUT'])
self.ttl = int(os.environ['TTL'])
self.stability_oracle = StabilityOracle(self.ttl)
self.ordering = EpTOOrdering(self.stability_oracle)
self.ordering = EpTOOrdering(self.stability_oracle, application)
self.schedule_repeated_task(initial_delay, interval)

def schedule_repeated_task(self, initial_delay, interval):
Expand Down Expand Up @@ -85,10 +85,6 @@ def repeated_task(self):
response = self.send_next_ball(destination)
logger.info('I sent next ball to ' + destination + ' and I got ' + str(response))

self.ordering.order(self.next_ball.copy())
self.next_ball = Ball()
logger.debug("I reset next_ball.", next_ball=self.next_ball)

else:

logger.debug('My next_ball is still empty! Need to wait', next_ball=self.next_ball)
self.ordering.order(self.next_ball.copy())
self.next_ball = Ball()
logger.debug("I reset next_ball.", next_ball=self.next_ball)
7 changes: 5 additions & 2 deletions epto_project/api/eptoOrdering.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@

class EpTOOrdering(object):

def __init__(self, stability_oracle):
def __init__(self, stability_oracle, application):
self.received = set() # received map of (id, event) pairs with all known but not yet delivered events
self.delivered = [] # list of all the events already delivered to the application
self.stability_oracle = stability_oracle
self.last_delivered_ts = 0
self.application = application # reference to the application layer

# Procedure order is called every round and its goal is to deliver events to the application.
# The main task of this procedure is to move events from the received set to the delivered set,
# preserving the total order of the events.
def order(self, ball):

logger.debug('This Ordering component has been invoked.', ball=ball, received=self.received)

# We start by incrementing the ttl of all events previously received to indicate the start of a new round.
for event in self.received:
event.increase_ttl()
Expand Down Expand Up @@ -67,4 +70,4 @@ def order(self, ball):
def deliver(self, event):
self.delivered.append(event)
self.last_delivered_ts = event.ts
logger.critical("I Delivered " + str(event))
self.application.deliver_event(event)
3 changes: 2 additions & 1 deletion epto_project/api/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
# The below routes are for production use.

path('send-ball', views.receive_ball, name='send-ball'),
path('broadcast', views.broadcast_event, name='broadcast')
path('broadcast', views.broadcast_event, name='broadcast'),
path('delivered-events', views.get_delivered_events, name='delivered-events')
])),
]
11 changes: 11 additions & 0 deletions epto_project/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ def get_ball(request):
return JsonResponse({"error": {"message": "Only the GET method is allowed."}}, status=403)


@csrf_exempt
def get_delivered_events(request):
if request.method == 'GET':
if not request.GET:
return JsonResponse(epto.get_delivered_events(), safe=False)
else:
return JsonResponse({"error": {"message": "The list of parameters has to be empty."}}, status=500)
else:
return JsonResponse({"error": {"message": "Only the GET method is allowed."}}, status=403)


# ---------------------
# Production routes
# The below routes are for production use.
Expand Down

0 comments on commit ca25d4f

Please sign in to comment.