stormbrigade_sheriff/sbsheriff/Lib/site-packages/greenlet/tests/test_leaks.py

179 lines
6.5 KiB
Python

import unittest
import sys
import gc
import time
import weakref
import threading
import greenlet
class TestLeaks(unittest.TestCase):
def test_arg_refs(self):
args = ('a', 'b', 'c')
refcount_before = sys.getrefcount(args)
# pylint:disable=unnecessary-lambda
g = greenlet.greenlet(
lambda *args: greenlet.getcurrent().parent.switch(*args))
for _ in range(100):
g.switch(*args)
self.assertEqual(sys.getrefcount(args), refcount_before)
def test_kwarg_refs(self):
kwargs = {}
# pylint:disable=unnecessary-lambda
g = greenlet.greenlet(
lambda **kwargs: greenlet.getcurrent().parent.switch(**kwargs))
for _ in range(100):
g.switch(**kwargs)
self.assertEqual(sys.getrefcount(kwargs), 2)
assert greenlet.GREENLET_USE_GC # Option to disable this was removed in 1.0
def recycle_threads(self):
# By introducing a thread that does sleep we allow other threads,
# that have triggered their __block condition, but did not have a
# chance to deallocate their thread state yet, to finally do so.
# The way it works is by requiring a GIL switch (different thread),
# which does a GIL release (sleep), which might do a GIL switch
# to finished threads and allow them to clean up.
def worker():
time.sleep(0.001)
t = threading.Thread(target=worker)
t.start()
time.sleep(0.001)
t.join()
def test_threaded_leak(self):
gg = []
def worker():
# only main greenlet present
gg.append(weakref.ref(greenlet.getcurrent()))
for _ in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertIsNone(g())
def test_threaded_adv_leak(self):
gg = []
def worker():
# main and additional *finished* greenlets
ll = greenlet.getcurrent().ll = []
def additional():
ll.append(greenlet.getcurrent())
for _ in range(2):
greenlet.greenlet(additional).switch()
gg.append(weakref.ref(greenlet.getcurrent()))
for _ in range(2):
t = threading.Thread(target=worker)
t.start()
t.join()
del t
greenlet.getcurrent() # update ts_current
self.recycle_threads()
greenlet.getcurrent() # update ts_current
gc.collect()
greenlet.getcurrent() # update ts_current
for g in gg:
self.assertIsNone(g())
def test_issue251_killing_cross_thread_leaks_list(self, manually_collect_background=True):
# See https://github.com/python-greenlet/greenlet/issues/251
# Killing a greenlet (probably not the main one)
# in one thread from another thread would
# result in leaking a list (the ts_delkey list).
# For the test to be valid, even empty lists have to be tracked by the
# GC
assert gc.is_tracked([])
def count_objects(kind=list):
# pylint:disable=unidiomatic-typecheck
# Collect the garbage.
for _ in range(3):
gc.collect()
gc.collect()
return sum(
1
for x in gc.get_objects()
if type(x) is kind
)
# XXX: The main greenlet of a dead thread is only released
# when one of the proper greenlet APIs is used from a different
# running thread. See #252 (https://github.com/python-greenlet/greenlet/issues/252)
greenlet.getcurrent()
greenlets_before = count_objects(greenlet.greenlet)
background_glet_running = threading.Event()
background_glet_killed = threading.Event()
background_greenlets = []
def background_greenlet():
# Throw control back to the main greenlet.
greenlet.getcurrent().parent.switch()
def background_thread():
glet = greenlet.greenlet(background_greenlet)
background_greenlets.append(glet)
glet.switch() # Be sure it's active.
# Control is ours again.
del glet # Delete one reference from the thread it runs in.
background_glet_running.set()
background_glet_killed.wait()
# To trigger the background collection of the dead
# greenlet, thus clearing out the contents of the list, we
# need to run some APIs. See issue 252.
if manually_collect_background:
greenlet.getcurrent()
t = threading.Thread(target=background_thread)
t.start()
background_glet_running.wait()
lists_before = count_objects()
assert len(background_greenlets) == 1
self.assertFalse(background_greenlets[0].dead)
# Delete the last reference to the background greenlet
# from a different thread. This puts it in the background thread's
# ts_delkey list.
del background_greenlets[:]
background_glet_killed.set()
# Now wait for the background thread to die.
t.join(10)
del t
# Free the background main greenlet by forcing greenlet to notice a difference.
greenlet.getcurrent()
greenlets_after = count_objects(greenlet.greenlet)
lists_after = count_objects()
# On 2.7, we observe that lists_after is smaller than
# lists_before. No idea what lists got cleaned up. All the
# Python 3 versions match exactly.
self.assertLessEqual(lists_after, lists_before)
self.assertEqual(greenlets_before, greenlets_after)
@unittest.expectedFailure
def test_issue251_issue252_need_to_collect_in_background(self):
# This still fails because the leak of the list
# still exists when we don't call a greenlet API before exiting the
# thread. The proximate cause is that neither of the two greenlets
# from the background thread are actually being destroyed, even though
# the GC is in fact visiting both objects.
# It's not clear where that leak is? For some reason the thread-local dict
# holding it isn't being cleaned up.
self.test_issue251_killing_cross_thread_leaks_list(manually_collect_background=False)