# underscore this to make sure it is deleted first upon module deletion
_defaultDumpParams = DumpParams(NL_DUMP_LINE)
-###########################################################################
-# Cacheable Object (Base Class)
class Object(object):
"""Cacheable object (base class)"""
capi.nl_object_dump(self._nl_object, params._dp)
- #####################################################################
- # mark
@property
def mark(self):
return bool(capi.nl_object_is_marked(self._nl_object))
else:
capi.nl_object_unmark(self._nl_object)
- #####################################################################
- # shared
@property
def shared(self):
return capi.nl_object_shared(self._nl_object) != 0
- #####################################################################
- # attrs
@property
def attrs(self):
attr_list = capi.nl_object_attr_list(self._nl_object, 1024)
return attr_list[0].split()
- #####################################################################
- # refcnt
@property
def refcnt(self):
return capi.nl_object_get_refcnt(self._nl_object)
def get_next(self):
return capi.nl_cache_get_prev(self._nl_object)
-###########################################################################
-# Cache
class Cache(object):
"""Collection of netlink objects"""
def __init__(self):
"""
capi.nl_cache_mngt_unprovide(self._nl_cache)
-###########################################################################
# Cache Manager (Work in Progress)
NL_AUTO_PROVIDE = 1
class CacheManager(object):
def add(self, name):
capi.cache_mngr_add(self._mngr, name, None, None)
-###########################################################################
-# Address Family
class AddressFamily(object):
"""Address family representation
return 'AddressFamily({0!r})'.format(str(self))
-###########################################################################
-# Abstract Address
class AbstractAddress(object):
"""Abstract address object
from . import link as Link
from .. import util as util
-###########################################################################
-# Address Cache
class AddressCache(netlink.Cache):
"""Cache containing network addresses"""
def _new_cache(cache):
return AddressCache(cache=cache)
-###########################################################################
-# Address Object
class Address(netlink.Object):
"""Network address"""
def _new_instance(obj):
return Address(obj)
- #####################################################################
- # ifindex
@netlink.nlattr('address.ifindex', type=int, immutable=True,
fmt=util.num)
@property
self.link = link
- #####################################################################
- # link
@netlink.nlattr('address.link', type=str, fmt=util.string)
@property
def link(self):
if capi.rtnl_addr_get_ifindex(self._orig) == 0:
capi.rtnl_addr_set_ifindex(self._orig, value.ifindex)
- #####################################################################
- # label
@netlink.nlattr('address.label', type=str, fmt=util.string)
@property
def label(self):
def label(self, value):
capi.rtnl_addr_set_label(self._rtnl_addr, value)
- #####################################################################
- # flags
@netlink.nlattr('address.flags', type=str, fmt=util.string)
@property
def flags(self):
else:
self._set_flag(value)
- #####################################################################
- # family
@netlink.nlattr('address.family', type=int, immutable=True,
fmt=util.num)
@property
capi.rtnl_addr_set_family(self._rtnl_addr, int(value))
- #####################################################################
- # scope
@netlink.nlattr('address.scope', type=int, fmt=util.num)
@property
def scope(self):
value = capi.rtnl_str2scope(value)
capi.rtnl_addr_set_scope(self._rtnl_addr, value)
- #####################################################################
- # local address
@netlink.nlattr('address.local', type=str, immutable=True,
fmt=util.addr)
@property
if capi.rtnl_addr_get_local(self._orig) is None:
capi.rtnl_addr_set_local(self._orig, a._nl_addr)
- #####################################################################
- # Peer address
@netlink.nlattr('address.peer', type=str, fmt=util.addr)
@property
def peer(self):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_peer(self._rtnl_addr, a._nl_addr)
- #####################################################################
- # Broadcast address
@netlink.nlattr('address.broadcast', type=str, fmt=util.addr)
@property
def broadcast(self):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_broadcast(self._rtnl_addr, a._nl_addr)
- #####################################################################
- # Multicast address
@netlink.nlattr('address.multicast', type=str, fmt=util.addr)
@property
def multicast(self):
capi.rtnl_addr_set_multicast(self._rtnl_addr, a._nl_addr)
- #####################################################################
- # Anycast address
@netlink.nlattr('address.anycast', type=str, fmt=util.addr)
@property
def anycast(self):
a = netlink.AbstractAddress(value)
capi.rtnl_addr_set_anycast(self._rtnl_addr, a._nl_addr)
- #####################################################################
- # Valid lifetime
@netlink.nlattr('address.valid_lifetime', type=int, immutable=True,
fmt=util.num)
@property
def valid_lifetime(self, value):
capi.rtnl_addr_set_valid_lifetime(self._rtnl_addr, int(value))
- #####################################################################
- # Preferred lifetime
@netlink.nlattr('address.preferred_lifetime', type=int,
immutable=True, fmt=util.num)
@property
def preferred_lifetime(self, value):
capi.rtnl_addr_set_preferred_lifetime(self._rtnl_addr, int(value))
- #####################################################################
- # Creation Time
@netlink.nlattr('address.create_time', type=int, immutable=True,
fmt=util.num)
@property
hsec = capi.rtnl_addr_get_create_time(self._rtnl_addr)
return datetime.timedelta(milliseconds=10*hsec)
- #####################################################################
- # Last Update
@netlink.nlattr('address.last_update', type=int, immutable=True,
fmt=util.num)
@property
hsec = capi.rtnl_addr_get_last_update_time(self._rtnl_addr)
return datetime.timedelta(milliseconds=10*hsec)
- #####################################################################
- # add()
def add(self, socket=None, flags=None):
if not socket:
socket = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if ret < 0:
raise netlink.KernelError(ret)
- #####################################################################
- # delete()
def delete(self, socket, flags=0):
"""Attempt to delete this address in the kernel"""
ret = capi.rtnl_addr_delete(socket._sock, self._rtnl_addr, flags)
def _flags(self):
return ','.join(self.flags)
- ###################################################################
- #
- # format(details=False, stats=False)
- #
def format(self, details=False, stats=False, nodev=False, indent=''):
"""Return address as formatted text"""
fmt = util.MyFormatter(self, indent)
from .links import inet as inet
from .. import util as util
-###########################################################################
# Link statistics definitions
RX_PACKETS = 0
TX_PACKETS = 1
ICMP6_OUTMSGS = 55
ICMP6_OUTERRORS = 56
-###########################################################################
-# Link Cache
class LinkCache(netlink.Cache):
"""Cache of network links"""
def _new_cache(self, cache):
return LinkCache(family=self.arg1, cache=cache)
-###########################################################################
-# Link Object
class Link(netlink.Object):
"""Network link"""
return Link(obj)
- #####################################################################
- # ifindex
@netlink.nlattr('link.ifindex', type=int, immutable=True, fmt=util.num)
@property
def ifindex(self):
if capi.rtnl_link_get_ifindex(self._orig) == 0:
capi.rtnl_link_set_ifindex(self._orig, int(value))
- #####################################################################
- # name
@netlink.nlattr('link.name', type=str, fmt=util.bold)
@property
def name(self):
if capi.rtnl_link_get_name(self._orig) is None:
capi.rtnl_link_set_name(self._orig, value)
- #####################################################################
- # flags
@netlink.nlattr('link.flags', type=str, fmt=util.string)
@property
def flags(self):
else:
self._set_flag(value)
- #####################################################################
- # mtu
@netlink.nlattr('link.mtu', type=int, fmt=util.num)
@property
def mtu(self):
def mtu(self, value):
capi.rtnl_link_set_mtu(self._rtnl_link, int(value))
- #####################################################################
- # family
@netlink.nlattr('link.family', type=int, immutable=True, fmt=util.num)
@property
def family(self):
def family(self, value):
capi.rtnl_link_set_family(self._rtnl_link, value)
- #####################################################################
- # address
@netlink.nlattr('link.address', type=str, fmt=util.addr)
@property
def address(self):
def address(self, value):
capi.rtnl_link_set_addr(self._rtnl_link, value._addr)
- #####################################################################
- # broadcast
@netlink.nlattr('link.broadcast', type=str, fmt=util.addr)
@property
def broadcast(self):
def broadcast(self, value):
capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr)
- #####################################################################
- # qdisc
@netlink.nlattr('link.qdisc', type=str, immutable=True, fmt=util.string)
@property
def qdisc(self):
def qdisc(self, value):
capi.rtnl_link_set_qdisc(self._rtnl_link, value)
- #####################################################################
- # txqlen
@netlink.nlattr('link.txqlen', type=int, fmt=util.num)
@property
def txqlen(self):
def txqlen(self, value):
capi.rtnl_link_set_txqlen(self._rtnl_link, int(value))
- #####################################################################
- # weight
@netlink.nlattr('link.weight', type=str, fmt=util.string)
@property
def weight(self):
v = int(value)
capi.rtnl_link_set_weight(self._rtnl_link, v)
- #####################################################################
- # arptype
@netlink.nlattr('link.arptype', type=str, immutable=True, fmt=util.string)
@property
def arptype(self):
i = core_capi.nl_str2llproto(value)
capi.rtnl_link_set_arptype(self._rtnl_link, i)
- #####################################################################
- # operstate
@netlink.nlattr('link.operstate', type=str, immutable=True,
fmt=util.string, title='state')
@property
i = capi.rtnl_link_str2operstate(value)
capi.rtnl_link_set_operstate(self._rtnl_link, i)
- #####################################################################
- # mode
@netlink.nlattr('link.mode', type=str, immutable=True, fmt=util.string)
@property
def mode(self):
i = capi.rtnl_link_str2mode(value)
capi.rtnl_link_set_linkmode(self._rtnl_link, i)
- #####################################################################
- # alias
@netlink.nlattr('link.alias', type=str, fmt=util.string)
@property
def alias(self):
def alias(self, value):
capi.rtnl_link_set_ifalias(self._rtnl_link, value)
- #####################################################################
- # type
@netlink.nlattr('link.type', type=str, fmt=util.string)
@property
def type(self):
self._module_lookup('netlink.route.links.' + value)
- #####################################################################
- # get_stat()
def get_stat(self, stat):
"""Retrieve statistical information"""
if type(stat) is str:
return capi.rtnl_link_get_stat(self._rtnl_link, stat)
- #####################################################################
- # add()
def add(self, sock=None, flags=None):
if not sock:
sock = netlink.lookup_socket(netlink.NETLINK_ROUTE)
if ret < 0:
raise netlink.KernelError(ret)
- #####################################################################
- # change()
def change(self, sock=None, flags=0):
"""Commit changes made to the link object"""
if sock is None:
if ret < 0:
raise netlink.KernelError(ret)
- #####################################################################
- # delete()
def delete(self, sock=None):
"""Attempt to delete this link in the kernel"""
if sock is None:
pass
return buf
- ###################################################################
- #
- # format(details=False, stats=False)
- #
def format(self, details=False, stats=False, indent=''):
"""Return link as formatted text"""
fmt = util.MyFormatter(self, indent)
def __init__(self, link):
self._link = link
- ###################################################################
- # id
@netlink.nlattr('link.vlan.id', type=int)
@property
def id(self):
def id(self, value):
capi.rtnl_link_vlan_set_id(self._link, int(value))
- ###################################################################
- # flags
@netlink.nlattr('link.vlan.flags', type=str)
@property
def flags(self):
def __init__(self, qdisc):
self._qdisc = qdisc
- ###################################################################
- # default class
@netlink.nlattr('qdisc.htb.default_class', type=int)
@property
def default_class(self):
def default_class(self, value):
capi.rtnl_htb_set_defcls(self._qdisc._rtnl_qdisc, int(value))
- #####################################################################
- # r2q
@netlink.nlattr('qdisc.htb.r2q', type=int)
@property
def r2q(self):
def __init__(self, cl):
self._class = cl
- #####################################################################
- # rate
@netlink.nlattr('class.htb.rate', type=str)
@property
def rate(self):
def rate(self, value):
capi.rtnl_htb_set_rate(self._class._rtnl_class, int(value))
- #####################################################################
- # ceil
@netlink.nlattr('class.htb.ceil', type=str)
@property
def ceil(self):
def ceil(self, value):
capi.rtnl_htb_set_ceil(self._class._rtnl_class, int(value))
- #####################################################################
- # burst
@netlink.nlattr('class.htb.burst', type=str)
@property
def burst(self):
def burst(self, value):
capi.rtnl_htb_set_rbuffer(self._class._rtnl_class, int(value))
- #####################################################################
- # ceil burst
@netlink.nlattr('class.htb.ceil_burst', type=str)
@property
def ceil_burst(self):
def ceil_burst(self, value):
capi.rtnl_htb_set_cbuffer(self._class._rtnl_class, int(value))
- #####################################################################
- # priority
@netlink.nlattr('class.htb.prio', type=int)
@property
def prio(self):
def prio(self, value):
capi.rtnl_htb_set_prio(self._class._rtnl_class, int(value))
- #####################################################################
- # quantum
@netlink.nlattr('class.htb.quantum', type=int)
@property
def quantum(self):
def quantum(self, value):
capi.rtnl_htb_set_quantum(self._class._rtnl_class, int(value))
- #####################################################################
- # level
@netlink.nlattr('class.htb.level', type=int)
@property
def level(self):
STAT_MAX = STAT_OVERLIMITS
-###########################################################################
-# Handle
class Handle(object):
""" Traffic control handle
def isroot(self):
return self._val == TC_H_ROOT or self._val == TC_H_INGRESS
-###########################################################################
-# TC Cache
class TcCache(netlink.Cache):
"""Cache of traffic control object"""
def __getitem__(self, key):
raise NotImplementedError()
-###########################################################################
-# Tc Object
class Tc(netlink.Object):
def __cmp__(self, other):
diff = self.ifindex - other.ifindex
"""True if tc object is a root object"""
return self.parent.isroot()
- #####################################################################
- # ifindex
@property
def ifindex(self):
"""interface index"""
def ifindex(self, value):
capi.rtnl_tc_set_ifindex(self._rtnl_tc, int(value))
- #####################################################################
- # link
@property
def link(self):
link = capi.rtnl_tc_get_link(self._rtnl_tc)
def link(self, value):
capi.rtnl_tc_set_link(self._rtnl_tc, value._link)
- #####################################################################
- # mtu
@property
def mtu(self):
return capi.rtnl_tc_get_mtu(self._rtnl_tc)
def mtu(self, value):
capi.rtnl_tc_set_mtu(self._rtnl_tc, int(value))
- #####################################################################
- # mpu
@property
def mpu(self):
return capi.rtnl_tc_get_mpu(self._rtnl_tc)
def mpu(self, value):
capi.rtnl_tc_set_mpu(self._rtnl_tc, int(value))
- #####################################################################
- # overhead
@property
def overhead(self):
return capi.rtnl_tc_get_overhead(self._rtnl_tc)
def overhead(self, value):
capi.rtnl_tc_set_overhead(self._rtnl_tc, int(value))
- #####################################################################
- # linktype
@property
def linktype(self):
return capi.rtnl_tc_get_linktype(self._rtnl_tc)
def linktype(self, value):
capi.rtnl_tc_set_linktype(self._rtnl_tc, int(value))
- #####################################################################
- # handle
@property
def handle(self):
return Handle(capi.rtnl_tc_get_handle(self._rtnl_tc))
def handle(self, value):
capi.rtnl_tc_set_handle(self._rtnl_tc, int(value))
- #####################################################################
- # parent
@property
def parent(self):
return Handle(capi.rtnl_tc_get_parent(self._rtnl_tc))
def parent(self, value):
capi.rtnl_tc_set_parent(self._rtnl_tc, int(value))
- #####################################################################
- # kind
@property
def kind(self):
return capi.rtnl_tc_get_kind(self._rtnl_tc)
def stats(fmt):
return fmt.nl('{t|packets} {t|bytes} {t|qlen}')
-###########################################################################
-# Queueing discipline cache
class QdiscCache(netlink.Cache):
"""Cache of qdiscs"""
def _new_cache(cache):
return QdiscCache(cache=cache)
-###########################################################################
-# Qdisc Object
class Qdisc(Tc):
"""Queueing discipline"""
return ret
-# #####################################################################
-# # add()
# def add(self, socket, flags=None):
# if not flags:
# flags = netlink.NLM_F_CREATE
# if ret < 0:
# raise netlink.KernelError(ret)
#
-# #####################################################################
-# # change()
# def change(self, socket, flags=0):
# """Commit changes made to the link object"""
# if not self._orig:
# if ret < 0:
# raise netlink.KernelError(ret)
#
-# #####################################################################
-# # delete()
# def delete(self, socket):
# """Attempt to delete this link in the kernel"""
# ret = capi.rtnl_link_delete(socket._sock, self._link)
# if ret < 0:
# raise netlink.KernelError(ret)
- ###################################################################
- #
- # format(details=False, stats=False)
- #
def format(self, details=False, stats=False, nodev=False,
noparent=False, indent=''):
"""Return qdisc as formatted text"""
return buf
-###########################################################################
-# Traffic class cache
class TcClassCache(netlink.Cache):
"""Cache of traffic classes"""
def _new_cache(self, cache):
return TcClassCache(self.arg1, cache=cache)
-###########################################################################
-# Traffic Class Object
class TcClass(Tc):
"""Traffic Class"""
return ret
- ###################################################################
- #
- # format(details=False, stats=False)
- #
def format(self, details=False, _stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""
return buf
-###########################################################################
-# Classifier Cache
class ClassifierCache(netlink.Cache):
"""Cache of traffic classifiers objects"""
def _new_cache(self, cache):
return ClassifierCache(self.arg1, self.arg2, cache=cache)
-###########################################################################
-# Classifier Object
class Classifier(Tc):
"""Classifier"""
return Classifier(obj)
- #####################################################################
- # priority
@property
def priority(self):
return capi.rtnl_cls_get_prio(self._rtnl_cls)
def priority(self, value):
capi.rtnl_cls_set_prio(self._rtnl_cls, int(value))
- #####################################################################
- # protocol
@property
def protocol(self):
return capi.rtnl_cls_get_protocol(self._rtnl_cls)
def childs(self):
return []
- ###################################################################
- #
- # format(details=False, stats=False)
- #
def format(self, details=False, _stats=False, nodev=False,
noparent=False, indent=''):
"""Return class as formatted text"""