365bet体育在线世界杯-365bet大陆-365手机安全卫士下载

— 怀旧经典 · 永恒记忆 —

python中sys _current_frames函数用法示例代码

python中sys _current_frames函数用法示例代码

本文搜集整理了关于python中sys _current_frames方法/函数的使用示例。

Namespace/Package: sys

Method/Function: _current_frames

导入包: sys

每个示例代码都附有代码来源和完整的源代码,希望对您的程序开发有帮助。

示例1

def wrapped(*args, **kwargs):

old_threads = set(sys._current_frames().keys())

res = func(*args, **kwargs)

new_threads = set(sys._current_frames().keys())

new_threads -= old_threads

global_exclude_thread_ids.update(new_threads)

return res

浏览完整代码 来源:Debug.py 项目:chagge/returnn

示例2

def metrics(self):

metrics = {}

# get total threads

metrics['totalThreads'] = len(sys._current_frames().keys())

# get free threads

freeThreads = 0

for frame in sys._current_frames().values():

_self = frame.f_locals.get('self')

if getattr(_self, '__module__', None) == ZRendevous.__module__:

freeThreads += 1

metrics['freeThreads'] = freeThreads

try:

metrics['activeSessions'] = len(self.context.unrestrictedTraverse('/temp_folder/session_data'))

except Exception:

metrics['activeSessions'] = -1

global _REQUEST_TOTAL, _REQUEST_COUNT, _REQUEST_TIME

metrics["requestTotal"] = _REQUEST_TOTAL

metrics["request1m"] = max(_REQUEST_COUNT.query(60), 1)

metrics["requestTimeAvg1m"] = _REQUEST_TIME.query(60) / float(metrics["request1m"])

for key, value in self._getVmStats():

metrics[key] = value

return metrics

浏览完整代码 来源:stats.py 项目:SteelHouseLabs/zenoss-prodbin

示例3

def do_frame(self, arg):

"""frame [thread-Name|thread-number] [frame-number]

Move the current frame to the specified frame number. If a

Thread-Name is given, move the current frame to that. Dot (.) can be used

to indicate the name of the current frame. A thread number can be used

in Python 2.5 or greater.

0 is the most recent frame. A negative number indicates position from

the other end. So 'frame -1' moves when gdb dialect is in

effect moves to the oldest frame, and 'frame 0' moves to the

newest frame."""

args = arg.split()

if len(args) > 0:

thread_name = args[0]

try:

thread_id = int(thread_name)

if hasattr(sys, '_current_frames'):

threads = sys._current_frames()

if thread_id not in threads.keys():

self.errmsg("I don't know about thread number %s" %

thread_name)

self.info_thread_terse()

return

frame = threads[thread_id]

newframe = find_nondebug_frame(self, frame)

if newframe is not None: frame = newframe

self.stack, self.curindex = self.get_stack(frame, None)

if len(args) == 1:

arg = '0'

else:

arg = ' '.join(args[1:])

except ValueError:

# Must be frame command without a frame number

if thread_name == '.':

thread_name = threading.currentThread().getName()

if thread_name not in self.traced.keys():

self.errmsg("I don't know about thread %s" % thread_name)

return

thread_id = self.traced[thread_name]

if hasattr(sys, '_current_frames'):

frames = sys._current_frames()

if thread_id in frames.keys():

self.curframe_thread_name = thread_name

frame = frames[thread_id]

else:

self.errmsg("Frame selection not supported. Upgrade to")

self.errmsg("Python 2.5 or install threadframe.")

return

newframe = find_nondebug_frame(self, frame)

if newframe is not None: frame = newframe

self.stack, self.curindex = self.get_stack(frame, None)

if len(args) == 1:

arg = '0'

else:

arg = ' '.join(args[1:])

self.thread_name = threading.currentThread().getName()

pydb.Pdb.do_frame(self, arg)

浏览完整代码 来源:threaddbg.py 项目:gsliu/emacs_C

示例4

def streaming(self):

next_call = time.time()

while not self.stopped.wait(next_call - time.time()): #timer compensate

self.x=self.x+1

self.s.send(self.input_update)

print "have sent update command to streamer.."

# print "recv data1 from streamer.."

resp1 = self.s.recv(1024*10)

# print "received simu's data1:",resp1

print "received data from streamer!!"

xdata["1"].append(self.x)

xdata["2"].append(self.x)

# print resp1,"dd"

resp_A=resp1.split('

')[0]

resp_B=resp1.split('

')[1]

rexp = re.compile(r'[^d.,]+')

data1=map(float,rexp.sub('', resp_A).split(','))[1]

data2=map(float,rexp.sub('', resp_B).split(','))[1]

ydata["1"].append(data1)

ydata["2"].append(data2)

thread2.on_running(xdata, ydata)

print sys._current_frames()

next_call = next_call+1 #timer=1s

浏览完整代码 来源:socket_Q_server.py 项目:liangcai2012/savant-torrey-ranch

示例5

def test_current_frames(self):

import sys

import time

import thread

# XXX workaround for now: to prevent deadlocks, call

# sys._current_frames() once before starting threads.

# This is an issue in non-translated versions only.

sys._current_frames()

thread_id = thread.get_ident()

def other_thread():

print "thread started"

lock2.release()

lock1.acquire()

lock1 = thread.allocate_lock()

lock2 = thread.allocate_lock()

lock1.acquire()

lock2.acquire()

thread.start_new_thread(other_thread, ())

def f():

lock2.acquire()

return sys._current_frames()

frames = f()

lock1.release()

thisframe = frames.pop(thread_id)

assert thisframe.f_code.co_name in ('f', '?')

assert len(frames) == 1

_, other_frame = frames.popitem()

assert other_frame.f_code.co_name in ('other_thread', '?')

浏览完整代码 来源:test_sysmodule.py 项目:yuyichao/pypy

示例6

def __dumpstacks(self, context=1, sighandler_deep=2):

""" Signal handler: dump a stack trace for each existing thread."""

currentThreadId = threading.currentThread().ident

def unique_count(l):

d = collections.defaultdict(lambda: 0)

for v in l:

d[tuple(v)] += 1

return list((k, v) for k, v in d.items())

stack_displays = []

for threadId, stack in sys._current_frames().items():

stack_display = []

for filename, lineno, name, line in traceback.extract_stack(stack):

stack_display.append(' File: "%s", line %d, in %s'

% (filename, lineno, name))

if line:

stack_display.append(" %s" % (line.strip()))

if currentThreadId == threadId:

stack_display = stack_display[:- (sighandler_deep * 2)]

stack_display.append(' => Stopped to handle current signal. ')

stack_displays.append(stack_display)

stacks = unique_count(stack_displays)

self.ui.debug('thread', "** Thread List:

")

for stack, times in stacks:

if times == 1:

msg = "%s Thread is at:

%s

"

else:

msg = "%s Threads are at:

%s

"

self.ui.debug('thread', msg % (times, '

'.join(stack[- (context * 2):])))

self.ui.debug('thread', "Dumped a total of %d Threads." %

len(sys._current_frames().keys()))

浏览完整代码 来源:init.py 项目:ordnungswidrig/offlineimap

示例7

def adminInfo(handler):

handler.title('Information')

requirePriv(handler, 'Admin')

print "

"

print "

Uptime

"

loadTime = getLoadtime()

print "Started %s
" % loadTime

print "Up for %s
" % timesince(loadTime)

print "Total requests: %d
" % server().getTotalRequests()

print "

"

print Button('Restart', type = 'submit').negative()

print "

"

print "

Threads

"

print "

"

print "

"

for thread in sorted(threads(), key = lambda thread: thread.name):

print "

" % ('yes' if thread.isAlive() else 'no', 'yes' if thread.daemon else 'no')

print "

IDNameAliveDaemon
%s" % ('None' if thread.ident is None else "%x" % abs(thread.ident))

print thread.name

print "
"

try:

print CollapsibleBox('Traceback', formatTrace(traceback.extract_stack(sys._current_frames()[thread.ident])))

except Exception:

pass

print "

  
"

print "

Locks

"

print "

"

print "

"

for (name, lock) in sorted(locks.iteritems()):

print "

" % ('yes' if avail else 'no', ' ' if avail else (lock.owner or '???'), 'yes' if lock.reentrant() else 'no')

浏览完整代码 来源:admin.py 项目:mrozekma/Sprint

示例8

def getCallingModuleName():

import sys

if sys.version_info[0] == 3:

f = list(sys._current_frames().values())[0]

else:

f = sys._current_frames().values()[0]

f = f.f_back

return f.f_back.f_globals['__name__']

浏览完整代码 来源:tulipqtplugins.py 项目:kdbanman/browseRDF

示例9

def synthesize_thread_stacks():

threads = dict([(th.ident, th) for th in threading.enumerate()])

ostr = StringIO()

if len(sys._current_frames()) > 1 or (

sys._current_frames().values()[0] != inspect.currentframe()):

# Multi-threaded

ostr.write('

All threads:

')

for thread_id, stack in sys._current_frames().items():

AppExceptionHandler.print_stack(thread_id, threads[thread_id], stack, ostr, indent=2)

return ostr.getvalue()

浏览完整代码 来源:exception_handler.py 项目:billwei/commons

示例10

def info_thread_missing(self):

"""Show information about threads we might not know about"""

if hasattr(sys, "_current_frames") and

len(self.traced) != len(sys._current_frames()):

frames = sys._current_frames()

thread_ids = frames.keys()

self.msg("Untraced/unknown threads:")

for thread_id in thread_ids:

if thread_id not in self.traced.values():

self.msg(" %d" % thread_id)

return

浏览完整代码 来源:threaddbg.py 项目:gsliu/emacs_C

示例11

def test_top_frame(self):

self.assertEqual(1, len(sys._current_frames()))

frame = sys._current_frames().items()[0][1]

# frame filename and name

self.assertEqual((frame.f_code.co_filename, frame.f_code.co_name),

_snakemeter.get_top_frame()[:2])

# line number of current frame

self.assertEqual(sys._current_frames().items()[0][1].f_lineno, _snakemeter.get_top_frame()[2])

浏览完整代码 来源:test_native.py 项目:gchp/snakemeter

示例12

def synthesize_thread_stacks():

threads = dict([(th.ident, th) for th in threading.enumerate()])

ostr = Compatibility.StringIO()

# _current_frames not yet implemented on pypy and not guaranteed anywhere but

# cpython in practice.

if hasattr(sys, '_current_frames') and (len(sys._current_frames()) > 1 or

sys._current_frames().values()[0] != inspect.currentframe()):

# Multi-threaded

ostr.write('

All threads:

')

for thread_id, stack in sys._current_frames().items():

BasicExceptionHandler.print_stack(thread_id, threads[thread_id], stack, ostr, indent=2)

return ostr.getvalue()

浏览完整代码 来源:__init__.py 项目:BabyDuncan/commons

示例13

def info_thread_missing(obj):

"""Show information about threads we might not know about"""

if not hasattr(obj, "traced"): return

if (hasattr(sys, "_current_frames") and

len(obj.traced) != len(sys._current_frames())):

frames = sys._current_frames()

thread_ids = frames.keys()

obj.msg("Untraced/unknown threads:")

for thread_id in thread_ids:

if thread_id not in obj.traced.values():

obj.msg(" %d" % thread_id)

return

浏览完整代码 来源:threadinfo.py 项目:carlgao/lenga

示例14

def dumpThread(threadId):

import sys

if not hasattr(sys, "_current_frames"):

print "Warning: dumpThread: no sys._current_frames"

return

if threadId not in sys._current_frames():

print("Thread %d not found" % threadId)

return

stack = sys._current_frames()[threadId]

better_exchook.print_traceback(stack)

浏览完整代码 来源:utils.py 项目:BMXE/music-player

示例15

def dumpThread(threadId):

import threading, sys, traceback

if threadId not in sys._current_frames():

print "Thread", threadId, "not found"

return

code = []

stack = sys._current_frames()[threadId]

for filename, lineno, name, line in traceback.extract_stack(stack):

code.append('File: "%s", line %d, in %s' % (filename, lineno, name))

if line:

code.append(" %s" % (line.strip()))

print "

".join(code)

浏览完整代码 来源:utils.py 项目:gijs/music-player

示例16

def getCallingModuleName():

import sys

if sys.version_info[0] == 3:

frames = list(sys._current_frames().values())

else:

frames = sys._current_frames().values()

for i in range(len(frames)):

f = frames[i]

if f.f_globals['__name__'] == "tulipplugins":

f = f.f_back

break

while f.f_globals['__name__'] == "tulipplugins":

f = f.f_back

return f.f_globals['__name__']

浏览完整代码 来源:tulipplugins.py 项目:tulip5/tulip

示例17

def wrapped(*args, **kwargs):

"""

:param args:

:param kwargs:

:return:

"""

# noinspection PyProtectedMember

old_threads = set(sys._current_frames().keys())

res = func(*args, **kwargs)

# noinspection PyProtectedMember

new_threads = set(sys._current_frames().keys())

new_threads -= old_threads

global_exclude_thread_ids.update(new_threads)

return res

浏览完整代码 来源:Debug.py 项目:rwth-i6/returnn

示例18

def debug_dump_threads():

print("

===== %s dump thread stack frames. %i threads. conductor lock = %s:

" % (

format_unixts(time.time()),

len(sys._current_frames()),

the_conductor.lock), file=sys.stderr)

idx=0

for thread_id, frame in sys._current_frames().items():

print("===== thread #%i [%#x] refcount = %s" % (idx, thread_id, sys.getrefcount(frame)), file=sys.stderr)

if thread_id != _debug_thread_id:

traceback.print_stack(frame, file = sys.stderr)

else:

print(" debug thread, skipping", file=sys.stderr)

idx += 1

print("=====

", file=sys.stderr)

浏览完整代码 来源:conductor.py 项目:mickours/execo

示例19

def save_transaction(self, transaction):

"""Saves the specified transaction away under the thread ID of

the current executing thread. Will also cache a reference to the

greenlet if using coroutines. This is so we can later determine

the stack trace for a transaction when using greenlets.

"""

thread_id = transaction.thread_id

if thread_id in self._cache:

raise RuntimeError('transaction already active')

self._cache[thread_id] = transaction

# We judge whether we are actually running in a coroutine by

# seeing if the current thread ID is actually listed in the set

# of all current frames for executing threads. If we are

# executing within a greenlet, then thread.get_ident() will

# return the greenlet identifier. This will not be a key in

# dictionary of all current frames because that will still be

# the original standard thread which all greenlets are running

# within.

transaction._greenlet = None

if hasattr(sys, '_current_frames'):

if thread_id not in sys._current_frames():

greenlet = sys.modules.get('greenlet')

if greenlet:

transaction._greenlet = weakref.ref(greenlet.getcurrent())

浏览完整代码 来源:transaction_cache.py 项目:Mause/table_select_web

示例20

def _find_thread_stack(id):

"""Returns a stack object that can be used to dump a stack trace for

the given thread id (or None if the id is not found)."""

for thread_id, stack in sys._current_frames().items():

if thread_id == id:

return stack

return None

浏览完整代码 来源:__init__.py 项目:ackdesha/celery

示例21

def cry(): # pragma: no cover

"""Return stacktrace of all active threads.

From https://gist.github.com/737056

"""

tmap = {}

main_thread = None

# get a map of threads by their ID so we can print their names

# during the traceback dump

for t in threading.enumerate():

if getattr(t, "ident", None):

tmap[t.ident] = t

else:

main_thread = t

out = StringIO()

sep = "=" * 49 + "

"

for tid, frame in sys._current_frames().iteritems():

thread = tmap.get(tid, main_thread)

out.write("%s

" % (thread.getName(),))

out.write(sep)

traceback.print_stack(frame, file=out)

out.write(sep)

out.write("LOCAL VARIABLES

")

out.write(sep)

pprint(frame.f_locals, stream=out)

out.write("

")

return out.getvalue()

浏览完整代码 来源:__init__.py 项目:ackdesha/celery

示例22

def current_frames_without_threads(self):

# Not much happens here: there is only one thread, with artificial

# "thread id" 0.

d = sys._current_frames()

self.assertEqual(len(d), 1)

self.assertIn(0, d)

self.assertTrue(d[0] is sys._getframe())

浏览完整代码 来源:test_sys.py 项目:plirof/minibloq_v0.83

示例23

def dumpstacks(sig=None, frame=None):

""" Signal handler: dump a stack trace for each existing thread."""

code = []

def extract_stack(stack):

for filename, lineno, name, line in traceback.extract_stack(stack):

yield 'File: "%s", line %d, in %s' % (filename, lineno, name)

if line:

yield " %s" % (line.strip(),)

# code from http://stackoverflow.com/questions/132058/getting-stack-trace-from-a-running-python-application#answer-2569696

# modified for python 2.5 compatibility

threads_info = dict([(th.ident, {'name': th.name, 'uid': getattr(th, 'uid', 'n/a')})

for th in threading.enumerate()])

for threadId, stack in sys._current_frames().items():

thread_info = threads_info.get(threadId)

code.append("

# Thread: %s (id:%s) (uid:%s)" %

(thread_info and thread_info['name'] or 'n/a',

threadId,

thread_info and thread_info['uid'] or 'n/a'))

for line in extract_stack(stack):

code.append(line)

if openerp.evented:

# code from http://stackoverflow.com/questions/12510648/in-gevent-how-can-i-dump-stack-traces-of-all-running-greenlets

import gc

from greenlet import greenlet

for ob in gc.get_objects():

if not isinstance(ob, greenlet) or not ob:

continue

code.append("

# Greenlet: %r" % (ob,))

for line in extract_stack(ob.gr_frame):

code.append(line)

_logger.info("

".join(code))

浏览完整代码 来源:misc.py 项目:bentahir/odoo-8.0

示例24

def run( self ):

self.file = open( self.fname, "a" )

print >> self.file, "Heartbeat for pid %d thread started at %s" % ( self.pid, time.asctime() )

print >> self.file

self.file_nonsleeping = open( self.fname_nonsleeping, "a" )

print >> self.file_nonsleeping, "Non-Sleeping-threads for pid %d thread started at %s" % ( self.pid, time.asctime() )

print >> self.file_nonsleeping

try:

while not self.should_stop:

# Print separator with timestamp

print >> self.file, "Traceback dump for all threads at %s:" % time.asctime()

print >> self.file

# Print the thread states

threads = get_current_thread_object_dict()

for thread_id, frame in iteritems(sys._current_frames()):

if thread_id in threads:

object = repr( threads[thread_id] )

else:

object = ""

print >> self.file, "Thread %s, %s:" % ( thread_id, object )

print >> self.file

traceback.print_stack( frame, file=self.file )

print >> self.file

print >> self.file, "End dump"

print >> self.file

self.file.flush()

self.print_nonsleeping(threads)

# Sleep for a bit

self.wait_event.wait( self.period )

finally:

print >> self.file, "Heartbeat for pid %d thread stopped at %s" % ( self.pid, time.asctime() )

print >> self.file

# Cleanup

self.file.close()

self.file_nonsleeping.close()

浏览完整代码 来源:heartbeat.py 项目:AbhishekKumarSingh/galaxy

示例25

def current_frames_with_threads(self):

import threading, thread

import traceback

# Spawn a thread that blocks at a known place. Then the main

# thread does sys._current_frames(), and verifies that the frames

# returned make sense.

entered_g = threading.Event()

leave_g = threading.Event()

thread_info = [] # the thread's id

def f123():

g456()

def g456():

thread_info.append(thread.get_ident())

entered_g.set()

leave_g.wait()

t = threading.Thread(target=f123)

t.start()

entered_g.wait()

# At this point, t has finished its entered_g.set(), although it's

# impossible to guess whether it's still on that line or has moved on

# to its leave_g.wait().

self.assertEqual(len(thread_info), 1)

thread_id = thread_info[0]

d = sys._current_frames()

main_id = thread.get_ident()

self.assertIn(main_id, d)

self.assertIn(thread_id, d)

# Verify that the captured main-thread frame is _this_ frame.

frame = d.pop(main_id)

self.assertTrue(frame is sys._getframe())

# Verify that the captured thread frame is blocked in g456, called

# from f123. This is a litte tricky, since various bits of

# threading.py are also in the thread's call stack.

frame = d.pop(thread_id)

stack = traceback.extract_stack(frame)

for i, (filename, lineno, funcname, sourceline) in enumerate(stack):

if funcname == "f123":

break

else:

self.fail("didn't find f123() on thread's call stack")

self.assertEqual(sourceline, "g456()")

# And the next record must be for g456().

filename, lineno, funcname, sourceline = stack[i + 1]

self.assertEqual(funcname, "g456")

self.assertIn(sourceline, ["leave_g.wait()", "entered_g.set()"])

# Reap the spawned thread.

leave_g.set()

t.join()

浏览完整代码 来源:test_sys.py 项目:plirof/minibloq_v0.83

示例26

def cry(): # pragma: no cover

"""Return stacktrace of all active threads.

From https://gist.github.com/737056

"""

tmap = {}

main_thread = None

# get a map of threads by their ID so we can print their names

# during the traceback dump

for t in threading.enumerate():

if getattr(t, 'ident', None):

tmap[t.ident] = t

else:

main_thread = t

out = StringIO()

P = partial(print, file=out)

sep = '=' * 49

for tid, frame in sys._current_frames().iteritems():

thread = tmap.get(tid, main_thread)

if not thread:

# skip old junk (left-overs from a fork)

continue

P('{0.name}'.format(thread))

P(sep)

traceback.print_stack(frame, file=out)

P(sep)

P('LOCAL VARIABLES')

P(sep)

pprint(frame.f_locals, stream=out)

P('

')

return out.getvalue()

浏览完整代码 来源:__init__.py 项目:KWMalik/celery

示例27

def dumpState(self):

from meh import ExceptionInfo

from meh.dump import ReverseExceptionDump

from inspect import stack as _stack

from traceback import format_stack

# Skip the frames for dumpState and the signal handler.

stack = _stack()[2:]

stack.reverse()

exn = ReverseExceptionDump(ExceptionInfo(None, None, stack),

self.mehConfig)

# gather up info on the running threads

threads = "

Threads

-------

"

# Every call to sys._current_frames() returns a new dict, so it is not

# modified when threads are created or destroyed. Iterating over it is

# thread safe.

for thread_id, frame in sys._current_frames().items():

threads += "

Thread %s

" % (thread_id,)

threads += "".join(format_stack(frame))

# dump to a unique file

(fd, filename) = mkstemp(prefix="anaconda-tb-", dir="/tmp")

dump_text = exn.traceback_and_object_dump(self)

dump_text += threads

dump_text_bytes = dump_text.encode("utf-8")

os.write(fd, dump_text_bytes)

os.close(fd)

# append to a given file

with open("/tmp/anaconda-tb-all.log", "a+") as f:

f.write("--- traceback: %s ---

" % filename)

f.write(dump_text + "

")

浏览完整代码 来源:anaconda.py 项目:rvykydal/anaconda

示例28

def api_callstack(self, ignores=None):

# pylint: disable=line-too-long

"""

return a list of function calls that form the stack

Example:

[' File "bastproxy.py", line 280, in ',

' main()',

' File "bastproxy.py", line 253, in main',

' start(listen_port)',

' File "/home/endavis/src/games/bastproxy/bp/libs/event.py", line 60, in new_func',

' return func(*args, **kwargs)',

' File "/home/endavis/src/games/bastproxy/bp/libs/net/proxy.py", line 63, in handle_read',

" 'convertansi':tconvertansi})", '

"""

# pylint: enable=line-too-long

if ignores is None:

ignores = []

callstack = []

for _, frame in sys._current_frames().items(): # pylint: disable=protected-access

for i in traceback.format_stack(frame)[:-1]:

if True in [tstr in i for tstr in ignores]:

continue

tlist = i.split('

')

for tline in tlist:

if tline:

if self.BASEPATH:

tline = tline.replace(self.BASEPATH + "/", "")

callstack.append(tline.rstrip())

return callstack

浏览完整代码 来源:api.py 项目:endavis/bastproxy

示例29

def __init__(self):

#dynamically determine the unittest.case frame and use it to get the name of the test method

ident = threading.current_thread().ident

upperf = sys._current_frames()[ident]

while (upperf.f_globals['__name__'] != 'unittest.case'):

upperf = upperf.f_back

def handleList(items):

ret = []

# items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())

for i in items:

s = i[0].id()

#Handle the _ErrorHolder objects from skipModule failures

if "setUpModule (" in s:

ret.append(s.replace("setUpModule (", "").replace(")",""))

else:

ret.append(s)

# Append also the test without the full path

testname = s.split('.')[-1]

if testname:

ret.append(testname)

return ret

self.faillist = handleList(upperf.f_locals['result'].failures)

self.errorlist = handleList(upperf.f_locals['result'].errors)

self.skiplist = handleList(upperf.f_locals['result'].skipped)

浏览完整代码 来源:decorators.py 项目:openembedded/openembedded-core

示例30

def cry(out=None, sepchr='=', seplen=49): # pragma: no cover

"""Return stack-trace of all active threads,

taken from https://gist.github.com/737056."""

import threading

out = WhateverIO() if out is None else out

P = partial(print, file=out)

# get a map of threads by their ID so we can print their names

# during the traceback dump

tmap = {t.ident: t for t in threading.enumerate()}

sep = sepchr * seplen

for tid, frame in items(sys._current_frames()):

thread = tmap.get(tid)

if not thread:

# skip old junk (left-overs from a fork)

continue

P('{0.name}'.format(thread))

P(sep)

traceback.print_stack(frame, file=out)

P(sep)

P('LOCAL VARIABLES')

P(sep)

pprint(frame.f_locals, stream=out)

P('

')

return out.getvalue()

浏览完整代码 来源:__init__.py 项目:277800076/celery

相关推荐

365bet大陆 双世宠妃剧情介绍

双世宠妃剧情介绍

📅 07-03 👁️ 9474
365bet体育在线世界杯 炉石传说在哪里换卡背

炉石传说在哪里换卡背

📅 07-04 👁️ 8113
365手机安全卫士下载 重庆概况

重庆概况

📅 09-18 👁️ 5124
365bet大陆 猾怎么读

猾怎么读

📅 09-11 👁️ 1306
365bet大陆 刁 古文中的意思

刁 古文中的意思

📅 09-16 👁️ 205
365bet大陆 老虎油延时喷剂用一次顶多久?

老虎油延时喷剂用一次顶多久?

📅 08-31 👁️ 1701
365bet体育在线世界杯 皮科专家说丨去角质,多久一次合适?

皮科专家说丨去角质,多久一次合适?

📅 07-20 👁️ 1621
365手机安全卫士下载 iphone7换个壳多少,苹果7换个壳子多少钱

iphone7换个壳多少,苹果7换个壳子多少钱

📅 09-17 👁️ 7976
365bet体育在线世界杯 基本字义解释

基本字义解释

📅 07-11 👁️ 1592
NameAvailableReentrant
"

print name

avail = lock.avail()

if not avail:

print "
"

writer = ResponseWriter()

try:

owner, tb = lock.owner, lock.tb

name = ("%x" % abs(owner)) if owner else 'None'

#TODO Is there no O(1) way to do this?

for thread in threads():

if thread.ident == owner:

name = "%s (%x)" % (thread.name, abs(owner))

break

print "Owned by: %s

" % name

if tb:

print "Acquisition traceback:
"

print formatTrace(tb)

print "
"

print "Current traceback:
"

print formatTrace(traceback.extract_stack(sys._current_frames()[owner]))

except Exception, e:

writer.clear()

print "(Unable to retrieve stack trace)"

print CollapsibleBox('Ownership', writer.done())

print "

%s