Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: serialize dask_awkward.Array to None and store hard ref in closure in behavior #824

Merged
merged 3 commits into from
Jun 6, 2023

Conversation

lgray
Copy link
Collaborator

@lgray lgray commented May 30, 2023

Fixes #823

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

@jrueb can you please try this one out!

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

@nsmith- this feels mildly dirty, any idea?

@nsmith-
Copy link
Member

nsmith- commented May 30, 2023

Keeping events for the duration of the program seems hazardous, even for dask objects that are all-metadata. To be honest, I don't understand how putting the self-reference in the finalizer allows it to ever clean up, even at the end (but on the other hand, who cares how this kind of stuff is cleaned at termination)

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

@nsmith- it'll get called at program exit for sure (with the atexit flag set to true, judging from docs), this just a more controllable way of stuffing the reference somewhere that has known behavior.

As far as behavior stickiness, it's not that. It's because the events object that gets created holds the behaviors as twice in-directed (in events._meta.behavior, where _meta is a typetracer array and events.behavior is actually a property that calls to that). I cannot figure out the dask-awkward self from within the behavior since it doesn't know about the dask-awkward array at all!

I think there are ways to get the list of active finalizers on an object and we can percolate lifetime control through that?

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

So it for sure goes at program exit, I added in a print statement:

(coffea-dev) lgray@dhcp-131-225-173-191 coffea % python jrueb_repro.py                 
/Users/lgray/coffea-dev/coffea/coffea/nanoevents/schemas/nanoaod.py:215: RuntimeWarning: Missing cross-reference index for FatJet_genJetAK8Idx => GenJetAK8
  warnings.warn(
finalizing: dask.awkward<from-uproot, npartitions=1>

I'll add in a __events_finalizer__ weakref in the behavior as an escape hatch for now.

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

Arg, multiple levels of confusion but it works like I expect.

@nsmith-
Copy link
Member

nsmith- commented May 30, 2023

I'm going to push on behavior stickyness because it should be possible: after all, how can a dask-awkward array know its mixin without keeping somewhere a reference to the mixin class type? The only other option is a name in the parameters and a global lookup, but as far as I know ak2 keeps a behavior namespace per object as it did in ak1.

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

@nsmith- dask-awkward arrays don't directly know about their behaviors. It's stored in the _meta which is properly sticky in the way you expect. The problem is one of double indirection more than anything else. What you were talking about on skype with having the dask-awkward array keep a reference to itself is probably the best way to go (then it will need a custom pickle to not send it).

The problem with what you want to do is:

  1. The behavior of the _meta of the dask-awkward array doesn't know about the dask-awkward array
  2. The dask awkward array itself should really not be sent to the nodes doing the work, it gets intensely confusing really quickly if you don't crisply define which side of the fence you're on.

https://github.com/dask-contrib/dask-awkward/blob/main/src/dask_awkward/lib/core.py#L973-L1045

@lgray lgray force-pushed the fix_dead_weakref branch from e3ec2cd to b7ec235 Compare May 30, 2023 20:22
@nsmith-
Copy link
Member

nsmith- commented May 30, 2023

The root issue here is: what is the ak2-blessed way of handling self-references that survive slicing?

Stepping back to what the idea was in 0.7/ak1: you could delete the original events and/or it's factory, but with a handle to some sliced events array (that internally held a strong ref to the factory which was serializable and could either recreate or use its existing weakref to an events object), you can call any method that makes use of the original array, where the method would know how to get the factory pointer (it was stored in the behavior metadata) and hence the (original via weakref or re-made) unsliced events.
In the case where the weakref was dead and events was remade, it would be more overhead than keeping a strong ref to the events object itself, but then at least nobody is surprised they couldn't do something because they let the factory go out of scope, while still avoiding having a pointer to an events array that could be really large and unreleasable by the more savvy user.
Nowdays, I guess that point about unreleasable events is moot since in dask-awkward it is just some metadata. Maybe we can just use a strong reference in the appropriate location?

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

@martindurant @jpivarski

@jpivarski
Copy link
Member

I've been staring at this, trying to understand the issue. I highly doubt that weak references will be needed in ak2. From what I understood in ak1, the only reason we needed them then was to break reference cycles through C++, and all of that is gone now. Reference cycles are fine; the garbage collector will find them.

The thing I'm trying to parse is "self-references that survive slicing." Does this have something to do with one particle type's record fields being global indexes into another particle type (e.g. muons have a list of closest jets, expressed as indexes, and index 0 is the first jet of the first event, not the first jet of the same event as the muon)? If a behavior computes that slice as part of a method or property implementation, it will need to have a reference to the whole, unsliced array so that the global indexing is always correct.

If that's the issue, I don't see how weak references help. As long as we're talking about cousin cross-references, not cycles in the graph that would break most recursive functions (i.e. still a DAG), you can keep a reference to the original array by burying it deep in an IndexedArray or IndexedOptionArray.

For instance, suppose you're given these data,

import awkward as ak
import numpy as np

events = ak.Array([
    {"muon": [{"pt": 1}, {"pt": 2}, {"pt": 3}], "jet": [{"pt": 1.1}, {"pt": 2.2}]},
    {"muon": [{"pt": 4}, {"pt": 5}], "jet": [{"pt": 3.3}, {"pt": 4.4}]},
    {"muon": [{"pt": 6}], "jet": [{"pt": 5.5}, {"pt": 6.6}, {"pt": 7.7}]},
    {"muon": [{"pt": 7}, {"pt": 8}], "jet": []},
    {"muon": [{"pt": 9}], "jet": [{"pt": 8.8}, {"pt": 9.9}]},
])

and you annotate the types (and provide behaviors, but I don't need to do that in this example).

events["muon"] = ak.with_name(events["muon"], "Muon")
events["jet"] = ak.with_name(events["jet"], "Jet")

Suppose that you're also given the global index of each muon's favorite jet,

muons_favorite_jet_index = ak.Array([[0, 1, -1], [3, 2], [5], [-1, -1], [8]])

which can be assigned as a field of the Muon records (unnecessary; just to demonstrate).

events["muon", "favorite_index"] = muons_favorite_jet_index

Similarly, you can add the whole jet objects in their original order by nesting them within an IndexedArray or IndexedOptionArray,

muons_favorite_jet = ak.Array(
    ak.contents.ListOffsetArray(
        events["muon"].layout.offsets,
        ak.contents.IndexedOptionArray(
            ak.index.Index64(muons_favorite_jet_index.layout.content.data),
            events["jet"].layout.content,
        ),
    )
)

events["muon", "favorite"] = muons_favorite_jet

and

>>> events.type.show()
5 * {
    jet: var * Jet[
        pt: float64
    ],
    muon: var * Muon[
        pt: int64,
        favorite_index: int64,
        favorite: ?Jet[
            pt: float64
        ]
    ]
}

When events is sliced with an event-level or particle-level cut, the whole events["jet"] array remains in there, unsliced with its global index positions intact, because it's protected by the IndexedArray or IndexedOptionArray.

selected_events = events[[True, False, False, True, True]]

So

>>> events["muon", "favorite", "pt"].show()
[[1.1, 2.2, None],
 [4.4, 3.3],
 [6.6],
 [None, None],
 [9.9]]
>>> selected_events["muon", "favorite", "pt"].show()
[[1.1, 2.2, None],
 [None, None],
 [9.9]]

still has the right indexing (the last event, which passed the cut, still has a jet pt of 9.9).

In fact,

>>> selected_events["muon", "favorite"].layout.content.content.content is events["jet"].layout.content
True

This "keeps events alive for the duration of the user session" because even if a user cuts a lot of events and drops the reference to the original array, they're kept alive inside the IndexedArray or IndexedOptionArray. Someone might argue that it's undesirable because the user cut events for the purpose of reducing memory, or they call ak.packed on the array or save it, which projects the IndexedArray or IndexedOptionArray, breaking ties with the original (but then again, that may be what the user wanted).

In summary, events.muon.favorite is not invalidated by slicing. It remains meaningful, but might be more memory use than users intend. Meanwhile, events.muon.favorite_index is only a useful index for (manually) slicing the original array. To use events.muon.favorite_index, users have to know what they're doing and keep a copy of events.jet around, but the memory use is easier to understand.

Also, you're asking about this in the context of dask-awkward, not awkward. The muons_favorite_jet has to be constructed early. Some months ago, we were talking about ways of building arbitrary structures without Dask thinking that all of the pieces in the construction have been touched. I don't remember what the final way to do that was, but this is just another construction; it would be built the way that structures in general are built. Or if this requires a Dask-friendly ak.from_buffers, then maybe it's time for that.

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

@jpivarski I think @nsmith- (and please Nick correct me if I am wrong) and I are in agreement that making the user manage multiple lists of jets so they can always index back into the full jet array (or similar) in order to keep correct cross references is too much unreasonable bookkeeping. The probability of error is very high and the additional complexity of code for someone just learning python to do physics analysis, who will immediately encounter the need for such patterns, is far too much.

Long story short: events.Muon.matched_jet should work as expected with or without either of events or Muon being sliced. That's why we're doing things this way in coffea. It is well justified since matching/cross-referencing is required in more-or-less every non-trivial analysis we've seen so far and it is an easily automated problem that usefully hides complex code.

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

Oh I think I read the old version of your comment @jpivarski just a second.

Indeed - I'll give this a try with a form and typetracer first to see if I can demonstrate it from that end.
Will follow up.

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

A major issue is to be able to do this lazily without inducing unpleasant responses in users. :-)

@jpivarski
Copy link
Member

If this thing is constructed without touching (again, I don't know if we need a Dask-friendly from_buffers for that or if what we have now is okay) and then is never touched by analysis code, then it would get optimized away.

See:

def add_form_keys(form, hint=()):
    add_form_keys.num += 1
    num = add_form_keys.num
    if hasattr(form, "contents"):
        return form.copy(
            contents=[
                add_form_keys(v, hint + (k,))
                for k, v in zip(form.fields, form.contents)
            ],
            form_key=f"node-{num}-{'-'.join(hint)}",
        )
    elif hasattr(form, "content"):
        return form.copy(
            content=add_form_keys(form.content, hint + ('@',)),
            form_key=f"node-{num}-{'-'.join(hint)}",
        )
    else:
        return form.copy(
            form_key=f"node-{num}-{'-'.join(hint)}",
        )

add_form_keys.num = 0

form_with_keys = add_form_keys(events.layout.form)

typetracer, report = ak.typetracer.typetracer_with_report(form_with_keys)
events_tt = ak.Array(typetracer)

and then:

>>> report.data_touched
[]

>>> events_tt.muon.pt + 1
<Array-typetracer [...] type='## * var * int64'>
>>> report.data_touched
['node-5-muon', 'node-7-muon-@-pt']

>>> events_tt.jet.pt + 1
<Array-typetracer [...] type='## * var * float64'>
>>> report.data_touched
['node-5-muon', 'node-7-muon-@-pt', 'node-2-jet', 'node-4-jet-@-pt']

>>> events_tt.muon.favorite.pt + 1
<Array-typetracer [...] type='## * var * ?float64'>
>>> report.data_touched
['node-5-muon', 'node-7-muon-@-pt', 'node-2-jet', 'node-4-jet-@-pt',
 'node-9-muon-@-favorite', 'node-11-muon-@-favorite-@-pt']

Or restart and

>>> typetracer, report = ak.typetracer.typetracer_with_report(form_with_keys)
>>> events_tt = ak.Array(typetracer)

>>> report.data_touched
[]

>>> events_tt.muon.favorite.pt + 1
<Array-typetracer [...] type='## * var * ?float64'>
>>> report.data_touched
['node-5-muon', 'node-9-muon-@-favorite', 'node-11-muon-@-favorite-@-pt']

Touching the jet fields is independent of touching the muon fields (though it does touch the muon offsets because the favorite jets are nested under the muon's ListOffsetArray).

@lgray
Copy link
Collaborator Author

lgray commented May 30, 2023

Looking at how we've done things I think you're correct that we can start everything off as an IndexedOptionArray and work from that instead of ListOffsetArrays that we do takes on to get the IndexedOptionArray.

It'll take a bit to re-engineer but it'll be far more correct.

@nsmith- your thoughts?

@nsmith-
Copy link
Member

nsmith- commented May 31, 2023

Thanks Jim for also understanding what we were talking about. I should have included an example. Sadly, there are cycles, e.g. gen particle parents, so embedding the reference in the form is not an option. Actually, since cross-references go both ways, events.Muon.matched_jet.matched_muons exists and is another example of a cycle. Another issue with this is that ak.packed(events) would be a lot larger than people expect, since I assume the cross-references are de-referenced explicitly. The old solution is to basically have a smart behavior that knew where to find the un-sliced original array and use that on demand. That introduced a cycle in the array object, I don't recall if it was through the pybind11 layer or not but I suppose so otherwise we would not have used a weakref (unless my earlier comment was correct that it was a chance to reduce memory usage).

A minimal example, focusing on one "event" to not cloud the issue with the jagged array indexing mess that has to happen:

import awkward as ak

behavior = {}

@ak.mixin_class(behavior)
class GenPart:
    @property
    def parent(self):
        return self.behavior["__original__"][self.parentIdx]


genpart = ak.zip(
    {
        "pt": ak.Array([0., 0., 13., 14., 15., 16., 17.]),
        "parentIdx": ak.Array([None, None, 0, 0, 1, 2, 3]),
    },
    with_name="GenPart",
    behavior=behavior,
)
# hide in a spot that is sticky to all downstream arrays
genpart.behavior["__original__"] = genpart

# slice
genpart = genpart[3:]
print(genpart.parent)

This works in ak1 and ak2. But for dask-awkward, I am not sure where to keep the reference.

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

@nsmith-

import awkward as ak

behavior = {}

@ak.mixin_class(behavior)
class GenPart:
    @property
    def parent(self):
        return self.behavior["__original__"][self.parentIdx]

parentIdx = ak.Array([None, None, 0, 0, 1, 2, 3])
pt = ak.Array([0., 0., 13., 14., 15., 16., 17.])

genpart = ak.zip(
    {
        "pt": pt,
        "parentIdx": parentIdx,
    },
    with_name="GenPart",
    behavior=behavior,
)

layout_parents = ak.contents.IndexedOptionArray(
    ak.index.Index64(ak.fill_none(parentIdx, -1)),
    genpart.layout,
)

parents = ak.Array(layout_parents)

# hide in a spot that is sticky to all downstream arrays
genpart.behavior["__original__"] = genpart

# slice
genpart = genpart[3:]
parents = parents[3:]
print(genpart.parent)
print(parents)

appears to work!

@nsmith-
Copy link
Member

nsmith- commented May 31, 2023

Ok, and genpart.parent.parent?

@jpivarski
Copy link
Member

Copying (and summarizing) from Slack:

Although ak2's Pythonic layout nodes can, in principle, support cycles, you'd have to build them by modifying private attributes and we'd have a lot of trouble memoizing all of the recursive functions to make them cyclic-safe. In many cases, the problem would be conceptual—sure, we can stop recursing once we get to a node we've seen before, but what would the return value be? For example, should purelist_depth return the depth-until-a-cycle or infinity (a non-int)? We'd have to ask hard questions like this about every recursive function, and almost all of our functions are recursive.

So we should consider ourselves restricted to no-cycles. Two use-cases that break this are:

  • genparticles, whose child is itself
  • bidirectional cross-references between cousins, like Muon's favorite_jet and Jet's favorite_muon.

I'll come back to the genparticles case because I don't think we need to make that structurally recursive. There's only one layout node involved—we should be able to sequester it somehow and do all of the iterative self-application in behaviors, rather than structure.

For bidirectional cross-references among cousins, there's a way out. Muon's favorite_jet can be a Jet type without a favorite_muon field, and Jet's favorite_muon can be a Muon type without a favorite_jet field. The two types, Muon_with_favorite and Muon_without_favorite can share (zero-copy reference in memory) all fields other than the favorite_jet field.

It means users would be able to say

events.muon.favorite_jet.pt

and

events.jet.favorite_muon.pt

but not

events.muon.favorite_jet.favorite_muon.favorite_jet.favorite_muon.pt

Does anybody actually need the last case? It seems eclectic.

To do this, you can define only one mixin class named Muon and only one mixin class named Jet and use this mixin on both the RecordArray that doesn't contain the favorite_* field and the RecordArray that does. Even if some of the properties and methods defined in this class depend on the existence of the favorite_* field, those properties and methods would just fail when used on a RecordArray without that field. By the magic of Python's duck typing and the ak.behaviors-parameters coupling being loose, all of the operations that don't rely on MuonJetMuon → ... cycles will still work, which I believe are all of the important operations.

Who needs to literally take advantage of bidirectional cousin cycles? If it's just a technicality, that technicality can be avoided.

@NJManganelli
Copy link
Collaborator

NJManganelli commented May 31, 2023

Ah, apologies, I didn't read Jim's comment all the way. For muons and jets there should be ways to handle things, even if a little less convenient, but this is the simple case of something very similar in gen particles where you'd lose a lot of convenience if you can't naturally traverse the parentage in multiple directions and find those cousins. That feels like bread and butter when you're, e.g., relating decay products of tops, including all those radiated hadrons

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

@nsmith-

appending to the above:

layout_parents_parents = ak.contents.IndexedOptionArray(
    ak.index.Index64(ak.fill_none(parents.parentIdx, -1)),
    parents.layout.content,
)

parents_parents = ak.Array(layout_parents_parents)

print(genpart.parent.parent)
print(parents_parents)

yields:

(coffea-dev) lgray@Lindseys-MBP coffea % python -i index_array_play.py
[{pt: 0, parentIdx: None}, {pt: 0, ...}, {...}, {pt: 14, parentIdx: 0}]
[{pt: 0, parentIdx: None}, {pt: 0, ...}, {...}, {pt: 14, parentIdx: 0}]
[None, None, {pt: 0, parentIdx: None}, {pt: 0, parentIdx: None}]
[None, None, {pt: 0, parentIdx: None}, {pt: 0, parentIdx: None}]

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

so we can absolutely define a mixin that builds the parents on the fly (as a daskable operation too), that should also be reasonably traceable given Jim's indications above.

@jpivarski
Copy link
Member

I was just working on a GenPart in which parents can be repeatedly applied and doesn't stash the original array in the behavior. Apart from stashing the original array in the behavior, it may be functionally equivalent to @lgray's new implementation.

We should avoid stashing large data, like arrays, in the behavior because a string representation of the whole behavior is included in the Numba type of an array passed to Numba:

https://github.com/scikit-hep/awkward/blob/ca7637a0b4fa0aba0337095a604145e2491df26f/src/awkward/_connect/numba/arrayview.py#L235-L248

Since behaviors can add new methods and unary/binary operators to records, the way that a Numba function gets compiled depends on the behavior. Change the behavior and we need to recompile the function, and Numba only knows that it needs to recompile the function if the (string) type name changes.

Oh wait—the string representation of an array is not large, so this is not a performance issue. But it would lead to Numba functions getting recompiled when decimal values of the few numbers that do appear in the __original__ GenPart's string representation change. That would be a bad surprise.

So anyway, here is a GenPart implementation based on sequestering the original array in a nested record (so that Awkward won't project away IndexedArrays, as long as the GenPart structure is unbroken).

import awkward as ak
import numpy as np

parentIdx = ak.Array([None, None, 0, 0, 1, 2, 3])
pt = ak.Array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6])

genpart = ak.zip(
    {
        "pt": pt,
        "parentIdx": ak.fill_none(parentIdx, -1),
    }
)

@ak.mixin_class(ak.behavior)
class GenPart:
    @property
    def pt(self):
        return self["_data", "pt"]
    @property
    def parent(self):
        return ak.Array(
            ak.contents.RecordArray(
                [
                    ak.contents.IndexedOptionArray(
                        ak.index.Index64(
                            ak.fill_none(self["_data", "parentIdx"], -1)
                        ),
                        self["_data"].layout.content,
                    ),
                ],
                [
                    "_data",
                ],
            ),
            with_name="GenPart",
        )

sequestered = ak.Array(
    ak.contents.RecordArray(
        [
            ak.contents.IndexedOptionArray(
                ak.index.Index64(np.arange(len(genpart), dtype=np.int64)),
                genpart.layout,
            ),
        ],
        [
            "_data",
        ],
    ),
    with_name="GenPart",
)

With this definition,

>>> sequestered.pt
<Array [0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6] type='7 * ?float64'>
>>> sequestered.parent.pt
<Array [None, None, 0, 0, 1.1, 2.2, 3.3] type='7 * ?float64'>
>>> sequestered.parent.parent.pt
<Array [None, None, None, None, None, 0, 0] type='7 * ?float64'>

and

>>> sequestered[3:].pt
<Array [3.3, 4.4, 5.5, 6.6] type='4 * ?float64'>
>>> sequestered[3:].parent.pt
<Array [0, 1.1, 2.2, 3.3] type='4 * ?float64'>
>>> sequestered[3:].parent.parent.pt
<Array [None, None, 0, 0] type='4 * ?float64'>

and

>>> sequestered[[3, 4, 5, 6]].pt
<Array [3.3, 4.4, 5.5, 6.6] type='4 * ?float64'>
>>> sequestered[[3, 4, 5, 6]].parent.pt
<Array [0, 1.1, 2.2, 3.3] type='4 * ?float64'>
>>> sequestered[[3, 4, 5, 6]].parent.parent.pt
<Array [None, None, 0, 0] type='4 * ?float64'>

This would usually work without the double-nesting (just an IndexedArray or IndexedOptionArray of RecordArray containing pt, parentIdx, etc.) because Awkward has optimizations that leave indexed-of-records as indexed-of-records, but you don't want to depend on that implementation detail. Adding the extra layer of RecordArray between the view of selected/sliced genparticles and the original, full set of genparticles and reconstructing it in the parent property ensures that none of Awkward's internal methods consider it fair game to IndexedArray.project it away, which would make parentIdx inapplicable.

The next step would be to make sure that these sorts of constructions will work in dask-awkward. The method of stashing the original genparticles in behavior["__original__"] would probably be an insurmountable problem for dask-awkward, too, since the behavior is probably cloudpickled and sent over the wire.

@jpivarski
Copy link
Member

Just as something to think about—you probably don't want to introduce this complication: it's conceivably possible to make cross-referenced cousins work exactly two levels deep. What I described above made them work one level deep by introducing two RecordArray structures for each particle: "muon", "muon with favorite jets", "jet", and "jet with favorite muons". It's possible to keep going:

  • muon0 has particle attributes only
  • muon1 has a favorite_jet field that points to jet2
  • muon2 has a favorite_jet field that points to jet0
  • jet0 has particle attributes only
  • jet1 has a favorite_muon field that points to muon2
  • jet2 has a favorite_muon field that points to muon0

The events object would include muon1 and jet1. From there, you can go

events.muon1.favorite_jet.favorite_muon.pt

but no deeper:

events.muon1.favorite_jet.favorite_muon.favorite_jet   # is an error!

Similarly, you can go

events.jet1.favorite_muon.favorite_jet.pt

but no deeper:

events.jet1.favorite_muon.favorite_jet.favorite_muon   # is an error!

With enough different RecordArray types, you can go a specified depth without introducing the recursion errors that true cyclic references would cause.

This is probably more complicated than you want to consider, but it's good to know that there's room to grow, if you ever need to. Also, I'm going to assert that while there might be a 1% chance that someone really needs events.muon.favorite_jet.favorite_muon.pt, there's only a 0.01% chance that anyone is going to need events.muon.favorite_jet.favorite_muon.favorite_jet.pt.

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

So this exploration is interesting, but isn't allowing storing a reference back the original array a bit of a cleaner implementation at this point? It's much more clear, IMO, what it's doing to a typical physicist using the ref-to-original trick.

The content munging takes a lot longer to understand, and is harder to reproduce oneself (as we often like to do to convince ourselves we understand something).

@nsmith-
Copy link
Member

nsmith- commented May 31, 2023

I like the sequestered array approach! Essentially you get recursion by holding onto a tail. The only downside is now each of the array's fields that is now hidden in _data needs to be explicitly exposed via properties, but that is not such a bad thing.

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

It will be nasty to deal with in terms of data versioning (though GenParticles don't move that much).

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

@jpivarski on the dask-awkward side of things the way it's presently implemented shows that you don't even need the pointer-to-original in the task graph being executed, it's basically embedded in the graph itself.

Right now with coffea the weakref gets pickled to a None (yes this a bad override I would like to see go away) and is not used when being executed. Since dask forces the operation into a DAG (since infinite recursion is not possible in an analysis you can actually execute) the need to pickle the ref-to-self disappears when dealing with the concrete awkward arrays on worker nodes.

@jpivarski
Copy link
Member

Oh, but ak.to_packed or ak.to_arrow (and therefore anything that writes the arrays) would blow up memory use, for either ListArray or IndexedArray of RegularArray.

@lgray
Copy link
Collaborator Author

lgray commented May 31, 2023

@nsmith- do you find that an acceptable risk?

@lgray
Copy link
Collaborator Author

lgray commented Jun 1, 2023

While we're discussing on slack the possibility of just having the self-reference in dask-awkward (some understanding was reached about what was going on before that hadn't been there before). I'll just leave the above implementation as is, might be moot.

I'm glad we figured all this stuff out though.

@jpivarski
Copy link
Member

Okay, once more unto the breach. Here's a solution that's safe for saving (and continues to work after loading a saved file), doesn't explode memory use even after ak.to_packed/ak.to_arrow, and is still $\mathcal{O}(n)$. (I had some thoughts about np.searchsorted, which would make it $\mathcal{O}(n \log n)$, which isn't terrible, but it's nice to not have to do that.)

Instead of keeping the original, uncut data, we allow the data to be cut and always translate the global parentIdx into whatever the new subset (and order) is. We do this the same way that a database keeps track of the identity of table rows, with a surrogate index, a column of numbers that are unique at the time when the global parentIdx applies. Actually, this index also encodes the original order, so I suppose it's something more than a surrogate index.

But anyway, the implementation (global_index is what I'm calling parentIdx now):

import awkward as ak
import numpy as np

pt = ak.Array([0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6])
global_index = ak.Array([None, None, 0, 0, 1, 2, 3])

@ak.mixin_class(ak.behavior)
class GenPart:
    @property
    def parent(self):
        surrogate_index = ak.to_numpy(self["surrogate_index"])
        inverse_map = np.full(np.max(surrogate_index) + 1, -1, dtype=np.int64)
        inverse_map[surrogate_index] = np.arange(len(surrogate_index))
        slicer = ak.Array(
            ak.contents.ByteMaskedArray(
                ak.index.Index8(inverse_map >= 0),
                ak.contents.NumpyArray(inverse_map),
                valid_when=True,
            )
        )[global_index]
        return self[slicer]

genpart = ak.zip(
    {
        "pt": pt,
        "global_index": global_index,
        "surrogate_index": np.arange(len(global_index)),
    },
    with_name="GenPart",
)

Starting with our original genpart,

>>> genpart.show()
[{pt: 0, global_index: None, surrogate_index: 0},
 {pt: 1.1, global_index: None, surrogate_index: 1},
 {pt: 2.2, global_index: 0, surrogate_index: 2},
 {pt: 3.3, global_index: 0, surrogate_index: 3},
 {pt: 4.4, global_index: 1, surrogate_index: 4},
 {pt: 5.5, global_index: 2, surrogate_index: 5},
 {pt: 6.6, global_index: 3, surrogate_index: 6}]

it computes the parents correctly:

>>> genpart.parent.show()
[None,
 None,
 {pt: 0, global_index: None, surrogate_index: 0},
 {pt: 0, global_index: None, surrogate_index: 0},
 {pt: 1.1, global_index: None, surrogate_index: 1},
 {pt: 2.2, global_index: 0, surrogate_index: 2},
 {pt: 3.3, global_index: 0, surrogate_index: 3}]

But then, suppose we had cut out the genparticle at (original, global) index 2.

>>> selected_genpart = genpart[[True, True, False, True, True, True, True]]
>>> selected_genpart.show()
[{pt: 0, global_index: None, surrogate_index: 0},
 {pt: 1.1, global_index: None, surrogate_index: 1},
 {pt: 3.3, global_index: 0, surrogate_index: 3},
 {pt: 4.4, global_index: 1, surrogate_index: 4},
 {pt: 5.5, global_index: 2, surrogate_index: 5},
 {pt: 6.6, global_index: 3, surrogate_index: 6}]

Now even though all indexes above 2 have shifted, they still go to the right places because inverse_map models the original table, and it points to where the items have gone, after any filtering, duplication, or rearrangement (i.e. after the genpart has been sliced by any integer index, which is more general than just boolean filtering).

>>> selected_genpart.parent.show()
[None,
 None,
 {pt: 0, global_index: None, surrogate_index: 0},
 {pt: 0, global_index: None, surrogate_index: 0},
 {pt: 1.1, global_index: None, surrogate_index: 1},
 None,
 {pt: 3.3, global_index: 0, surrogate_index: 3}]

Since the particle at original index 2 is now missing, parent returns None for that particle because we don't save the original particles anymore. If you're filtering for particles of interest, you can't filter out the parents of particles of interest and expect to still be able to find them.

This is robust because we're always making an inverse_map, which models the original table such that the global_index (i.e. parentIdx) is always applicable to that table. We don't know or even care how many times the set has been sliced, and that continues to be true through serialization and deserialization.

Oh, and also the particle attributes don't have to be modified. Maybe you want to hide such details as surrogate_index.

@jpivarski
Copy link
Member

Actually, the size of the inverse_map would have to be

max(np.max(global_index), np.max(surrogate_index)) + 1

instead of just

np.max(surrogate_index) + 1

In principle, it should be the size of the original table, but after slicing we might not know the size of the original table. For the technique to work, it only needs to be larger than the arrays that are going to index it, which is not just surrogate_index, but also global_index.

Setting the size of an array by a value derived from another array (the np.max) might be a problem for a Dask process. In that case, stash the original table size (a single integer) in a parameter (which will ensure that it gets serialized, and the technique still works across serialization and Dask).

There might also be a problem with the mutation in place:

inverse_map[surrogate_index] = np.arange(len(surrogate_index))

which is why I switched to NumPy. (Is it a problem in dask-awkward to switch between Awkward and NumPy? Is dask-awkward well connected to dask.array? Does dask.array follow in-place mutations?) There's probably another way of doing this, too.

@lgray
Copy link
Collaborator Author

lgray commented Jun 1, 2023

How can we hide surrogate_index from the user? That seems to have fallen out of my brain.

@lgray
Copy link
Collaborator Author

lgray commented Jun 1, 2023

@jpivarski It's OK to use numpy so long as we hide it behind a if ak.backend(self) == "typetracer" or empty_if_typetracerand then for the latter catch that and make length_zero_arrays.

The ending type is still easily determined.

@jpivarski
Copy link
Member

How can we hide surrogate_index from the user? That seems to have fallen out of my brain.

Only by prepending it with an underscore and saying that Python rules apply here. There isn't a way to make it visible to a library like Coffea and also not visible to users of that library.

Or maybe it could be given a more meaningful name ("position in uncut genparticles") and not hidden. Someone may be able to use it.

@nsmith-
Copy link
Member

nsmith- commented Jun 2, 2023

Since the particle at original index 2 is now missing, parent returns None for that particle because we don't save the original particles anymore. If you're filtering for particles of interest, you can't filter out the parents of particles of interest and expect to still be able to find them.

I think having the parent even if you filtered out is a hard requirement here.

@lgray lgray force-pushed the fix_dead_weakref branch from 1a6305e to 2ca83a9 Compare June 2, 2023 15:24
@lgray lgray changed the title fix: Use finalizer to keep events alive for duration of user session fix: serialize dask_awkward.Array to None and store hard ref in behavior Jun 2, 2023
@lgray
Copy link
Collaborator Author

lgray commented Jun 2, 2023

@jrueb ok a much more reasonable fix is now in place (and then an even more reasonable fix than that to come).

Please give it a try.

@lgray lgray changed the title fix: serialize dask_awkward.Array to None and store hard ref in behavior fix: serialize dask_awkward.Array to None and store hard ref in closure in behavior Jun 2, 2023
@lgray lgray force-pushed the fix_dead_weakref branch from 2ca83a9 to 0eff911 Compare June 2, 2023 16:04
@lgray
Copy link
Collaborator Author

lgray commented Jun 2, 2023

I think I have a possibly better solution for this. If I store (via some indirection) the original HLG layer that got generated I can always reconstruct the dask array.

@lgray lgray force-pushed the fix_dead_weakref branch 2 times, most recently from 6f40692 to 1c7b204 Compare June 5, 2023 22:43
@lgray
Copy link
Collaborator Author

lgray commented Jun 5, 2023

@jrueb @nsmith- I think this is acceptable for now until transient_attrs is available in awkward. Your opinions?

@lgray lgray force-pushed the fix_dead_weakref branch from 1c7b204 to aa9bc3f Compare June 6, 2023 04:24
@lgray
Copy link
Collaborator Author

lgray commented Jun 6, 2023

Merging this end of today FNAL time if there are no further issues.

@nsmith-
Copy link
Member

nsmith- commented Jun 6, 2023

Is the change to lookup_tools directly related to the NanoEvents cycles? It wasn't clear to me how they are related

@lgray
Copy link
Collaborator Author

lgray commented Jun 6, 2023

If we are not defaulting weakref to serialize to None then I have to control the lifecycle of the weakref manually. They're actually related! (otherwise using dask distributed client fails, for instance, since it tries to pickle a weakref)

@jrueb
Copy link
Contributor

jrueb commented Jun 6, 2023

My issue indeed seems to be solved with this change.

@lgray lgray merged commit f8a4eb9 into master Jun 6, 2023
@lgray lgray deleted the fix_dead_weakref branch June 26, 2023 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dask Nanoevents attributes rely on possibly dead weakref of original array
5 participants