-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbases.py
293 lines (234 loc) · 9.83 KB
/
bases.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
""" base classes for temporal models """
import abc
import collections
import contextlib
import datetime as dt
import typing
import uuid
import warnings
import sqlalchemy as sa
import sqlalchemy.dialects.postgresql as sap
import sqlalchemy.orm as orm
import sqlalchemy.orm.attributes as attributes
import psycopg2.extras as psql_extras
from temporal_sqlalchemy import nine
from temporal_sqlalchemy.metadata import STRICT_MODE_KEY
_ClockSet = collections.namedtuple('_ClockSet', ('effective', 'vclock'))
T_PROPS = typing.TypeVar(
'T_PROP', orm.RelationshipProperty, orm.ColumnProperty)
NOT_FOUND_SENTINEL = object()
class EntityClock:
""" Clock Model base -- all Clocks inherit this """
id = sa.Column(sap.UUID(as_uuid=True), default=uuid.uuid4, primary_key=True)
tick = sa.Column(sa.Integer, nullable=False)
timestamp = sa.Column(sa.DateTime(True),
server_default=sa.func.current_timestamp())
class TemporalProperty:
"""mixin when constructing a property history table"""
__table__ = None # type: sa.Table
entity_id = None # type: orm.ColumnProperty
entity = None # type: orm.RelationshipProperty
effective = None # type: psql_extras.DateTimeRange
vclock = None # type: psql_extras.NumericRange
class TemporalActivityMixin:
""" stub for an Activity class to have an ID property """
@abc.abstractmethod
def id(self):
pass
class TemporalOption:
""" Temporal Options stored on Model """
def __init__(self,
history_models: typing.Dict[T_PROPS, nine.Type[TemporalProperty]],
temporal_props: typing.Iterable[T_PROPS],
clock_model: nine.Type[EntityClock],
activity_cls: nine.Type[TemporalActivityMixin] = None,
allow_persist_on_commit: bool = False):
self.history_models = history_models
self.temporal_props = temporal_props
self.clock_model = clock_model
self.activity_cls = activity_cls
self.allow_persist_on_commit = allow_persist_on_commit
@property
def clock_table(self):
""" DEPRECATED: use .clock_model instead -- Clock Model for this Model"""
warnings.warn(
'use TemporalOption.clock_model instead',
PendingDeprecationWarning)
return self.clock_model
@property
def history_tables(self):
""" DEPRECATED: use .history_models instead -- list of history models for this Model"""
warnings.warn(
'use TemporalOption.history_models instead',
PendingDeprecationWarning)
return self.history_models
@staticmethod
def make_clock(effective_lower: dt.datetime,
vclock_lower: int,
**kwargs) -> _ClockSet:
"""construct a clock set tuple"""
effective_upper = kwargs.get('effective_upper', None)
vclock_upper = kwargs.get('vclock_upper', None)
effective = psql_extras.DateTimeTZRange(
effective_lower, effective_upper)
vclock = psql_extras.NumericRange(vclock_lower, vclock_upper)
return _ClockSet(effective, vclock)
def record_history(self,
clocked: 'Clocked',
session: orm.Session,
timestamp: dt.datetime):
"""record all history for a given clocked object"""
new_tick = self._get_new_tick(clocked)
is_strict_mode = session.info[STRICT_MODE_KEY]
vclock_history = attributes.get_history(clocked, 'vclock')
is_vclock_unchanged = (vclock_history.unchanged and
new_tick == vclock_history.unchanged[0])
new_clock = self.make_clock(timestamp, new_tick)
attr = {'entity': clocked}
for prop, cls in self.history_models.items():
value = self._get_prop_value(clocked, prop)
if value is not NOT_FOUND_SENTINEL:
if is_strict_mode:
assert not is_vclock_unchanged, \
'flush() has triggered for a changed temporalized property outside of a clock tick'
self._cap_previous_history_row(clocked, new_clock, cls)
# Add new history row
hist = attr.copy()
hist[prop.key] = value
session.add(
cls(
vclock=new_clock.vclock,
effective=new_clock.effective,
**hist,
),
)
def record_history_on_commit(self,
clocked: 'Clocked',
changes: dict,
session: orm.Session,
timestamp: dt.datetime):
"""record all history for a given clocked object"""
new_tick = self._get_new_tick(clocked)
new_clock = self.make_clock(timestamp, new_tick)
attr = {'entity': clocked}
for prop, cls in self.history_models.items():
if prop in changes:
value = changes[prop]
self._cap_previous_history_row(clocked, new_clock, cls)
# Add new history row
hist = attr.copy()
hist[prop.key] = value
session.add(
cls(
vclock=new_clock.vclock,
effective=new_clock.effective,
**hist,
),
)
def get_history(self, clocked: 'Clocked'):
""" return history & notify if the vclock is actually changed for this """
history = {}
new_tick = self._get_new_tick(clocked)
vclock_history = attributes.get_history(clocked, 'vclock')
is_vclock_unchanged = (vclock_history.unchanged and
new_tick == vclock_history.unchanged[0])
for prop in self.history_models.keys():
value = self._get_prop_value(clocked, prop)
if value is not NOT_FOUND_SENTINEL:
history[prop] = value
return history, is_vclock_unchanged
@staticmethod
def _cap_previous_history_row(clocked, new_clock, cls):
""" Cap previous history row if exists """
if sa.inspect(clocked).identity is not None:
# but only if it already exists!!
effective_close = sa.func.tstzrange(
sa.func.lower(cls.effective),
new_clock.effective.lower,
'[)')
vclock_close = sa.func.int4range(
sa.func.lower(cls.vclock),
new_clock.vclock.lower,
'[)')
history_query = getattr(
clocked, cls.entity.property.backref[0])
history_query.filter(
sa.and_(
sa.func.upper_inf(cls.effective),
sa.func.upper_inf(cls.vclock),
),
).update(
{
cls.effective: effective_close,
cls.vclock: vclock_close,
}, synchronize_session=False,
)
@staticmethod
def _get_prop_value(clocked, prop):
state = attributes.instance_state(clocked)
# fires a load on any deferred columns
if prop.key not in state.dict:
getattr(clocked, prop.key)
if isinstance(prop, orm.RelationshipProperty):
changes = attributes.get_history(
clocked, prop.key,
passive=attributes.PASSIVE_NO_INITIALIZE)
else:
changes = attributes.get_history(clocked, prop.key)
if changes.added:
return changes.added[0]
return NOT_FOUND_SENTINEL
@staticmethod
def _get_new_tick(clocked):
state = attributes.instance_state(clocked)
try:
new_tick = state.dict['vclock']
except KeyError: # pragma: no cover
# TODO understand why this is necessary
new_tick = getattr(clocked, 'vclock')
return new_tick
class Clocked:
"""Clocked Mixin gives you the default implementations for working
with clocked data
use with add_clock to make your model temporal:
>>> import sqlalchemy as sa
>>> import sqlalchemy.ext.declarative as declarative
>>> import temporal_sqlalchemy
>>>
>>> @temporal_sqlalchemy.add_clock('prop1', 'prop2')
>>> class MyModel(Clocked, declarative.declarative_base()):
>>> prop1 = sa.Column(sa.INTEGER)
>>> prop2 = sa.Column(sa.TEXT)
>>>
>>> my_instance = MyModel(prop1=1, prop2='foo')
>>> assert my_instance.temporal_options is MyModel.temporal_options
>>> assert my_instance.vclock == 1
"""
vclock = sa.Column(sa.Integer, default=1)
clock = None # type: orm.relationship
temporal_options = None # type: TemporalOption
first_tick = None # type: EntityClock
latest_tick = None # type: EntityClock
@property
def date_created(self):
return self.first_tick.timestamp
@property
def date_modified(self):
return self.latest_tick.timestamp
@contextlib.contextmanager
def clock_tick(self, activity: TemporalActivityMixin = None):
"""Increments vclock by 1 with changes scoped to the session"""
warnings.warn("clock_tick is going away in 0.5.0",
PendingDeprecationWarning)
if self.temporal_options.activity_cls is not None and activity is None:
raise ValueError("activity is missing on edit") from None
session = orm.object_session(self)
with session.no_autoflush:
yield self
if session.is_modified(self):
self.vclock += 1
new_clock_tick = self.temporal_options.clock_model(
entity=self, tick=self.vclock)
if activity is not None:
new_clock_tick.activity = activity
session.add(new_clock_tick)