From 9ec1f1fba52973df3b8c7b22426927d3ac4244c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mauricio=20V=C3=A1squez?= Date: Tue, 4 Feb 2020 15:06:57 -0500 Subject: [PATCH] sdk: Implement force_flush for span processors (#389) https://github.com/open-telemetry/opentelemetry-specification/pull/370 added the requirement to have a "force_flush" method in the span processors. This commit exposes an already existing internal method on the batch span processor that does exactly the same, it also adds it to the span processor interface and as a no-op to the simple span processor. --- .../src/opentelemetry/sdk/trace/__init__.py | 5 +++ .../sdk/trace/export/__init__.py | 7 ++-- .../tests/trace/export/test_export.py | 36 ++++++++++++++++--- 3 files changed, 42 insertions(+), 6 deletions(-) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index fd279e603af..23f1aaf79be 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -70,6 +70,11 @@ def shutdown(self) -> None: """Called when a :class:`opentelemetry.sdk.trace.Tracer` is shutdown. """ + def force_flush(self) -> None: + """Export all ended spans to the configured Exporter that have not + yet been exported. + """ + class MultiSpanProcessor(SpanProcessor): """Implementation of :class:`SpanProcessor` that forwards all received diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py index b70fb010190..5db2c1e957d 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/export/__init__.py @@ -83,6 +83,9 @@ def on_end(self, span: Span) -> None: def shutdown(self) -> None: self.span_exporter.shutdown() + def force_flush(self) -> None: + pass + class BatchExportSpanProcessor(SpanProcessor): """Batch span processor implementation. @@ -171,7 +174,7 @@ def worker(self): timeout = self.schedule_delay_millis / 1e3 - duration # be sure that all spans are sent - self._flush() + self.force_flush() def export(self) -> None: """Exports at most max_export_batch_size spans.""" @@ -197,7 +200,7 @@ def export(self) -> None: for index in range(idx): self.spans_list[index] = None - def _flush(self): + def force_flush(self): # export all elements until queue is empty while self.queue: self.export() diff --git a/opentelemetry-sdk/tests/trace/export/test_export.py b/opentelemetry-sdk/tests/trace/export/test_export.py index 54fdee2629b..43299ebe6a4 100644 --- a/opentelemetry-sdk/tests/trace/export/test_export.py +++ b/opentelemetry-sdk/tests/trace/export/test_export.py @@ -97,7 +97,7 @@ def _create_start_and_end_span(name, span_processor): class TestBatchExportSpanProcessor(unittest.TestCase): - def test_batch_span_processor(self): + def test_shutdown(self): spans_names_list = [] my_exporter = MySpanExporter(destination=spans_names_list) @@ -109,9 +109,35 @@ def test_batch_span_processor(self): _create_start_and_end_span(name, span_processor) span_processor.shutdown() + self.assertTrue(my_exporter.is_shutdown) + + # check that spans are exported without an explicitly call to + # force_flush() self.assertListEqual(span_names, spans_names_list) - self.assertTrue(my_exporter.is_shutdown) + def test_flush(self): + spans_names_list = [] + + my_exporter = MySpanExporter(destination=spans_names_list) + span_processor = export.BatchExportSpanProcessor(my_exporter) + + span_names0 = ["xxx", "bar", "foo"] + span_names1 = ["yyy", "baz", "fox"] + + for name in span_names0: + _create_start_and_end_span(name, span_processor) + + span_processor.force_flush() + self.assertListEqual(span_names0, spans_names_list) + + # create some more spans to check that span processor still works + for name in span_names1: + _create_start_and_end_span(name, span_processor) + + span_processor.force_flush() + self.assertListEqual(span_names0 + span_names1, spans_names_list) + + span_processor.shutdown() def test_batch_span_processor_lossless(self): """Test that no spans are lost when sending max_queue_size spans""" @@ -127,8 +153,9 @@ def test_batch_span_processor_lossless(self): for _ in range(512): _create_start_and_end_span("foo", span_processor) - span_processor.shutdown() + span_processor.force_flush() self.assertEqual(len(spans_names_list), 512) + span_processor.shutdown() def test_batch_span_processor_many_spans(self): """Test that no spans are lost when sending many spans""" @@ -150,8 +177,9 @@ def test_batch_span_processor_many_spans(self): time.sleep(0.05) # give some time for the exporter to upload spans - span_processor.shutdown() + span_processor.force_flush() self.assertEqual(len(spans_names_list), 1024) + span_processor.shutdown() def test_batch_span_processor_scheduled_delay(self): """Test that spans are exported each schedule_delay_millis"""