Skip to content

Commit

Permalink
Stackless issue python#222: add a test case
Browse files Browse the repository at this point in the history
Add a test case for a main tasklet, that terminates with
serial_last_jump != initial_stub.serial.
(cherry picked from commit a0f56e7)
  • Loading branch information
Anselm Kruis committed Jul 1, 2019
1 parent 6ab2e5b commit 95b4c30
Showing 1 changed file with 86 additions and 1 deletion.
87 changes: 86 additions & 1 deletion Stackless/unittests/test_outside.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
from __future__ import absolute_import

import unittest
import threading
import stackless
from stackless import test_cframe_nr, test_outside
from stackless import tasklet, channel, run
from _teststackless import test_cframe, test_cstate

from support import test_main # @UnusedImport
from support import StacklessTestCase
from support import StacklessTestCase, withThreads, get_serial_last_jump


def current_initial_stub(threadid=-1):
"""Get the initial stub of the given thread.
"""
# The second argument of get_thread_info() is intentionally undocumented.
# See C source.
return stackless.get_thread_info(threadid, 1 << 30)[5]


MAIN_INITIAL_STUB = current_initial_stub()


class TestOutside(StacklessTestCase):
Expand Down Expand Up @@ -70,6 +84,77 @@ def test_outside5(self):
tasklet(test_cframe_nr)(100)
test_outside()

@unittest.skipUnless(withThreads, "requires thread support")
def test_main_exit_with_serial_mismatch(self):
# Test the exit of a main-tasklet with a current C-stack serial number
# unequal to the serial number of the initial stub.
# The purpose of this test is to exercise a special code path in
# slp_tasklet_end(PyObject *retval), scheduling.c:1444

# Note: this test only works in thread for two reasons:
# - The main tasklet terminates
# - The sequence of operations used to change the serial of the
# main tasklet depends on the fact, that the thread state is
# not the initial thread state of the interpreter.

# sanity checks
self.assertEqual(get_serial_last_jump(), MAIN_INITIAL_STUB.serial)
self.result = None

def run():
try:
# Test preconditions: current is main at level 0
if stackless.enable_softswitch(None):
self.assertEqual(stackless.current.nesting_level, 0)
self.assertIs(stackless.current, stackless.main)
thread_initial_stub = current_initial_stub()
# sanity check
self.assertEqual(get_serial_last_jump(), thread_initial_stub.serial)

# a tasklet with a different C-stack
# test_cstate forces a hard switch from schedule_remove
t = tasklet(test_cstate)(stackless.schedule_remove)
t_cstate = t.cstate

# now t has a cstate, that belongs to thread_initial_stub
# check it
self.assertIsNot(t_cstate, thread_initial_stub)
self.assertEqual(t_cstate.serial, thread_initial_stub.serial)

# Run t from a new entry point.
# Hard switch to t,
# run t and
# hard switch back to the main tasklet
test_outside() # run scheduled tasklets
self.assertEqual(t.nesting_level, 1) # It was a hard switch back to main

# t has now it's own stack created from a different entry point
self.assertIsNot(t.cstate, t_cstate)
self.assertNotEqual(t.cstate.serial, thread_initial_stub.serial)
t_serial = t.cstate.serial

# soft-to-hard switch to t, finish t and soft-switch back
t.run()

self.assertEqual(t.nesting_level, 0) # Soft switching was possible
if stackless.enable_softswitch(None):
# Final test: the current serial has changed.
self.assertNotEqual(get_serial_last_jump(), thread_initial_stub.serial)
self.assertEqual(get_serial_last_jump(), t_serial)
else:
self.assertEqual(get_serial_last_jump(), thread_initial_stub.serial)
except Exception as e:
self.result = e
else:
self.result = False

thread = threading.Thread(target=run, name=str(self.id()))
thread.start()
thread.join()
if self.result:
raise self.result
self.assertIs(self.result, False)


class TestCframe(StacklessTestCase):
n = 100
Expand Down

0 comments on commit 95b4c30

Please sign in to comment.