The module contains the following public classes:
- Socket -- The netlink socket
- Object -- Abstract object (based on struct nl_obect in libnl) used as
- base class for all object types which can be put into a Cache
+ base class for all object types which can be put into a Cache
- Cache -- A collection of objects which are derived from the base
- class Object. Used for netlink protocols which maintain a list
- or tree of objects.
+ class Object. Used for netlink protocols which maintain a list
+ or tree of objects.
- DumpParams --
The following exceptions are defined:
- NetlinkError -- Base exception for all general purpose exceptions raised.
- KernelError -- Raised when the kernel returns an error as response to a
- request.
+ request.
All other classes or functions in this module are considered implementation
details.
import socket
__all__ = ['Message', 'Socket', 'DumpParams', 'Object', 'Cache', 'KernelError',
- 'NetlinkError']
+ 'NetlinkError']
__version__ = "0.1"
# netlink protocols
NLM_F_APPEND = 0x800
class NetlinkError(Exception):
- def __init__(self, error):
- self._error = error
- self._msg = capi.nl_geterror(error)
+ def __init__(self, error):
+ self._error = error
+ self._msg = capi.nl_geterror(error)
- def __str__(self):
- return self._msg
+ def __str__(self):
+ return self._msg
class KernelError(NetlinkError):
- def __str__(self):
- return "Kernel returned: {0}".format(self._msg)
+ def __str__(self):
+ return "Kernel returned: {0}".format(self._msg)
class ImmutableError(NetlinkError):
- def __init__(self, msg):
- self._msg = msg
+ def __init__(self, msg):
+ self._msg = msg
- def __str__(self):
- return "Immutable attribute: {0}".format(self._msg)
+ def __str__(self):
+ return "Immutable attribute: {0}".format(self._msg)
class Message(object):
- """Netlink message"""
+ """Netlink message"""
- def __init__(self, size=0):
- if size == 0:
- self._msg = capi.nlmsg_alloc()
- else:
- self._msg = capi.nlmsg_alloc_size(size)
+ def __init__(self, size=0):
+ if size == 0:
+ self._msg = capi.nlmsg_alloc()
+ else:
+ self._msg = capi.nlmsg_alloc_size(size)
- if self._msg is None:
- raise Exception("Message allocation returned NULL")
+ if self._msg is None:
+ raise Exception("Message allocation returned NULL")
- def __del__(self):
- capi.nlmsg_free(self._msg)
+ def __del__(self):
+ capi.nlmsg_free(self._msg)
- def __len__(self):
- return capi.nlmsg_len(nlmsg_hdr(self._msg))
+ def __len__(self):
+ return capi.nlmsg_len(nlmsg_hdr(self._msg))
- @property
- def protocol(self):
- return capi.nlmsg_get_proto(self._msg)
+ @property
+ def protocol(self):
+ return capi.nlmsg_get_proto(self._msg)
- @protocol.setter
- def protocol(self, value):
- capi.nlmsg_set_proto(self._msg, value)
+ @protocol.setter
+ def protocol(self, value):
+ capi.nlmsg_set_proto(self._msg, value)
- @property
- def maxSize(self):
- return capi.nlmsg_get_max_size(self._msg)
+ @property
+ def maxSize(self):
+ return capi.nlmsg_get_max_size(self._msg)
- @property
- def hdr(self):
- return capi.nlmsg_hdr(self._msg)
+ @property
+ def hdr(self):
+ return capi.nlmsg_hdr(self._msg)
- @property
- def data(self):
- return capi.nlmsg_data(self._msg)
+ @property
+ def data(self):
+ return capi.nlmsg_data(self._msg)
- @property
- def attrs(self):
- return capi.nlmsg_attrdata(self._msg)
+ @property
+ def attrs(self):
+ return capi.nlmsg_attrdata(self._msg)
- def send(self, socket):
- socket.send(self)
+ def send(self, socket):
+ socket.send(self)
class Socket(object):
- """Netlink socket"""
+ """Netlink socket"""
- def __init__(self, cb=None):
- if cb is None:
- self._sock = capi.nl_socket_alloc()
- else:
- self._sock = capi.nl_socket_alloc_cb(cb)
+ def __init__(self, cb=None):
+ if cb is None:
+ self._sock = capi.nl_socket_alloc()
+ else:
+ self._sock = capi.nl_socket_alloc_cb(cb)
- if self._sock is None:
- raise Exception("NULL pointer returned while allocating socket")
+ if self._sock is None:
+ raise Exception("NULL pointer returned while allocating socket")
- def __del__(self):
- capi.nl_socket_free(self._sock)
+ def __del__(self):
+ capi.nl_socket_free(self._sock)
- def __str__(self):
- return "nlsock<{0}>".format(self.localPort)
+ def __str__(self):
+ return "nlsock<{0}>".format(self.localPort)
- @property
- def local_port(self):
- return capi.nl_socket_get_local_port(self._sock)
+ @property
+ def local_port(self):
+ return capi.nl_socket_get_local_port(self._sock)
- @local_port.setter
- def local_port(self, value):
- capi.nl_socket_set_local_port(self._sock, int(value))
+ @local_port.setter
+ def local_port(self, value):
+ capi.nl_socket_set_local_port(self._sock, int(value))
- @property
- def peer_port(self):
- return capi.nl_socket_get_peer_port(self._sock)
+ @property
+ def peer_port(self):
+ return capi.nl_socket_get_peer_port(self._sock)
- @peer_port.setter
- def peer_port(self, value):
- capi.nl_socket_set_peer_port(self._sock, int(value))
+ @peer_port.setter
+ def peer_port(self, value):
+ capi.nl_socket_set_peer_port(self._sock, int(value))
- @property
- def peer_groups(self):
- return capi.nl_socket_get_peer_groups(self._sock)
+ @property
+ def peer_groups(self):
+ return capi.nl_socket_get_peer_groups(self._sock)
- @peer_groups.setter
- def peer_groups(self, value):
- capi.nl_socket_set_peer_groups(self._sock, value)
+ @peer_groups.setter
+ def peer_groups(self, value):
+ capi.nl_socket_set_peer_groups(self._sock, value)
- def set_bufsize(self, rx, tx):
- capi.nl_socket_set_buffer_size(self._sock, rx, tx)
+ def set_bufsize(self, rx, tx):
+ capi.nl_socket_set_buffer_size(self._sock, rx, tx)
- def connect(self, proto):
- capi.nl_connect(self._sock, proto)
- return self
+ def connect(self, proto):
+ capi.nl_connect(self._sock, proto)
+ return self
- def disconnect(self):
- capi.nl_close(self._sock)
+ def disconnect(self):
+ capi.nl_close(self._sock)
- def sendto(self, buf):
- ret = capi.nl_sendto(self._sock, buf, len(buf))
- if ret < 0:
- raise Exception("Failed to send")
- else:
- return ret
+ def sendto(self, buf):
+ ret = capi.nl_sendto(self._sock, buf, len(buf))
+ if ret < 0:
+ raise Exception("Failed to send")
+ else:
+ return ret
_sockets = {}
def lookup_socket(protocol):
- try:
- sock = _sockets[protocol]
- except KeyError:
- sock = Socket()
- sock.connect(protocol)
- _sockets[protocol] = sock
+ try:
+ sock = _sockets[protocol]
+ except KeyError:
+ sock = Socket()
+ sock.connect(protocol)
+ _sockets[protocol] = sock
- return sock
+ return sock
class DumpParams(object):
- """Dumping parameters"""
+ """Dumping parameters"""
- def __init__(self, type=NL_DUMP_LINE):
- self._dp = capi.alloc_dump_params()
- if not self._dp:
- raise Exception("Unable to allocate struct nl_dump_params")
+ def __init__(self, type=NL_DUMP_LINE):
+ self._dp = capi.alloc_dump_params()
+ if not self._dp:
+ raise Exception("Unable to allocate struct nl_dump_params")
- self._dp.dp_type = type
+ self._dp.dp_type = type
- def __del__(self):
- capi.free_dump_params(self._dp)
+ def __del__(self):
+ capi.free_dump_params(self._dp)
- @property
- def type(self):
- return self._dp.dp_type
+ @property
+ def type(self):
+ return self._dp.dp_type
- @type.setter
- def type(self, value):
- self._dp.dp_type = value
+ @type.setter
+ def type(self, value):
+ self._dp.dp_type = value
- @property
- def prefix(self):
- return self._dp.dp_prefix
+ @property
+ def prefix(self):
+ return self._dp.dp_prefix
- @prefix.setter
- def prefix(self, value):
- self._dp.dp_prefix = value
+ @prefix.setter
+ def prefix(self, value):
+ self._dp.dp_prefix = value
# underscore this to make sure it is deleted first upon module deletion
_defaultDumpParams = DumpParams(type=NL_DUMP_LINE)
###########################################################################
# Cacheable Object (Base Class)
class Object(object):
- """Cacheable object (base class)"""
+ """Cacheable object (base class)"""
- def __init__(self, obj_name, name, obj=None):
- self._obj_name = obj_name
- self._name = name
- self._modules = []
+ def __init__(self, obj_name, name, obj=None):
+ self._obj_name = obj_name
+ self._name = name
+ self._modules = []
- if not obj:
- obj = capi.object_alloc_name(self._obj_name)
-
- self._nl_object = obj
-
- # Create a clone which stores the original state to notice
- # modifications
- clone_obj = capi.nl_object_clone(self._nl_object)
- self._orig = self._obj2type(clone_obj)
-
- def __del__(self):
- if not self._nl_object:
- raise ValueError()
-
- capi.nl_object_put(self._nl_object)
-
- def __str__(self):
- if hasattr(self, 'format'):
- return self.format()
- else:
- return capi.nl_object_dump_buf(self._nl_object, 4096).rstrip()
-
- def _new_instance(self):
- raise NotImplementedError()
-
- def clone(self):
- """Clone object"""
- return self._new_instance(capi.nl_object_clone(self._nl_object))
-
- def _module_lookup(self, path, constructor=None):
- """Lookup object specific module and load it
-
- Object implementations consisting of multiple types may
- offload some type specific code to separate modules which
- are loadable on demand, e.g. a VLAN link or a specific
- queueing discipline implementation.
-
- Loads the module `path` and calls the constructor if
- supplied or `module`.init()
-
- The constructor/init function typically assigns a new
- object covering the type specific implementation aspects
- to the new object, e.g. link.vlan = VLANLink()
- """
- try:
- tmp = __import__(path)
- except ImportError:
- return
-
- module = sys.modules[path]
-
- if constructor:
- ret = getattr(module, constructor)(self)
- else:
- ret = module.init(self)
-
- if ret:
- self._modules.append(ret)
-
- def _module_brief(self):
- ret = ''
-
- for module in self._modules:
- if hasattr(module, 'brief'):
- ret += module.brief()
-
- return ret
-
- def dump(self, params=None):
- """Dump object as human readable text"""
- if params is None:
- params = _defaultDumpParams
-
- capi.nl_object_dump(self._nl_object, params._dp)
-
-
- #####################################################################
- # mark
- @property
- def mark(self):
- if capi.nl_object_is_marked(self.obj):
- return True
- else:
- return False
-
- @mark.setter
- def mark(self, value):
- if value:
- capi.nl_object_mark(self._nl_object)
- else:
- capi.nl_object_unmark(self._nl_object)
-
- #####################################################################
- # shared
- @property
- def shared(self):
- return capi.nl_object_shared(self._nl_object) != 0
-
- #####################################################################
- # attrs
- @property
- def attrs(self):
- attr_list = capi.nl_object_attr_list(self._nl_object, 1024)
- return re.split('\W+', attr_list[0])
-
- #####################################################################
- # refcnt
- @property
- def refcnt(self):
- return capi.nl_object_get_refcnt(self._nl_object)
-
- # this method resolves multiple levels of sub types to allow
- # accessing properties of subclass/subtypes (e.g. link.vlan.id)
- def _resolve(self, attr):
- obj = self
- l = attr.split('.')
- while len(l) > 1:
- obj = getattr(obj, l.pop(0))
- return (obj, l.pop(0))
-
- def _setattr(self, attr, val):
- obj, attr = self._resolve(attr)
- return setattr(obj, attr, val)
-
- def _hasattr(self, attr):
- obj, attr = self._resolve(attr)
- return hasattr(obj, attr)
-
- def apply(self, attr, val):
- try:
- d = attrs[self._name + "." + attr]
- except KeyError:
- raise KeyError("Unknown " + self._name +
- " attribute: " + attr)
-
- if 'immutable' in d:
- raise ImmutableError(attr)
-
- if not self._hasattr(attr):
- raise KeyError("Invalid " + self._name +
- " attribute: " + attr)
- self._setattr(attr, val)
+ if not obj:
+ obj = capi.object_alloc_name(self._obj_name)
-class ObjIterator(object):
- def __init__(self, cache, obj):
- self._cache = cache
- self._nl_object = None
-
- if not obj:
- self._end = 1
- else:
- capi.nl_object_get(obj)
- self._nl_object = obj
- self._first = 1
- self._end = 0
-
- def __del__(self):
- if self._nl_object:
- capi.nl_object_put(self._nl_object)
-
- def __iter__(self):
- return self
-
- def get_next(self):
- return capi.nl_cache_get_next(self._nl_object)
-
- def next(self):
- if self._end:
- raise StopIteration()
-
- if self._first:
- ret = self._nl_object
- self._first = 0
- else:
- ret = self.get_next()
- if not ret:
- self._end = 1
- raise StopIteration()
-
- # return ref of previous element and acquire ref of current
- # element to have object stay around until we fetched the
- # next ptr
- capi.nl_object_put(self._nl_object)
- capi.nl_object_get(ret)
- self._nl_object = ret
-
- # reference used inside object
- capi.nl_object_get(ret)
- return self._cache._new_object(ret)
+ self._nl_object = obj
+ # Create a clone which stores the original state to notice
+ # modifications
+ clone_obj = capi.nl_object_clone(self._nl_object)
+ self._orig = self._obj2type(clone_obj)
-class ReverseObjIterator(ObjIterator):
- def get_next(self):
- return capi.nl_cache_get_prev(self._nl_object)
+ def __del__(self):
+ if not self._nl_object:
+ raise ValueError()
-###########################################################################
-# Cache
-class Cache(object):
- """Collection of netlink objects"""
- def __init__(self):
- raise NotImplementedError()
-
- def __del(self):
- capi.nl_cache_free(self._nl_cache)
+ capi.nl_object_put(self._nl_object)
- def __len__(self):
- return capi.nl_cache_nitems(self._nl_cache)
-
- def __iter__(self):
- obj = capi.nl_cache_get_first(self._nl_cache)
- return ObjIterator(self, obj)
+ def __str__(self):
+ if hasattr(self, 'format'):
+ return self.format()
+ else:
+ return capi.nl_object_dump_buf(self._nl_object, 4096).rstrip()
- def __reversed__(self):
- obj = capi.nl_cache_get_last(self._nl_cache)
- return ReverseObjIterator(self, obj)
+ def _new_instance(self):
+ raise NotImplementedError()
- def __contains__(self, item):
- obj = capi.nl_cache_search(self._nl_cache, item._nl_object)
- if obj is None:
- return False
- else:
- capi.nl_object_put(obj)
- return True
+ def clone(self):
+ """Clone object"""
+ return self._new_instance(capi.nl_object_clone(self._nl_object))
- # called by sub classes to allocate type specific caches by name
- def _alloc_cache_name(self, name):
- return capi.alloc_cache_name(name)
+ def _module_lookup(self, path, constructor=None):
+ """Lookup object specific module and load it
- # implemented by sub classes, must return new instasnce of cacheable
- # object
- def _new_object(self, obj):
- raise NotImplementedError()
+ Object implementations consisting of multiple types may
+ offload some type specific code to separate modules which
+ are loadable on demand, e.g. a VLAN link or a specific
+ queueing discipline implementation.
- # implemented by sub classes, must return instance of sub class
- def _new_cache(self, cache):
- raise NotImplementedError()
+ Loads the module `path` and calls the constructor if
+ supplied or `module`.init()
- def subset(self, filter):
- """Return new cache containing subset of cache
+ The constructor/init function typically assigns a new
+ object covering the type specific implementation aspects
+ to the new object, e.g. link.vlan = VLANLink()
+ """
+ try:
+ tmp = __import__(path)
+ except ImportError:
+ return
+
+ module = sys.modules[path]
+
+ if constructor:
+ ret = getattr(module, constructor)(self)
+ else:
+ ret = module.init(self)
+
+ if ret:
+ self._modules.append(ret)
+
+ def _module_brief(self):
+ ret = ''
+
+ for module in self._modules:
+ if hasattr(module, 'brief'):
+ ret += module.brief()
+
+ return ret
+
+ def dump(self, params=None):
+ """Dump object as human readable text"""
+ if params is None:
+ params = _defaultDumpParams
+
+ capi.nl_object_dump(self._nl_object, params._dp)
+
+
+ #####################################################################
+ # mark
+ @property
+ def mark(self):
+ if capi.nl_object_is_marked(self.obj):
+ return True
+ else:
+ return False
+
+ @mark.setter
+ def mark(self, value):
+ if value:
+ capi.nl_object_mark(self._nl_object)
+ else:
+ capi.nl_object_unmark(self._nl_object)
+
+ #####################################################################
+ # shared
+ @property
+ def shared(self):
+ return capi.nl_object_shared(self._nl_object) != 0
+
+ #####################################################################
+ # attrs
+ @property
+ def attrs(self):
+ attr_list = capi.nl_object_attr_list(self._nl_object, 1024)
+ return re.split('\W+', attr_list[0])
+
+ #####################################################################
+ # refcnt
+ @property
+ def refcnt(self):
+ return capi.nl_object_get_refcnt(self._nl_object)
+
+ # this method resolves multiple levels of sub types to allow
+ # accessing properties of subclass/subtypes (e.g. link.vlan.id)
+ def _resolve(self, attr):
+ obj = self
+ l = attr.split('.')
+ while len(l) > 1:
+ obj = getattr(obj, l.pop(0))
+ return (obj, l.pop(0))
+
+ def _setattr(self, attr, val):
+ obj, attr = self._resolve(attr)
+ return setattr(obj, attr, val)
+
+ def _hasattr(self, attr):
+ obj, attr = self._resolve(attr)
+ return hasattr(obj, attr)
+
+ def apply(self, attr, val):
+ try:
+ d = attrs[self._name + "." + attr]
+ except KeyError:
+ raise KeyError("Unknown " + self._name +
+ " attribute: " + attr)
- Cretes a new cache containing all objects which match the
- specified filter.
- """
- if not filter:
- raise ValueError()
+ if 'immutable' in d:
+ raise ImmutableError(attr)
- c = capi.nl_cache_subset(self._nl_cache, filter._nl_object)
- return self._new_cache(cache=c)
+ if not self._hasattr(attr):
+ raise KeyError("Invalid " + self._name +
+ " attribute: " + attr)
+ self._setattr(attr, val)
- def dump(self, params=None, filter=None):
- """Dump (print) cache as human readable text"""
- if not params:
- params = _defaultDumpParams
+class ObjIterator(object):
+ def __init__(self, cache, obj):
+ self._cache = cache
+ self._nl_object = None
+
+ if not obj:
+ self._end = 1
+ else:
+ capi.nl_object_get(obj)
+ self._nl_object = obj
+ self._first = 1
+ self._end = 0
+
+ def __del__(self):
+ if self._nl_object:
+ capi.nl_object_put(self._nl_object)
+
+ def __iter__(self):
+ return self
+
+ def get_next(self):
+ return capi.nl_cache_get_next(self._nl_object)
+
+ def next(self):
+ if self._end:
+ raise StopIteration()
+
+ if self._first:
+ ret = self._nl_object
+ self._first = 0
+ else:
+ ret = self.get_next()
+ if not ret:
+ self._end = 1
+ raise StopIteration()
+
+ # return ref of previous element and acquire ref of current
+ # element to have object stay around until we fetched the
+ # next ptr
+ capi.nl_object_put(self._nl_object)
+ capi.nl_object_get(ret)
+ self._nl_object = ret
+
+ # reference used inside object
+ capi.nl_object_get(ret)
+ return self._cache._new_object(ret)
- if filter:
- filter = filter._nl_object
- capi.nl_cache_dump_filter(self._nl_cache, params._dp, filter)
+class ReverseObjIterator(ObjIterator):
+ def get_next(self):
+ return capi.nl_cache_get_prev(self._nl_object)
- def clear(self):
- """Remove all cache entries"""
- capi.nl_cache_clear(self._nl_cache)
+###########################################################################
+# Cache
+class Cache(object):
+ """Collection of netlink objects"""
+ def __init__(self):
+ raise NotImplementedError()
+
+ def __del(self):
+ capi.nl_cache_free(self._nl_cache)
+
+ def __len__(self):
+ return capi.nl_cache_nitems(self._nl_cache)
+
+ def __iter__(self):
+ obj = capi.nl_cache_get_first(self._nl_cache)
+ return ObjIterator(self, obj)
+
+ def __reversed__(self):
+ obj = capi.nl_cache_get_last(self._nl_cache)
+ return ReverseObjIterator(self, obj)
+
+ def __contains__(self, item):
+ obj = capi.nl_cache_search(self._nl_cache, item._nl_object)
+ if obj is None:
+ return False
+ else:
+ capi.nl_object_put(obj)
+ return True
+
+ # called by sub classes to allocate type specific caches by name
+ def _alloc_cache_name(self, name):
+ return capi.alloc_cache_name(name)
+
+ # implemented by sub classes, must return new instasnce of cacheable
+ # object
+ def _new_object(self, obj):
+ raise NotImplementedError()
+
+ # implemented by sub classes, must return instance of sub class
+ def _new_cache(self, cache):
+ raise NotImplementedError()
+
+ def subset(self, filter):
+ """Return new cache containing subset of cache
+
+ Cretes a new cache containing all objects which match the
+ specified filter.
+ """
+ if not filter:
+ raise ValueError()
- # Called by sub classes to set first cache argument
- def _set_arg1(self, arg):
- self.arg1 = arg
- capi.nl_cache_set_arg1(self._nl_cache, arg)
+ c = capi.nl_cache_subset(self._nl_cache, filter._nl_object)
+ return self._new_cache(cache=c)
- # Called by sub classes to set second cache argument
- def _set_arg2(self, arg):
- self.arg2 = arg
- capi.nl_cache_set_arg2(self._nl_cache, arg)
+ def dump(self, params=None, filter=None):
+ """Dump (print) cache as human readable text"""
+ if not params:
+ params = _defaultDumpParams
- def refill(self, socket=None):
- """Clear cache and refill it"""
- if socket is None:
- socket = lookup_socket(self._protocol)
+ if filter:
+ filter = filter._nl_object
- capi.nl_cache_refill(socket._sock, self._nl_cache)
- return self
+ capi.nl_cache_dump_filter(self._nl_cache, params._dp, filter)
- def resync(self, socket=None, cb=None):
- """Synchronize cache with content in kernel"""
- if socket is None:
- socket = lookup_socket(self._protocol)
+ def clear(self):
+ """Remove all cache entries"""
+ capi.nl_cache_clear(self._nl_cache)
- capi.nl_cache_resync(socket._sock, self._nl_cache, cb)
+ # Called by sub classes to set first cache argument
+ def _set_arg1(self, arg):
+ self.arg1 = arg
+ capi.nl_cache_set_arg1(self._nl_cache, arg)
- def provide(self):
- """Provide this cache to others
+ # Called by sub classes to set second cache argument
+ def _set_arg2(self, arg):
+ self.arg2 = arg
+ capi.nl_cache_set_arg2(self._nl_cache, arg)
- Caches which have been "provided" are made available
- to other users (of the same application context) which
- "require" it. F.e. a link cache is generally provided
- to allow others to translate interface indexes to
- link names
- """
+ def refill(self, socket=None):
+ """Clear cache and refill it"""
+ if socket is None:
+ socket = lookup_socket(self._protocol)
- capi.nl_cache_mngt_provide(self._nl_cache)
+ capi.nl_cache_refill(socket._sock, self._nl_cache)
+ return self
- def unprovide(self):
- """Unprovide this cache
+ def resync(self, socket=None, cb=None):
+ """Synchronize cache with content in kernel"""
+ if socket is None:
+ socket = lookup_socket(self._protocol)
- No longer make the cache available to others. If the cache
- has been handed out already, that reference will still
- be valid.
- """
- capi.nl_cache_mngt_unprovide(self._nl_cache)
+ capi.nl_cache_resync(socket._sock, self._nl_cache, cb)
+
+ def provide(self):
+ """Provide this cache to others
+
+ Caches which have been "provided" are made available
+ to other users (of the same application context) which
+ "require" it. F.e. a link cache is generally provided
+ to allow others to translate interface indexes to
+ link names
+ """
+
+ capi.nl_cache_mngt_provide(self._nl_cache)
+
+ def unprovide(self):
+ """Unprovide this cache
+
+ No longer make the cache available to others. If the cache
+ has been handed out already, that reference will still
+ be valid.
+ """
+ capi.nl_cache_mngt_unprovide(self._nl_cache)
###########################################################################
# Cache Manager (Work in Progress)
NL_AUTO_PROVIDE = 1
class CacheManager(object):
- def __init__(self, protocol, flags=None):
+ def __init__(self, protocol, flags=None):
- self._sock = Socket()
- self._sock.connect(protocol)
+ self._sock = Socket()
+ self._sock.connect(protocol)
- if not flags:
- flags = NL_AUTO_PROVIDE
+ if not flags:
+ flags = NL_AUTO_PROVIDE
- self._mngr = cache_mngr_alloc(self._sock._sock, protocol, flags)
+ self._mngr = cache_mngr_alloc(self._sock._sock, protocol, flags)
- def __del__(self):
- if self._sock:
- self._sock.disconnect()
+ def __del__(self):
+ if self._sock:
+ self._sock.disconnect()
- if self._mngr:
- capi.nl_cache_mngr_free(self._mngr)
+ if self._mngr:
+ capi.nl_cache_mngr_free(self._mngr)
- def add(self, name):
- capi.cache_mngr_add(self._mngr, name, None, None)
+ def add(self, name):
+ capi.cache_mngr_add(self._mngr, name, None, None)
###########################################################################
# Address Family
class AddressFamily(object):
- """Address family representation
+ """Address family representation
- af = AddressFamily('inet6')
- # raises:
- # - ValueError if family name is not known
- # - TypeError if invalid type is specified for family
+ af = AddressFamily('inet6')
+ # raises:
+ # - ValueError if family name is not known
+ # - TypeError if invalid type is specified for family
- print af # => 'inet6' (string representation)
- print int(af) # => 10 (numeric representation)
- print repr(af) # => AddressFamily('inet6')
- """
- def __init__(self, family=socket.AF_UNSPEC):
- if isinstance(family, str):
- family = capi.nl_str2af(family)
- if family < 0:
- raise ValueError('Unknown family name')
- elif not isinstance(family, int):
- raise TypeError()
+ print af # => 'inet6' (string representation)
+ print int(af) # => 10 (numeric representation)
+ print repr(af) # => AddressFamily('inet6')
+ """
+ def __init__(self, family=socket.AF_UNSPEC):
+ if isinstance(family, str):
+ family = capi.nl_str2af(family)
+ if family < 0:
+ raise ValueError('Unknown family name')
+ elif not isinstance(family, int):
+ raise TypeError()
- self._family = family
+ self._family = family
- def __str__(self):
- return capi.nl_af2str(self._family, 32)[0]
+ def __str__(self):
+ return capi.nl_af2str(self._family, 32)[0]
- def __int__(self):
- return self._family
+ def __int__(self):
+ return self._family
- def __repr__(self):
- return 'AddressFamily({0!r})'.format(str(self))
+ def __repr__(self):
+ return 'AddressFamily({0!r})'.format(str(self))
###########################################################################
# Abstract Address
class AbstractAddress(object):
- """Abstract address object
-
- addr = AbstractAddress('127.0.0.1/8')
- print addr # => '127.0.0.1/8'
- print addr.prefixlen # => '8'
- print addr.family # => 'inet'
- print len(addr) # => '4' (32bit ipv4 address)
-
- a = AbstractAddress('10.0.0.1/24')
- b = AbstractAddress('10.0.0.2/24')
- print a == b # => False
+ """Abstract address object
+
+ addr = AbstractAddress('127.0.0.1/8')
+ print addr # => '127.0.0.1/8'
+ print addr.prefixlen # => '8'
+ print addr.family # => 'inet'
+ print len(addr) # => '4' (32bit ipv4 address)
+
+ a = AbstractAddress('10.0.0.1/24')
+ b = AbstractAddress('10.0.0.2/24')
+ print a == b # => False
+
+
+ """
+ def __init__(self, addr):
+ self._nl_addr = None
+
+ if isinstance(addr, str):
+ addr = capi.addr_parse(addr, socket.AF_UNSPEC)
+ if addr is None:
+ raise ValueError('Invalid address format')
+ elif addr:
+ capi.nl_addr_get(addr)
+
+ self._nl_addr = addr
+
+ def __del__(self):
+ if self._nl_addr:
+ capi.nl_addr_put(self._nl_addr)
+
+ def __cmp__(self, other):
+ if isinstance(other, str):
+ other = AbstractAddress(other)
+
+ diff = self.prefixlen - other.prefixlen
+ if diff == 0:
+ diff = capi.nl_addr_cmp(self._nl_addr, other._nl_addr)
+
+ return diff
+
+ def contains(self, item):
+ diff = int(self.family) - int(item.family)
+ if diff:
+ return False
+
+ if item.prefixlen < self.prefixlen:
+ return False
+
+ diff = capi.nl_addr_cmp_prefix(self._nl_addr, item._nl_addr)
+ return diff == 0
+
+ def __nonzero__(self):
+ if self._nl_addr:
+ return not capi.nl_addr_iszero(self._nl_addr)
+ else:
+ return False
+
+ def __len__(self):
+ if self._nl_addr:
+ return capi.nl_addr_get_len(self._nl_addr)
+ else:
+ return 0
+
+ def __str__(self):
+ if self._nl_addr:
+ return capi.nl_addr2str(self._nl_addr, 64)[0]
+ else:
+ return "none"
+
+ @property
+ def shared(self):
+ """True if address is shared (multiple users)"""
+ if self._nl_addr:
+ return capi.nl_addr_shared(self._nl_addr) != 0
+ else:
+ return False
+
+ @property
+ def prefixlen(self):
+ """Length of prefix (number of bits)"""
+ if self._nl_addr:
+ return capi.nl_addr_get_prefixlen(self._nl_addr)
+ else:
+ return 0
+
+ @prefixlen.setter
+ def prefixlen(self, value):
+ if not self._nl_addr:
+ raise TypeError()
+
+ capi.nl_addr_set_prefixlen(self._nl_addr, int(value))
+
+ @property
+ def family(self):
+ """Address family"""
+ f = 0
+ if self._nl_addr:
+ f = capi.nl_addr_get_family(self._nl_addr)
+
+ return AddressFamily(f)
+
+ @family.setter
+ def family(self, value):
+ if not self._nl_addr:
+ raise TypeError()
-
- """
- def __init__(self, addr):
- self._nl_addr = None
-
- if isinstance(addr, str):
- addr = capi.addr_parse(addr, socket.AF_UNSPEC)
- if addr is None:
- raise ValueError('Invalid address format')
- elif addr:
- capi.nl_addr_get(addr)
-
- self._nl_addr = addr
-
- def __del__(self):
- if self._nl_addr:
- capi.nl_addr_put(self._nl_addr)
-
- def __cmp__(self, other):
- if isinstance(other, str):
- other = AbstractAddress(other)
-
- diff = self.prefixlen - other.prefixlen
- if diff == 0:
- diff = capi.nl_addr_cmp(self._nl_addr, other._nl_addr)
-
- return diff
-
- def contains(self, item):
- diff = int(self.family) - int(item.family)
- if diff:
- return False
-
- if item.prefixlen < self.prefixlen:
- return False
-
- diff = capi.nl_addr_cmp_prefix(self._nl_addr, item._nl_addr)
- return diff == 0
-
- def __nonzero__(self):
- if self._nl_addr:
- return not capi.nl_addr_iszero(self._nl_addr)
- else:
- return False
-
- def __len__(self):
- if self._nl_addr:
- return capi.nl_addr_get_len(self._nl_addr)
- else:
- return 0
-
- def __str__(self):
- if self._nl_addr:
- return capi.nl_addr2str(self._nl_addr, 64)[0]
- else:
- return "none"
-
- @property
- def shared(self):
- """True if address is shared (multiple users)"""
- if self._nl_addr:
- return capi.nl_addr_shared(self._nl_addr) != 0
- else:
- return False
-
- @property
- def prefixlen(self):
- """Length of prefix (number of bits)"""
- if self._nl_addr:
- return capi.nl_addr_get_prefixlen(self._nl_addr)
- else:
- return 0
-
- @prefixlen.setter
- def prefixlen(self, value):
- if not self._nl_addr:
- raise TypeError()
-
- capi.nl_addr_set_prefixlen(self._nl_addr, int(value))
-
- @property
- def family(self):
- """Address family"""
- f = 0
- if self._nl_addr:
- f = capi.nl_addr_get_family(self._nl_addr)
-
- return AddressFamily(f)
-
- @family.setter
- def family(self, value):
- if not self._nl_addr:
- raise TypeError()
-
- if not isinstance(value, AddressFamily):
- value = AddressFamily(value)
-
- capi.nl_addr_set_family(self._nl_addr, int(value))
+ if not isinstance(value, AddressFamily):
+ value = AddressFamily(value)
+
+ capi.nl_addr_set_family(self._nl_addr, int(value))
# global dictionay for all object attributes
attrs = {}
def add_attr(name, **kwds):
- attrs[name] = {}
- for k in kwds:
- attrs[name][k] = kwds[k]
+ attrs[name] = {}
+ for k in kwds:
+ attrs[name][k] = kwds[k]
def nlattr(name, **kwds):
- """netlink object attribute decorator
+ """netlink object attribute decorator
- decorator used to mark mutable and immutable properties
- of netlink objects. All properties marked as such are
- regarded to be accessable.
+ decorator used to mark mutable and immutable properties
+ of netlink objects. All properties marked as such are
+ regarded to be accessable.
- @netlink.nlattr('my_type.my_attr', type=int)
- @property
- def my_attr(self):
- return self._my_attr
+ @netlink.nlattr('my_type.my_attr', type=int)
+ @property
+ def my_attr(self):
+ return self._my_attr
- """
+ """
- attrs[name] = {}
- for k in kwds:
- attrs[name][k] = kwds[k]
+ attrs[name] = {}
+ for k in kwds:
+ attrs[name][k] = kwds[k]
- def wrap_fn(func):
- return func
+ def wrap_fn(func):
+ return func
- return wrap_fn
+ return wrap_fn
__version__ = "1.0"
__all__ = [
- 'AddressCache',
- 'Address']
+ 'AddressCache',
+ 'Address']
import datetime
from .. import core as netlink
###########################################################################
# Address Cache
class AddressCache(netlink.Cache):
- """Cache containing network addresses"""
+ """Cache containing network addresses"""
- def __init__(self, cache=None):
- if not cache:
- cache = self._alloc_cache_name("route/addr")
+ def __init__(self, cache=None):
+ if not cache:
+ cache = self._alloc_cache_name("route/addr")
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
+ self._protocol = netlink.NETLINK_ROUTE
+ self._nl_cache = cache
- def __getitem__(self, key):
- # Using ifindex=0 here implies that the local address itself
- # is unique, otherwise the first occurence is returned.
- return self.lookup(0, key)
+ def __getitem__(self, key):
+ # Using ifindex=0 here implies that the local address itself
+ # is unique, otherwise the first occurence is returned.
+ return self.lookup(0, key)
- def lookup(self, ifindex, local):
- if type(local) is str:
- local = netlink.AbstractAddress(local)
+ def lookup(self, ifindex, local):
+ if type(local) is str:
+ local = netlink.AbstractAddress(local)
- addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
- local._nl_addr)
- if addr is None:
- raise KeyError()
+ addr = capi.rtnl_addr_get(self._nl_cache, ifindex,
+ local._nl_addr)
+ if addr is None:
+ raise KeyError()
- return Address._from_capi(addr)
+ return Address._from_capi(addr)
- def _new_object(self, obj):
- return Address(obj)
+ def _new_object(self, obj):
+ return Address(obj)
- def _new_cache(self, cache):
- return AddressCache(cache=cache)
+ def _new_cache(self, cache):
+ return AddressCache(cache=cache)
###########################################################################
# Address Object
class Address(netlink.Object):
- """Network address"""
-
- def __init__(self, obj=None):
- netlink.Object.__init__(self, "route/addr", "address", obj)
- self._rtnl_addr = self._obj2type(self._nl_object)
-
- @classmethod
- def _from_capi(cls, obj):
- return cls(capi.addr2obj(obj))
-
- def _obj2type(self, obj):
- return capi.obj2addr(obj)
-
- def __cmp__(self, other):
- # sort by:
- # 1. network link
- # 2. address family
- # 3. local address (including prefixlen)
- diff = self.ifindex - other.ifindex
-
- if diff == 0:
- diff = self.family - other.family
- if diff == 0:
- diff = capi.nl_addr_cmp(self.local, other.local)
-
- return diff
-
- def _new_instance(self, obj):
- return Address(obj)
-
- #####################################################################
- # ifindex
- @netlink.nlattr('address.ifindex', type=int, immutable=True,
- fmt=util.num)
- @property
- def ifindex(self):
- """interface index"""
- return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
-
- @ifindex.setter
- def ifindex(self, value):
- link = Link.resolve(value)
- if not link:
- raise ValueError()
-
- self.link = link
-
- #####################################################################
- # link
- @netlink.nlattr('address.link', type=str, fmt=util.string)
- @property
- def link(self):
- link = capi.rtnl_addr_get_link(self._rtnl_addr)
- if not link:
- return None
-
- return Link.Link.from_capi(link)
-
- @link.setter
- def link(self, value):
- if type(value) is str:
- try:
- value = Link.resolve(value)
- except KeyError:
- raise ValueError()
-
- capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
-
- # ifindex is immutable but we assume that if _orig does not
- # have an ifindex specified, it was meant to be given here
- if capi.rtnl_addr_get_ifindex(self._orig) == 0:
- capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
-
- #####################################################################
- # label
- @netlink.nlattr('address.label', type=str, fmt=util.string)
- @property
- def label(self):
- """address label"""
- return capi.rtnl_addr_get_label(self._rtnl_addr)
-
- @label.setter
- def label(self, value):
- capi.rtnl_addr_set_label(self._rtnl_addr, value)
-
- #####################################################################
- # flags
- @netlink.nlattr('address.flags', type=str, fmt=util.string)
- @property
- def flags(self):
- """Flags
-
- Setting this property will *Not* reset flags to value you supply in
-
- Examples:
- addr.flags = '+xxx' # add xxx flag
- addr.flags = 'xxx' # exactly the same
- addr.flags = '-xxx' # remove xxx flag
- addr.flags = [ '+xxx', '-yyy' ] # list operation
- """
- flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
- return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')
-
- def _set_flag(self, flag):
- if flag.startswith('-'):
- i = capi.rtnl_addr_str2flags(flag[1:])
- capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
- elif flag.startswith('+'):
- i = capi.rtnl_addr_str2flags(flag[1:])
- capi.rtnl_addr_set_flags(self._rtnl_addr, i)
- else:
- i = capi.rtnl_addr_str2flags(flag)
- capi.rtnl_addr_set_flags(self._rtnl_addr, i)
-
- @flags.setter
- def flags(self, value):
- if type(value) is list:
- for flag in value:
- self._set_flag(flag)
- else:
- self._set_flag(value)
-
- #####################################################################
- # family
- @netlink.nlattr('address.family', type=int, immutable=True,
- fmt=util.num)
- @property
- def family(self):
- """Address family"""
- fam = capi.rtnl_addr_get_family(self._rtnl_addr)
- return netlink.AddressFamily(fam)
-
- @family.setter
- def family(self, value):
- if not isinstance(value, AddressFamily):
- value = AddressFamily(value)
-
- capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
-
- #####################################################################
- # scope
- @netlink.nlattr('address.scope', type=int, fmt=util.num)
- @property
- def scope(self):
- """Address scope"""
- scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
- return capi.rtnl_scope2str(scope, 32)[0]
-
- @scope.setter
- def scope(self, value):
- if type(value) is str:
- value = capi.rtnl_str2scope(value)
- capi.rtnl_addr_set_scope(self._rtnl_addr, value)
-
- #####################################################################
- # local address
- @netlink.nlattr('address.local', type=str, immutable=True,
- fmt=util.addr)
- @property
- def local(self):
- """Local address"""
- a = capi.rtnl_addr_get_local(self._rtnl_addr)
- return netlink.AbstractAddress(a)
-
- @local.setter
- def local(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
-
- # local is immutable but we assume that if _orig does not
- # have a local address specified, it was meant to be given here
- if capi.rtnl_addr_get_local(self._orig) is None:
- capi.rtnl_addr_set_local(self._orig, a._nl_addr)
-
- #####################################################################
- # Peer address
- @netlink.nlattr('address.peer', type=str, fmt=util.addr)
- @property
- def peer(self):
- """Peer address"""
- a = capi.rtnl_addr_get_peer(self._rtnl_addr)
- return netlink.AbstractAddress(a)
-
- @peer.setter
- def peer(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
-
- #####################################################################
- # Broadcast address
- @netlink.nlattr('address.broadcast', type=str, fmt=util.addr)
- @property
- def broadcast(self):
- """Broadcast address"""
- a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
- return netlink.AbstractAddress(a)
-
- @broadcast.setter
- def broadcast(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
-
- #####################################################################
- # Multicast address
- @netlink.nlattr('address.multicast', type=str, fmt=util.addr)
- @property
- def multicast(self):
- """multicast address"""
- a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
- return netlink.AbstractAddress(a)
-
- @multicast.setter
- def multicast(self, value):
- try:
- a = netlink.AbstractAddress(value)
- except ValueError as err:
- raise AttributeError('multicast', err)
-
- capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
-
- #####################################################################
- # Anycast address
- @netlink.nlattr('address.anycast', type=str, fmt=util.addr)
- @property
- def anycast(self):
- """anycast address"""
- a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
- return netlink.AbstractAddress(a)
-
- @anycast.setter
- def anycast(self, value):
- a = netlink.AbstractAddress(value)
- capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
-
- #####################################################################
- # Valid lifetime
- @netlink.nlattr('address.valid_lifetime', type=int, immutable=True,
- fmt=util.num)
- @property
- def valid_lifetime(self):
- """Valid lifetime"""
- msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
- if msecs == 0xFFFFFFFF:
- return None
- else:
- return datetime.timedelta(seconds=msecs)
-
- @valid_lifetime.setter
- def valid_lifetime(self, value):
- capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
-
- #####################################################################
- # Preferred lifetime
- @netlink.nlattr('address.preferred_lifetime', type=int,
- immutable=True, fmt=util.num)
- @property
- def preferred_lifetime(self):
- """Preferred lifetime"""
- msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
- if msecs == 0xFFFFFFFF:
- return None
- else:
- return datetime.timedelta(seconds=msecs)
-
- @preferred_lifetime.setter
- def preferred_lifetime(self, value):
- capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
-
- #####################################################################
- # Creation Time
- @netlink.nlattr('address.create_time', type=int, immutable=True,
- fmt=util.num)
- @property
- def create_time(self):
- """Creation time"""
- hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
- return datetime.timedelta(milliseconds=10*hsec)
-
- #####################################################################
- # Last Update
- @netlink.nlattr('address.last_update', type=int, immutable=True,
- fmt=util.num)
- @property
- def last_update(self):
- """Last update"""
- hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
- return datetime.timedelta(milliseconds=10*hsec)
-
- #####################################################################
- # add()
- def add(self, socket=None, flags=None):
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
-
- if not flags:
- flags = netlink.NLM_F_CREATE
-
- ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
-
- #####################################################################
- # delete()
- def delete(self, socket, flags=0):
- """Attempt to delete this address in the kernel"""
- ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
-
- ###################################################################
- # private properties
- #
- # Used for formatting output. USE AT OWN RISK
- @property
- def _flags(self):
- return ','.join(self.flags)
-
- ###################################################################
- #
- # format(details=False, stats=False)
- #
- def format(self, details=False, stats=False, nodev=False, indent=''):
- """Return address as formatted text"""
- fmt = util.MyFormatter(self, indent)
-
- buf = fmt.format('{a|local!b}')
-
- if not nodev:
- buf += fmt.format(' {a|ifindex}')
-
- buf += fmt.format(' {a|scope}')
-
- if self.label:
- buf += fmt.format(' "{a|label}"')
-
- buf += fmt.format(' <{a|_flags}>')
-
- if details:
- buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
- + fmt.nl('\t{t|peer} {t|anycast}')
-
- if self.valid_lifetime:
- buf += fmt.nl('\t{s|valid-lifetime!k} '\
- '{a|valid_lifetime}')
-
- if self.preferred_lifetime:
- buf += fmt.nl('\t{s|preferred-lifetime!k} '\
- '{a|preferred_lifetime}')
-
- if stats and (self.create_time or self.last_update):
- buf += self.nl('\t{s|created!k} {a|create_time}'\
- ' {s|last-updated!k} {a|last_update}')
-
- return buf
+ """Network address"""
+
+ def __init__(self, obj=None):
+ netlink.Object.__init__(self, "route/addr", "address", obj)
+ self._rtnl_addr = self._obj2type(self._nl_object)
+
+ @classmethod
+ def _from_capi(cls, obj):
+ return cls(capi.addr2obj(obj))
+
+ def _obj2type(self, obj):
+ return capi.obj2addr(obj)
+
+ def __cmp__(self, other):
+ # sort by:
+ # 1. network link
+ # 2. address family
+ # 3. local address (including prefixlen)
+ diff = self.ifindex - other.ifindex
+
+ if diff == 0:
+ diff = self.family - other.family
+ if diff == 0:
+ diff = capi.nl_addr_cmp(self.local, other.local)
+
+ return diff
+
+ def _new_instance(self, obj):
+ return Address(obj)
+
+ #####################################################################
+ # ifindex
+ @netlink.nlattr('address.ifindex', type=int, immutable=True,
+ fmt=util.num)
+ @property
+ def ifindex(self):
+ """interface index"""
+ return capi.rtnl_addr_get_ifindex(self._rtnl_addr)
+
+ @ifindex.setter
+ def ifindex(self, value):
+ link = Link.resolve(value)
+ if not link:
+ raise ValueError()
+
+ self.link = link
+
+ #####################################################################
+ # link
+ @netlink.nlattr('address.link', type=str, fmt=util.string)
+ @property
+ def link(self):
+ link = capi.rtnl_addr_get_link(self._rtnl_addr)
+ if not link:
+ return None
+
+ return Link.Link.from_capi(link)
+
+ @link.setter
+ def link(self, value):
+ if type(value) is str:
+ try:
+ value = Link.resolve(value)
+ except KeyError:
+ raise ValueError()
+
+ capi.rtnl_addr_set_link(self._rtnl_addr, value._rtnl_link)
+
+ # ifindex is immutable but we assume that if _orig does not
+ # have an ifindex specified, it was meant to be given here
+ if capi.rtnl_addr_get_ifindex(self._orig) == 0:
+ capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
+
+ #####################################################################
+ # label
+ @netlink.nlattr('address.label', type=str, fmt=util.string)
+ @property
+ def label(self):
+ """address label"""
+ return capi.rtnl_addr_get_label(self._rtnl_addr)
+
+ @label.setter
+ def label(self, value):
+ capi.rtnl_addr_set_label(self._rtnl_addr, value)
+
+ #####################################################################
+ # flags
+ @netlink.nlattr('address.flags', type=str, fmt=util.string)
+ @property
+ def flags(self):
+ """Flags
+
+ Setting this property will *Not* reset flags to value you supply in
+
+ Examples:
+ addr.flags = '+xxx' # add xxx flag
+ addr.flags = 'xxx' # exactly the same
+ addr.flags = '-xxx' # remove xxx flag
+ addr.flags = [ '+xxx', '-yyy' ] # list operation
+ """
+ flags = capi.rtnl_addr_get_flags(self._rtnl_addr)
+ return capi.rtnl_addr_flags2str(flags, 256)[0].split(',')
+
+ def _set_flag(self, flag):
+ if flag.startswith('-'):
+ i = capi.rtnl_addr_str2flags(flag[1:])
+ capi.rtnl_addr_unset_flags(self._rtnl_addr, i)
+ elif flag.startswith('+'):
+ i = capi.rtnl_addr_str2flags(flag[1:])
+ capi.rtnl_addr_set_flags(self._rtnl_addr, i)
+ else:
+ i = capi.rtnl_addr_str2flags(flag)
+ capi.rtnl_addr_set_flags(self._rtnl_addr, i)
+
+ @flags.setter
+ def flags(self, value):
+ if type(value) is list:
+ for flag in value:
+ self._set_flag(flag)
+ else:
+ self._set_flag(value)
+
+ #####################################################################
+ # family
+ @netlink.nlattr('address.family', type=int, immutable=True,
+ fmt=util.num)
+ @property
+ def family(self):
+ """Address family"""
+ fam = capi.rtnl_addr_get_family(self._rtnl_addr)
+ return netlink.AddressFamily(fam)
+
+ @family.setter
+ def family(self, value):
+ if not isinstance(value, AddressFamily):
+ value = AddressFamily(value)
+
+ capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
+
+ #####################################################################
+ # scope
+ @netlink.nlattr('address.scope', type=int, fmt=util.num)
+ @property
+ def scope(self):
+ """Address scope"""
+ scope = capi.rtnl_addr_get_scope(self._rtnl_addr)
+ return capi.rtnl_scope2str(scope, 32)[0]
+
+ @scope.setter
+ def scope(self, value):
+ if type(value) is str:
+ value = capi.rtnl_str2scope(value)
+ capi.rtnl_addr_set_scope(self._rtnl_addr, value)
+
+ #####################################################################
+ # local address
+ @netlink.nlattr('address.local', type=str, immutable=True,
+ fmt=util.addr)
+ @property
+ def local(self):
+ """Local address"""
+ a = capi.rtnl_addr_get_local(self._rtnl_addr)
+ return netlink.AbstractAddress(a)
+
+ @local.setter
+ def local(self, value):
+ a = netlink.AbstractAddress(value)
+ capi.rtnl_addr_set_local(self._rtnl_addr, a._nl_addr)
+
+ # local is immutable but we assume that if _orig does not
+ # have a local address specified, it was meant to be given here
+ if capi.rtnl_addr_get_local(self._orig) is None:
+ capi.rtnl_addr_set_local(self._orig, a._nl_addr)
+
+ #####################################################################
+ # Peer address
+ @netlink.nlattr('address.peer', type=str, fmt=util.addr)
+ @property
+ def peer(self):
+ """Peer address"""
+ a = capi.rtnl_addr_get_peer(self._rtnl_addr)
+ return netlink.AbstractAddress(a)
+
+ @peer.setter
+ def peer(self, value):
+ a = netlink.AbstractAddress(value)
+ capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
+
+ #####################################################################
+ # Broadcast address
+ @netlink.nlattr('address.broadcast', type=str, fmt=util.addr)
+ @property
+ def broadcast(self):
+ """Broadcast address"""
+ a = capi.rtnl_addr_get_broadcast(self._rtnl_addr)
+ return netlink.AbstractAddress(a)
+
+ @broadcast.setter
+ def broadcast(self, value):
+ a = netlink.AbstractAddress(value)
+ capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
+
+ #####################################################################
+ # Multicast address
+ @netlink.nlattr('address.multicast', type=str, fmt=util.addr)
+ @property
+ def multicast(self):
+ """multicast address"""
+ a = capi.rtnl_addr_get_multicast(self._rtnl_addr)
+ return netlink.AbstractAddress(a)
+
+ @multicast.setter
+ def multicast(self, value):
+ try:
+ a = netlink.AbstractAddress(value)
+ except ValueError as err:
+ raise AttributeError('multicast', err)
+
+ capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
+
+ #####################################################################
+ # Anycast address
+ @netlink.nlattr('address.anycast', type=str, fmt=util.addr)
+ @property
+ def anycast(self):
+ """anycast address"""
+ a = capi.rtnl_addr_get_anycast(self._rtnl_addr)
+ return netlink.AbstractAddress(a)
+
+ @anycast.setter
+ def anycast(self, value):
+ a = netlink.AbstractAddress(value)
+ capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
+
+ #####################################################################
+ # Valid lifetime
+ @netlink.nlattr('address.valid_lifetime', type=int, immutable=True,
+ fmt=util.num)
+ @property
+ def valid_lifetime(self):
+ """Valid lifetime"""
+ msecs = capi.rtnl_addr_get_valid_lifetime(self._rtnl_addr)
+ if msecs == 0xFFFFFFFF:
+ return None
+ else:
+ return datetime.timedelta(seconds=msecs)
+
+ @valid_lifetime.setter
+ def valid_lifetime(self, value):
+ capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
+
+ #####################################################################
+ # Preferred lifetime
+ @netlink.nlattr('address.preferred_lifetime', type=int,
+ immutable=True, fmt=util.num)
+ @property
+ def preferred_lifetime(self):
+ """Preferred lifetime"""
+ msecs = capi.rtnl_addr_get_preferred_lifetime(self._rtnl_addr)
+ if msecs == 0xFFFFFFFF:
+ return None
+ else:
+ return datetime.timedelta(seconds=msecs)
+
+ @preferred_lifetime.setter
+ def preferred_lifetime(self, value):
+ capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
+
+ #####################################################################
+ # Creation Time
+ @netlink.nlattr('address.create_time', type=int, immutable=True,
+ fmt=util.num)
+ @property
+ def create_time(self):
+ """Creation time"""
+ hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
+ return datetime.timedelta(milliseconds=10*hsec)
+
+ #####################################################################
+ # Last Update
+ @netlink.nlattr('address.last_update', type=int, immutable=True,
+ fmt=util.num)
+ @property
+ def last_update(self):
+ """Last update"""
+ hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
+ return datetime.timedelta(milliseconds=10*hsec)
+
+ #####################################################################
+ # add()
+ def add(self, socket=None, flags=None):
+ if not socket:
+ socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+
+ if not flags:
+ flags = netlink.NLM_F_CREATE
+
+ ret = capi.rtnl_addr_add(socket._sock, self._rtnl_addr, flags)
+ if ret < 0:
+ raise netlink.KernelError(ret)
+
+ #####################################################################
+ # delete()
+ def delete(self, socket, flags=0):
+ """Attempt to delete this address in the kernel"""
+ ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
+ if ret < 0:
+ raise netlink.KernelError(ret)
+
+ ###################################################################
+ # private properties
+ #
+ # Used for formatting output. USE AT OWN RISK
+ @property
+ def _flags(self):
+ return ','.join(self.flags)
+
+ ###################################################################
+ #
+ # format(details=False, stats=False)
+ #
+ def format(self, details=False, stats=False, nodev=False, indent=''):
+ """Return address as formatted text"""
+ fmt = util.MyFormatter(self, indent)
+
+ buf = fmt.format('{a|local!b}')
+
+ if not nodev:
+ buf += fmt.format(' {a|ifindex}')
+
+ buf += fmt.format(' {a|scope}')
+
+ if self.label:
+ buf += fmt.format(' "{a|label}"')
+
+ buf += fmt.format(' <{a|_flags}>')
+
+ if details:
+ buf += fmt.nl('\t{t|broadcast} {t|multicast}') \
+ + fmt.nl('\t{t|peer} {t|anycast}')
+
+ if self.valid_lifetime:
+ buf += fmt.nl('\t{s|valid-lifetime!k} '\
+ '{a|valid_lifetime}')
+
+ if self.preferred_lifetime:
+ buf += fmt.nl('\t{s|preferred-lifetime!k} '\
+ '{a|preferred_lifetime}')
+
+ if stats and (self.create_time or self.last_update):
+ buf += self.nl('\t{s|created!k} {a|create_time}'\
+ ' {s|last-updated!k} {a|last_update}')
+
+ return buf
modify them and to add and delete virtual network links.
The following is a basic example:
- import netlink.core as netlink
- import netlink.route.link as link
+ import netlink.core as netlink
+ import netlink.route.link as link
- sock = netlink.Socket()
- sock.connect(netlink.NETLINK_ROUTE)
+ sock = netlink.Socket()
+ sock.connect(netlink.NETLINK_ROUTE)
- cache = link.LinkCache() # create new empty link cache
- cache.refill(sock) # fill cache with all configured links
- eth0 = cache['eth0'] # lookup link "eth0"
- print eth0 # print basic configuration
+ cache = link.LinkCache() # create new empty link cache
+ cache.refill(sock) # fill cache with all configured links
+ eth0 = cache['eth0'] # lookup link "eth0"
+ print eth0 # print basic configuration
The module contains the following public classes:
- Link -- Represents a network link. Instances can be created directly
- via the constructor (empty link objects) or via the refill()
- method of a LinkCache.
+ via the constructor (empty link objects) or via the refill()
+ method of a LinkCache.
- LinkCache -- Derived from netlink.Cache, holds any number of
- network links (Link instances). Main purpose is to keep
- a local list of all network links configured in the
- kernel.
+ network links (Link instances). Main purpose is to keep
+ a local list of all network links configured in the
+ kernel.
The following public functions exist:
- get_from_kernel(socket, name)
__version__ = "0.1"
__all__ = [
- 'LinkCache',
- 'Link',
- 'get_from_kernel']
+ 'LinkCache',
+ 'Link',
+ 'get_from_kernel']
import socket
import sys
###########################################################################
# Link Cache
class LinkCache(netlink.Cache):
- """Cache of network links"""
+ """Cache of network links"""
- def __init__(self, family=socket.AF_UNSPEC, cache=None):
- if not cache:
- cache = self._alloc_cache_name("route/link")
+ def __init__(self, family=socket.AF_UNSPEC, cache=None):
+ if not cache:
+ cache = self._alloc_cache_name("route/link")
- self._info_module = None
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
- self._set_arg1(family)
+ self._info_module = None
+ self._protocol = netlink.NETLINK_ROUTE
+ self._nl_cache = cache
+ self._set_arg1(family)
- def __getitem__(self, key):
- if type(key) is int:
- link = capi.rtnl_link_get(self._nl_cache, key)
- else:
- link = capi.rtnl_link_get_by_name(self._nl_cache, key)
+ def __getitem__(self, key):
+ if type(key) is int:
+ link = capi.rtnl_link_get(self._nl_cache, key)
+ else:
+ link = capi.rtnl_link_get_by_name(self._nl_cache, key)
- if link is None:
- raise KeyError()
- else:
- return Link.from_capi(link)
+ if link is None:
+ raise KeyError()
+ else:
+ return Link.from_capi(link)
- def _new_object(self, obj):
- return Link(obj)
+ def _new_object(self, obj):
+ return Link(obj)
- def _new_cache(self, cache):
- return LinkCache(family=self.arg1, cache=cache)
+ def _new_cache(self, cache):
+ return LinkCache(family=self.arg1, cache=cache)
###########################################################################
# Link Object
class Link(netlink.Object):
- """Network link"""
-
- def __init__(self, obj=None):
- netlink.Object.__init__(self, "route/link", "link", obj)
- self._rtnl_link = self._obj2type(self._nl_object)
-
- if self.type:
- self._module_lookup('netlink.route.links.' + self.type)
-
- self.inet = inet.InetLink(self)
- self.af = {'inet' : self.inet }
-
- @classmethod
- def from_capi(cls, obj):
- return cls(capi.link2obj(obj))
-
- def _obj2type(self, obj):
- return capi.obj2link(obj)
-
- def __cmp__(self, other):
- return self.ifindex - other.ifindex
-
- def _new_instance(self, obj):
- if not obj:
- raise ValueError()
-
- return Link(obj)
-
- #####################################################################
- # ifindex
- @netlink.nlattr('link.ifindex', type=int, immutable=True, fmt=util.num)
- @property
- def ifindex(self):
- """interface index"""
- return capi.rtnl_link_get_ifindex(self._rtnl_link)
-
- @ifindex.setter
- def ifindex(self, value):
- capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
-
- # ifindex is immutable but we assume that if _orig does not
- # have an ifindex specified, it was meant to be given here
- if capi.rtnl_link_get_ifindex(self._orig) == 0:
- capi.rtnl_link_set_ifindex(self._orig, int(value))
-
- #####################################################################
- # name
- @netlink.nlattr('link.name', type=str, fmt=util.bold)
- @property
- def name(self):
- """Name of link"""
- return capi.rtnl_link_get_name(self._rtnl_link)
-
- @name.setter
- def name(self, value):
- capi.rtnl_link_set_name(self._rtnl_link, value)
-
- # name is the secondary identifier, if _orig does not have
- # the name specified yet, assume it was meant to be specified
- # here. ifindex will always take priority, therefore if ifindex
- # is specified as well, this will be ignored automatically.
- if capi.rtnl_link_get_name(self._orig) is None:
- capi.rtnl_link_set_name(self._orig, value)
-
- #####################################################################
- # flags
- @netlink.nlattr('link.flags', type=str, fmt=util.string)
- @property
- def flags(self):
- """Flags
- Setting this property will *Not* reset flags to value you supply in
- Examples:
- link.flags = '+xxx' # add xxx flag
- link.flags = 'xxx' # exactly the same
- link.flags = '-xxx' # remove xxx flag
- link.flags = [ '+xxx', '-yyy' ] # list operation
- """
- flags = capi.rtnl_link_get_flags(self._rtnl_link)
- return capi.rtnl_link_flags2str(flags, 256)[0].split(',')
-
- def _set_flag(self, flag):
- if flag.startswith('-'):
- i = capi.rtnl_link_str2flags(flag[1:])
- capi.rtnl_link_unset_flags(self._rtnl_link, i)
- elif flag.startswith('+'):
- i = capi.rtnl_link_str2flags(flag[1:])
- capi.rtnl_link_set_flags(self._rtnl_link, i)
- else:
- i = capi.rtnl_link_str2flags(flag)
- capi.rtnl_link_set_flags(self._rtnl_link, i)
-
- @flags.setter
- def flags(self, value):
- if not (type(value) is str):
- for flag in value:
- self._set_flag(flag)
- else:
- self._set_flag(value)
-
- #####################################################################
- # mtu
- @netlink.nlattr('link.mtu', type=int, fmt=util.num)
- @property
- def mtu(self):
- """Maximum Transmission Unit"""
- return capi.rtnl_link_get_mtu(self._rtnl_link)
-
- @mtu.setter
- def mtu(self, value):
- capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
-
- #####################################################################
- # family
- @netlink.nlattr('link.family', type=int, immutable=True, fmt=util.num)
- @property
- def family(self):
- """Address family"""
- return capi.rtnl_link_get_family(self._rtnl_link)
-
- @family.setter
- def family(self, value):
- capi.rtnl_link_set_family(self._rtnl_link, value)
-
- #####################################################################
- # address
- @netlink.nlattr('link.address', type=str, fmt=util.addr)
- @property
- def address(self):
- """Hardware address (MAC address)"""
- a = capi.rtnl_link_get_addr(self._rtnl_link)
- return netlink.AbstractAddress(a)
-
- @address.setter
- def address(self, value):
- capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
-
- #####################################################################
- # broadcast
- @netlink.nlattr('link.broadcast', type=str, fmt=util.addr)
- @property
- def broadcast(self):
- """Hardware broadcast address"""
- a = capi.rtnl_link_get_broadcast(self._rtnl_link)
- return netlink.AbstractAddress(a)
-
- @broadcast.setter
- def broadcast(self, value):
- capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
-
- #####################################################################
- # qdisc
- @netlink.nlattr('link.qdisc', type=str, immutable=True, fmt=util.string)
- @property
- def qdisc(self):
- """Name of qdisc (cannot be changed)"""
- return capi.rtnl_link_get_qdisc(self._rtnl_link)
-
- @qdisc.setter
- def qdisc(self, value):
- capi.rtnl_link_set_qdisc(self._rtnl_link, value)
-
- #####################################################################
- # txqlen
- @netlink.nlattr('link.txqlen', type=int, fmt=util.num)
- @property
- def txqlen(self):
- """Length of transmit queue"""
- return capi.rtnl_link_get_txqlen(self._rtnl_link)
-
- @txqlen.setter
- def txqlen(self, value):
- capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
-
- #####################################################################
- # weight
- @netlink.nlattr('link.weight', type=str, fmt=util.string)
- @property
- def weight(self):
- """Weight"""
- v = capi.rtnl_link_get_weight(self._rtnl_link)
- if v == 4294967295:
- return 'max'
- else:
- return str(v)
-
- @weight.setter
- def weight(self, value):
- if value == 'max':
- v = 4294967295
- else:
- v = int(value)
- capi.rtnl_link_set_weight(self._rtnl_link, v)
-
- #####################################################################
- # arptype
- @netlink.nlattr('link.arptype', type=str, immutable=True, fmt=util.string)
- @property
- def arptype(self):
- """Type of link (cannot be changed)"""
- type = capi.rtnl_link_get_arptype(self._rtnl_link)
- return core_capi.nl_llproto2str(type, 64)[0]
-
- @arptype.setter
- def arptype(self, value):
- i = core_capi.nl_str2llproto(value)
- capi.rtnl_link_set_arptype(self._rtnl_link, i)
-
- #####################################################################
- # operstate
- @netlink.nlattr('link.operstate', type=str, immutable=True,
- fmt=util.string, title='state')
- @property
- def operstate(self):
- """Operational status"""
- operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
- return capi.rtnl_link_operstate2str(operstate, 32)[0]
-
- @operstate.setter
- def operstate(self, value):
- i = capi.rtnl_link_str2operstate(flag)
- capi.rtnl_link_set_operstate(self._rtnl_link, i)
-
- #####################################################################
- # mode
- @netlink.nlattr('link.mode', type=str, immutable=True, fmt=util.string)
- @property
- def mode(self):
- """Link mode"""
- mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
- return capi.rtnl_link_mode2str(mode, 32)[0]
-
- @mode.setter
- def mode(self, value):
- i = capi.rtnl_link_str2mode(flag)
- capi.rtnl_link_set_linkmode(self._rtnl_link, i)
-
- #####################################################################
- # alias
- @netlink.nlattr('link.alias', type=str, fmt=util.string)
- @property
- def alias(self):
- """Interface alias (SNMP)"""
- return capi.rtnl_link_get_ifalias(self._rtnl_link)
-
- @alias.setter
- def alias(self, value):
- capi.rtnl_link_set_ifalias(self._rtnl_link, value)
-
- #####################################################################
- # type
- @netlink.nlattr('link.type', type=str, fmt=util.string)
- @property
- def type(self):
- """Link type"""
- return capi.rtnl_link_get_type(self._rtnl_link)
-
- @type.setter
- def type(self, value):
- if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
- raise NameError("unknown info type")
-
- self._module_lookup('netlink.route.links.' + value)
-
- #####################################################################
- # get_stat()
- def get_stat(self, stat):
- """Retrieve statistical information"""
- if type(stat) is str:
- stat = capi.rtnl_link_str2stat(stat)
- if stat < 0:
- raise NameError("unknown name of statistic")
-
- return capi.rtnl_link_get_stat(self._rtnl_link, stat)
-
- #####################################################################
- # add()
- def add(self, socket=None, flags=None):
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
-
- if not flags:
- flags = netlink.NLM_F_CREATE
-
- ret = capi.rtnl_link_add(socket._sock, self._rtnl_link, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
-
- #####################################################################
- # change()
- def change(self, socket=None, flags=0):
- """Commit changes made to the link object"""
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
-
- if not self._orig:
- raise NetlinkError("Original link not available")
- ret = capi.rtnl_link_change(socket._sock, self._orig, self._rtnl_link, flags)
- if ret < 0:
- raise netlink.KernelError(ret)
-
- #####################################################################
- # delete()
- def delete(self, socket=None):
- """Attempt to delete this link in the kernel"""
- if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
-
- ret = capi.rtnl_link_delete(socket._sock, self._rtnl_link)
- if ret < 0:
- raise netlink.KernelError(ret)
-
- ###################################################################
- # private properties
- #
- # Used for formatting output. USE AT OWN RISK
- @property
- def _state(self):
- if 'up' in self.flags:
- buf = util.good('up')
- if 'lowerup' not in self.flags:
- buf += ' ' + util.bad('no-carrier')
- else:
- buf = util.bad('down')
- return buf
-
- @property
- def _brief(self):
- return self._module_brief() + self._foreach_af('brief')
-
- @property
- def _flags(self):
- ignore = ['up', 'running', 'lowerup']
- return ','.join([flag for flag in self.flags if flag not in ignore])
-
- def _foreach_af(self, name, args=None):
- buf = ''
- for af in self.af:
- try:
- func = getattr(self.af[af], name)
- s = str(func(args))
- if len(s) > 0:
- buf += ' ' + s
- except AttributeError:
- pass
- return buf
-
- ###################################################################
- #
- # format(details=False, stats=False)
- #
- def format(self, details=False, stats=False, indent=''):
- """Return link as formatted text"""
- fmt = util.MyFormatter(self, indent)
-
- buf = fmt.format('{a|ifindex} {a|name} {a|arptype} {a|address} '\
- '{a|_state} <{a|_flags}> {a|_brief}')
-
- if details:
- buf += fmt.nl('\t{t|mtu} {t|txqlen} {t|weight} '\
- '{t|qdisc} {t|operstate}')
- buf += fmt.nl('\t{t|broadcast} {t|alias}')
-
- buf += self._foreach_af('details', fmt)
-
- if stats:
- l = [['Packets', RX_PACKETS, TX_PACKETS],
- ['Bytes', RX_BYTES, TX_BYTES],
- ['Errors', RX_ERRORS, TX_ERRORS],
- ['Dropped', RX_DROPPED, TX_DROPPED],
- ['Compressed', RX_COMPRESSED, TX_COMPRESSED],
- ['FIFO Errors', RX_FIFO_ERR, TX_FIFO_ERR],
- ['Length Errors', RX_LEN_ERR, None],
- ['Over Errors', RX_OVER_ERR, None],
- ['CRC Errors', RX_CRC_ERR, None],
- ['Frame Errors', RX_FRAME_ERR, None],
- ['Missed Errors', RX_MISSED_ERR, None],
- ['Abort Errors', None, TX_ABORT_ERR],
- ['Carrier Errors', None, TX_CARRIER_ERR],
- ['Heartbeat Errors', None, TX_HBEAT_ERR],
- ['Window Errors', None, TX_WIN_ERR],
- ['Collisions', None, COLLISIONS],
- ['Multicast', None, MULTICAST],
- ['', None, None],
- ['Ipv6:', None, None],
- ['Packets', IP6_INPKTS, IP6_OUTPKTS],
- ['Bytes', IP6_INOCTETS, IP6_OUTOCTETS],
- ['Discards', IP6_INDISCARDS, IP6_OUTDISCARDS],
- ['Multicast Packets', IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
- ['Multicast Bytes', IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
- ['Broadcast Packets', IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
- ['Broadcast Bytes', IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
- ['Delivers', IP6_INDELIVERS, None],
- ['Forwarded', None, IP6_OUTFORWDATAGRAMS],
- ['No Routes', IP6_INNOROUTES, IP6_OUTNOROUTES],
- ['Header Errors', IP6_INHDRERRORS, None],
- ['Too Big Errors', IP6_INTOOBIGERRORS, None],
- ['Address Errors', IP6_INADDRERRORS, None],
- ['Unknown Protocol', IP6_INUNKNOWNPROTOS, None],
- ['Truncated Packets', IP6_INTRUNCATEDPKTS, None],
- ['Reasm Timeouts', IP6_REASMTIMEOUT, None],
- ['Reasm Requests', IP6_REASMREQDS, None],
- ['Reasm Failures', IP6_REASMFAILS, None],
- ['Reasm OK', IP6_REASMOKS, None],
- ['Frag Created', None, IP6_FRAGCREATES],
- ['Frag Failures', None, IP6_FRAGFAILS],
- ['Frag OK', None, IP6_FRAGOKS],
- ['', None, None],
- ['ICMPv6:', None, None],
- ['Messages', ICMP6_INMSGS, ICMP6_OUTMSGS],
- ['Errors', ICMP6_INERRORS, ICMP6_OUTERRORS]]
-
- buf += '\n\t%s%s%s%s\n' % (33 * ' ', util.title('RX'),
- 15 * ' ', util.title('TX'))
-
- for row in l:
- row[0] = util.kw(row[0])
- row[1] = self.get_stat(row[1]) if row[1] else ''
- row[2] = self.get_stat(row[2]) if row[2] else ''
- buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
-
- buf += self._foreach_af('stats')
-
- return buf
+ """Network link"""
+
+ def __init__(self, obj=None):
+ netlink.Object.__init__(self, "route/link", "link", obj)
+ self._rtnl_link = self._obj2type(self._nl_object)
+
+ if self.type:
+ self._module_lookup('netlink.route.links.' + self.type)
+
+ self.inet = inet.InetLink(self)
+ self.af = {'inet' : self.inet }
+
+ @classmethod
+ def from_capi(cls, obj):
+ return cls(capi.link2obj(obj))
+
+ def _obj2type(self, obj):
+ return capi.obj2link(obj)
+
+ def __cmp__(self, other):
+ return self.ifindex - other.ifindex
+
+ def _new_instance(self, obj):
+ if not obj:
+ raise ValueError()
+
+ return Link(obj)
+
+ #####################################################################
+ # ifindex
+ @netlink.nlattr('link.ifindex', type=int, immutable=True, fmt=util.num)
+ @property
+ def ifindex(self):
+ """interface index"""
+ return capi.rtnl_link_get_ifindex(self._rtnl_link)
+
+ @ifindex.setter
+ def ifindex(self, value):
+ capi.rtnl_link_set_ifindex(self._rtnl_link, int(value))
+
+ # ifindex is immutable but we assume that if _orig does not
+ # have an ifindex specified, it was meant to be given here
+ if capi.rtnl_link_get_ifindex(self._orig) == 0:
+ capi.rtnl_link_set_ifindex(self._orig, int(value))
+
+ #####################################################################
+ # name
+ @netlink.nlattr('link.name', type=str, fmt=util.bold)
+ @property
+ def name(self):
+ """Name of link"""
+ return capi.rtnl_link_get_name(self._rtnl_link)
+
+ @name.setter
+ def name(self, value):
+ capi.rtnl_link_set_name(self._rtnl_link, value)
+
+ # name is the secondary identifier, if _orig does not have
+ # the name specified yet, assume it was meant to be specified
+ # here. ifindex will always take priority, therefore if ifindex
+ # is specified as well, this will be ignored automatically.
+ if capi.rtnl_link_get_name(self._orig) is None:
+ capi.rtnl_link_set_name(self._orig, value)
+
+ #####################################################################
+ # flags
+ @netlink.nlattr('link.flags', type=str, fmt=util.string)
+ @property
+ def flags(self):
+ """Flags
+ Setting this property will *Not* reset flags to value you supply in
+ Examples:
+ link.flags = '+xxx' # add xxx flag
+ link.flags = 'xxx' # exactly the same
+ link.flags = '-xxx' # remove xxx flag
+ link.flags = [ '+xxx', '-yyy' ] # list operation
+ """
+ flags = capi.rtnl_link_get_flags(self._rtnl_link)
+ return capi.rtnl_link_flags2str(flags, 256)[0].split(',')
+
+ def _set_flag(self, flag):
+ if flag.startswith('-'):
+ i = capi.rtnl_link_str2flags(flag[1:])
+ capi.rtnl_link_unset_flags(self._rtnl_link, i)
+ elif flag.startswith('+'):
+ i = capi.rtnl_link_str2flags(flag[1:])
+ capi.rtnl_link_set_flags(self._rtnl_link, i)
+ else:
+ i = capi.rtnl_link_str2flags(flag)
+ capi.rtnl_link_set_flags(self._rtnl_link, i)
+
+ @flags.setter
+ def flags(self, value):
+ if not (type(value) is str):
+ for flag in value:
+ self._set_flag(flag)
+ else:
+ self._set_flag(value)
+
+ #####################################################################
+ # mtu
+ @netlink.nlattr('link.mtu', type=int, fmt=util.num)
+ @property
+ def mtu(self):
+ """Maximum Transmission Unit"""
+ return capi.rtnl_link_get_mtu(self._rtnl_link)
+
+ @mtu.setter
+ def mtu(self, value):
+ capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
+
+ #####################################################################
+ # family
+ @netlink.nlattr('link.family', type=int, immutable=True, fmt=util.num)
+ @property
+ def family(self):
+ """Address family"""
+ return capi.rtnl_link_get_family(self._rtnl_link)
+
+ @family.setter
+ def family(self, value):
+ capi.rtnl_link_set_family(self._rtnl_link, value)
+
+ #####################################################################
+ # address
+ @netlink.nlattr('link.address', type=str, fmt=util.addr)
+ @property
+ def address(self):
+ """Hardware address (MAC address)"""
+ a = capi.rtnl_link_get_addr(self._rtnl_link)
+ return netlink.AbstractAddress(a)
+
+ @address.setter
+ def address(self, value):
+ capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
+
+ #####################################################################
+ # broadcast
+ @netlink.nlattr('link.broadcast', type=str, fmt=util.addr)
+ @property
+ def broadcast(self):
+ """Hardware broadcast address"""
+ a = capi.rtnl_link_get_broadcast(self._rtnl_link)
+ return netlink.AbstractAddress(a)
+
+ @broadcast.setter
+ def broadcast(self, value):
+ capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
+
+ #####################################################################
+ # qdisc
+ @netlink.nlattr('link.qdisc', type=str, immutable=True, fmt=util.string)
+ @property
+ def qdisc(self):
+ """Name of qdisc (cannot be changed)"""
+ return capi.rtnl_link_get_qdisc(self._rtnl_link)
+
+ @qdisc.setter
+ def qdisc(self, value):
+ capi.rtnl_link_set_qdisc(self._rtnl_link, value)
+
+ #####################################################################
+ # txqlen
+ @netlink.nlattr('link.txqlen', type=int, fmt=util.num)
+ @property
+ def txqlen(self):
+ """Length of transmit queue"""
+ return capi.rtnl_link_get_txqlen(self._rtnl_link)
+
+ @txqlen.setter
+ def txqlen(self, value):
+ capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
+
+ #####################################################################
+ # weight
+ @netlink.nlattr('link.weight', type=str, fmt=util.string)
+ @property
+ def weight(self):
+ """Weight"""
+ v = capi.rtnl_link_get_weight(self._rtnl_link)
+ if v == 4294967295:
+ return 'max'
+ else:
+ return str(v)
+
+ @weight.setter
+ def weight(self, value):
+ if value == 'max':
+ v = 4294967295
+ else:
+ v = int(value)
+ capi.rtnl_link_set_weight(self._rtnl_link, v)
+
+ #####################################################################
+ # arptype
+ @netlink.nlattr('link.arptype', type=str, immutable=True, fmt=util.string)
+ @property
+ def arptype(self):
+ """Type of link (cannot be changed)"""
+ type = capi.rtnl_link_get_arptype(self._rtnl_link)
+ return core_capi.nl_llproto2str(type, 64)[0]
+
+ @arptype.setter
+ def arptype(self, value):
+ i = core_capi.nl_str2llproto(value)
+ capi.rtnl_link_set_arptype(self._rtnl_link, i)
+
+ #####################################################################
+ # operstate
+ @netlink.nlattr('link.operstate', type=str, immutable=True,
+ fmt=util.string, title='state')
+ @property
+ def operstate(self):
+ """Operational status"""
+ operstate = capi.rtnl_link_get_operstate(self._rtnl_link)
+ return capi.rtnl_link_operstate2str(operstate, 32)[0]
+
+ @operstate.setter
+ def operstate(self, value):
+ i = capi.rtnl_link_str2operstate(flag)
+ capi.rtnl_link_set_operstate(self._rtnl_link, i)
+
+ #####################################################################
+ # mode
+ @netlink.nlattr('link.mode', type=str, immutable=True, fmt=util.string)
+ @property
+ def mode(self):
+ """Link mode"""
+ mode = capi.rtnl_link_get_linkmode(self._rtnl_link)
+ return capi.rtnl_link_mode2str(mode, 32)[0]
+
+ @mode.setter
+ def mode(self, value):
+ i = capi.rtnl_link_str2mode(flag)
+ capi.rtnl_link_set_linkmode(self._rtnl_link, i)
+
+ #####################################################################
+ # alias
+ @netlink.nlattr('link.alias', type=str, fmt=util.string)
+ @property
+ def alias(self):
+ """Interface alias (SNMP)"""
+ return capi.rtnl_link_get_ifalias(self._rtnl_link)
+
+ @alias.setter
+ def alias(self, value):
+ capi.rtnl_link_set_ifalias(self._rtnl_link, value)
+
+ #####################################################################
+ # type
+ @netlink.nlattr('link.type', type=str, fmt=util.string)
+ @property
+ def type(self):
+ """Link type"""
+ return capi.rtnl_link_get_type(self._rtnl_link)
+
+ @type.setter
+ def type(self, value):
+ if capi.rtnl_link_set_type(self._rtnl_link, value) < 0:
+ raise NameError("unknown info type")
+
+ self._module_lookup('netlink.route.links.' + value)
+
+ #####################################################################
+ # get_stat()
+ def get_stat(self, stat):
+ """Retrieve statistical information"""
+ if type(stat) is str:
+ stat = capi.rtnl_link_str2stat(stat)
+ if stat < 0:
+ raise NameError("unknown name of statistic")
+
+ return capi.rtnl_link_get_stat(self._rtnl_link, stat)
+
+ #####################################################################
+ # add()
+ def add(self, socket=None, flags=None):
+ if not socket:
+ socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
-def get(name, socket=None):
- """Lookup Link object directly from kernel"""
- if not name:
- raise ValueError()
+ if not flags:
+ flags = netlink.NLM_F_CREATE
+ ret = capi.rtnl_link_add(socket._sock, self._rtnl_link, flags)
+ if ret < 0:
+ raise netlink.KernelError(ret)
+
+ #####################################################################
+ # change()
+ def change(self, socket=None, flags=0):
+ """Commit changes made to the link object"""
+ if not socket:
+ socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+
+ if not self._orig:
+ raise NetlinkError("Original link not available")
+ ret = capi.rtnl_link_change(socket._sock, self._orig, self._rtnl_link, flags)
+ if ret < 0:
+ raise netlink.KernelError(ret)
+
+ #####################################################################
+ # delete()
+ def delete(self, socket=None):
+ """Attempt to delete this link in the kernel"""
if not socket:
- socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+ socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
+
+ ret = capi.rtnl_link_delete(socket._sock, self._rtnl_link)
+ if ret < 0:
+ raise netlink.KernelError(ret)
+
+ ###################################################################
+ # private properties
+ #
+ # Used for formatting output. USE AT OWN RISK
+ @property
+ def _state(self):
+ if 'up' in self.flags:
+ buf = util.good('up')
+ if 'lowerup' not in self.flags:
+ buf += ' ' + util.bad('no-carrier')
+ else:
+ buf = util.bad('down')
+ return buf
+
+ @property
+ def _brief(self):
+ return self._module_brief() + self._foreach_af('brief')
+
+ @property
+ def _flags(self):
+ ignore = ['up', 'running', 'lowerup']
+ return ','.join([flag for flag in self.flags if flag not in ignore])
+
+ def _foreach_af(self, name, args=None):
+ buf = ''
+ for af in self.af:
+ try:
+ func = getattr(self.af[af], name)
+ s = str(func(args))
+ if len(s) > 0:
+ buf += ' ' + s
+ except AttributeError:
+ pass
+ return buf
+
+ ###################################################################
+ #
+ # format(details=False, stats=False)
+ #
+ def format(self, details=False, stats=False, indent=''):
+ """Return link as formatted text"""
+ fmt = util.MyFormatter(self, indent)
+
+ buf = fmt.format('{a|ifindex} {a|name} {a|arptype} {a|address} '\
+ '{a|_state} <{a|_flags}> {a|_brief}')
+
+ if details:
+ buf += fmt.nl('\t{t|mtu} {t|txqlen} {t|weight} '\
+ '{t|qdisc} {t|operstate}')
+ buf += fmt.nl('\t{t|broadcast} {t|alias}')
+
+ buf += self._foreach_af('details', fmt)
+
+ if stats:
+ l = [['Packets', RX_PACKETS, TX_PACKETS],
+ ['Bytes', RX_BYTES, TX_BYTES],
+ ['Errors', RX_ERRORS, TX_ERRORS],
+ ['Dropped', RX_DROPPED, TX_DROPPED],
+ ['Compressed', RX_COMPRESSED, TX_COMPRESSED],
+ ['FIFO Errors', RX_FIFO_ERR, TX_FIFO_ERR],
+ ['Length Errors', RX_LEN_ERR, None],
+ ['Over Errors', RX_OVER_ERR, None],
+ ['CRC Errors', RX_CRC_ERR, None],
+ ['Frame Errors', RX_FRAME_ERR, None],
+ ['Missed Errors', RX_MISSED_ERR, None],
+ ['Abort Errors', None, TX_ABORT_ERR],
+ ['Carrier Errors', None, TX_CARRIER_ERR],
+ ['Heartbeat Errors', None, TX_HBEAT_ERR],
+ ['Window Errors', None, TX_WIN_ERR],
+ ['Collisions', None, COLLISIONS],
+ ['Multicast', None, MULTICAST],
+ ['', None, None],
+ ['Ipv6:', None, None],
+ ['Packets', IP6_INPKTS, IP6_OUTPKTS],
+ ['Bytes', IP6_INOCTETS, IP6_OUTOCTETS],
+ ['Discards', IP6_INDISCARDS, IP6_OUTDISCARDS],
+ ['Multicast Packets', IP6_INMCASTPKTS, IP6_OUTMCASTPKTS],
+ ['Multicast Bytes', IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS],
+ ['Broadcast Packets', IP6_INBCASTPKTS, IP6_OUTBCASTPKTS],
+ ['Broadcast Bytes', IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS],
+ ['Delivers', IP6_INDELIVERS, None],
+ ['Forwarded', None, IP6_OUTFORWDATAGRAMS],
+ ['No Routes', IP6_INNOROUTES, IP6_OUTNOROUTES],
+ ['Header Errors', IP6_INHDRERRORS, None],
+ ['Too Big Errors', IP6_INTOOBIGERRORS, None],
+ ['Address Errors', IP6_INADDRERRORS, None],
+ ['Unknown Protocol', IP6_INUNKNOWNPROTOS, None],
+ ['Truncated Packets', IP6_INTRUNCATEDPKTS, None],
+ ['Reasm Timeouts', IP6_REASMTIMEOUT, None],
+ ['Reasm Requests', IP6_REASMREQDS, None],
+ ['Reasm Failures', IP6_REASMFAILS, None],
+ ['Reasm OK', IP6_REASMOKS, None],
+ ['Frag Created', None, IP6_FRAGCREATES],
+ ['Frag Failures', None, IP6_FRAGFAILS],
+ ['Frag OK', None, IP6_FRAGOKS],
+ ['', None, None],
+ ['ICMPv6:', None, None],
+ ['Messages', ICMP6_INMSGS, ICMP6_OUTMSGS],
+ ['Errors', ICMP6_INERRORS, ICMP6_OUTERRORS]]
+
+ buf += '\n\t%s%s%s%s\n' % (33 * ' ', util.title('RX'),
+ 15 * ' ', util.title('TX'))
+
+ for row in l:
+ row[0] = util.kw(row[0])
+ row[1] = self.get_stat(row[1]) if row[1] else ''
+ row[2] = self.get_stat(row[2]) if row[2] else ''
+ buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
+
+ buf += self._foreach_af('stats')
+
+ return buf
+
+def get(name, socket=None):
+ """Lookup Link object directly from kernel"""
+ if not name:
+ raise ValueError()
+
+ if not socket:
+ socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
- link = capi.get_from_kernel(socket._sock, 0, name)
- if not link:
- return None
+ link = capi.get_from_kernel(socket._sock, 0, name)
+ if not link:
+ return None
- return Link.from_capi(link)
+ return Link.from_capi(link)
_link_cache = LinkCache()
def resolve(name):
- _link_cache.refill()
- return _link_cache[name]
+ _link_cache.refill()
+ return _link_cache[name]
from ... import core as netlink
from .. import capi as capi
class DummyLink(object):
- def __init__(self, link):
- self._rtnl_link = link
+ def __init__(self, link):
+ self._rtnl_link = link
- def brief(self):
- return 'dummy'
+ def brief(self):
+ return 'dummy'
def init(link):
- link.dummy = DummyLink(link._rtnl_link)
- return link.dummy
+ link.dummy = DummyLink(link._rtnl_link)
+ return link.dummy
DEVCONF_MAX = DEVCONF_PROXY_ARP_PVLAN
def _resolve(id):
- if type(id) is str:
- id = capi.rtnl_link_inet_str2devconf(id)[0]
- if id < 0:
- raise NameError("unknown configuration id")
- return id
+ if type(id) is str:
+ id = capi.rtnl_link_inet_str2devconf(id)[0]
+ if id < 0:
+ raise NameError("unknown configuration id")
+ return id
class InetLink(object):
- def __init__(self, link):
- self._link = link
-
- def details(self, fmt):
- buf = fmt.nl('\n\t{0}\n\t'.format(util.title('Configuration:')))
-
- for i in range(DEVCONF_FORWARDING,DEVCONF_MAX+1):
- if i & 1 and i > 1:
- buf += fmt.nl('\t')
- txt = util.kw(capi.rtnl_link_inet_devconf2str(i, 32)[0])
- buf += fmt.format('{0:28s} {1:12} ', txt,
- self.get_conf(i))
-
-
- return buf
-
- def get_conf(self, id):
- return capi.inet_get_conf(self._link._rtnl_link, _resolve(id))
-
- def set_conf(self, id, value):
- return capi.rtnl_link_inet_set_conf(self._link._rtnl_link,
- _resolve(id), int(value))
-
- @netlink.nlattr('link.inet.forwarding', type=bool, fmt=util.bool)
- @property
- def forwarding(self):
- return bool(self.get_conf(DEVCONF_FORWARDING))
-
- @forwarding.setter
- def forwarding(self, value):
- self.set_conf(DEVCONF_FORWARDING, int(value))
-
- @netlink.nlattr('link.inet.mc_forwarding', type=bool, fmt=util.bool)
- @property
- def mc_forwarding(self):
- return bool(self.get_conf(DEVCONF_MC_FORWARDING))
-
- @mc_forwarding.setter
- def mc_forwarding(self, value):
- self.set_conf(DEVCONF_MC_FORWARDING, int(value))
-
- @netlink.nlattr('link.inet.proxy_arp', type=bool, fmt=util.bool)
- @property
- def proxy_arp(self):
- return bool(self.get_conf(DEVCONF_PROXY_ARP))
-
- @proxy_arp.setter
- def proxy_arp(self, value):
- self.set_conf(DEVCONF_PROXY_ARP, int(value))
-
- @netlink.nlattr('link.inet.accept_redirects', type=bool, fmt=util.bool)
- @property
- def accept_redirects(self):
- return bool(self.get_conf(DEVCONF_ACCEPT_REDIRECTS))
-
- @accept_redirects.setter
- def accept_redirects(self, value):
- self.set_conf(DEVCONF_ACCEPT_REDIRECTS, int(value))
-
- @netlink.nlattr('link.inet.secure_redirects', type=bool, fmt=util.bool)
- @property
- def secure_redirects(self):
- return bool(self.get_conf(DEVCONF_SECURE_REDIRECTS))
-
- @secure_redirects.setter
- def secure_redirects(self, value):
- self.set_conf(DEVCONF_SECURE_REDIRECTS, int(value))
-
- @netlink.nlattr('link.inet.send_redirects', type=bool, fmt=util.bool)
- @property
- def send_redirects(self):
- return bool(self.get_conf(DEVCONF_SEND_REDIRECTS))
-
- @send_redirects.setter
- def send_redirects(self, value):
- self.set_conf(DEVCONF_SEND_REDIRECTS, int(value))
-
- @netlink.nlattr('link.inet.shared_media', type=bool, fmt=util.bool)
- @property
- def shared_media(self):
- return bool(self.get_conf(DEVCONF_SHARED_MEDIA))
-
- @shared_media.setter
- def shared_media(self, value):
- self.set_conf(DEVCONF_SHARED_MEDIA, int(value))
+ def __init__(self, link):
+ self._link = link
+
+ def details(self, fmt):
+ buf = fmt.nl('\n\t{0}\n\t'.format(util.title('Configuration:')))
+
+ for i in range(DEVCONF_FORWARDING,DEVCONF_MAX+1):
+ if i & 1 and i > 1:
+ buf += fmt.nl('\t')
+ txt = util.kw(capi.rtnl_link_inet_devconf2str(i, 32)[0])
+ buf += fmt.format('{0:28s} {1:12} ', txt,
+ self.get_conf(i))
+
+
+ return buf
+
+ def get_conf(self, id):
+ return capi.inet_get_conf(self._link._rtnl_link, _resolve(id))
+
+ def set_conf(self, id, value):
+ return capi.rtnl_link_inet_set_conf(self._link._rtnl_link,
+ _resolve(id), int(value))
+
+ @netlink.nlattr('link.inet.forwarding', type=bool, fmt=util.bool)
+ @property
+ def forwarding(self):
+ return bool(self.get_conf(DEVCONF_FORWARDING))
+
+ @forwarding.setter
+ def forwarding(self, value):
+ self.set_conf(DEVCONF_FORWARDING, int(value))
+
+ @netlink.nlattr('link.inet.mc_forwarding', type=bool, fmt=util.bool)
+ @property
+ def mc_forwarding(self):
+ return bool(self.get_conf(DEVCONF_MC_FORWARDING))
+
+ @mc_forwarding.setter
+ def mc_forwarding(self, value):
+ self.set_conf(DEVCONF_MC_FORWARDING, int(value))
+
+ @netlink.nlattr('link.inet.proxy_arp', type=bool, fmt=util.bool)
+ @property
+ def proxy_arp(self):
+ return bool(self.get_conf(DEVCONF_PROXY_ARP))
+
+ @proxy_arp.setter
+ def proxy_arp(self, value):
+ self.set_conf(DEVCONF_PROXY_ARP, int(value))
+
+ @netlink.nlattr('link.inet.accept_redirects', type=bool, fmt=util.bool)
+ @property
+ def accept_redirects(self):
+ return bool(self.get_conf(DEVCONF_ACCEPT_REDIRECTS))
+
+ @accept_redirects.setter
+ def accept_redirects(self, value):
+ self.set_conf(DEVCONF_ACCEPT_REDIRECTS, int(value))
+
+ @netlink.nlattr('link.inet.secure_redirects', type=bool, fmt=util.bool)
+ @property
+ def secure_redirects(self):
+ return bool(self.get_conf(DEVCONF_SECURE_REDIRECTS))
+
+ @secure_redirects.setter
+ def secure_redirects(self, value):
+ self.set_conf(DEVCONF_SECURE_REDIRECTS, int(value))
+
+ @netlink.nlattr('link.inet.send_redirects', type=bool, fmt=util.bool)
+ @property
+ def send_redirects(self):
+ return bool(self.get_conf(DEVCONF_SEND_REDIRECTS))
+
+ @send_redirects.setter
+ def send_redirects(self, value):
+ self.set_conf(DEVCONF_SEND_REDIRECTS, int(value))
+
+ @netlink.nlattr('link.inet.shared_media', type=bool, fmt=util.bool)
+ @property
+ def shared_media(self):
+ return bool(self.get_conf(DEVCONF_SHARED_MEDIA))
+
+ @shared_media.setter
+ def shared_media(self, value):
+ self.set_conf(DEVCONF_SHARED_MEDIA, int(value))
# IPV4_DEVCONF_RP_FILTER,
# IPV4_DEVCONF_ACCEPT_SOURCE_ROUTE,
from ... import core as netlink
from .. import capi as capi
class VLANLink(object):
- def __init__(self, link):
- self._link = link
+ def __init__(self, link):
+ self._link = link
- ###################################################################
- # id
- @netlink.nlattr('link.vlan.id', type=int)
- @property
- def id(self):
- """vlan identifier"""
- return capi.rtnl_link_vlan_get_id(self._link)
+ ###################################################################
+ # id
+ @netlink.nlattr('link.vlan.id', type=int)
+ @property
+ def id(self):
+ """vlan identifier"""
+ return capi.rtnl_link_vlan_get_id(self._link)
- @id.setter
- def id(self, value):
- capi.rtnl_link_vlan_set_id(self._link, int(value))
+ @id.setter
+ def id(self, value):
+ capi.rtnl_link_vlan_set_id(self._link, int(value))
- ###################################################################
- # flags
- @netlink.nlattr('link.vlan.flags', type=str)
- @property
- def flags(self):
- """ VLAN flags
- Setting this property will *Not* reset flags to value you supply in
- Examples:
- link.flags = '+xxx' # add xxx flag
- link.flags = 'xxx' # exactly the same
- link.flags = '-xxx' # remove xxx flag
- link.flags = [ '+xxx', '-yyy' ] # list operation
- """
- flags = capi.rtnl_link_vlan_get_flags(self._link)
- return capi.rtnl_link_vlan_flags2str(flags, 256)[0].split(',')
+ ###################################################################
+ # flags
+ @netlink.nlattr('link.vlan.flags', type=str)
+ @property
+ def flags(self):
+ """ VLAN flags
+ Setting this property will *Not* reset flags to value you supply in
+ Examples:
+ link.flags = '+xxx' # add xxx flag
+ link.flags = 'xxx' # exactly the same
+ link.flags = '-xxx' # remove xxx flag
+ link.flags = [ '+xxx', '-yyy' ] # list operation
+ """
+ flags = capi.rtnl_link_vlan_get_flags(self._link)
+ return capi.rtnl_link_vlan_flags2str(flags, 256)[0].split(',')
- def _set_flag(self, flag):
- if flag.startswith('-'):
- i = capi.rtnl_link_vlan_str2flags(flag[1:])
- capi.rtnl_link_vlan_unset_flags(self._link, i)
- elif flag.startswith('+'):
- i = capi.rtnl_link_vlan_str2flags(flag[1:])
- capi.rtnl_link_vlan_set_flags(self._link, i)
- else:
- i = capi.rtnl_link_vlan_str2flags(flag)
- capi.rtnl_link_vlan_set_flags(self._link, i)
+ def _set_flag(self, flag):
+ if flag.startswith('-'):
+ i = capi.rtnl_link_vlan_str2flags(flag[1:])
+ capi.rtnl_link_vlan_unset_flags(self._link, i)
+ elif flag.startswith('+'):
+ i = capi.rtnl_link_vlan_str2flags(flag[1:])
+ capi.rtnl_link_vlan_set_flags(self._link, i)
+ else:
+ i = capi.rtnl_link_vlan_str2flags(flag)
+ capi.rtnl_link_vlan_set_flags(self._link, i)
- @flags.setter
- def flags(self, value):
- if type(value) is list:
- for flag in value:
- self._set_flag(flag)
- else:
- self._set_flag(value)
+ @flags.setter
+ def flags(self, value):
+ if type(value) is list:
+ for flag in value:
+ self._set_flag(flag)
+ else:
+ self._set_flag(value)
- ###################################################################
- # TODO:
- # - ingress map
- # - egress map
+ ###################################################################
+ # TODO:
+ # - ingress map
+ # - egress map
- def brief(self):
- return 'vlan-id {0}'.format(self.id)
+ def brief(self):
+ return 'vlan-id {0}'.format(self.id)
def init(link):
- link.vlan = VLANLink(link._link)
- return link.vlan
+ link.vlan = VLANLink(link._link)
+ return link.vlan
from .. import tc as tc
class HTBQdisc(object):
- def __init__(self, qdisc):
- self._qdisc = qdisc
+ def __init__(self, qdisc):
+ self._qdisc = qdisc
- ###################################################################
- # default class
- @netlink.nlattr('qdisc.htb.default_class', type=int)
- @property
- def default_class(self):
- return tc.Handle(capi.rtnl_htb_get_defcls(self._qdisc._rtnl_qdisc))
+ ###################################################################
+ # default class
+ @netlink.nlattr('qdisc.htb.default_class', type=int)
+ @property
+ def default_class(self):
+ return tc.Handle(capi.rtnl_htb_get_defcls(self._qdisc._rtnl_qdisc))
- @default_class.setter
- def default_class(self, value):
- capi.rtnl_htb_set_defcls(self._qdisc._rtnl_qdisc, int(value))
+ @default_class.setter
+ def default_class(self, value):
+ capi.rtnl_htb_set_defcls(self._qdisc._rtnl_qdisc, int(value))
- #####################################################################
- # r2q
- @netlink.nlattr('qdisc.htb.r2q', type=int)
- @property
- def r2q(self):
- return capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc)
+ #####################################################################
+ # r2q
+ @netlink.nlattr('qdisc.htb.r2q', type=int)
+ @property
+ def r2q(self):
+ return capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc)
- @r2q.setter
- def r2q(self, value):
- capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc,
- int(value))
+ @r2q.setter
+ def r2q(self, value):
+ capi.rtnl_htb_get_rate2quantum(self._qdisc._rtnl_qdisc,
+ int(value))
- def brief(self):
- fmt = util.MyFormatter(self)
+ def brief(self):
+ fmt = util.MyFormatter(self)
- ret = ' {s|default-class!k} {a|default_class}'
+ ret = ' {s|default-class!k} {a|default_class}'
- if self.r2q:
- ret += ' {s|r2q!k} {a|r2q}'
+ if self.r2q:
+ ret += ' {s|r2q!k} {a|r2q}'
- return fmt.format(ret)
+ return fmt.format(ret)
class HTBClass(object):
- def __init__(self, cl):
- self._class = cl
-
- #####################################################################
- # rate
- @netlink.nlattr('class.htb.rate', type=str)
- @property
- def rate(self):
- rate = capi.rtnl_htb_get_rate(self._class._rtnl_class)
- return util.Rate(rate)
-
- @rate.setter
- def rate(self, value):
- capi.rtnl_htb_set_rate(self._class._rtnl_class, int(value))
-
- #####################################################################
- # ceil
- @netlink.nlattr('class.htb.ceil', type=str)
- @property
- def ceil(self):
- ceil = capi.rtnl_htb_get_ceil(self._class._rtnl_class)
- return util.Rate(ceil)
-
- @ceil.setter
- def ceil(self, value):
- capi.rtnl_htb_set_ceil(self._class._rtnl_class, int(value))
-
- #####################################################################
- # burst
- @netlink.nlattr('class.htb.burst', type=str)
- @property
- def burst(self):
- burst = capi.rtnl_htb_get_rbuffer(self._class._rtnl_class)
- return util.Size(burst)
-
- @burst.setter
- def burst(self, value):
- capi.rtnl_htb_set_rbuffer(self._class._rtnl_class, int(value))
-
- #####################################################################
- # ceil burst
- @netlink.nlattr('class.htb.ceil_burst', type=str)
- @property
- def ceil_burst(self):
- burst = capi.rtnl_htb_get_cbuffer(self._class._rtnl_class)
- return util.Size(burst)
-
- @ceil_burst.setter
- def ceil_burst(self, value):
- capi.rtnl_htb_set_cbuffer(self._class._rtnl_class, int(value))
-
- #####################################################################
- # priority
- @netlink.nlattr('class.htb.prio', type=int)
- @property
- def prio(self):
- return capi.rtnl_htb_get_prio(self._class._rtnl_class)
-
- @prio.setter
- def prio(self, value):
- capi.rtnl_htb_set_prio(self._class._rtnl_class, int(value))
-
- #####################################################################
- # quantum
- @netlink.nlattr('class.htb.quantum', type=int)
- @property
- def quantum(self):
- return capi.rtnl_htb_get_quantum(self._class._rtnl_class)
-
- @quantum.setter
- def quantum(self, value):
- capi.rtnl_htb_set_quantum(self._class._rtnl_class, int(value))
-
- #####################################################################
- # level
- @netlink.nlattr('class.htb.level', type=int)
- @property
- def level(self):
- level = capi.rtnl_htb_get_level(self._class._rtnl_class)
-
- @level.setter
- def level(self, value):
- capi.rtnl_htb_set_level(self._class._rtnl_class, int(value))
-
- def brief(self):
- fmt = util.MyFormatter(self)
-
- ret = ' {t|prio} {t|rate}'
-
- if self.rate != self.ceil:
- ret += ' {s|borrow-up-to!k} {a|ceil}'
-
- ret += ' {t|burst}'
-
- return fmt.format(ret)
-
- def details(self):
- fmt = util.MyFormatter(self)
-
- return fmt.nl('\t{t|level} {t|quantum}')
+ def __init__(self, cl):
+ self._class = cl
+
+ #####################################################################
+ # rate
+ @netlink.nlattr('class.htb.rate', type=str)
+ @property
+ def rate(self):
+ rate = capi.rtnl_htb_get_rate(self._class._rtnl_class)
+ return util.Rate(rate)
+
+ @rate.setter
+ def rate(self, value):
+ capi.rtnl_htb_set_rate(self._class._rtnl_class, int(value))
+
+ #####################################################################
+ # ceil
+ @netlink.nlattr('class.htb.ceil', type=str)
+ @property
+ def ceil(self):
+ ceil = capi.rtnl_htb_get_ceil(self._class._rtnl_class)
+ return util.Rate(ceil)
+
+ @ceil.setter
+ def ceil(self, value):
+ capi.rtnl_htb_set_ceil(self._class._rtnl_class, int(value))
+
+ #####################################################################
+ # burst
+ @netlink.nlattr('class.htb.burst', type=str)
+ @property
+ def burst(self):
+ burst = capi.rtnl_htb_get_rbuffer(self._class._rtnl_class)
+ return util.Size(burst)
+
+ @burst.setter
+ def burst(self, value):
+ capi.rtnl_htb_set_rbuffer(self._class._rtnl_class, int(value))
+
+ #####################################################################
+ # ceil burst
+ @netlink.nlattr('class.htb.ceil_burst', type=str)
+ @property
+ def ceil_burst(self):
+ burst = capi.rtnl_htb_get_cbuffer(self._class._rtnl_class)
+ return util.Size(burst)
+
+ @ceil_burst.setter
+ def ceil_burst(self, value):
+ capi.rtnl_htb_set_cbuffer(self._class._rtnl_class, int(value))
+
+ #####################################################################
+ # priority
+ @netlink.nlattr('class.htb.prio', type=int)
+ @property
+ def prio(self):
+ return capi.rtnl_htb_get_prio(self._class._rtnl_class)
+
+ @prio.setter
+ def prio(self, value):
+ capi.rtnl_htb_set_prio(self._class._rtnl_class, int(value))
+
+ #####################################################################
+ # quantum
+ @netlink.nlattr('class.htb.quantum', type=int)
+ @property
+ def quantum(self):
+ return capi.rtnl_htb_get_quantum(self._class._rtnl_class)
+
+ @quantum.setter
+ def quantum(self, value):
+ capi.rtnl_htb_set_quantum(self._class._rtnl_class, int(value))
+
+ #####################################################################
+ # level
+ @netlink.nlattr('class.htb.level', type=int)
+ @property
+ def level(self):
+ level = capi.rtnl_htb_get_level(self._class._rtnl_class)
+
+ @level.setter
+ def level(self, value):
+ capi.rtnl_htb_set_level(self._class._rtnl_class, int(value))
+
+ def brief(self):
+ fmt = util.MyFormatter(self)
+
+ ret = ' {t|prio} {t|rate}'
+
+ if self.rate != self.ceil:
+ ret += ' {s|borrow-up-to!k} {a|ceil}'
+
+ ret += ' {t|burst}'
+
+ return fmt.format(ret)
+
+ def details(self):
+ fmt = util.MyFormatter(self)
+
+ return fmt.nl('\t{t|level} {t|quantum}')
def init_qdisc(qdisc):
- qdisc.htb = HTBQdisc(qdisc)
- return qdisc.htb
+ qdisc.htb = HTBQdisc(qdisc)
+ return qdisc.htb
def init_class(cl):
- cl.htb = HTBClass(cl)
- return cl.htb
+ cl.htb = HTBClass(cl)
+ return cl.htb
#extern void rtnl_htb_set_quantum(struct rtnl_class *, uint32_t quantum);
from __future__ import absolute_import
__all__ = [
- 'TcCache',
- 'Tc',
- 'QdiscCache',
- 'Qdisc',
- 'TcClassCache',
- 'TcClass']
+ 'TcCache',
+ 'Tc',
+ 'QdiscCache',
+ 'Qdisc',
+ 'TcClassCache',
+ 'TcClass']
import socket
import sys
###########################################################################
# Handle
class Handle(object):
- """ Traffic control handle
+ """ Traffic control handle
- Representation of a traffic control handle which uniquely identifies
- each traffic control object in its link namespace.
+ Representation of a traffic control handle which uniquely identifies
+ each traffic control object in its link namespace.
- handle = tc.Handle('10:20')
- handle = tc.handle('root')
- print int(handle)
- print str(handle)
- """
- def __init__(self, val=None):
- if type(val) is str:
- val = capi.tc_str2handle(val)
- elif not val:
- val = 0
+ handle = tc.Handle('10:20')
+ handle = tc.handle('root')
+ print int(handle)
+ print str(handle)
+ """
+ def __init__(self, val=None):
+ if type(val) is str:
+ val = capi.tc_str2handle(val)
+ elif not val:
+ val = 0
- self._val = int(val)
+ self._val = int(val)
- def __cmp__(self, other):
- if other is None:
- other = 0
+ def __cmp__(self, other):
+ if other is None:
+ other = 0
- if isinstance(other, Handle):
- return int(self) - int(other)
- elif isinstance(other, int):
- return int(self) - other
- else:
- raise TypeError()
+ if isinstance(other, Handle):
+ return int(self) - int(other)
+ elif isinstance(other, int):
+ return int(self) - other
+ else:
+ raise TypeError()
- def __int__(self):
- return self._val
+ def __int__(self):
+ return self._val
- def __str__(self):
- return capi.rtnl_tc_handle2str(self._val, 64)[0]
+ def __str__(self):
+ return capi.rtnl_tc_handle2str(self._val, 64)[0]
- def isroot(self):
- return self._val == TC_H_ROOT or self._val == TC_H_INGRESS
+ def isroot(self):
+ return self._val == TC_H_ROOT or self._val == TC_H_INGRESS
###########################################################################
# TC Cache
class TcCache(netlink.Cache):
- """Cache of traffic control object"""
+ """Cache of traffic control object"""
- def __getitem__(self, key):
- raise NotImplementedError()
+ def __getitem__(self, key):
+ raise NotImplementedError()
###########################################################################
# Tc Object
class Tc(netlink.Object):
- def __cmp__(self, other):
- diff = self.ifindex - other.ifindex
- if diff == 0:
- diff = int(self.handle) - int(other.handle)
- return diff
-
- def _tc_module_lookup(self):
- self._module_lookup(self._module_path + self.kind,
- 'init_' + self._name)
-
- @property
- def root(self):
- """True if tc object is a root object"""
- return self.parent.isroot()
-
- #####################################################################
- # ifindex
- @property
- def ifindex(self):
- """interface index"""
- return capi.rtnl_tc_get_ifindex(self._rtnl_tc)
-
- @ifindex.setter
- def ifindex(self, value):
- capi.rtnl_tc_set_ifindex(self._rtnl_tc, int(value))
-
- #####################################################################
- # link
- @property
- def link(self):
- link = capi.rtnl_tc_get_link(self._rtnl_tc)
- if not link:
- return None
-
- return Link.Link.from_capi(link)
-
- @link.setter
- def link(self, value):
- capi.rtnl_tc_set_link(self._rtnl_tc, value._link)
-
- #####################################################################
- # mtu
- @property
- def mtu(self):
- return capi.rtnl_tc_get_mtu(self._rtnl_tc)
-
- @mtu.setter
- def mtu(self, value):
- capi.rtnl_tc_set_mtu(self._rtnl_tc, int(value))
-
- #####################################################################
- # mpu
- @property
- def mpu(self):
- return capi.rtnl_tc_get_mpu(self._rtnl_tc)
-
- @mpu.setter
- def mpu(self, value):
- capi.rtnl_tc_set_mpu(self._rtnl_tc, int(value))
-
- #####################################################################
- # overhead
- @property
- def overhead(self):
- return capi.rtnl_tc_get_overhead(self._rtnl_tc)
-
- @overhead.setter
- def overhead(self, value):
- capi.rtnl_tc_set_overhead(self._rtnl_tc, int(value))
-
- #####################################################################
- # linktype
- @property
- def linktype(self):
- return capi.rtnl_tc_get_linktype(self._rtnl_tc)
-
- @linktype.setter
- def linktype(self, value):
- capi.rtnl_tc_set_linktype(self._rtnl_tc, int(value))
-
- #####################################################################
- # handle
- @property
- def handle(self):
- return Handle(capi.rtnl_tc_get_handle(self._rtnl_tc))
-
- @handle.setter
- def handle(self, value):
- capi.rtnl_tc_set_handle(self._rtnl_tc, int(value))
-
- #####################################################################
- # parent
- @property
- def parent(self):
- return Handle(capi.rtnl_tc_get_parent(self._rtnl_tc))
-
- @parent.setter
- def parent(self, value):
- capi.rtnl_tc_set_parent(self._rtnl_tc, int(value))
-
- #####################################################################
- # kind
- @property
- def kind(self):
- return capi.rtnl_tc_get_kind(self._rtnl_tc)
-
- @kind.setter
- def kind(self, value):
- capi.rtnl_tc_set_kind(self._rtnl_tc, value)
- self._tc_module_lookup()
-
- def get_stat(self, id):
- return capi.rtnl_tc_get_stat(self._rtnl_tc, id)
-
- @property
- def _dev(self):
- buf = util.kw('dev') + ' '
-
- if self.link:
- return buf + util.string(self.link.name)
- else:
- return buf + util.num(self.ifindex)
-
- def brief(self, title, nodev=False, noparent=False):
- ret = title + ' {a|kind} {a|handle}'
-
- if not nodev:
- ret += ' {a|_dev}'
-
- if not noparent:
- ret += ' {t|parent}'
-
- return ret + self._module_brief()
+ def __cmp__(self, other):
+ diff = self.ifindex - other.ifindex
+ if diff == 0:
+ diff = int(self.handle) - int(other.handle)
+ return diff
+
+ def _tc_module_lookup(self):
+ self._module_lookup(self._module_path + self.kind,
+ 'init_' + self._name)
+
+ @property
+ def root(self):
+ """True if tc object is a root object"""
+ return self.parent.isroot()
+
+ #####################################################################
+ # ifindex
+ @property
+ def ifindex(self):
+ """interface index"""
+ return capi.rtnl_tc_get_ifindex(self._rtnl_tc)
+
+ @ifindex.setter
+ def ifindex(self, value):
+ capi.rtnl_tc_set_ifindex(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # link
+ @property
+ def link(self):
+ link = capi.rtnl_tc_get_link(self._rtnl_tc)
+ if not link:
+ return None
+
+ return Link.Link.from_capi(link)
+
+ @link.setter
+ def link(self, value):
+ capi.rtnl_tc_set_link(self._rtnl_tc, value._link)
+
+ #####################################################################
+ # mtu
+ @property
+ def mtu(self):
+ return capi.rtnl_tc_get_mtu(self._rtnl_tc)
+
+ @mtu.setter
+ def mtu(self, value):
+ capi.rtnl_tc_set_mtu(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # mpu
+ @property
+ def mpu(self):
+ return capi.rtnl_tc_get_mpu(self._rtnl_tc)
+
+ @mpu.setter
+ def mpu(self, value):
+ capi.rtnl_tc_set_mpu(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # overhead
+ @property
+ def overhead(self):
+ return capi.rtnl_tc_get_overhead(self._rtnl_tc)
+
+ @overhead.setter
+ def overhead(self, value):
+ capi.rtnl_tc_set_overhead(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # linktype
+ @property
+ def linktype(self):
+ return capi.rtnl_tc_get_linktype(self._rtnl_tc)
+
+ @linktype.setter
+ def linktype(self, value):
+ capi.rtnl_tc_set_linktype(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # handle
+ @property
+ def handle(self):
+ return Handle(capi.rtnl_tc_get_handle(self._rtnl_tc))
+
+ @handle.setter
+ def handle(self, value):
+ capi.rtnl_tc_set_handle(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # parent
+ @property
+ def parent(self):
+ return Handle(capi.rtnl_tc_get_parent(self._rtnl_tc))
+
+ @parent.setter
+ def parent(self, value):
+ capi.rtnl_tc_set_parent(self._rtnl_tc, int(value))
+
+ #####################################################################
+ # kind
+ @property
+ def kind(self):
+ return capi.rtnl_tc_get_kind(self._rtnl_tc)
+
+ @kind.setter
+ def kind(self, value):
+ capi.rtnl_tc_set_kind(self._rtnl_tc, value)
+ self._tc_module_lookup()
+
+ def get_stat(self, id):
+ return capi.rtnl_tc_get_stat(self._rtnl_tc, id)
+
+ @property
+ def _dev(self):
+ buf = util.kw('dev') + ' '
+
+ if self.link:
+ return buf + util.string(self.link.name)
+ else:
+ return buf + util.num(self.ifindex)
+
+ def brief(self, title, nodev=False, noparent=False):
+ ret = title + ' {a|kind} {a|handle}'
+
+ if not nodev:
+ ret += ' {a|_dev}'
+
+ if not noparent:
+ ret += ' {t|parent}'
+
+ return ret + self._module_brief()
- def details(self):
- return '{t|mtu} {t|mpu} {t|overhead} {t|linktype}'
-
- @property
- def packets(self):
- return self.get_stat(STAT_PACKETS)
-
- @property
- def bytes(self):
- return self.get_stat(STAT_BYTES)
-
- @property
- def qlen(self):
- return self.get_stat(STAT_QLEN)
-
- def stats(self, fmt):
- return fmt.nl('{t|packets} {t|bytes} {t|qlen}')
+ def details(self):
+ return '{t|mtu} {t|mpu} {t|overhead} {t|linktype}'
+
+ @property
+ def packets(self):
+ return self.get_stat(STAT_PACKETS)
+
+ @property
+ def bytes(self):
+ return self.get_stat(STAT_BYTES)
+
+ @property
+ def qlen(self):
+ return self.get_stat(STAT_QLEN)
+
+ def stats(self, fmt):
+ return fmt.nl('{t|packets} {t|bytes} {t|qlen}')
###########################################################################
# Queueing discipline cache
class QdiscCache(netlink.Cache):
- """Cache of qdiscs"""
+ """Cache of qdiscs"""
- def __init__(self, cache=None):
- if not cache:
- cache = self._alloc_cache_name("route/qdisc")
+ def __init__(self, cache=None):
+ if not cache:
+ cache = self._alloc_cache_name("route/qdisc")
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
+ self._protocol = netlink.NETLINK_ROUTE
+ self._nl_cache = cache
# def __getitem__(self, key):
# if type(key) is int:
# else:
# return Qdisc._from_capi(capi.qdisc2obj(qdisc))
- def _new_object(self, obj):
- return Qdisc(obj)
+ def _new_object(self, obj):
+ return Qdisc(obj)
- def _new_cache(self, cache):
- return QdiscCache(cache=cache)
+ def _new_cache(self, cache):
+ return QdiscCache(cache=cache)
###########################################################################
# Qdisc Object
class Qdisc(Tc):
- """Queueing discipline"""
+ """Queueing discipline"""
- def __init__(self, obj=None):
- netlink.Object.__init__(self, "route/qdisc", "qdisc", obj)
- self._module_path = 'netlink.route.qdisc.'
- self._rtnl_qdisc = self._obj2type(self._nl_object)
- self._rtnl_tc = capi.obj2tc(self._nl_object)
+ def __init__(self, obj=None):
+ netlink.Object.__init__(self, "route/qdisc", "qdisc", obj)
+ self._module_path = 'netlink.route.qdisc.'
+ self._rtnl_qdisc = self._obj2type(self._nl_object)
+ self._rtnl_tc = capi.obj2tc(self._nl_object)
- netlink.add_attr('qdisc.handle', fmt=util.handle)
- netlink.add_attr('qdisc.parent', fmt=util.handle)
- netlink.add_attr('qdisc.kind', fmt=util.bold)
+ netlink.add_attr('qdisc.handle', fmt=util.handle)
+ netlink.add_attr('qdisc.parent', fmt=util.handle)
+ netlink.add_attr('qdisc.kind', fmt=util.bold)
- if self.kind:
- self._tc_module_lookup()
+ if self.kind:
+ self._tc_module_lookup()
- @classmethod
- def from_capi(cls, obj):
- return cls(capi.qdisc2obj(obj))
+ @classmethod
+ def from_capi(cls, obj):
+ return cls(capi.qdisc2obj(obj))
- def _obj2type(self, obj):
- return capi.obj2qdisc(obj)
+ def _obj2type(self, obj):
+ return capi.obj2qdisc(obj)
- def _new_instance(self, obj):
- if not obj:
- raise ValueError()
+ def _new_instance(self, obj):
+ if not obj:
+ raise ValueError()
- return Qdisc(obj)
+ return Qdisc(obj)
- @property
- def childs(self):
- ret = []
+ @property
+ def childs(self):
+ ret = []
- if int(self.handle):
- ret += get_cls(self.ifindex, parent=self.handle)
+ if int(self.handle):
+ ret += get_cls(self.ifindex, parent=self.handle)
- if self.root:
- ret += get_class(self.ifindex, parent=TC_H_ROOT)
+ if self.root:
+ ret += get_class(self.ifindex, parent=TC_H_ROOT)
- ret += get_class(self.ifindex, parent=self.handle)
+ ret += get_class(self.ifindex, parent=self.handle)
- return ret
+ return ret
# #####################################################################
# # add()
# if ret < 0:
# raise netlink.KernelError(ret)
- ###################################################################
- #
- # format(details=False, stats=False)
- #
- def format(self, details=False, stats=False, nodev=False,
- noparent=False, indent=''):
- """Return qdisc as formatted text"""
- fmt = util.MyFormatter(self, indent)
+ ###################################################################
+ #
+ # format(details=False, stats=False)
+ #
+ def format(self, details=False, stats=False, nodev=False,
+ noparent=False, indent=''):
+ """Return qdisc as formatted text"""
+ fmt = util.MyFormatter(self, indent)
- buf = fmt.format(self.brief('qdisc', nodev, noparent))
+ buf = fmt.format(self.brief('qdisc', nodev, noparent))
- if details:
- buf += fmt.nl('\t' + self.details())
+ if details:
+ buf += fmt.nl('\t' + self.details())
- if stats:
- buf += self.stats(fmt)
+ if stats:
+ buf += self.stats(fmt)
# if stats:
# l = [['Packets', RX_PACKETS, TX_PACKETS],
# row[2] = self.get_stat(row[2]) if row[2] else ''
# buf += '\t{0:27} {1:>16} {2:>16}\n'.format(*row)
- return buf
+ return buf
###########################################################################
# Traffic class cache
class TcClassCache(netlink.Cache):
- """Cache of traffic classes"""
+ """Cache of traffic classes"""
- def __init__(self, ifindex, cache=None):
- if not cache:
- cache = self._alloc_cache_name("route/class")
+ def __init__(self, ifindex, cache=None):
+ if not cache:
+ cache = self._alloc_cache_name("route/class")
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
- self._set_arg1(ifindex)
+ self._protocol = netlink.NETLINK_ROUTE
+ self._nl_cache = cache
+ self._set_arg1(ifindex)
- def _new_object(self, obj):
- return TcClass(obj)
+ def _new_object(self, obj):
+ return TcClass(obj)
- def _new_cache(self, cache):
- return TcClassCache(self.arg1, cache=cache)
+ def _new_cache(self, cache):
+ return TcClassCache(self.arg1, cache=cache)
###########################################################################
# Traffic Class Object
class TcClass(Tc):
- """Traffic Class"""
+ """Traffic Class"""
- def __init__(self, obj=None):
- netlink.Object.__init__(self, "route/class", "class", obj)
- self._module_path = 'netlink.route.qdisc.'
- self._rtnl_class = self._obj2type(self._nl_object)
- self._rtnl_tc = capi.obj2tc(self._nl_object)
+ def __init__(self, obj=None):
+ netlink.Object.__init__(self, "route/class", "class", obj)
+ self._module_path = 'netlink.route.qdisc.'
+ self._rtnl_class = self._obj2type(self._nl_object)
+ self._rtnl_tc = capi.obj2tc(self._nl_object)
- netlink.add_attr('class.handle', fmt=util.handle)
- netlink.add_attr('class.parent', fmt=util.handle)
- netlink.add_attr('class.kind', fmt=util.bold)
+ netlink.add_attr('class.handle', fmt=util.handle)
+ netlink.add_attr('class.parent', fmt=util.handle)
+ netlink.add_attr('class.kind', fmt=util.bold)
- if self.kind:
- self._tc_module_lookup()
+ if self.kind:
+ self._tc_module_lookup()
- @classmethod
- def from_capi(cls, obj):
- return cls(capi.class2obj(obj))
+ @classmethod
+ def from_capi(cls, obj):
+ return cls(capi.class2obj(obj))
- def _obj2type(self, obj):
- return capi.obj2class(obj)
+ def _obj2type(self, obj):
+ return capi.obj2class(obj)
- def _new_instance(self, obj):
- if not obj:
- raise ValueError()
+ def _new_instance(self, obj):
+ if not obj:
+ raise ValueError()
- return TcClass(obj)
+ return TcClass(obj)
- @property
- def childs(self):
- ret = []
+ @property
+ def childs(self):
+ ret = []
- # classes can have classifiers, child classes and leaf
- # qdiscs
- ret += get_cls(self.ifindex, parent=self.handle)
- ret += get_class(self.ifindex, parent=self.handle)
- ret += get_qdisc(self.ifindex, parent=self.handle)
+ # classes can have classifiers, child classes and leaf
+ # qdiscs
+ ret += get_cls(self.ifindex, parent=self.handle)
+ ret += get_class(self.ifindex, parent=self.handle)
+ ret += get_qdisc(self.ifindex, parent=self.handle)
- return ret
+ return ret
- ###################################################################
- #
- # format(details=False, stats=False)
- #
- def format(self, details=False, stats=False, nodev=False,
- noparent=False, indent=''):
- """Return class as formatted text"""
- fmt = util.MyFormatter(self, indent)
+ ###################################################################
+ #
+ # format(details=False, stats=False)
+ #
+ def format(self, details=False, stats=False, nodev=False,
+ noparent=False, indent=''):
+ """Return class as formatted text"""
+ fmt = util.MyFormatter(self, indent)
- buf = fmt.format(self.brief('class', nodev, noparent))
+ buf = fmt.format(self.brief('class', nodev, noparent))
- if details:
- buf += fmt.nl('\t' + self.details())
+ if details:
+ buf += fmt.nl('\t' + self.details())
- return buf
+ return buf
###########################################################################
# Classifier Cache
class ClassifierCache(netlink.Cache):
- """Cache of traffic classifiers objects"""
+ """Cache of traffic classifiers objects"""
- def __init__(self, ifindex, parent, cache=None):
- if not cache:
- cache = self._alloc_cache_name("route/cls")
+ def __init__(self, ifindex, parent, cache=None):
+ if not cache:
+ cache = self._alloc_cache_name("route/cls")
- self._protocol = netlink.NETLINK_ROUTE
- self._nl_cache = cache
- self._set_arg1(ifindex)
- self._set_arg2(int(parent))
+ self._protocol = netlink.NETLINK_ROUTE
+ self._nl_cache = cache
+ self._set_arg1(ifindex)
+ self._set_arg2(int(parent))
- def _new_object(self, obj):
- return Classifier(obj)
+ def _new_object(self, obj):
+ return Classifier(obj)
- def _new_cache(self, cache):
- return ClassifierCache(self.arg1, self.arg2, cache=cache)
+ def _new_cache(self, cache):
+ return ClassifierCache(self.arg1, self.arg2, cache=cache)
###########################################################################
# Classifier Object
class Classifier(Tc):
- """Classifier"""
+ """Classifier"""
- def __init__(self, obj=None):
- netlink.Object.__init__(self, "route/cls", "cls", obj)
- self._module_path = 'netlink.route.cls.'
- self._rtnl_cls = self._obj2type(self._nl_object)
- self._rtnl_tc = capi.obj2tc(self._nl_object)
+ def __init__(self, obj=None):
+ netlink.Object.__init__(self, "route/cls", "cls", obj)
+ self._module_path = 'netlink.route.cls.'
+ self._rtnl_cls = self._obj2type(self._nl_object)
+ self._rtnl_tc = capi.obj2tc(self._nl_object)
- netlink.add_attr('cls.handle', fmt=util.handle)
- netlink.add_attr('cls.parent', fmt=util.handle)
- netlink.add_attr('cls.kind', fmt=util.bold)
+ netlink.add_attr('cls.handle', fmt=util.handle)
+ netlink.add_attr('cls.parent', fmt=util.handle)
+ netlink.add_attr('cls.kind', fmt=util.bold)
- @classmethod
- def from_capi(cls, obj):
- return cls(capi.cls2obj(obj))
+ @classmethod
+ def from_capi(cls, obj):
+ return cls(capi.cls2obj(obj))
- def _obj2type(self, obj):
- return capi.obj2cls(obj)
+ def _obj2type(self, obj):
+ return capi.obj2cls(obj)
- def _new_instance(self, obj):
- if not obj:
- raise ValueError()
+ def _new_instance(self, obj):
+ if not obj:
+ raise ValueError()
- return Classifier(obj)
+ return Classifier(obj)
- #####################################################################
- # priority
- @property
- def priority(self):
- return capi.rtnl_cls_get_prio(self._rtnl_cls)
+ #####################################################################
+ # priority
+ @property
+ def priority(self):
+ return capi.rtnl_cls_get_prio(self._rtnl_cls)
- @priority.setter
- def priority(self, value):
- capi.rtnl_cls_set_prio(self._rtnl_cls, int(value))
+ @priority.setter
+ def priority(self, value):
+ capi.rtnl_cls_set_prio(self._rtnl_cls, int(value))
- #####################################################################
- # protocol
- @property
- def protocol(self):
- return capi.rtnl_cls_get_protocol(self._rtnl_cls)
+ #####################################################################
+ # protocol
+ @property
+ def protocol(self):
+ return capi.rtnl_cls_get_protocol(self._rtnl_cls)
- @protocol.setter
- def protocol(self, value):
- capi.rtnl_cls_set_protocol(self._rtnl_cls, int(value))
+ @protocol.setter
+ def protocol(self, value):
+ capi.rtnl_cls_set_protocol(self._rtnl_cls, int(value))
- @property
- def childs(self):
- return []
+ @property
+ def childs(self):
+ return []
- ###################################################################
- #
- # format(details=False, stats=False)
- #
- def format(self, details=False, stats=False, nodev=False,
- noparent=False, indent=''):
- """Return class as formatted text"""
- fmt = util.MyFormatter(self, indent)
+ ###################################################################
+ #
+ # format(details=False, stats=False)
+ #
+ def format(self, details=False, stats=False, nodev=False,
+ noparent=False, indent=''):
+ """Return class as formatted text"""
+ fmt = util.MyFormatter(self, indent)
- buf = fmt.format(self.brief('classifier', nodev, noparent))
- buf += fmt.format(' {t|priority} {t|protocol}')
+ buf = fmt.format(self.brief('classifier', nodev, noparent))
+ buf += fmt.format(' {t|priority} {t|protocol}')
- if details:
- buf += fmt.nl('\t' + self.details())
+ if details:
+ buf += fmt.nl('\t' + self.details())
- return buf
+ return buf
_qdisc_cache = QdiscCache()
def get_qdisc(ifindex, handle=None, parent=None):
- l = []
+ l = []
- _qdisc_cache.refill()
+ _qdisc_cache.refill()
- for qdisc in _qdisc_cache:
- if qdisc.ifindex == ifindex and \
- (handle == None or qdisc.handle == handle) and \
- (parent == None or qdisc.parent == parent):
- l.append(qdisc)
+ for qdisc in _qdisc_cache:
+ if qdisc.ifindex == ifindex and \
+ (handle == None or qdisc.handle == handle) and \
+ (parent == None or qdisc.parent == parent):
+ l.append(qdisc)
- return l
+ return l
_class_cache = {}
def get_class(ifindex, parent, handle=None):
- l = []
+ l = []
- try:
- cache = _class_cache[ifindex]
- except KeyError:
- cache = TcClassCache(ifindex)
- _class_cache[ifindex] = cache
+ try:
+ cache = _class_cache[ifindex]
+ except KeyError:
+ cache = TcClassCache(ifindex)
+ _class_cache[ifindex] = cache
- cache.refill()
+ cache.refill()
- for cl in cache:
- if (parent == None or cl.parent == parent) and \
- (handle == None or cl.handle == handle):
- l.append(cl)
+ for cl in cache:
+ if (parent == None or cl.parent == parent) and \
+ (handle == None or cl.handle == handle):
+ l.append(cl)
- return l
+ return l
_cls_cache = {}
def get_cls(ifindex, parent, handle=None):
- l = []
+ l = []
- try:
- chain = _cls_cache[ifindex]
- except KeyError:
- _cls_cache[ifindex] = {}
+ try:
+ chain = _cls_cache[ifindex]
+ except KeyError:
+ _cls_cache[ifindex] = {}
- try:
- cache = _cls_cache[ifindex][parent]
- except KeyError:
- cache = ClassifierCache(ifindex, parent)
- _cls_cache[ifindex][parent] = cache
+ try:
+ cache = _cls_cache[ifindex][parent]
+ except KeyError:
+ cache = ClassifierCache(ifindex, parent)
+ _cls_cache[ifindex][parent] = cache
- cache.refill()
+ cache.refill()
- for cls in cache:
- if handle == None or cls.handle == handle:
- l.append(cls)
+ for cls in cache:
+ if handle == None or cls.handle == handle:
+ l.append(cls)
- return l
+ return l
__version__ = "1.0"
def _color(t, c):
- return b'{esc}[{color}m{text}{esc}[0m'.format(esc=b'\x1b', color=c, text=t)
+ return b'{esc}[{color}m{text}{esc}[0m'.format(esc=b'\x1b', color=c, text=t)
def black(t):
- return _color(t, 30)
+ return _color(t, 30)
def red(t):
- return _color(t, 31)
+ return _color(t, 31)
def green(t):
- return _color(t, 32)
+ return _color(t, 32)
def yellow(t):
- return _color(t, 33)
+ return _color(t, 33)
def blue(t):
- return _color(t, 34)
+ return _color(t, 34)
def magenta(t):
- return _color(t, 35)
+ return _color(t, 35)
def cyan(t):
- return _color(t, 36)
+ return _color(t, 36)
def white(t):
- return _color(t, 37)
+ return _color(t, 37)
def bold(t):
- return _color(t, 1)
+ return _color(t, 1)
def kw(t):
- return yellow(t)
+ return yellow(t)
def num(t):
- return str(t)
+ return str(t)
def string(t):
- return t
+ return t
def addr(t):
- return str(t)
+ return str(t)
def bad(t):
- return red(t)
+ return red(t)
def good(t):
- return green(t)
+ return green(t)
def title(t):
- return t
+ return t
def bool(t):
- return str(t)
+ return str(t)
def handle(t):
- return str(t)
+ return str(t)
class MyFormatter(Formatter):
- def __init__(self, obj, indent=''):
- self._obj = obj
- self._indent = indent
-
- def _nlattr(self, key):
- value = getattr(self._obj, key)
- title = None
-
- if isinstance(value, types.MethodType):
- value = value()
-
- try:
- d = netlink.attrs[self._obj._name + '.' + key]
-
- if 'fmt' in d:
- value = d['fmt'](value)
-
- if 'title' in d:
- title = d['title']
- except KeyError:
- pass
- except AttributeError:
- pass
-
- return title, str(value)
-
- def get_value(self, key, args, kwds):
- # Let default get_value() handle ints
- if not isinstance(key, str):
- return Formatter.get_value(self, key, args, kwds)
-
- # HACK, we allow defining strings via fields to allow
- # conversions
- if key[:2] == 's|':
- return key[2:]
-
- if key[:2] == 't|':
- # title mode ("TITLE ATTR")
- include_title = True
- elif key[:2] == 'a|':
- # plain attribute mode ("ATTR")
- include_title = False
- else:
- # No special field, have default get_value() get it
- return Formatter.get_value(self, key, args, kwds)
-
- key = key[2:]
- (title, value) = self._nlattr(key)
-
- if include_title:
- if not title:
- title = key # fall back to key as title
- value = '{0} {1}'.format(kw(title), value)
-
- return value
-
- def convert_field(self, value, conversion):
- if conversion == 'r':
- return repr(value)
- elif conversion == 's':
- return str(value)
- elif conversion == 'k':
- return kw(value)
- elif conversion == 'b':
- return bold(value)
- elif conversion is None:
- return value
-
- raise ValueError("Unknown converion specifier {0!s}".format(conversion))
-
- def nl(self, format_string=''):
- return '\n' + self._indent + self.format(format_string)
+ def __init__(self, obj, indent=''):
+ self._obj = obj
+ self._indent = indent
+
+ def _nlattr(self, key):
+ value = getattr(self._obj, key)
+ title = None
+
+ if isinstance(value, types.MethodType):
+ value = value()
+
+ try:
+ d = netlink.attrs[self._obj._name + '.' + key]
+
+ if 'fmt' in d:
+ value = d['fmt'](value)
+
+ if 'title' in d:
+ title = d['title']
+ except KeyError:
+ pass
+ except AttributeError:
+ pass
+
+ return title, str(value)
+
+ def get_value(self, key, args, kwds):
+ # Let default get_value() handle ints
+ if not isinstance(key, str):
+ return Formatter.get_value(self, key, args, kwds)
+
+ # HACK, we allow defining strings via fields to allow
+ # conversions
+ if key[:2] == 's|':
+ return key[2:]
+
+ if key[:2] == 't|':
+ # title mode ("TITLE ATTR")
+ include_title = True
+ elif key[:2] == 'a|':
+ # plain attribute mode ("ATTR")
+ include_title = False
+ else:
+ # No special field, have default get_value() get it
+ return Formatter.get_value(self, key, args, kwds)
+
+ key = key[2:]
+ (title, value) = self._nlattr(key)
+
+ if include_title:
+ if not title:
+ title = key # fall back to key as title
+ value = '{0} {1}'.format(kw(title), value)
+
+ return value
+
+ def convert_field(self, value, conversion):
+ if conversion == 'r':
+ return repr(value)
+ elif conversion == 's':
+ return str(value)
+ elif conversion == 'k':
+ return kw(value)
+ elif conversion == 'b':
+ return bold(value)
+ elif conversion is None:
+ return value
+
+ raise ValueError("Unknown converion specifier {0!s}".format(conversion))
+
+ def nl(self, format_string=''):
+ return '\n' + self._indent + self.format(format_string)
NL_BYTE_RATE = 0
NL_BIT_RATE = 1
class Rate(object):
- def __init__(self, rate, mode=NL_BYTE_RATE):
- self._rate = rate
- self._mode = mode
+ def __init__(self, rate, mode=NL_BYTE_RATE):
+ self._rate = rate
+ self._mode = mode
- def __str__(self):
- return capi.nl_rate2str(self._rate, self._mode, 32)[1]
+ def __str__(self):
+ return capi.nl_rate2str(self._rate, self._mode, 32)[1]
- def __int__(self):
- return self._rate
+ def __int__(self):
+ return self._rate
- def __cmp__(self, other):
- return int(self) - int(other)
+ def __cmp__(self, other):
+ return int(self) - int(other)
class Size(object):
- def __init__(self, size):
- self._size = size
+ def __init__(self, size):
+ self._size = size
- def __str__(self):
- return capi.nl_size2str(self._size, 32)[0]
+ def __str__(self):
+ return capi.nl_size2str(self._size, 32)[0]
- def __int__(self):
- return self._size
+ def __int__(self):
+ return self._size
- def __cmp__(self, other):
- return int(self) - int(other)
+ def __cmp__(self, other):
+ return int(self) - int(other)