Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.12] gh-120868: Fix breaking change in logging.config when using QueueHandler (GH-120872) #121077

Merged
merged 2 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions Lib/logging/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -787,25 +787,44 @@ def configure_handler(self, config):
# if 'handlers' not in config:
# raise ValueError('No handlers specified for a QueueHandler')
if 'queue' in config:
from multiprocessing.queues import Queue as MPQueue
from multiprocessing import Manager as MM
proxy_queue = MM().Queue()
proxy_joinable_queue = MM().JoinableQueue()
qspec = config['queue']
if not isinstance(qspec, (queue.Queue, MPQueue,
type(proxy_queue), type(proxy_joinable_queue))):
if isinstance(qspec, str):
q = self.resolve(qspec)
if not callable(q):
raise TypeError('Invalid queue specifier %r' % qspec)
q = q()
elif isinstance(qspec, dict):
if '()' not in qspec:
raise TypeError('Invalid queue specifier %r' % qspec)
q = self.configure_custom(dict(qspec))
else:

if isinstance(qspec, str):
q = self.resolve(qspec)
if not callable(q):
raise TypeError('Invalid queue specifier %r' % qspec)
config['queue'] = q
config['queue'] = q()
elif isinstance(qspec, dict):
if '()' not in qspec:
raise TypeError('Invalid queue specifier %r' % qspec)
config['queue'] = self.configure_custom(dict(qspec))
else:
from multiprocessing.queues import Queue as MPQueue

if not isinstance(qspec, (queue.Queue, MPQueue)):
# Safely check if 'qspec' is an instance of Manager.Queue
# / Manager.JoinableQueue

from multiprocessing import Manager as MM
from multiprocessing.managers import BaseProxy

# if it's not an instance of BaseProxy, it also can't be
# an instance of Manager.Queue / Manager.JoinableQueue
if isinstance(qspec, BaseProxy):
# Sometimes manager or queue creation might fail
# (e.g. see issue gh-120868). In that case, any
# exception during the creation of these queues will
# propagate up to the caller and be wrapped in a
# `ValueError`, whose cause will indicate the details of
# the failure.
mm = MM()
proxy_queue = mm.Queue()
proxy_joinable_queue = mm.JoinableQueue()
if not isinstance(qspec, (type(proxy_queue), type(proxy_joinable_queue))):
raise TypeError('Invalid queue specifier %r' % qspec)
else:
raise TypeError('Invalid queue specifier %r' % qspec)

if 'listener' in config:
lspec = config['listener']
if isinstance(lspec, type):
Expand Down
45 changes: 45 additions & 0 deletions Lib/test/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import weakref

from http.server import HTTPServer, BaseHTTPRequestHandler
from unittest.mock import patch
from urllib.parse import urlparse, parse_qs
from socketserver import (ThreadingUDPServer, DatagramRequestHandler,
ThreadingTCPServer, StreamRequestHandler)
Expand Down Expand Up @@ -3895,6 +3896,50 @@ def test_config_queue_handler(self):
msg = str(ctx.exception)
self.assertEqual(msg, "Unable to configure handler 'ah'")

@threading_helper.requires_working_threading()
@support.requires_subprocess()
@patch("multiprocessing.Manager")
def test_config_queue_handler_does_not_create_multiprocessing_manager(self, manager):
# gh-120868

from multiprocessing import Queue as MQ

q1 = {"()": "queue.Queue", "maxsize": -1}
q2 = MQ()
q3 = queue.Queue()

for qspec in (q1, q2, q3):
self.apply_config(
{
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": qspec,
},
},
}
)
manager.assert_not_called()

@patch("multiprocessing.Manager")
def test_config_queue_handler_invalid_config_does_not_create_multiprocessing_manager(self, manager):
# gh-120868

with self.assertRaises(ValueError):
self.apply_config(
{
"version": 1,
"handlers": {
"queue_listener": {
"class": "logging.handlers.QueueHandler",
"queue": object(),
},
},
}
)
manager.assert_not_called()

@support.requires_subprocess()
def test_multiprocessing_queues(self):
# See gh-119819
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,7 @@ Hrvoje Nikšić
Gregory Nofi
Jesse Noller
Bill Noon
Janek Nouvertné
Stefan Norberg
Tim Northover
Joe Norton
Expand Down
Loading