"""
- apply(self.log_message, args)
+ self.log_message(*args)
def log_message(self, format, *args):
"""Log an arbitrary message.
def count(self, item): return self.data.count(item)
def index(self, item): return self.data.index(item)
def reverse(self): self.data.reverse()
- def sort(self, *args): apply(self.data.sort, args)
+ def sort(self, *args): self.data.sort(*args)
def extend(self, other):
if isinstance(other, UserList):
self.data.extend(other.data)
while _exithandlers:
func, targs, kargs = _exithandlers.pop()
- apply(func, targs, kargs)
+ func(*targs, **kargs)
def register(func, *targs, **kargs):
"""register a function to be executed upon normal program termination
res = None
try:
try:
- res = apply(func, args)
+ res = func(*args)
except BdbQuit:
pass
finally:
log = nolog
else:
log = dolog
- apply(log, allargs)
+ log(*allargs)
def dolog(fmt, *args):
"""Write a log message to the log file. See initlog() for docs."""
return x.__copy__()
if hasattr(x, '__getinitargs__'):
args = x.__getinitargs__()
- y = apply(x.__class__, args)
+ y = x.__class__(*args)
else:
y = _EmptyClass()
y.__class__ = x.__class__
if hasattr(x, '__getinitargs__'):
args = x.__getinitargs__()
args = deepcopy(args, memo)
- y = apply(x.__class__, args)
+ y = x.__class__(*args)
else:
y = _EmptyClass()
y.__class__ = x.__class__
def add_hor_rule(self, *args, **kw):
if not self.hard_break:
self.writer.send_line_break()
- apply(self.writer.send_hor_rule, args, kw)
+ self.writer.send_hor_rule(*args, **kw)
self.hard_break = self.nospace = 1
self.have_label = self.para_end = self.softspace = self.parskip = 0
def note(self, *args):
if self.verbose:
- apply(self.message, args)
+ self.message(*args)
def message(self, format, *args):
if args:
def path_islink(self, x): return os.path.islink(x)
# etc.
- def openfile(self, *x): return apply(open, x)
+ def openfile(self, *x): return open(*x)
openfile_error = IOError
def listdir(self, x): return os.listdir(x)
listdir_error = os.error
"""
name = 'SEARCH'
if charset:
- typ, dat = apply(self._simple_command, (name, 'CHARSET', charset) + criteria)
+ typ, dat = self._simple_command(name, 'CHARSET', charset, *criteria)
else:
- typ, dat = apply(self._simple_command, (name,) + criteria)
+ typ, dat = self._simple_command(name, *criteria)
return self._untagged_response(typ, dat, name)
# raise self.error('unimplemented extension command: %s' % name)
if (sort_criteria[0],sort_criteria[-1]) != ('(',')'):
sort_criteria = '(%s)' % sort_criteria
- typ, dat = apply(self._simple_command, (name, sort_criteria, charset) + search_criteria)
+ typ, dat = self._simple_command(name, sort_criteria, charset, *search_criteria)
return self._untagged_response(typ, dat, name)
raise self.error('command %s illegal in state %s'
% (command, self.state))
name = 'UID'
- typ, dat = apply(self._simple_command, (name, command) + args)
+ typ, dat = self._simple_command(name, command, *args)
if command in ('SEARCH', 'SORT'):
name = command
else:
# raise self.error('unknown extension command: %s' % name)
if not name in Commands:
Commands[name] = (self.state,)
- return apply(self._simple_command, (name,) + args)
+ return self._simple_command(name, *args)
def _simple_command(self, name, *args):
- return self._command_complete(name, apply(self._command, (name,) + args))
+ return self._command_complete(name, self._command(name, *args))
def _untagged_response(self, typ, dat, name):
i, n = self._cmd_log_idx, self._cmd_log_len
while n:
try:
- apply(self._mesg, self._cmd_log[i])
+ self._mesg(*self._cmd_log[i])
except:
pass
i += 1
def run(cmd, args):
M._mesg('%s %s' % (cmd, args))
- typ, dat = apply(getattr(M, cmd), args)
+ typ, dat = getattr(M, cmd)(*args)
M._mesg('%s => %s %s' % (cmd, typ, dat))
if typ == 'NO': raise dat[0]
return dat
if self.manager.disable >= DEBUG:
return
if DEBUG >= self.getEffectiveLevel():
- apply(self._log, (DEBUG, msg, args), kwargs)
+ self._log(DEBUG, msg, args, **kwargs)
def info(self, msg, *args, **kwargs):
"""
if self.manager.disable >= INFO:
return
if INFO >= self.getEffectiveLevel():
- apply(self._log, (INFO, msg, args), kwargs)
+ self._log(INFO, msg, args, **kwargs)
def warning(self, msg, *args, **kwargs):
"""
if self.manager.disable >= WARNING:
return
if self.isEnabledFor(WARNING):
- apply(self._log, (WARNING, msg, args), kwargs)
+ self._log(WARNING, msg, args, **kwargs)
warn = warning
if self.manager.disable >= ERROR:
return
if self.isEnabledFor(ERROR):
- apply(self._log, (ERROR, msg, args), kwargs)
+ self._log(ERROR, msg, args, **kwargs)
def exception(self, msg, *args):
"""
Convenience method for logging an ERROR with exception information.
"""
- apply(self.error, (msg,) + args, {'exc_info': 1})
+ self.error(msg, exc_info=1, *args)
def critical(self, msg, *args, **kwargs):
"""
if self.manager.disable >= CRITICAL:
return
if CRITICAL >= self.getEffectiveLevel():
- apply(self._log, (CRITICAL, msg, args), kwargs)
+ self._log(CRITICAL, msg, args, **kwargs)
fatal = critical
if self.manager.disable >= level:
return
if self.isEnabledFor(level):
- apply(self._log, (level, msg, args), kwargs)
+ self._log(level, msg, args, **kwargs)
def findCaller(self):
"""
"""
if len(root.handlers) == 0:
basicConfig()
- apply(root.critical, (msg,)+args, kwargs)
+ root.critical(msg, *args, **kwargs)
fatal = critical
"""
if len(root.handlers) == 0:
basicConfig()
- apply(root.error, (msg,)+args, kwargs)
+ root.error(msg, *args, **kwargs)
def exception(msg, *args):
"""
Log a message with severity 'ERROR' on the root logger,
with exception information.
"""
- apply(error, (msg,)+args, {'exc_info': 1})
+ error(msg, exc_info=1, *args)
def warning(msg, *args, **kwargs):
"""
"""
if len(root.handlers) == 0:
basicConfig()
- apply(root.warning, (msg,)+args, kwargs)
+ root.warning(msg, *args, **kwargs)
warn = warning
"""
if len(root.handlers) == 0:
basicConfig()
- apply(root.info, (msg,)+args, kwargs)
+ root.info(msg, *args, **kwargs)
def debug(msg, *args, **kwargs):
"""
"""
if len(root.handlers) == 0:
basicConfig()
- apply(root.debug, (msg,)+args, kwargs)
+ root.debug(msg, *args, **kwargs)
def disable(level):
"""
klass = eval(klass, vars(logging))
args = cp.get(sectname, "args")
args = eval(args, vars(logging))
- h = apply(klass, args)
+ h = klass(*args)
if "level" in opts:
level = cp.get(sectname, "level")
h.setLevel(logging._levelNames[level])
def error(self, *args):
"""Error message handler."""
- apply(self.mh.error, args)
+ self.mh.error(*args)
def getfullname(self):
"""Return the full pathname of the folder."""
encode_args = (args[0], args[1])
if decode_base64:
encode_args = encode_args + (decode_base64,)
- apply(encode, encode_args)
+ encode(*encode_args)
level = args[0]
if level <= self.debug:
self.indent = self.indent + 1
- apply(self.msg, args)
+ self.msg(*args)
def msgout(self, *args):
level = args[0]
if level <= self.debug:
self.indent = self.indent - 1
- apply(self.msg, args)
+ self.msg(*args)
def run_script(self, pathname):
self.msg(2, "run_script", pathname)
class NNTPError(Exception):
"""Base class for all nntplib exceptions"""
def __init__(self, *args):
- apply(Exception.__init__, (self,)+args)
+ Exception.__init__(self, *args)
try:
self.response = args[0]
except IndexError:
head, tail = path.split(file)
if head:
- apply(func, (file,) + argrest)
+ func(file, *argrest)
return
if 'PATH' in env:
envpath = env['PATH']
for dir in PATH:
fullname = path.join(dir, file)
try:
- apply(func, (fullname,) + argrest)
+ func(fullname, *argrest)
except error, e:
tb = sys.exc_info()[2]
if (e.errno != ENOENT and e.errno != ENOTDIR
run(statement, globals, locals)
def runcall(*args):
- return apply(Pdb().runcall, args)
+ return Pdb().runcall(*args)
def set_trace():
Pdb().set_trace()
self.pattern = statetuple[0]
self.flags = statetuple[1]
self.groupindex = statetuple[2]
- self.code = apply(pcre_compile, statetuple)
+ self.code = pcre_compile(*statetuple)
class _Dummy:
# Dummy class used by _subn_string(). Has 'group' to avoid core dump.
self.set_cmd(`func`)
sys.setprofile(self.dispatcher)
try:
- return apply(func, args, kw)
+ return func(*args, **kw)
finally:
sys.setprofile(None)
arg = args[0]
args = args[1:]
self.init(arg)
- apply(self.add, args)
+ self.add(*args)
def init(self, arg):
self.all_callees = None # calc only if needed
def add(self, *arg_list):
if not arg_list: return self
- if len(arg_list) > 1: apply(self.add, arg_list[1:])
+ if len(arg_list) > 1: self.add(*arg_list[1:])
other = arg_list[0]
if type(self) != type(other) or self.__class__ != other.__class__:
other = Stats(other)
pass
processed.append(term)
if self.stats:
- apply(getattr(self.stats, fn), processed)
+ getattr(self.stats, fn)(*processed)
else:
print "No statistics object is loaded."
return 0
def do_sort(self, line):
abbrevs = self.stats.get_sort_arg_defs()
if line and not filter(lambda x,a=abbrevs: x not in a,line.split()):
- apply(self.stats.sort_stats, line.split())
+ self.stats.sort_stats(*line.split())
else:
print "Valid sort keys (unique prefixes are accepted):"
for (key, value) in Stats.sort_arg_dict_default.iteritems():
argv = (argv,)
pid, master_fd = fork()
if pid == CHILD:
- apply(os.execlp, (argv[0],) + argv)
+ os.execlp(argv[0], *argv)
try:
mode = tty.tcgetattr(STDIN_FILENO)
tty.setraw(STDIN_FILENO)
def document(self, object, name=None, *args):
"""Generate documentation for an object."""
args = (object, name) + args
- if inspect.ismodule(object): return apply(self.docmodule, args)
- if inspect.isclass(object): return apply(self.docclass, args)
- if inspect.isroutine(object): return apply(self.docroutine, args)
- return apply(self.docother, args)
+ if inspect.ismodule(object): return self.docmodule(*args)
+ if inspect.isclass(object): return self.docclass(*args)
+ if inspect.isroutine(object): returnself.docroutine(*args)
+ return self.docother(*args)
def fail(self, object, name=None, *args):
"""Raise an exception for unimplemented types."""
def bigsection(self, title, *args):
"""Format a section with a big heading."""
title = '<big><strong>%s</strong></big>' % title
- return apply(self.section, (title,) + args)
+ return self.section(title, *args)
def preformat(self, text):
"""Format literal preformatted text."""
TEMPLATE = """
def %s(self, *args):
- return apply(getattr(self.mod, self.name).%s, args)
+ return getattr(self.mod, self.name).%s(*args)
"""
class FileDelegate(FileBase):
sys.stdout = self.save_stdout
sys.stderr = self.save_stderr
- def s_apply(self, func, args=(), kw=None):
+ def s_apply(self, func, args=(), kw={}):
self.save_files()
try:
self.set_files()
- if kw:
- r = apply(func, args, kw)
- else:
- r = apply(func, args)
+ r = func(*args, **kw)
finally:
self.restore_files()
return r
class URLopener(urllib.FancyURLopener):
def __init__(self, *args):
- apply(urllib.FancyURLopener.__init__, (self,) + args)
+ urllib.FancyURLopener.__init__(self, *args)
self.errcode = 200
def http_error_default(self, url, fp, errcode, errmsg, headers):
self.delayfunc(time - now)
else:
del q[0]
- void = apply(action, argument)
+ void = action(*argument)
self.delayfunc(0) # Let other threads run
s_args = s_reply
if timeout is not None:
s_args = s_args + (timeout,)
- while not self.eof and apply(select.select, s_args) == s_reply:
+ while not self.eof and select.select(*s_args) == s_reply:
i = max(0, len(self.cookedq)-n)
self.fill_rawq()
self.process_rawq()
kwdict = {}
for k in kwargs: kwdict[k] = k + k
print func.func_name, args, sortdict(kwdict), '->',
- try: apply(func, args, kwdict)
+ try: func(*args, **kwdict)
except TypeError, err: print err
from test.test_support import TestFailed, verbose, verify
import struct
-## import pdb
import sys
ISBIGENDIAN = sys.byteorder == "big"
def simple_err(func, *args):
try:
- apply(func, args)
+ func(*args)
except struct.error:
pass
else:
raise TestFailed, "%s%s did not raise struct.error" % (
func.__name__, args)
-## pdb.set_trace()
def any_err(func, *args):
try:
- apply(func, args)
+ func(*args)
except (struct.error, OverflowError, TypeError):
pass
else:
raise TestFailed, "%s%s did not raise error" % (
func.__name__, args)
-## pdb.set_trace()
simple_err(struct.calcsize, 'Z')
Lock = _allocate_lock
def RLock(*args, **kwargs):
- return apply(_RLock, args, kwargs)
+ return _RLock(*args, **kwargs)
class _RLock(_Verbose):
def Condition(*args, **kwargs):
- return apply(_Condition, args, kwargs)
+ return _Condition(*args, **kwargs)
class _Condition(_Verbose):
def Semaphore(*args, **kwargs):
- return apply(_Semaphore, args, kwargs)
+ return _Semaphore(*args, **kwargs)
class _Semaphore(_Verbose):
def BoundedSemaphore(*args, **kwargs):
- return apply(_BoundedSemaphore, args, kwargs)
+ return _BoundedSemaphore(*args, **kwargs)
class _BoundedSemaphore(_Semaphore):
"""Semaphore that checks that # releases is <= # acquires"""
def Event(*args, **kwargs):
- return apply(_Event, args, kwargs)
+ return _Event(*args, **kwargs)
class _Event(_Verbose):
def run(self):
if self.__target:
- apply(self.__target, self.__args, self.__kwargs)
+ self.__target(*self.__args, **self.__kwargs)
def __bootstrap(self):
try:
N_TOKENS += 2
def group(*choices): return '(' + '|'.join(choices) + ')'
-def any(*choices): return apply(group, choices) + '*'
-def maybe(*choices): return apply(group, choices) + '?'
+def any(*choices): return group(*choices) + '*'
+def maybe(*choices): return group(*choices) + '?'
Whitespace = r'[ \f\t]*'
Comment = r'#[^\r\n]*'
# backwards compatible interface
def tokenize_loop(readline, tokeneater):
for token_info in generate_tokens(readline):
- apply(tokeneater, token_info)
+ tokeneater(*token_info)
def generate_tokens(readline):
"""
if not self.donothing:
sys.settrace(self.globaltrace)
try:
- result = apply(func, args, kw)
+ result = func(*args, **kw)
finally:
if not self.donothing:
sys.settrace(None)
def _exc_info_to_string(self, err):
"""Converts a sys.exc_info()-style tuple of values into a string."""
- return string.join(apply(traceback.format_exception, err), '')
+ return string.join(traceback.format_exception(*err), '')
def __repr__(self):
return "<%s run=%i errors=%i failures=%i>" % \
unexpected exception.
"""
try:
- apply(callableObj, args, kwargs)
+ callableObj(*args, **kwargs)
except excClass:
return
else:
return getattr(self.stream,attr)
def writeln(self, *args):
- if args: apply(self.write, args)
+ if args: self.write(*args)
self.write('\n') # text-mode streams translate to \r\n if needed
h.putrequest('GET', selector)
if auth: h.putheader('Authorization', 'Basic %s' % auth)
if realhost: h.putheader('Host', realhost)
- for args in self.addheaders: apply(h.putheader, args)
+ for args in self.addheaders: h.putheader(*args)
h.endheaders()
if data is not None:
h.send(data)
h.putrequest('GET', selector)
if auth: h.putheader('Authorization: Basic %s' % auth)
if realhost: h.putheader('Host', realhost)
- for args in self.addheaders: apply(h.putheader, args)
+ for args in self.addheaders: h.putheader(*args)
h.endheaders()
if data is not None:
h.send(data)
"""Derived class with handlers for errors we can handle (perhaps)."""
def __init__(self, *args, **kwargs):
- apply(URLopener.__init__, (self,) + args, kwargs)
+ URLopener.__init__(self, *args, **kwargs)
self.auth_cache = {}
self.tries = 0
self.maxtries = 10
def close(self):
addbase.close(self)
if self.closehook:
- apply(self.closehook, self.hookargs)
+ self.closehook(*self.hookargs)
self.closehook = None
self.hookargs = None
for method, args in packtest:
print 'pack test', count,
try:
- apply(method, args)
+ method(*args)
print 'succeeded'
except ConversionError, var:
print 'ConversionError:', var.msg
print 'unpack test', count,
try:
if succeedlist[count]:
- x = apply(method, args)
+ x = method(*args)
print pred(x) and 'succeeded' or 'failed', ':', x
else:
print 'skipping'
if self.__class__ is DOMException:
raise RuntimeError(
"DOMException should not be instantiated directly")
- apply(Exception.__init__, (self,) + args, kw)
+ Exception.__init__(self, *args, **kw)
def _get_code(self):
return self.code
# ---
def create_parser(*args, **kwargs):
- return apply(ExpatParser, args, kwargs)
+ return ExpatParser(*args, **kwargs)
# ---
def __init__(self, **kw):
self.testdata = ""
- apply(XMLParser.__init__, (self,), kw)
+ XMLParser.__init__(self, **kw)
def handle_xml(self, encoding, standalone):
self.flush()
if self._type is None or self._marks:
raise ResponseError()
if self._type == "fault":
- raise apply(Fault, (), self._stack[0])
+ raise Fault(**self._stack[0])
return tuple(self._stack)
def getmethodname(self):
"your version of httplib doesn't support HTTPS"
)
else:
- return apply(HTTPS, (host, None), x509 or {})
+ return HTTPS(host, None, **(x509 or {}))
##
# Standard server proxy. This class establishes a virtual connection