def attrs(self):
return capi.nlmsg_attrdata(self._msg)
- def send(self, socket):
- socket.send(self)
+ def send(self, sock):
+ sock.send(self)
class Socket(object):
"""Netlink socket"""
class DumpParams(object):
"""Dumping parameters"""
- def __init__(self, type=NL_DUMP_LINE):
+ def __init__(self, type_=NL_DUMP_LINE):
self._dp = capi.alloc_dump_params()
if not self._dp:
raise Exception('Unable to allocate struct nl_dump_params')
- self._dp.dp_type = type
+ self._dp.dp_type = type_
def __del__(self):
capi.free_dump_params(self._dp)
self._dp.dp_prefix = value
# underscore this to make sure it is deleted first upon module deletion
-_defaultDumpParams = DumpParams(type=NL_DUMP_LINE)
+_defaultDumpParams = DumpParams(NL_DUMP_LINE)
###########################################################################
# Cacheable Object (Base Class)
to the new object, e.g. link.vlan = VLANLink()
"""
try:
- tmp = __import__(path)
+ __import__(path)
except ImportError:
return
# mark
@property
def mark(self):
- if capi.nl_object_is_marked(self.obj):
- return True
- else:
- return False
+ return bool(capi.nl_object_is_marked(self._nl_object))
@mark.setter
def mark(self, value):
@property
def attrs(self):
attr_list = capi.nl_object_attr_list(self._nl_object, 1024)
- return re.split('\W+', attr_list[0])
+ return attr_list[0].split()
#####################################################################
# refcnt
"""Collection of netlink objects"""
def __init__(self):
raise NotImplementedError()
+ self.arg1 = None
+ self.arg2 = None
def __del(self):
capi.nl_cache_free(self._nl_cache)
return True
# called by sub classes to allocate type specific caches by name
- def _alloc_cache_name(self, name):
+ @staticmethod
+ def _alloc_cache_name(name):
return capi.alloc_cache_name(name)
# implemented by sub classes, must return new instasnce of cacheable
# object
- def _new_object(self, obj):
+ @staticmethod
+ def _new_object(obj):
raise NotImplementedError()
# implemented by sub classes, must return instance of sub class
def _new_cache(self, cache):
raise NotImplementedError()
- def subset(self, filter):
+ def subset(self, filter_):
"""Return new cache containing subset of cache
Cretes a new cache containing all objects which match the
specified filter.
"""
- if not filter:
+ if not filter_:
raise ValueError()
- c = capi.nl_cache_subset(self._nl_cache, filter._nl_object)
+ c = capi.nl_cache_subset(self._nl_cache, filter_._nl_object)
return self._new_cache(cache=c)
- def dump(self, params=None, filter=None):
+ def dump(self, params=None, filter_=None):
"""Dump (print) cache as human readable text"""
if not params:
params = _defaultDumpParams
- if filter:
- filter = filter._nl_object
+ if filter_:
+ filter_ = filter_._nl_object
- capi.nl_cache_dump_filter(self._nl_cache, params._dp, filter)
+ capi.nl_cache_dump_filter(self._nl_cache, params._dp, filter_)
def clear(self):
"""Remove all cache entries"""
if not flags:
flags = NL_AUTO_PROVIDE
- self._mngr = cache_mngr_alloc(self._sock._sock, protocol, flags)
+ self._mngr = capi.cache_mngr_alloc(self._sock._sock, protocol, flags)
def __del__(self):
if self._sock:
import datetime
from .. import core as netlink
-from .. import capi as core_capi
from . import capi as capi
from . import link as Link
from .. import util as util
return Address._from_capi(addr)
- def _new_object(self, obj):
+ @staticmethod
+ def _new_object(obj):
return Address(obj)
- def _new_cache(self, cache):
+ @staticmethod
+ def _new_cache(cache):
return AddressCache(cache=cache)
###########################################################################
def _from_capi(cls, obj):
return cls(capi.addr2obj(obj))
- def _obj2type(self, obj):
+ @staticmethod
+ def _obj2type(obj):
return capi.obj2addr(obj)
def __cmp__(self, other):
return diff
- def _new_instance(self, obj):
+ @staticmethod
+ def _new_instance(obj):
return Address(obj)
#####################################################################
@family.setter
def family(self, value):
- if not isinstance(value, AddressFamily):
- value = AddressFamily(value)
+ if not isinstance(value, netlink.AddressFamily):
+ value = netlink.AddressFamily(value)
capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
]
import socket
-import sys
from .. import core as netlink
from .. import capi as core_capi
from . import capi as capi
else:
return Link.from_capi(link)
- def _new_object(self, obj):
+ @staticmethod
+ def _new_object(obj):
return Link(obj)
def _new_cache(self, cache):
def from_capi(cls, obj):
return cls(capi.link2obj(obj))
- def _obj2type(self, obj):
+ @staticmethod
+ def _obj2type(obj):
return capi.obj2link(obj)
def __cmp__(self, other):
return self.ifindex - other.ifindex
- def _new_instance(self, obj):
+ @staticmethod
+ def _new_instance(obj):
if not obj:
raise ValueError()
@property
def arptype(self):
"""Type of link (cannot be changed)"""
- type = capi.rtnl_link_get_arptype(self._rtnl_link)
- return core_capi.nl_llproto2str(type, 64)[0]
+ type_ = capi.rtnl_link_get_arptype(self._rtnl_link)
+ return core_capi.nl_llproto2str(type_, 64)[0]
@arptype.setter
def arptype(self, value):
@operstate.setter
def operstate(self, value):
- i = capi.rtnl_link_str2operstate(flag)
+ i = capi.rtnl_link_str2operstate(value)
capi.rtnl_link_set_operstate(self._rtnl_link, i)
#####################################################################
@mode.setter
def mode(self, value):
- i = capi.rtnl_link_str2mode(flag)
+ i = capi.rtnl_link_str2mode(value)
capi.rtnl_link_set_linkmode(self._rtnl_link, i)
#####################################################################
#####################################################################
# add()
- def add(self, socket=None, flags=None):
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+ def add(self, sock=None, flags=None):
+ if not sock:
+ sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not flags:
flags = netlink.NLM_F_CREATE
- ret = capi.rtnl_link_add(socket._sock, self._rtnl_link, flags)
+ ret = capi.rtnl_link_add(sock._sock, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
#####################################################################
# change()
- def change(self, socket=None, flags=0):
+ def change(self, sock=None, flags=0):
"""Commit changes made to the link object"""
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+ if sock is None:
+ sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if not self._orig:
- raise NetlinkError('Original link not available')
- ret = capi.rtnl_link_change(socket._sock, self._orig, self._rtnl_link, flags)
+ raise netlink.NetlinkError('Original link not available')
+ ret = capi.rtnl_link_change(sock._sock, self._orig, self._rtnl_link, flags)
if ret < 0:
raise netlink.KernelError(ret)
#####################################################################
# delete()
- def delete(self, socket=None):
+ def delete(self, sock=None):
"""Attempt to delete this link in the kernel"""
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+ if sock is None:
+ sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- ret = capi.rtnl_link_delete(socket._sock, self._rtnl_link)
+ ret = capi.rtnl_link_delete(sock._sock, self._rtnl_link)
if ret < 0:
raise netlink.KernelError(ret)
row[0] = util.kw(row[0])
row[1] = self.get_stat(row[1]) if row[1] else ''
row[2] = self.get_stat(row[2]) if row[2] else ''
- buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
+ buf += '\t{0[0]:27} {0[1]:>16} {0[2]:>16}\n'.format(row)
buf += self._foreach_af('stats')
return buf
-def get(name, socket=None):
+def get(name, sock=None):
"""Lookup Link object directly from kernel"""
if not name:
raise ValueError()
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+ if not sock:
+ sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- link = capi.get_from_kernel(socket._sock, 0, name)
+ link = capi.get_from_kernel(sock._sock, 0, name)
if not link:
return None
]
-from ... import core as netlink
-from .. import capi as capi
class DummyLink(object):
def __init__(self, link):
self._rtnl_link = link
- def brief(self):
+ @staticmethod
+ def brief():
return 'dummy'
def init(link):
def details(self, fmt):
buf = fmt.nl('\n\t{0}\n\t'.format(util.title('Configuration:')))
- for i in range(DEVCONF_FORWARDING,DEVCONF_MAX+1):
+ for i in range(DEVCONF_FORWARDING, DEVCONF_MAX+1):
if i & 1 and i > 1:
buf += fmt.nl('\t')
txt = util.kw(capi.rtnl_link_inet_devconf2str(i, 32)[0])
return capi.rtnl_link_inet_set_conf(self._link._rtnl_link,
_resolve(id), int(value))
- @netlink.nlattr('link.inet.forwarding', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.forwarding', type=bool, fmt=util.boolean)
@property
def forwarding(self):
return bool(self.get_conf(DEVCONF_FORWARDING))
def forwarding(self, value):
self.set_conf(DEVCONF_FORWARDING, int(value))
- @netlink.nlattr('link.inet.mc_forwarding', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.mc_forwarding', type=bool, fmt=util.boolean)
@property
def mc_forwarding(self):
return bool(self.get_conf(DEVCONF_MC_FORWARDING))
def mc_forwarding(self, value):
self.set_conf(DEVCONF_MC_FORWARDING, int(value))
- @netlink.nlattr('link.inet.proxy_arp', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.proxy_arp', type=bool, fmt=util.boolean)
@property
def proxy_arp(self):
return bool(self.get_conf(DEVCONF_PROXY_ARP))
def proxy_arp(self, value):
self.set_conf(DEVCONF_PROXY_ARP, int(value))
- @netlink.nlattr('link.inet.accept_redirects', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.accept_redirects', type=bool, fmt=util.boolean)
@property
def accept_redirects(self):
return bool(self.get_conf(DEVCONF_ACCEPT_REDIRECTS))
def accept_redirects(self, value):
self.set_conf(DEVCONF_ACCEPT_REDIRECTS, int(value))
- @netlink.nlattr('link.inet.secure_redirects', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.secure_redirects', type=bool, fmt=util.boolean)
@property
def secure_redirects(self):
return bool(self.get_conf(DEVCONF_SECURE_REDIRECTS))
def secure_redirects(self, value):
self.set_conf(DEVCONF_SECURE_REDIRECTS, int(value))
- @netlink.nlattr('link.inet.send_redirects', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.send_redirects', type=bool, fmt=util.boolean)
@property
def send_redirects(self):
return bool(self.get_conf(DEVCONF_SEND_REDIRECTS))
def send_redirects(self, value):
self.set_conf(DEVCONF_SEND_REDIRECTS, int(value))
- @netlink.nlattr('link.inet.shared_media', type=bool, fmt=util.bool)
+ @netlink.nlattr('link.inet.shared_media', type=bool, fmt=util.boolean)
@property
def shared_media(self):
return bool(self.get_conf(DEVCONF_SHARED_MEDIA))
@netlink.nlattr('class.htb.level', type=int)
@property
def level(self):
- level = capi.rtnl_htb_get_level(self._class._rtnl_class)
+ return capi.rtnl_htb_get_level(self._class._rtnl_class)
@level.setter
def level(self, value):
'TcClass',
]
-import socket
-import sys
from .. import core as netlink
-from .. import capi as core_capi
from . import capi as capi
from .. import util as util
from . import link as Link
return ret + self._module_brief()
- def details(self):
+ @staticmethod
+ def details():
return '{t|mtu} {t|mpu} {t|overhead} {t|linktype}'
@property
def qlen(self):
return self.get_stat(STAT_QLEN)
- def stats(self, fmt):
+ @staticmethod
+ def stats(fmt):
return fmt.nl('{t|packets} {t|bytes} {t|qlen}')
###########################################################################
# else:
# return Qdisc._from_capi(capi.qdisc2obj(qdisc))
- def _new_object(self, obj):
+ @staticmethod
+ def _new_object(obj):
return Qdisc(obj)
- def _new_cache(self, cache):
+ @staticmethod
+ def _new_cache(cache):
return QdiscCache(cache=cache)
###########################################################################
def from_capi(cls, obj):
return cls(capi.qdisc2obj(obj))
- def _obj2type(self, obj):
+ @staticmethod
+ def _obj2type(obj):
return capi.obj2qdisc(obj)
- def _new_instance(self, obj):
+ @staticmethod
+ def _new_instance(obj):
if not obj:
raise ValueError()
self._nl_cache = cache
self._set_arg1(ifindex)
- def _new_object(self, obj):
+ @staticmethod
+ def _new_object(obj):
return TcClass(obj)
def _new_cache(self, cache):
def from_capi(cls, obj):
return cls(capi.class2obj(obj))
- def _obj2type(self, obj):
+ @staticmethod
+ def _obj2type(obj):
return capi.obj2class(obj)
- def _new_instance(self, obj):
+ @staticmethod
+ def _new_instance(obj):
if not obj:
raise ValueError()
#
# format(details=False, stats=False)
#
- def format(self, details=False, stats=False, nodev=False,
+ def format(self, details=False, _stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
fmt = util.MyFormatter(self, indent)
self._set_arg1(ifindex)
self._set_arg2(int(parent))
- def _new_object(self, obj):
+ @staticmethod
+ def _new_object(obj):
return Classifier(obj)
def _new_cache(self, cache):
def from_capi(cls, obj):
return cls(capi.cls2obj(obj))
- def _obj2type(self, obj):
+ @staticmethod
+ def _obj2type(obj):
return capi.obj2cls(obj)
- def _new_instance(self, obj):
+ @staticmethod
+ def _new_instance(obj):
if not obj:
raise ValueError()
#
# format(details=False, stats=False)
#
- def format(self, details=False, stats=False, nodev=False,
+ def format(self, details=False, _stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
fmt = util.MyFormatter(self, indent)
def title(t):
return t
-def bool(t):
+def boolean(t):
return str(t)
def handle(t):
def _nlattr(self, key):
value = getattr(self._obj, key)
- title = None
+ title_ = None
if isinstance(value, types.MethodType):
value = value()
value = d['fmt'](value)
if 'title' in d:
- title = d['title']
+ title_ = d['title']
except KeyError:
pass
except AttributeError:
pass
- return title, str(value)
+ return title_, str(value)
def get_value(self, key, args, kwds):
# Let default get_value() handle ints
return Formatter.get_value(self, key, args, kwds)
key = key[2:]
- (title, value) = self._nlattr(key)
+ (title_, value) = self._nlattr(key)
if include_title:
- if not title:
- title = key # fall back to key as title
- value = '{0} {1}'.format(kw(title), value)
+ if not title_:
+ title_ = key # fall back to key as title
+ value = '{0} {1}'.format(kw(title_), value)
return value