Skip to content

Commit ba82a8b

Browse files
committed
Properly support null values.
Based on the suggestion by @gunnaraasen in influxdata/influxdb#2429.
1 parent c069a0b commit ba82a8b

File tree

4 files changed

+184
-175
lines changed

4 files changed

+184
-175
lines changed

datastream/backends/influxdb.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -562,8 +562,12 @@ def _deserialize_value(self, value):
562562
def _format_datapoint(self, datapoint):
563563
result = {}
564564

565-
if 'value' in datapoint:
566-
result['v'] = self._deserialize_value(datapoint['value'])
565+
if 'value' in datapoint or 'value_null' in datapoint:
566+
if datapoint.get('value_null', False):
567+
result['v'] = None
568+
else:
569+
result['v'] = self._deserialize_value(datapoint['value'])
570+
567571
result['t'] = dateutil.parser.parse(datapoint['time'])
568572
if result['t'].tzinfo is None:
569573
result['t'] = result['t'].replace(tzinfo=pytz.utc)
@@ -1211,9 +1215,6 @@ def _append_multiple(self, datapoints, raw=False):
12111215
grouped_datapoints = {}
12121216
now = datetime.datetime.now(pytz.utc)
12131217
for datapoint in datapoints:
1214-
if datapoint['value'] is None:
1215-
continue
1216-
12171218
stream = self._get_stream(datapoint['stream_id'])
12181219
stream_ids.append(stream.uuid)
12191220

@@ -1222,11 +1223,14 @@ def _append_multiple(self, datapoints, raw=False):
12221223
if stream.derived_from is not None:
12231224
raise exceptions.AppendToDerivedStreamNotAllowed
12241225

1226+
if datapoint['value'] is None:
1227+
fields = {'value_null': True}
1228+
else:
1229+
fields = {'value': self._validate_type(stream, datapoint['value'])}
1230+
12251231
point = {
12261232
'measurement': stream.uuid,
1227-
'fields': {
1228-
'value': self._validate_type(stream, datapoint['value']),
1229-
},
1233+
'fields': fields,
12301234
}
12311235

12321236
# Remove subsecond precision.
@@ -1521,6 +1525,7 @@ def add_condition(field, operator, value):
15211525
select.append((VALUE_DOWNSAMPLER_MAP[downsampler] % 'value') + ' AS %s' % downsampler)
15221526
else:
15231527
select.append('value')
1528+
select.append('value_null')
15241529

15251530
if start is not None:
15261531
add_condition('time', '>=', start)

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from setuptools import setup, find_packages
66

7-
VERSION = '0.5.1'
7+
VERSION = '0.5.2'
88

99
if __name__ == '__main__':
1010
setup(

tests/test_common.py

+170-2
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ def test_granularities(self):
496496
def test_null_values(self):
497497
stream_id = self.datastream.ensure_stream({'name': 'foo'}, {}, self.value_downsamplers, datastream.Granularity.Seconds)
498498

499-
# Basic test with one stream
499+
# Basic test with one stream.
500500
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
501501
self.datastream.append(stream_id, None, ts)
502502

@@ -511,11 +511,19 @@ def test_null_values(self):
511511

512512
ts = datetime.datetime(2000, 1, 1, 12, 2, 0, tzinfo=pytz.utc)
513513
self.datastream.append(stream_id, 2, ts)
514+
end_ts = ts
515+
516+
# Test highest granularity.
517+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
518+
data = self.datastream.get_data(stream_id, self.datastream.Granularity.Seconds, start=ts, end=end_ts)
519+
data = list(data)
514520

521+
self.assertEqual([x['v'] for x in data], [None, 10, None, None, 2])
522+
523+
# Test downsampled.
515524
with self.time_offset():
516525
self.datastream.downsample_streams()
517526

518-
end_ts = ts
519527
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
520528
data = self.datastream.get_data(stream_id, self.datastream.Granularity.Seconds10, start=ts, end=end_ts)
521529
data = list(data)
@@ -540,6 +548,166 @@ def test_null_values(self):
540548
self.assertEqual(data[1]['v']['s'], None) # sum
541549
self.assertEqual(data[1]['v']['u'], None) # maximum
542550

551+
# Sum: two streams, only one has a null value for a datapoint
552+
other_stream_id = self.datastream.ensure_stream({'name': 'bar1'}, {}, self.value_downsamplers, datastream.Granularity.Seconds)
553+
554+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
555+
self.datastream.append(other_stream_id, 1, ts)
556+
557+
ts = datetime.datetime(2000, 1, 1, 12, 0, 1, tzinfo=pytz.utc)
558+
self.datastream.append(other_stream_id, 1, ts)
559+
560+
ts = datetime.datetime(2000, 1, 1, 12, 1, 0, tzinfo=pytz.utc)
561+
self.datastream.append(other_stream_id, 1, ts)
562+
563+
ts = datetime.datetime(2000, 1, 1, 12, 1, 1, tzinfo=pytz.utc)
564+
self.datastream.append(other_stream_id, 1, ts)
565+
566+
ts = datetime.datetime(2000, 1, 1, 12, 2, 0, tzinfo=pytz.utc)
567+
self.datastream.append(other_stream_id, 1, ts)
568+
569+
sum_stream_id = self.datastream.ensure_stream(
570+
{'name': 'null_sum1'},
571+
{},
572+
self.value_downsamplers,
573+
datastream.Granularity.Seconds,
574+
derive_from=[stream_id, other_stream_id],
575+
derive_op='sum',
576+
)
577+
self.datastream.backprocess_streams()
578+
579+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
580+
data = self.datastream.get_data(sum_stream_id, self.datastream.Granularity.Seconds, start=ts)
581+
data = list(data)
582+
self._test_data_types(data)
583+
584+
self.assertEqual([x['v'] for x in data], [1, 11, 1, 1, 3])
585+
586+
# Sum: two streams, both have null values for a datapoint
587+
other_stream_id = self.datastream.ensure_stream({'name': 'bar2'}, {}, self.value_downsamplers, datastream.Granularity.Seconds)
588+
589+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
590+
self.datastream.append(other_stream_id, None, ts)
591+
592+
ts = datetime.datetime(2000, 1, 1, 12, 0, 1, tzinfo=pytz.utc)
593+
self.datastream.append(other_stream_id, None, ts)
594+
595+
ts = datetime.datetime(2000, 1, 1, 12, 1, 0, tzinfo=pytz.utc)
596+
self.datastream.append(other_stream_id, None, ts)
597+
598+
ts = datetime.datetime(2000, 1, 1, 12, 1, 1, tzinfo=pytz.utc)
599+
self.datastream.append(other_stream_id, None, ts)
600+
601+
ts = datetime.datetime(2000, 1, 1, 12, 2, 0, tzinfo=pytz.utc)
602+
self.datastream.append(other_stream_id, None, ts)
603+
604+
sum_stream_id = self.datastream.ensure_stream(
605+
{'name': 'null_sum2'},
606+
{},
607+
self.value_downsamplers,
608+
datastream.Granularity.Seconds,
609+
derive_from=[stream_id, other_stream_id],
610+
derive_op='sum',
611+
)
612+
self.datastream.backprocess_streams()
613+
614+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
615+
data = self.datastream.get_data(sum_stream_id, self.datastream.Granularity.Seconds, start=ts)
616+
data = list(data)
617+
self._test_data_types(data)
618+
619+
self.assertEqual([x['v'] for x in data], [None, 10, None, None, 2])
620+
621+
# Derivative
622+
derivative_stream_id = self.datastream.ensure_stream(
623+
{'name': 'null_derivative'},
624+
{},
625+
self.value_downsamplers,
626+
datastream.Granularity.Seconds,
627+
derive_from=[stream_id],
628+
derive_op='derivative',
629+
)
630+
self.datastream.backprocess_streams()
631+
632+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
633+
data = self.datastream.get_data(derivative_stream_id, self.datastream.Granularity.Seconds, start=ts)
634+
data = list(data)
635+
self._test_data_types(data)
636+
637+
self.assertEqual([x['v'] for x in data], [None, None, None])
638+
639+
# Counter derivative
640+
reset_stream_id = self.datastream.ensure_stream({'name': 'reset'}, {}, self.value_downsamplers, datastream.Granularity.Seconds)
641+
counter_derivative_stream_id = self.datastream.ensure_stream(
642+
{'name': 'null_counter_derivative'},
643+
{},
644+
self.value_downsamplers,
645+
datastream.Granularity.Seconds,
646+
derive_from=[
647+
{'name': 'reset', 'stream': reset_stream_id},
648+
{'stream': stream_id},
649+
],
650+
derive_op='counter_derivative',
651+
)
652+
self.datastream.backprocess_streams()
653+
654+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
655+
data = self.datastream.get_data(counter_derivative_stream_id, self.datastream.Granularity.Seconds, start=ts)
656+
data = list(data)
657+
self._test_data_types(data)
658+
659+
self.assertEqual([x['v'] for x in data], [None, None, None])
660+
661+
# Counter reset
662+
reset_stream_id = self.datastream.ensure_stream(
663+
{'name': 'null_reset'},
664+
{},
665+
['count'],
666+
datastream.Granularity.Seconds,
667+
value_type='nominal',
668+
derive_from=stream_id,
669+
derive_op='counter_reset',
670+
)
671+
672+
self.datastream.backprocess_streams()
673+
674+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
675+
data = self.datastream.get_data(reset_stream_id, self.datastream.Granularity.Seconds, start=ts)
676+
data = list(data)
677+
self._test_data_types(data)
678+
679+
self.assertEqual([x['v'] for x in data], [1])
680+
681+
# Test null value insertion on downsampling
682+
stream_id = self.datastream.ensure_stream({'name': 'bar'}, {}, self.value_downsamplers, datastream.Granularity.Seconds)
683+
684+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
685+
self.datastream.append(stream_id, 10, ts)
686+
687+
ts = datetime.datetime(2000, 1, 1, 12, 10, 0, tzinfo=pytz.utc)
688+
self.datastream.append(stream_id, 20, ts)
689+
690+
ts = datetime.datetime(2000, 1, 1, 12, 13, 45, tzinfo=pytz.utc)
691+
with self.time_offset():
692+
self.datastream.downsample_streams(until=ts)
693+
694+
ts = datetime.datetime(2000, 1, 1, 12, 20, 0, tzinfo=pytz.utc)
695+
self.datastream.append(stream_id, 30, ts)
696+
697+
ts = datetime.datetime(2000, 1, 1, 12, 21, 0, tzinfo=pytz.utc)
698+
self.datastream.append(stream_id, 30, ts)
699+
end_ts = ts
700+
701+
with self.time_offset():
702+
self.datastream.downsample_streams()
703+
704+
ts = datetime.datetime(2000, 1, 1, 12, 0, 0, tzinfo=pytz.utc)
705+
data = self.datastream.get_data(stream_id, self.datastream.Granularity.Minutes, start=ts, end_exclusive=end_ts)
706+
data = list(data)
707+
self._test_data_types(data)
708+
709+
self.assertEqual([x['v']['m'] for x in data], [10.] + [None] * 9 + [20.] + [None] * 9 + [30.])
710+
543711
def test_find_streams(self):
544712
# Make sure we are starting with an empty database.
545713
self.assertEqual(len(self.datastream.find_streams()), 0)

0 commit comments

Comments
 (0)