Skip to content

Commit

Permalink
sdk: Implement force_flush for span processors (#389)
Browse files Browse the repository at this point in the history
open-telemetry/opentelemetry-specification#370 added
the requirement to have a "force_flush" method in the span processors.

This commit exposes an already existing internal method on the batch span
processor that does exactly the same, it also adds it to the span processor
interface and as a no-op to the simple span processor.
  • Loading branch information
mauriciovasquezbernal authored Feb 4, 2020
1 parent fa1f50b commit 9ec1f1f
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
5 changes: 5 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ def shutdown(self) -> None:
"""Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown.
"""

def force_flush(self) -> None:
"""Export all ended spans to the configured Exporter that have not
yet been exported.
"""


class MultiSpanProcessor(SpanProcessor):
"""Implementation of :class:`SpanProcessor` that forwards all received
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def on_end(self, span: Span) -> None:
def shutdown(self) -> None:
self.span_exporter.shutdown()

def force_flush(self) -> None:
pass


class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.
Expand Down Expand Up @@ -171,7 +174,7 @@ def worker(self):
timeout = self.schedule_delay_millis / 1e3 - duration

# be sure that all spans are sent
self._flush()
self.force_flush()

def export(self) -> None:
"""Exports at most max_export_batch_size spans."""
Expand All @@ -197,7 +200,7 @@ def export(self) -> None:
for index in range(idx):
self.spans_list[index] = None

def _flush(self):
def force_flush(self):
# export all elements until queue is empty
while self.queue:
self.export()
Expand Down
36 changes: 32 additions & 4 deletions opentelemetry-sdk/tests/trace/export/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _create_start_and_end_span(name, span_processor):


class TestBatchExportSpanProcessor(unittest.TestCase):
def test_batch_span_processor(self):
def test_shutdown(self):
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
Expand All @@ -109,9 +109,35 @@ def test_batch_span_processor(self):
_create_start_and_end_span(name, span_processor)

span_processor.shutdown()
self.assertTrue(my_exporter.is_shutdown)

# check that spans are exported without an explicitly call to
# force_flush()
self.assertListEqual(span_names, spans_names_list)

self.assertTrue(my_exporter.is_shutdown)
def test_flush(self):
spans_names_list = []

my_exporter = MySpanExporter(destination=spans_names_list)
span_processor = export.BatchExportSpanProcessor(my_exporter)

span_names0 = ["xxx", "bar", "foo"]
span_names1 = ["yyy", "baz", "fox"]

for name in span_names0:
_create_start_and_end_span(name, span_processor)

span_processor.force_flush()
self.assertListEqual(span_names0, spans_names_list)

# create some more spans to check that span processor still works
for name in span_names1:
_create_start_and_end_span(name, span_processor)

span_processor.force_flush()
self.assertListEqual(span_names0 + span_names1, spans_names_list)

span_processor.shutdown()

def test_batch_span_processor_lossless(self):
"""Test that no spans are lost when sending max_queue_size spans"""
Expand All @@ -127,8 +153,9 @@ def test_batch_span_processor_lossless(self):
for _ in range(512):
_create_start_and_end_span("foo", span_processor)

span_processor.shutdown()
span_processor.force_flush()
self.assertEqual(len(spans_names_list), 512)
span_processor.shutdown()

def test_batch_span_processor_many_spans(self):
"""Test that no spans are lost when sending many spans"""
Expand All @@ -150,8 +177,9 @@ def test_batch_span_processor_many_spans(self):

time.sleep(0.05) # give some time for the exporter to upload spans

span_processor.shutdown()
span_processor.force_flush()
self.assertEqual(len(spans_names_list), 1024)
span_processor.shutdown()

def test_batch_span_processor_scheduled_delay(self):
"""Test that spans are exported each schedule_delay_millis"""
Expand Down

0 comments on commit 9ec1f1f

Please sign in to comment.