-
Notifications
You must be signed in to change notification settings - Fork 14.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed hanged KubernetesPodOperator #28336
Conversation
09c496a
to
da0d40a
Compare
da0d40a
to
6c19712
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: datetime
62c82b4
to
b368ce9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Look nice for me. However better that also someone else look on this changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM too. @dstandish @jedcunningham - maybe you can take a look too?
45093c9
to
b56e4a8
Compare
b56e4a8
to
9370669
Compare
64b97ef
to
31ffba3
Compare
f4b28d7
to
f1b17dd
Compare
a81264f
to
51114aa
Compare
@dimberman , @dstandish , @jedcunningham, hi. |
51114aa
to
c9f0629
Compare
Needs re-review @dimberman @dstandish I guess. |
Hi, I dismissed my old review, so it's not blocking. I do have a suggestion though I'm sorry if it's a bit late in the game. And maybe it doesn't have to be done in this PR. But so the thing that stuck out to me when looking at this is, we do a kube api call (in logs_available) every chunk in the log stream. This seems like it could result in a lot of calls and depending on how many such processes on the cluster could cause problems. Just a hunch I guess. But so it would seem to me that to avoid this, perhaps you could run the |
OK I just experimented with our own "event scheduler" helper and it seems that we could use it to limit calls without managing threads. Here's a code sample: import time
from airflow.utils.event_scheduler import EventScheduler
class Tracker:
stop = False
counter = 0
def hello(tracker: Tracker):
tracker.counter += 1
print("hi! %s" % tracker.counter)
if tracker.counter > 10:
tracker.stop = True
e = EventScheduler()
tracker = Tracker()
e.call_regular_interval(2, hello, (tracker,))
while True:
e.run(blocking=False)
time.sleep(0.5)
if tracker.stop is True:
break I believe what this does is, keep track of how long it's been since |
LGTM Let's merge when tests pass |
fe14347
to
6007df5
Compare
@dstandish , thank you for proposing a nicer approach. |
Ah ok thanks 👍 |
Fixes: #23497