From 4521d8ab496027a08c3086e700827c5b7d77e195 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 24 Sep 2020 18:39:36 +0200 Subject: [PATCH] Implement global & extensible interfaces (#2707) * Implement global & extensible interfaces * Simplify detection of valid interfaces * Linux: handle interfaces with no IPv4 * Reimplement get_working_ifaces * Remove 'main' IPv6 in interfaces * Don't show invalid interfaces by default * Update error message --- .config/ci/install.sh | 6 +- .config/codespell_ignore.txt | 1 + .travis.yml | 2 +- doc/scapy/routing.rst | 12 + doc/scapy/usage.rst | 2 +- run_scapy.bat | 4 +- scapy/all.py | 1 + scapy/arch/__init__.py | 21 +- scapy/arch/bpf/core.py | 186 ++++++----- scapy/arch/bpf/supersocket.py | 6 +- scapy/arch/common.py | 40 ++- scapy/arch/{pcapdnet.py => libpcap.py} | 133 ++++++-- scapy/arch/linux.py | 76 +++-- scapy/arch/solaris.py | 2 +- scapy/arch/windows/__init__.py | 412 ++++++++----------------- scapy/arch/windows/native.py | 3 +- scapy/compat.py | 8 + scapy/config.py | 53 ++-- scapy/fields.py | 6 +- scapy/interfaces.py | 362 ++++++++++++++++++++++ scapy/layers/usb.py | 44 ++- scapy/libs/winpcapy.py | 42 ++- scapy/route.py | 33 +- scapy/route6.py | 12 +- scapy/sendrecv.py | 87 ++++-- scapy/supersocket.py | 11 +- scapy/themes.py | 8 +- scapy/tools/UTscapy.py | 4 +- scapy/utils.py | 37 ++- scapy/utils6.py | 2 +- test/bpf.uts | 37 --- test/linux.uts | 12 +- test/regression.uts | 111 +++++-- test/windows.uts | 26 +- tox.ini | 2 +- 35 files changed, 1155 insertions(+), 649 deletions(-) rename scapy/arch/{pcapdnet.py => libpcap.py} (79%) create mode 100644 scapy/interfaces.py diff --git a/.config/ci/install.sh b/.config/ci/install.sh index e28bbc0908d..251b1cf0d8d 100755 --- a/.config/ci/install.sh +++ b/.config/ci/install.sh @@ -2,10 +2,10 @@ # Install on osx if [ "$OSTYPE" = "darwin"* ] || [ "$TRAVIS_OS_NAME" = "osx" ] then - if [ ! -z $SCAPY_USE_PCAPDNET ] + if [ ! -z $SCAPY_USE_LIBPCAP ] then brew update - brew install libdnet libpcap + brew install libpcap fi fi @@ -18,7 +18,7 @@ then fi # Make sure libpcap is installed -if [ ! -z $SCAPY_USE_PCAPDNET ] +if [ ! -z $SCAPY_USE_LIBPCAP ] then sudo apt-get -qy install libpcap-dev || exit 1 fi diff --git a/.config/codespell_ignore.txt b/.config/codespell_ignore.txt index 1aaaa05c76c..7c7202d32ad 100644 --- a/.config/codespell_ignore.txt +++ b/.config/codespell_ignore.txt @@ -12,6 +12,7 @@ fo gost iff inout +microsof mitre nd negociate diff --git a/.travis.yml b/.travis.yml index dd12c00a93c..52c0ef4b9f6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -58,7 +58,7 @@ jobs: - os: linux python: 3.8 env: - - SCAPY_USE_PCAPDNET=yes TOXENV=py38-linux_root,codecov + - SCAPY_USE_LIBPCAP=yes TOXENV=py38-linux_root,codecov # warnings/deprecations - os: linux diff --git a/doc/scapy/routing.rst b/doc/scapy/routing.rst index 8e468696daa..a426d9fd93d 100644 --- a/doc/scapy/routing.rst +++ b/doc/scapy/routing.rst @@ -20,6 +20,18 @@ Use ``get_if_list()`` to get the interface list >>> get_if_list() ['lo', 'eth0'] +You can also use the :py:attr:`conf.ifaces ` object to get interfaces. +In this example, the object is first displayed as as column. Then, the :py:attr:`dev_from_index() ` is used to access the interface at index 2. + +.. code-block:: pycon + + >>> conf.ifaces + SRC INDEX IFACE IPv4 IPv6 MAC + sys 2 eth0 10.0.0.5 fe80::10a:2bef:dc12:afae Microsof:12:cb:ef + sys 1 lo 127.0.0.1 ::1 00:00:00:00:00:00 + >>> conf.ifaces.dev_from_index(2) + + IPv4 routes ----------- diff --git a/doc/scapy/usage.rst b/doc/scapy/usage.rst index f7ac1e952ee..af4c7de674e 100644 --- a/doc/scapy/usage.rst +++ b/doc/scapy/usage.rst @@ -1214,7 +1214,7 @@ Provided that your wireless card and driver are correctly configured for frame i On Windows, if using Npcap, the equivalent would be to call:: - >>> # Of course, conf.iface can be replaced by any interfaces accessed through IFACES + >>> # Of course, conf.iface can be replaced by any interfaces accessed through conf.ifaces ... conf.iface.setmonitor(True) you can have a kind of FakeAP:: diff --git a/run_scapy.bat b/run_scapy.bat index cfaa30ad643..11c73621a8d 100644 --- a/run_scapy.bat +++ b/run_scapy.bat @@ -2,10 +2,10 @@ set PYTHONPATH=%~dp0 REM shift will not work with %* set "_args=%*" -IF "%1" == "--2" ( +IF "%1" == "-2" ( set PYTHON=python set "_args=%_args:~3%" -) ELSE IF "%1" == "--3" ( +) ELSE IF "%1" == "-3" ( set PYTHON=python3 set "_args=%_args:~3%" ) diff --git a/scapy/all.py b/scapy/all.py index 4f438461402..b07be8af034 100644 --- a/scapy/all.py +++ b/scapy/all.py @@ -14,6 +14,7 @@ from scapy.error import * from scapy.themes import * from scapy.arch import * +from scapy.interfaces import * from scapy.plist import * from scapy.fields import * diff --git a/scapy/arch/__init__.py b/scapy/arch/__init__.py index eab1ae9a22b..154767d9c74 100644 --- a/scapy/arch/__init__.py +++ b/scapy/arch/__init__.py @@ -18,20 +18,23 @@ from scapy.compat import orb +# Duplicated from scapy/utils.py for import reasons + def str2mac(s): return ("%02x:" * 6)[:-1] % tuple(orb(x) for x in s) -if not WINDOWS: - if not conf.use_pcap: - from scapy.arch.bpf.core import get_if_raw_addr - - def get_if_addr(iff): - return inet_ntop(socket.AF_INET, get_if_raw_addr(iff)) + """ + Returns the IPv4 of an interface or "0.0.0.0" if not available + """ + return inet_ntop(socket.AF_INET, get_if_raw_addr(iff)) # noqa: F405 def get_if_hwaddr(iff): + """ + Returns the MAC (hardware) address of an interface + """ addrfamily, mac = get_if_raw_hwaddr(iff) # noqa: F405 if addrfamily in [ARPHDR_ETHER, ARPHDR_LOOPBACK]: return str2mac(mac) @@ -51,6 +54,8 @@ def get_if_hwaddr(iff): # def get_if(iff,cmd): # def get_if_index(iff): +from scapy.interfaces import get_working_if # noqa F401 + if LINUX: from scapy.arch.linux import * # noqa F403 elif BSD: @@ -58,7 +63,7 @@ def get_if_hwaddr(iff): from scapy.arch.bpf.core import * # noqa F403 if not conf.use_pcap: # Native - from scapy.arch.bpf.supersocket import * # noqa F403 + from scapy.arch.bpf.supersocket import * # noqa F403 conf.use_bpf = True elif SOLARIS: from scapy.arch.solaris import * # noqa F403 @@ -66,8 +71,6 @@ def get_if_hwaddr(iff): from scapy.arch.windows import * # noqa F403 from scapy.arch.windows.native import * # noqa F403 -if conf.iface is None: - conf.iface = conf.loopback_name _set_conf_sockets() # Apply config diff --git a/scapy/arch/bpf/core.py b/scapy/arch/bpf/core.py index 0ebdb53162b..6fbdf447eef 100644 --- a/scapy/arch/bpf/core.py +++ b/scapy/arch/bpf/core.py @@ -7,7 +7,7 @@ from __future__ import absolute_import from ctypes import cdll, cast, pointer -from ctypes import c_int, c_ulong, c_char_p +from ctypes import c_int, c_ulong, c_uint, c_char_p, Structure, POINTER from ctypes.util import find_library import fcntl import os @@ -16,27 +16,53 @@ import struct import subprocess +import scapy from scapy.arch.bpf.consts import BIOCSETF, SIOCGIFFLAGS, BIOCSETIF -from scapy.arch.common import get_if, compile_filter +from scapy.arch.common import get_if, compile_filter, _iff_flags +from scapy.arch.unix import in6_getifaddr from scapy.compat import plain_str from scapy.config import conf from scapy.data import ARPHDR_LOOPBACK, ARPHDR_ETHER from scapy.error import Scapy_Exception, warning +from scapy.interfaces import InterfaceProvider, IFACES, NetworkInterface, \ + network_name +from scapy.pton_ntop import inet_ntop from scapy.modules.six.moves import range # ctypes definitions LIBC = cdll.LoadLibrary(find_library("libc")) + LIBC.ioctl.argtypes = [c_int, c_ulong, c_char_p] LIBC.ioctl.restype = c_int +# The following is implemented as of Python >= 3.3 +# under socket.*. Remember to use them when dropping Py2.7 + +# See https://docs.python.org/3/library/socket.html#socket.if_nameindex + + +class if_nameindex(Structure): + _fields_ = [("if_index", c_uint), + ("if_name", c_char_p)] + + +_ptr_ifnameindex_table = POINTER(if_nameindex * 255) + +LIBC.if_nameindex.argtypes = [] +LIBC.if_nameindex.restype = _ptr_ifnameindex_table +LIBC.if_freenameindex.argtypes = [_ptr_ifnameindex_table] +LIBC.if_freenameindex.restype = None # Addresses manipulation functions + def get_if_raw_addr(ifname): """Returns the IPv4 address configured on 'ifname', packed with inet_pton.""" # noqa: E501 + ifname = network_name(ifname) + # Get ifconfig output subproc = subprocess.Popen( [conf.prog.ifconfig, ifname], @@ -49,7 +75,7 @@ def get_if_raw_addr(ifname): # Get IPv4 addresses addresses = [ - line for line in plain_str(stdout).splitlines() + line.strip() for line in plain_str(stdout).splitlines() if "inet " in line ] @@ -69,6 +95,7 @@ def get_if_raw_hwaddr(ifname): NULL_MAC_ADDRESS = b'\x00' * 6 + ifname = network_name(ifname) # Handle the loopback interface separately if ifname == conf.loopback_name: return (ARPHDR_LOOPBACK, NULL_MAC_ADDRESS) @@ -85,7 +112,7 @@ def get_if_raw_hwaddr(ifname): # Get MAC addresses addresses = [ - line for line in plain_str(stdout).splitlines() if ( + line.strip() for line in plain_str(stdout).splitlines() if ( "ether" in line or "lladdr" in line or "address" in line ) ] @@ -108,7 +135,12 @@ def get_dev_bpf(): try: fd = os.open("/dev/bpf%i" % bpf, os.O_RDWR) return (fd, bpf) - except OSError: + except OSError as ex: + if ex.errno == 13: # Permission denied + raise Scapy_Exception(( + "Permission denied: could not open /dev/bpf%i. " + "Make sure to be running Scapy as root ! (sudo)" + ) % bpf) continue raise Scapy_Exception("No /dev/bpf handle is available !") @@ -125,87 +157,87 @@ def attach_filter(fd, bpf_filter, iface): # Interface manipulation functions -def get_if_list(): - """Returns a list containing all network interfaces.""" - - # Get ifconfig output - subproc = subprocess.Popen( - [conf.prog.ifconfig], - close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) - stdout, stderr = subproc.communicate() - if subproc.returncode: - raise Scapy_Exception("Failed to execute ifconfig: (%s)" % - (plain_str(stderr))) - - interfaces = [ - line[:line.find(':')] for line in plain_str(stdout).splitlines() - if ": flags" in line.lower() - ] - return interfaces +def _get_ifindex_list(): + """ + Returns a list containing (iface, index) + """ + ptr = LIBC.if_nameindex() + ifaces = [] + for i in range(255): + iface = ptr.contents[i] + if not iface.if_name: + break + ifaces.append((plain_str(iface.if_name), iface.if_index)) + LIBC.if_freenameindex(ptr) + return ifaces _IFNUM = re.compile(r"([0-9]*)([ab]?)$") -def get_working_ifaces(): - """ - Returns an ordered list of interfaces that could be used with BPF. - Note: the order mimics pcap_findalldevs() behavior - """ +def _get_if_flags(ifname): + """Internal function to get interface flags""" + # Get interface flags + try: + result = get_if(ifname, SIOCGIFFLAGS) + except IOError: + warning("ioctl(SIOCGIFFLAGS) failed on %s !", ifname) + return None - # Only root is allowed to perform the following ioctl() call - if os.getuid() != 0: - return [] + # Convert flags + ifflags = struct.unpack("16xH14x", result)[0] + return ifflags - # Test all network interfaces - interfaces = [] - for ifname in get_if_list(): - # Unlike pcap_findalldevs(), we do not care of loopback interfaces. - if ifname == conf.loopback_name: - continue +class BPFInterfaceProvider(InterfaceProvider): + name = "BPF" - # Get interface flags + def _is_valid(self, dev): + if not dev.flags & 0x1: # not IFF_UP + return False + # Get a BPF handle try: - result = get_if(ifname, SIOCGIFFLAGS) - except IOError: - warning("ioctl(SIOCGIFFLAGS) failed on %s !", ifname) - continue - - # Convert flags - ifflags = struct.unpack("16xH14x", result)[0] - if ifflags & 0x1: # IFF_UP - - # Get a BPF handle fd = get_dev_bpf()[0] - if fd is None: - raise Scapy_Exception("No /dev/bpf are available !") - - # Check if the interface can be used + except Scapy_Exception: + return True # Can't check if available (non sudo?) + if fd is None: + raise Scapy_Exception("No /dev/bpf are available !") + # Check if the interface can be used + try: + fcntl.ioctl(fd, BIOCSETIF, struct.pack("16s16x", + dev.network_name.encode())) + except IOError: + return False + else: + return True + finally: + # Close the file descriptor + os.close(fd) + + def load(self): + from scapy.fields import FlagValue + data = {} + ips = in6_getifaddr() + for ifname, index in _get_ifindex_list(): try: - fcntl.ioctl(fd, BIOCSETIF, struct.pack("16s16x", - ifname.encode())) - except IOError: - pass - else: - ifnum, ifab = _IFNUM.search(ifname).groups() - interfaces.append((ifname, int(ifnum) if ifnum else -1, ifab)) - finally: - # Close the file descriptor - os.close(fd) - - # Sort to mimic pcap_findalldevs() order - interfaces.sort(key=lambda elt: (elt[1], elt[2], elt[0])) - - return [iface[0] for iface in interfaces] - - -def get_working_if(): - """Returns the first interface than can be used with BPF""" - - ifaces = get_working_ifaces() - if not ifaces: - # A better interface will be selected later using the routing table - return conf.loopback_name - return ifaces[0] + ifflags = _get_if_flags(ifname) + mac = scapy.utils.str2mac(get_if_raw_hwaddr(ifname)[1]) + ip = inet_ntop(socket.AF_INET, get_if_raw_addr(ifname)) + except Scapy_Exception: + continue + ifflags = FlagValue(ifflags, _iff_flags) + if_data = { + "name": ifname, + "network_name": ifname, + "description": ifname, + "flags": ifflags, + "index": index, + "ip": ip, + "ips": [x[0] for x in ips if x[2] == ifname] + [ip], + "mac": mac + } + data[ifname] = NetworkInterface(self, if_data) + return data + + +IFACES.register_provider(BPFInterfaceProvider) diff --git a/scapy/arch/bpf/supersocket.py b/scapy/arch/bpf/supersocket.py index 4087899bb12..f1837c9d1bc 100644 --- a/scapy/arch/bpf/supersocket.py +++ b/scapy/arch/bpf/supersocket.py @@ -21,6 +21,7 @@ from scapy.consts import FREEBSD, NETBSD, DARWIN from scapy.data import ETH_P_ALL from scapy.error import Scapy_Exception, warning +from scapy.interfaces import network_name from scapy.supersocket import SuperSocket from scapy.compat import raw @@ -53,10 +54,7 @@ def __init__(self, iface=None, type=ETH_P_ALL, promisc=None, filter=None, else: self.promisc = promisc - if iface is None: - self.iface = conf.iface - else: - self.iface = iface + self.iface = network_name(iface or conf.iface) # Get the BPF handle (self.ins, self.dev_bpf) = get_dev_bpf() diff --git a/scapy/arch/common.py b/scapy/arch/common.py index 375a1f1bd25..d44d43d156d 100644 --- a/scapy/arch/common.py +++ b/scapy/arch/common.py @@ -16,16 +16,42 @@ from scapy.config import conf from scapy.data import MTU, ARPHRD_TO_DLT from scapy.error import Scapy_Exception +from scapy.interfaces import network_name if not WINDOWS: from fcntl import ioctl +# From if.h +_iff_flags = [ + "UP", + "BROADCAST", + "DEBUG", + "LOOPBACK", + "POINTTOPOINT", + "NOTRAILERS", + "RUNNING", + "NOARP", + "PROMISC", + "NOTRAILERS", + "ALLMULTI", + "MASTER", + "SLAVE", + "MULTICAST", + "PORTSEL", + "AUTOMEDIA", + "DYNAMIC", + "LOWER_UP", + "DORMANT", + "ECHO" +] + # UTILS def get_if(iff, cmd): """Ease SIOCGIF* ioctl calls""" + iff = network_name(iff) sck = socket.socket() try: return ioctl(sck, cmd, struct.pack("16s16x", iff.encode("utf8"))) @@ -33,7 +59,7 @@ def get_if(iff, cmd): sck.close() -def get_if_raw_hwaddr(iff): +def get_if_raw_hwaddr(iff, siocgifhwaddr=None): """Get the raw MAC address of a local interface. This function uses SIOCGIFHWADDR calls, therefore only works @@ -42,8 +68,10 @@ def get_if_raw_hwaddr(iff): :param iff: the network interface name as a string :returns: the corresponding raw MAC address """ - from scapy.arch import SIOCGIFHWADDR - return struct.unpack("16xh6s8x", get_if(iff, SIOCGIFHWADDR)) + if siocgifhwaddr is None: + from scapy.arch import SIOCGIFHWADDR + siocgifhwaddr = SIOCGIFHWADDR + return struct.unpack("16xh6s8x", get_if(iff, siocgifhwaddr)) # SOCKET UTILS @@ -96,10 +124,7 @@ def compile_filter(filter_exp, iface=None, linktype=None, raise Scapy_Exception( "Please provide an interface or linktype!" ) - if WINDOWS: - iface = conf.iface.pcap_name - else: - iface = conf.iface + iface = conf.iface # Try to guess linktype to avoid requiring root try: arphd = get_if_raw_hwaddr(iface)[0] @@ -113,6 +138,7 @@ def compile_filter(filter_exp, iface=None, linktype=None, ) elif iface: err = create_string_buffer(PCAP_ERRBUF_SIZE) + iface = network_name(iface) iface = create_string_buffer(iface.encode("utf8")) pcap = pcap_open_live( iface, MTU, promisc, 0, err diff --git a/scapy/arch/pcapdnet.py b/scapy/arch/libpcap.py similarity index 79% rename from scapy/arch/pcapdnet.py rename to scapy/arch/libpcap.py index 4211d6f2247..8030309eede 100644 --- a/scapy/arch/pcapdnet.py +++ b/scapy/arch/libpcap.py @@ -19,14 +19,22 @@ from scapy.config import conf from scapy.consts import WINDOWS from scapy.data import MTU, ETH_P_ALL +from scapy.error import Scapy_Exception, log_loading, warning +from scapy.interfaces import network_name, InterfaceProvider, NetworkInterface from scapy.pton_ntop import inet_ntop from scapy.supersocket import SuperSocket -from scapy.error import Scapy_Exception, log_loading, warning +from scapy.utils import str2mac + import scapy.consts if not scapy.consts.WINDOWS: from fcntl import ioctl +# AF_LINK is only available and provided on BSD (MAC) +# but because we use its value elsewhere, let's patch it. +if not hasattr(socket, "AF_LINK"): + socket.AF_LINK = 18 + ############ # COMMON # ############ @@ -35,8 +43,20 @@ # BIOCIMMEDIATE = 0x80044270 BIOCIMMEDIATE = -2147204496 +# https://github.com/the-tcpdump-group/libpcap/blob/master/pcap/pcap.h +PCAP_IF_UP = 0x00000002 # interface is up +_pcap_if_flags = [ + "LOOPBACK", + "UP", + "RUNNING", + "WIRELESS", + "OK", + "DISCONNECTED", + "NA" +] + -class _L2pcapdnetSocket(SuperSocket, SelectableObject): +class _L2libpcapSocket(SuperSocket, SelectableObject): nonblocking_socket = True def __init__(self): @@ -94,22 +114,35 @@ def select(sockets, remain=None): # Part of the Winpcapy integration was inspired by phaethon/scapy # but he destroyed the commit history, so there is no link to that try: - from scapy.libs.winpcapy import PCAP_ERRBUF_SIZE, pcap_if_t, \ - sockaddr_in, sockaddr_in6, pcap_findalldevs, pcap_freealldevs, \ - pcap_lib_version, pcap_close, \ - pcap_open_live, pcap_pkthdr, \ - pcap_next_ex, pcap_datalink, \ - pcap_compile, pcap_setfilter, pcap_setnonblock, pcap_sendpacket, \ - bpf_program + from scapy.libs.winpcapy import ( + PCAP_ERRBUF_SIZE, + bpf_program, + pcap_close, + pcap_compile, + pcap_datalink, + pcap_findalldevs, + pcap_freealldevs, + pcap_if_t, + pcap_lib_version, + pcap_next_ex, + pcap_open_live, + pcap_pkthdr, + pcap_sendpacket, + pcap_setfilter, + pcap_setnonblock, + sockaddr_in, + sockaddr_in6, + ) def load_winpcapy(): """This functions calls libpcap ``pcap_findalldevs`` function, and extracts and parse all the data scapy will need to build the Interface List. - The date will be stored in ``conf.cache_iflist``, or accessible - with ``get_if_list()`` + The data will be stored in ``conf.cache_pcapiflist`` """ + from scapy.fields import FlagValue + err = create_string_buffer(PCAP_ERRBUF_SIZE) devs = POINTER(pcap_if_t)() if_list = {} @@ -120,9 +153,12 @@ def load_winpcapy(): # Iterate through the different interfaces while p: name = plain_str(p.contents.name) # GUID - description = plain_str(p.contents.description) # NAME + description = plain_str( + p.contents.description or "" + ) # DESC flags = p.contents.flags # FLAGS ips = [] + mac = "" a = p.contents.addresses while a: # IPv4 address @@ -134,18 +170,26 @@ def load_winpcapy(): elif family == socket.AF_INET6: val = cast(ap, POINTER(sockaddr_in6)) val = val.contents.sin6_addr[:] + elif family == socket.AF_LINK: + # Special case: MAC + # (AF_LINK is mostly BSD specific) + val = ap.contents.sa_data + val = val[:6] + mac = str2mac(bytes(bytearray(val))) + a = a.contents.next + continue else: - # Unknown address family - # (AF_LINK isn't a thing on Windows) + # Unknown AF a = a.contents.next continue addr = inet_ntop(family, bytes(bytearray(val))) if addr != "0.0.0.0": ips.append(addr) a = a.contents.next - if_list[name] = (description, ips, flags) + flags = FlagValue(flags, _pcap_if_flags) + if_list[name] = (description, ips, flags, mac) p = p.contents.next - conf.cache_iflist = if_list + conf.cache_pcapiflist = if_list except Exception: raise finally: @@ -180,16 +224,11 @@ def load_winpcapy(): conf.loopback_name = conf.loopback_name = "Npcap Loopback Adapter" # noqa: E501 if conf.use_pcap: - def get_if_list(): - """Returns all pcap names""" - if not conf.cache_iflist: - load_winpcapy() - return list(conf.cache_iflist) - class _PcapWrapper_libpcap: # noqa: F811 """Wrapper for the libpcap calls""" def __init__(self, device, snaplen, promisc, to_ms, monitor=None): + device = network_name(device) self.errbuf = create_string_buffer(PCAP_ERRBUF_SIZE) self.iface = create_string_buffer(device.encode("utf8")) self.dtl = None @@ -277,9 +316,54 @@ def close(self): pcap_close(self.pcap) open_pcap = _PcapWrapper_libpcap + class LibpcapProvider(InterfaceProvider): + """ + Load interfaces from Libpcap on non-Windows machines + """ + name = "libpcap" + libpcap = True + + def load(self): + if not conf.use_pcap or WINDOWS: + return {} + if not conf.cache_pcapiflist: + load_winpcapy() + data = {} + i = 0 + for ifname, dat in conf.cache_pcapiflist.items(): + description, ips, flags, mac = dat + i += 1 + if not mac: + from scapy.arch import get_if_hwaddr + try: + mac = get_if_hwaddr(ifname) + except Exception: + # There are at least 3 different possible exceptions + continue + if_data = { + 'name': ifname, + 'description': description or ifname, + 'network_name': ifname, + 'index': i, + 'mac': mac or '00:00:00:00:00:00', + 'ips': ips, + 'flags': flags + } + data[ifname] = NetworkInterface(self, if_data) + return data + + def reload(self): + if conf.use_pcap: + from scapy.arch.libpcap import load_winpcapy + load_winpcapy() + return self.load() + + if not WINDOWS: + conf.ifaces.register_provider(LibpcapProvider) + # pcap sockets - class L2pcapListenSocket(_L2pcapdnetSocket): + class L2pcapListenSocket(_L2libpcapSocket): desc = "read packets at layer 2 using libpcap" def __init__(self, iface=None, type=ETH_P_ALL, promisc=None, filter=None, monitor=None): # noqa: E501 @@ -316,7 +400,7 @@ def __init__(self, iface=None, type=ETH_P_ALL, promisc=None, filter=None, monito def send(self, x): raise Scapy_Exception("Can't send anything with L2pcapListenSocket") # noqa: E501 - class L2pcapSocket(_L2pcapdnetSocket): + class L2pcapSocket(_L2libpcapSocket): desc = "read/write packets at layer 2 using only libpcap" def __init__(self, iface=None, type=ETH_P_ALL, promisc=None, filter=None, nofilter=0, # noqa: E501 @@ -389,6 +473,5 @@ def send(self, x): self.outs.send(sx) else: # No libpcap installed - get_if_list = lambda: [] if WINDOWS: NPCAP_PATH = "" diff --git a/scapy/arch/linux.py b/scapy/arch/linux.py index 642e3ebd7fb..53744cdc3c2 100644 --- a/scapy/arch/linux.py +++ b/scapy/arch/linux.py @@ -28,10 +28,13 @@ from scapy.config import conf from scapy.data import MTU, ETH_P_ALL, SOL_PACKET, SO_ATTACH_FILTER, \ SO_TIMESTAMPNS +from scapy.interfaces import IFACES, InterfaceProvider, NetworkInterface, \ + network_name from scapy.supersocket import SuperSocket +from scapy.pton_ntop import inet_ntop from scapy.error import warning, Scapy_Exception, \ ScapyInvalidPlatformException, log_runtime -from scapy.arch.common import get_if, compile_filter +from scapy.arch.common import get_if, compile_filter, _iff_flags import scapy.modules.six as six from scapy.modules.six.moves import range @@ -92,13 +95,20 @@ def get_if_raw_addr(iff): + r""" + Return the raw IPv4 address of an interface. + If unavailable, returns b"\0\0\0\0" + """ try: return get_if(iff, SIOCGIFADDR)[20:24] except IOError: return b"\0\0\0\0" -def get_if_list(): +def _get_if_list(): + """ + Function to read the interfaces from /proc/net/dev + """ try: f = open("/proc/net/dev", "rb") except IOError: @@ -118,19 +128,6 @@ def get_if_list(): return lst -def get_working_if(): - """ - Return the name of the first network interfcace that is up. - """ - for i in get_if_list(): - if i == conf.loopback_name: - continue - ifflags = struct.unpack("16xH14x", get_if(i, SIOCGIFFLAGS))[0] - if ifflags & IFF_UP: - return i - return conf.loopback_name - - def attach_filter(sock, bpf_filter, iface): """ Compile bpf filter and attach it to a socket @@ -256,15 +253,12 @@ def read_routes(): gw_str = scapy.utils.inet_ntoa(struct.pack("I", int(gw, 16))) metric = int(metric) + route = [dst_int, msk_int, gw_str, iff, ifaddr, metric] if ifaddr_int & msk_int != dst_int: tmp_route = get_alias_address(iff, dst_int, gw_str, metric) if tmp_route: - routes.append(tmp_route) - else: - routes.append((dst_int, msk_int, gw_str, iff, ifaddr, metric)) - - else: - routes.append((dst_int, msk_int, gw_str, iff, ifaddr, metric)) + route = tmp_route + routes.append(tuple(route)) f.close() s.close() @@ -360,6 +354,42 @@ def get_if_index(iff): return int(struct.unpack("I", get_if(iff, SIOCGIFINDEX)[16:20])[0]) +class LinuxInterfaceProvider(InterfaceProvider): + name = "sys" + + def _is_valid(self, dev): + return bool(dev.flags & IFF_UP) + + def load(self): + from scapy.fields import FlagValue + data = {} + ips = in6_getifaddr() + for i in _get_if_list(): + ifflags = struct.unpack("16xH14x", get_if(i, SIOCGIFFLAGS))[0] + index = get_if_index(i) + mac = scapy.utils.str2mac( + get_if_raw_hwaddr(i, siocgifhwaddr=SIOCGIFHWADDR)[1] + ) + ip = inet_ntop(socket.AF_INET, get_if_raw_addr(i)) + if ip == "0.0.0.0": + ip = None + ifflags = FlagValue(ifflags, _iff_flags) + if_data = { + "name": i, + "network_name": i, + "description": i, + "flags": ifflags, + "index": index, + "ip": ip, + "ips": [x[0] for x in ips if x[2] == i] + [ip] if ip else [], + "mac": mac + } + data[i] = NetworkInterface(self, if_data) + return data + + +IFACES.register_provider(LinuxInterfaceProvider) + if os.uname()[4] in ['x86_64', 'aarch64']: def get_last_packet_timestamp(sock): ts = ioctl(sock, SIOCGSTAMP, "1234567890123456") @@ -388,7 +418,7 @@ class L2Socket(SuperSocket): def __init__(self, iface=None, type=ETH_P_ALL, promisc=None, filter=None, nofilter=0, monitor=None): - self.iface = conf.iface if iface is None else iface + self.iface = network_name(iface or conf.iface) self.type = type self.promisc = conf.sniff_promisc if promisc is None else promisc if monitor is not None: @@ -586,7 +616,9 @@ def down(self): def __enter__(self): self.setup() self.up() + conf.ifaces.reload() return self def __exit__(self, exc_type, exc_val, exc_tb): self.destroy() + conf.ifaces.reload() diff --git a/scapy/arch/solaris.py b/scapy/arch/solaris.py index 6083af4f9b1..44817039704 100644 --- a/scapy/arch/solaris.py +++ b/scapy/arch/solaris.py @@ -18,7 +18,7 @@ # From sys/sockio.h and net/if.h SIOCGIFHWADDR = 0xc02069b9 # Get hardware address -from scapy.arch.pcapdnet import * # noqa: F401, F403, E402 +from scapy.arch.libpcap import * # noqa: F401, F403, E402 from scapy.arch.unix import * # noqa: F401, F403, E402 from scapy.arch.common import get_if_raw_hwaddr # noqa: F401, F403, E402 diff --git a/scapy/arch/windows/__init__.py b/scapy/arch/windows/__init__.py index 614bd9ecf22..f9f73c1e3b4 100755 --- a/scapy/arch/windows/__init__.py +++ b/scapy/arch/windows/__init__.py @@ -16,6 +16,7 @@ import subprocess as sp from glob import glob import struct +import warnings from scapy.arch.windows.structures import _windows_title, \ GetAdaptersAddresses, GetIpForwardTable, GetIpForwardTable2, \ @@ -23,22 +24,27 @@ from scapy.consts import WINDOWS, WINDOWS_XP from scapy.config import conf, ConfClass from scapy.error import Scapy_Exception, log_loading, log_runtime, warning +from scapy.interfaces import NetworkInterface, InterfaceProvider, \ + dev_from_index, resolve_iface, network_name from scapy.pton_ntop import inet_ntop, inet_pton -from scapy.utils import atol, itom, pretty_list, mac2str, str2mac +from scapy.utils import atol, itom, mac2str, str2mac from scapy.utils6 import construct_source_candidate_set, in6_getscope from scapy.data import ARPHDR_ETHER, load_manuf import scapy.modules.six as six -from scapy.modules.six.moves import input, winreg, UserDict +from scapy.modules.six.moves import input, winreg from scapy.compat import plain_str from scapy.supersocket import SuperSocket conf.use_pcap = True # These import must appear after setting conf.use_* variables -from scapy.arch import pcapdnet # noqa: E402 -from scapy.arch.pcapdnet import NPCAP_PATH, get_if_list # noqa: E402 +from scapy.arch import libpcap # noqa: E402 +from scapy.arch.libpcap import ( # noqa: E402 + NPCAP_PATH, + PCAP_IF_UP, +) -# Detection happens after pcapdnet import (NPcap detection) +# Detection happens after libpcap import (NPcap detection) NPCAP_LOOPBACK_NAME = r"\Device\NPF_Loopback" if conf.use_npcap: conf.loopback_name = NPCAP_LOOPBACK_NAME @@ -269,7 +275,7 @@ def _resolve_ips(y): return [ { "name": _str_decode(x["friendly_name"]), - "win_index": x["interface_index"], + "index": x["interface_index"], "description": _str_decode(x["description"]), "guid": _str_decode(x["adapter_name"]), "mac": _get_mac(x), @@ -280,28 +286,6 @@ def _resolve_ips(y): ] -def get_ips(v6=False): - """Returns all available IPs matching to interfaces, using the windows system. - Should only be used as a WinPcapy fallback.""" - res = {} - for iface in six.itervalues(IFACES): - ips = [] - for ip in iface.ips: - if v6 and ":" in ip: - ips.append(ip) - elif not v6 and ":" not in ip: - ips.append(ip) - res[iface] = ips - return res - - -def get_ip_from_name(ifname, v6=False): - """Backward compatibility: indirectly calls get_ips - Deprecated.""" - iface = IFACES.dev_from_name(ifname) - return get_ips(v6=v6).get(iface, [""])[0] - - def _pcapname_to_guid(pcap_name): """Converts a Winpcap/Npcap pcpaname to its guid counterpart. e.g. \\DEVICE\\NPF_{...} => {...} @@ -311,79 +295,39 @@ def _pcapname_to_guid(pcap_name): return pcap_name -class NetworkInterface(object): +class NetworkInterface_Win(NetworkInterface): """A network interface of your local host""" - def __init__(self, data=None): - self.name = None - self.ip = None - self.ip6 = None - self.mac = None - self.pcap_name = None - self.description = None - self.invalid = False - self.raw80211 = None + def __init__(self, provider, data=None): self.cache_mode = None self.ipv4_metric = None self.ipv6_metric = None - self.ips = None - self.flags = None - if data is not None: - self.update(data) + self.guid = None + self.raw80211 = None + super(NetworkInterface_Win, self).__init__(provider, data) def update(self, data): """Update info about a network interface according to a given dictionary. Such data is provided by get_windows_if_list """ - self.data = data - self.name = data['name'] - self.pcap_name = data['pcap_name'] - self.description = data['description'] - self.win_index = data['win_index'] + # Populated early because used below + self.network_name = data['network_name'] + # Windows specific self.guid = data['guid'] - self.mac = data['mac'] self.ipv4_metric = data['ipv4_metric'] self.ipv6_metric = data['ipv6_metric'] - self.ips = data['ips'] - self.flags = data['flags'] - self.invalid = data['invalid'] try: # Npcap loopback interface - if conf.use_npcap and self.pcap_name == NPCAP_LOOPBACK_NAME: + if conf.use_npcap and self.network_name == NPCAP_LOOPBACK_NAME: # https://nmap.org/npcap/guide/npcap-devguide.html - self.mac = "00:00:00:00:00:00" - self.ip = "127.0.0.1" - self.ip6 = "::1" - return + data["mac"] = "00:00:00:00:00:00" + data["ip"] = "127.0.0.1" + data["ip6"] = "::1" + data["ips"] = ["127.0.0.1", "::1"] except KeyError: pass - - # Chose main IPv4 - if self.ips: - try: - self.ip = next(x for x in self.ips if ":" not in x) - except StopIteration: - pass - try: - self.ip6 = next(x for x in self.ips if ":" in x) - except StopIteration: - pass - if not self.ip and not self.ip6: - self.invalid = True - - def __hash__(self): - return hash(self.guid) - - def __eq__(self, other): - if isinstance(other, str): - return self.name == other or self.pcap_name == other - if isinstance(other, NetworkInterface): - return self.data == other.data - return object.__eq__(self, other) - - def is_invalid(self): - return self.invalid + super(NetworkInterface_Win, self).update(data) def _check_npcap_requirement(self): if not conf.use_npcap: @@ -392,7 +336,7 @@ def _check_npcap_requirement(self): val = _get_npcap_config("Dot11Support") self.raw80211 = bool(int(val)) if val else False if not self.raw80211: - raise Scapy_Exception("This interface does not support raw 802.11") + raise Scapy_Exception("Npcap 802.11 support is NOT enabled !") def _npcap_set(self, key, val): """Internal function. Set a [key] parameter to [value]""" @@ -551,51 +495,16 @@ def setmodulation(self, modu): m = _modus.get(modu, "unknown") if isinstance(modu, int) else modu return self._npcap_set("modu", str(m)) - def __repr__(self): - return "<%s [%s] %s>" % (self.__class__.__name__, - self.description, - self.guid) +class WindowsInterfacesProvider(InterfaceProvider): + name = "libpcap" + libpcap = True -def get_if_raw_addr(iff): - """Return the raw IPv4 address of interface""" - if not iff.ip: - return None - return inet_pton(socket.AF_INET, iff.ip) - - -def pcap_service_name(): - """Return the pcap adapter service's name""" - return "npcap" if conf.use_npcap else "npf" - - -def pcap_service_status(): - """Returns whether the windows pcap adapter is running or not""" - status = get_service_status(pcap_service_name()) - return status["dwCurrentState"] == 4 - - -def _pcap_service_control(action, askadmin=True): - """Internal util to run pcap control command""" - command = action + ' ' + pcap_service_name() - res, code = _exec_cmd(_encapsulate_admin(command) if askadmin else command) - if code != 0: - warning(res.decode("utf8", errors="ignore")) - return (code == 0) - - -def pcap_service_start(askadmin=True): - """Starts the pcap adapter. Will ask for admin. Returns True if success""" - return _pcap_service_control('sc start', askadmin=askadmin) - - -def pcap_service_stop(askadmin=True): - """Stops the pcap adapter. Will ask for admin. Returns True if success""" - return _pcap_service_control('sc stop', askadmin=askadmin) - - -class NetworkInterfaceDict(UserDict): - """Store information about network interfaces and convert between names""" + def _is_valid(self, dev): + # Winpcap (and old Npcap) have no support for PCAP_IF_UP :( + if dev.flags == 0: + return True + return dev.flags & PCAP_IF_UP @classmethod def _pcap_check(cls): @@ -640,10 +549,11 @@ def _ask_user(): "Scapy might help. Check your winpcap/npcap installation " "and access rights.") - def load(self): - if not get_if_list(): + def load(self, NetworkInterface_Win=NetworkInterface_Win): + results = {} + if not conf.cache_pcapiflist: # Try a restart - NetworkInterfaceDict._pcap_check() + WindowsInterfacesProvider._pcap_check() windows_interfaces = dict() for i in get_windows_if_list(): @@ -656,26 +566,24 @@ def load(self): windows_interfaces[i['guid']] = i index = 0 - for pcap_name, if_data in six.iteritems(conf.cache_iflist): - name, ips, flags = if_data - guid = _pcapname_to_guid(pcap_name) + for netw, if_data in six.iteritems(conf.cache_pcapiflist): + name, ips, flags, _ = if_data + guid = _pcapname_to_guid(netw) data = windows_interfaces.get(guid, None) if data: # Exists in Windows registry - data['pcap_name'] = pcap_name - data['ips'].extend(ips) + data['network_name'] = netw + data['ips'] = list(set(data['ips'] + ips)) data['flags'] = flags - data['invalid'] = False else: # Only in [Wi]npcap index -= 1 data = { 'name': name, - 'pcap_name': pcap_name, 'description': name, - 'win_index': index, + 'index': index, 'guid': guid, - 'invalid': False, + 'network_name': netw, 'mac': '00:00:00:00:00:00', 'ipv4_metric': 0, 'ipv6_metric': 0, @@ -683,124 +591,95 @@ def load(self): 'flags': flags } # No KeyError will happen here, as we get it from cache - self.data[guid] = NetworkInterface(data) - - def dev_from_name(self, name): - """Return the first pcap device name for a given Windows - device name. - """ - try: - return next(iface for iface in six.itervalues(self) - if (iface.name == name or iface.description == name)) - except (StopIteration, RuntimeError): - raise ValueError("Unknown network interface %r" % name) - - def dev_from_pcapname(self, pcap_name): - """Return Windows device name for given pcap device name.""" - try: - return next(iface for iface in six.itervalues(self) - if iface.pcap_name == pcap_name) - except (StopIteration, RuntimeError): - raise ValueError("Unknown pypcap network interface %r" % pcap_name) - - def dev_from_index(self, if_index): - """Return interface name from interface index""" - try: - if_index = int(if_index) # Backward compatibility - return next(iface for iface in six.itervalues(self) - if iface.win_index == if_index) - except (StopIteration, RuntimeError): - if str(if_index) == "1": - return IFACES.dev_from_pcapname(conf.loopback_name) - raise ValueError("Unknown network interface index %r" % if_index) + results[guid] = NetworkInterface_Win(self, data) + return results def reload(self): """Reload interface list""" self.restarted_adapter = False - self.data.clear() if conf.use_pcap: # Reload from Winpcapy - from scapy.arch.pcapdnet import load_winpcapy + from scapy.arch.libpcap import load_winpcapy load_winpcapy() - self.load() - # Reload conf.iface - conf.iface = get_working_if() - - def show(self, resolve_mac=True, print_result=True): - """Print list of available network interfaces in human readable form""" - res = [] - for iface_name in sorted(self.data): - dev = self.data[iface_name] - mac = dev.mac - if resolve_mac and conf.manufdb: - mac = conf.manufdb._resolve_MAC(mac) - validity_color = lambda x: conf.color_theme.red if x else \ - conf.color_theme.green - description = validity_color(dev.is_invalid())( - str(dev.description) - ) - index = str(dev.win_index) - res.append((index, description, str(dev.ip), str(dev.ip6), mac)) + return self.load() - res = pretty_list( - res, - [("INDEX", "IFACE", "IPv4", "IPv6", "MAC")], - sortBy=2 - ) - if print_result: - print(res) + +# Register provider +conf.ifaces.register_provider(WindowsInterfacesProvider) + + +def get_ips(v6=False): + """Returns all available IPs matching to interfaces, using the windows system. + Should only be used as a WinPcapy fallback.""" + res = {} + for iface in six.itervalues(conf.ifaces): + if v6: + res[iface] = iface.ips[6] else: - return res + res[iface] = iface.ips[4] + return res - def __repr__(self): - return self.show(print_result=False) +def get_if_raw_addr(iff): + """Return the raw IPv4 address of interface""" + iff = resolve_iface(iff) + if not iff.ip: + return None + return inet_pton(socket.AF_INET, iff.ip) -IFACES = ifaces = NetworkInterfaceDict() -IFACES.load() +def get_ip_from_name(ifname, v6=False): + """Backward compatibility: indirectly calls get_ips + Deprecated.""" + warnings.warn( + "get_ip_from_name is deprecated. Use the `ip` attribute of the iface " + "or use get_ips() to get all ips per interface.", + DeprecationWarning + ) + iface = conf.ifaces.dev_from_name(ifname) + return get_ips(v6=v6).get(iface, [""])[0] -def pcapname(dev): - """Get the device pcap name by device name or Scapy NetworkInterface - """ - if isinstance(dev, NetworkInterface): - if dev.is_invalid(): - return None - return dev.pcap_name - try: - return IFACES.dev_from_name(dev).pcap_name - except ValueError: - return IFACES.dev_from_pcapname(dev).pcap_name +def pcap_service_name(): + """Return the pcap adapter service's name""" + return "npcap" if conf.use_npcap else "npf" -def dev_from_pcapname(pcap_name): - """Return Scapy device name for given pcap device name""" - return IFACES.dev_from_pcapname(pcap_name) +def pcap_service_status(): + """Returns whether the windows pcap adapter is running or not""" + status = get_service_status(pcap_service_name()) + return status["dwCurrentState"] == 4 -def dev_from_index(if_index): - """Return Windows adapter name for given Windows interface index""" - return IFACES.dev_from_index(if_index) +def _pcap_service_control(action, askadmin=True): + """Internal util to run pcap control command""" + command = action + ' ' + pcap_service_name() + res, code = _exec_cmd(_encapsulate_admin(command) if askadmin else command) + if code != 0: + warning(res.decode("utf8", errors="ignore")) + return (code == 0) + + +def pcap_service_start(askadmin=True): + """Starts the pcap adapter. Will ask for admin. Returns True if success""" + return _pcap_service_control('sc start', askadmin=askadmin) -def show_interfaces(resolve_mac=True): - """Print list of available network interfaces""" - return IFACES.show(resolve_mac) +def pcap_service_stop(askadmin=True): + """Stops the pcap adapter. Will ask for admin. Returns True if success""" + return _pcap_service_control('sc stop', askadmin=askadmin) if conf.use_pcap: - _orig_open_pcap = pcapdnet.open_pcap + _orig_open_pcap = libpcap.open_pcap def open_pcap(iface, *args, **kargs): """open_pcap: Windows routine for creating a pcap from an interface. This function is also responsible for detecting monitor mode. """ - iface_pcap_name = pcapname(iface) - if not isinstance(iface, NetworkInterface) and \ - iface_pcap_name is not None: - iface = IFACES.dev_from_name(iface) - if iface is None or iface.is_invalid(): + iface = resolve_iface(iface) + iface_network_name = iface.network_name + if not iface: raise Scapy_Exception( "Interface is invalid (no pcap match found) !" ) @@ -814,12 +693,13 @@ def open_pcap(iface, *args, **kargs): # The monitor param is specified, and not matching the current # interface state iface.setmonitor(kw_monitor) - return _orig_open_pcap(iface_pcap_name, *args, **kargs) - pcapdnet.open_pcap = open_pcap + return _orig_open_pcap(iface_network_name, *args, **kargs) + libpcap.open_pcap = open_pcap -get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr = lambda iface, *args, **kargs: ( # noqa: E501 - ARPHDR_ETHER, mac2str(IFACES.dev_from_pcapname(pcapname(iface)).mac) -) + +def get_if_raw_hwaddr(iface): + iface = resolve_iface(iface) + return ARPHDR_ETHER, mac2str(iface.mac) def _read_routes_c_v1(): @@ -843,9 +723,10 @@ def _extract_ip(obj): except ValueError: continue ip = iface.ip + netw = network_name(iface) # RouteMetric + InterfaceMetric metric = metric + iface.ipv4_metric - routes.append((dest, netmask, nexthop, iface, ip, metric)) + routes.append((dest, netmask, nexthop, netw, ip, metric)) return routes @@ -883,14 +764,15 @@ def _extract_ip(obj): except ValueError: continue ip = iface.ip + netw = network_name(iface) # RouteMetric + InterfaceMetric metric = metric + getattr(iface, metric_name) if ipv6: _append_route6(routes, dest, netmask, nexthop, - iface, lifaddr, metric) + netw, lifaddr, metric) else: routes.append((atol(dest), itom(int(netmask)), - nexthop, iface, ip, metric)) + nexthop, netw, ip, metric)) return routes @@ -933,7 +815,7 @@ def in6_getifaddr(): def _append_route6(routes, dpref, dp, nh, iface, lifaddr, metric): cset = [] # candidate set (possible source addresses) - if iface.name == conf.loopback_name: + if iface == conf.loopback_name: if dpref == '::': return cset = ['::1'] @@ -957,32 +839,6 @@ def read_routes6(): return routes6 -def get_working_if(): - """Return an interface that works""" - try: - # return the interface associated with the route with smallest - # mask (route by default if it exists) - iface = min(conf.route.routes, key=lambda x: x[1])[3] - except ValueError: - # no route - iface = conf.loopback_name - if isinstance(iface, NetworkInterface) and iface.is_invalid(): - # Backup mode: try them all - for iface in six.itervalues(IFACES): - if not iface.is_invalid(): - return iface - return None - return iface - - -def _get_valid_guid(): - if conf.loopback_name: - return conf.loopback_name.guid - else: - return next((i.guid for i in six.itervalues(IFACES) - if not i.is_invalid()), None) - - def _route_add_loopback(routes=None, ipv6=False, iflist=None): """Add a route to 127.0.0.1 and ::1 to simplify unit tests on Windows""" if not WINDOWS: @@ -996,37 +852,25 @@ def _route_add_loopback(routes=None, ipv6=False, iflist=None): else: if not conf.route.routes: return - data = { - 'name': conf.loopback_name, - 'pcap_name': "\\Device\\NPF_{0XX00000-X000-0X0X-X00X-00XXXX000XXX}", - 'description': "Loopback", - 'win_index': -1, - 'guid': "{0XX00000-X000-0X0X-X00X-00XXXX000XXX}", - 'invalid': True, - 'mac': '00:00:00:00:00:00', - 'ipv4_metric': 0, - 'ipv6_metric': 0, - 'ips': ["127.0.0.1", "::"], - 'flags': 0 - } - adapter = NetworkInterface(data) + conf.ifaces._add_fake_iface(conf.loopback_name) + adapter = conf.ifaces.dev_from_name(conf.loopback_name) if iflist: - iflist.append(adapter.pcap_name) + iflist.append(adapter.network_name) return # Remove all conf.loopback_name routes for route in list(conf.route.routes): iface = route[3] - if iface.pcap_name == conf.loopback_name: + if iface == conf.loopback_name: conf.route.routes.remove(route) # Remove conf.loopback_name interface - for devname, iface in list(IFACES.items()): - if iface.pcap_name == conf.loopback_name: - IFACES.pop(devname) + for devname, iface in list(conf.ifaces.items()): + if iface == conf.loopback_name: + conf.ifaces.pop(devname) # Inject interface - IFACES["{0XX00000-X000-0X0X-X00X-00XXXX000XXX}"] = adapter - conf.loopback_name = adapter.pcap_name + conf.ifaces["{0XX00000-X000-0X0X-X00X-00XXXX000XXX}"] = adapter + conf.loopback_name = adapter.network_name if isinstance(conf.iface, NetworkInterface): - if conf.iface.pcap_name == conf.loopback_name: + if conf.iface.network_name == conf.loopback_name: conf.iface = adapter conf.netcache.arp_cache["127.0.0.1"] = "ff:ff:ff:ff:ff:ff" conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff" diff --git a/scapy/arch/windows/native.py b/scapy/arch/windows/native.py index c0c353bbc45..37815c2c2a6 100644 --- a/scapy/arch/windows/native.py +++ b/scapy/arch/windows/native.py @@ -56,6 +56,7 @@ from scapy.config import conf from scapy.data import MTU from scapy.error import Scapy_Exception, warning +from scapy.interfaces import resolve_iface from scapy.supersocket import SuperSocket # Watch out for import loops (inet...) @@ -115,7 +116,7 @@ def __init__(self, iface=None, proto=socket.IPPROTO_IP, self.ins.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) self.outs.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, ttl) # Bind on all ports - iface = iface or conf.iface + iface = resolve_iface(iface) or conf.iface host = iface.ip if iface.ip else socket.gethostname() self.ins.bind((host, 0)) self.ins.setblocking(False) diff --git a/scapy/compat.py b/scapy/compat.py index 421f6779c04..72592cb7070 100644 --- a/scapy/compat.py +++ b/scapy/compat.py @@ -109,6 +109,14 @@ def bytes_base64(x): return base64.encodebytes(bytes_encode(x)).replace(b'\n', b'') +if six.PY2: + import cgi + html_escape = cgi.escape +else: + import html + html_escape = html.escape + + if six.PY2: from StringIO import StringIO diff --git a/scapy/config.py b/scapy/config.py index e5820ec75aa..9afeddf5e71 100755 --- a/scapy/config.py +++ b/scapy/config.py @@ -72,8 +72,9 @@ def set_from_hook(obj, name, val): setattr(obj, int_name, val) def __set__(self, obj, val): + old = getattr(obj, self.intname, self.default) + val = self.hook(self.name, val, old, *self.args, **self.kargs) setattr(obj, self.intname, val) - self.hook(self.name, val, *self.args, **self.kargs) def _readonly(name): @@ -441,8 +442,9 @@ def isPyPy(): return False -def _prompt_changer(attr, val): +def _prompt_changer(attr, val, old): """Change the current prompt theme""" + Interceptor.set_from_hook(conf, attr, val) try: sys.ps1 = conf.color_theme.prompt(conf.prompt) except Exception: @@ -451,13 +453,13 @@ def _prompt_changer(attr, val): apply_ipython_style(get_ipython()) except NameError: pass + return getattr(conf, attr, old) def _set_conf_sockets(): """Populate the conf.L2Socket and conf.L3Socket according to the various use_* parameters """ - from scapy.main import _load if conf.use_bpf and not BSD: Interceptor.set_from_hook(conf, "use_bpf", False) raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !") @@ -469,7 +471,7 @@ def _set_conf_sockets(): # we are already in an Interceptor hook, use Interceptor.set_from_hook if conf.use_pcap: try: - from scapy.arch.pcapdnet import L2pcapListenSocket, L2pcapSocket, \ + from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \ L3pcapSocket except (OSError, ImportError): warning("No libpcap provider available ! pcap won't be used") @@ -479,9 +481,7 @@ def _set_conf_sockets(): conf.L3socket6 = functools.partial(L3pcapSocket, filter="ip6") conf.L2socket = L2pcapSocket conf.L2listen = L2pcapListenSocket - if conf.interactive: - # Update globals - _load("scapy.arch.pcapdnet") + conf.ifaces.reload() return if conf.use_bpf: from scapy.arch.bpf.supersocket import L2bpfListenSocket, \ @@ -490,9 +490,7 @@ def _set_conf_sockets(): conf.L3socket6 = functools.partial(L3bpfSocket, filter="ip6") conf.L2socket = L2bpfSocket conf.L2listen = L2bpfListenSocket - if conf.interactive: - # Update globals - _load("scapy.arch.bpf") + conf.ifaces.reload() return if LINUX: from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket @@ -500,9 +498,7 @@ def _set_conf_sockets(): conf.L3socket6 = functools.partial(L3PacketSocket, filter="ip6") conf.L2socket = L2Socket conf.L2listen = L2ListenSocket - if conf.interactive: - # Update globals - _load("scapy.arch.linux") + conf.ifaces.reload() return if WINDOWS: from scapy.arch.windows import _NotAvailableSocket @@ -511,6 +507,7 @@ def _set_conf_sockets(): conf.L3socket6 = L3WinSocket6 conf.L2socket = _NotAvailableSocket conf.L2listen = _NotAvailableSocket + conf.ifaces.reload() # No need to update globals on Windows return from scapy.supersocket import L3RawSocket @@ -519,9 +516,10 @@ def _set_conf_sockets(): conf.L3socket6 = L3RawSocket6 -def _socket_changer(attr, val): +def _socket_changer(attr, val, old): if not isinstance(val, bool): raise TypeError("This argument should be a boolean") + Interceptor.set_from_hook(conf, attr, val) dependencies = { # Things that will be turned off "use_pcap": ["use_bpf"], "use_bpf": ["use_pcap"], @@ -538,11 +536,27 @@ def _socket_changer(attr, val): Interceptor.set_from_hook(conf, key, value) if isinstance(e, ScapyInvalidPlatformException): raise + return getattr(conf, attr) -def _loglevel_changer(attr, val): +def _loglevel_changer(attr, val, old): """Handle a change of conf.logLevel""" log_scapy.setLevel(val) + return val + + +def _iface_changer(attr, val, old): + """Resolves the interface in conf.iface""" + if isinstance(val, str): + from scapy.interfaces import resolve_iface + iface = resolve_iface(val) + if old and iface.dummy: + warning( + "This interface is not specified in any provider ! " + "See conf.ifaces output" + ) + return iface + return val class Conf(ConfClass): @@ -557,7 +571,7 @@ class Conf(ConfClass): #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...) stealth = "not implemented" #: selects the default output interface for srp() and sendp(). - iface = None + iface = Interceptor("iface", None, _iface_changer) layers = LayersList() commands = CommandsList() ASN1_default_codec = None #: Codec used by default for ASN1 objects @@ -616,7 +630,10 @@ class Conf(ConfClass): #: When 1, print some TLS session secrets when they are computed. debug_tls = False wepkey = "" - cache_iflist = {} + #: holds the Scapy interface list and manager + ifaces = None + #: holds the cache of interfaces loaded from Libpcap + cache_pcapiflist = {} #: holds the Scapy IPv4 routing table and provides methods to #: manipulate it route = None # Filed by route.py @@ -642,7 +659,7 @@ class Conf(ConfClass): #: the conf.L[2/3] sockets use_pcap = Interceptor( "use_pcap", - os.getenv("SCAPY_USE_PCAPDNET", "").lower().startswith("y"), + os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"), _socket_changer ) use_bpf = Interceptor("use_bpf", False, _socket_changer) diff --git a/scapy/fields.py b/scapy/fields.py index 1dcafc08cf7..db85b695ee0 100644 --- a/scapy/fields.py +++ b/scapy/fields.py @@ -2120,7 +2120,11 @@ def __str__(self): x = int(self) while x: if x & 1: - r.append(self.names[i]) + try: + name = self.names[i] + except IndexError: + name = "?" + r.append(name) i += 1 x >>= 1 return ("+" if self.multi else "").join(r) diff --git a/scapy/interfaces.py b/scapy/interfaces.py new file mode 100644 index 00000000000..829e0799277 --- /dev/null +++ b/scapy/interfaces.py @@ -0,0 +1,362 @@ +# This file is part of Scapy +# See http://www.secdev.org/projects/scapy for more information +# Copyright (C) Philippe Biondi +# Copyright (C) Gabriel Potter +# This program is published under a GPLv2 license + +""" +Interfaces management +""" + +import itertools +import uuid +from collections import defaultdict + +from scapy.config import conf +from scapy.consts import WINDOWS +from scapy.utils import pretty_list +from scapy.utils6 import in6_isvalid + +from scapy.modules.six.moves import UserDict +import scapy.modules.six as six + + +class InterfaceProvider(object): + name = "Unknown" + headers = ("Index", "Name", "MAC", "IPv4", "IPv6") + header_sort = 1 + libpcap = False + + def load(self): + """Returns a dictionary of the loaded interfaces, by their + name.""" + raise NotImplementedError + + def reload(self): + """Same than load() but for reloads. By default calls load""" + return self.load() + + def l2socket(self): + """Return L2 socket used by interfaces of this provider""" + return conf.L2socket + + def l2listen(self): + """Return L2listen socket used by interfaces of this provider""" + return conf.L2listen + + def l3socket(self): + """Return L3 socket used by interfaces of this provider""" + return conf.L3socket + + def _is_valid(self, dev): + """Returns whether an interface is valid or not""" + return bool((dev.ips[4] or dev.ips[6]) and dev.mac) + + def _format(self, dev, **kwargs): + """Returns the elements used by show() + + If a tuple is returned, this consist of the strings that will be + inlined along with the interface. + If a list of tuples is returned, they will be appended one above the + other and should all be part of a single interface. + """ + mac = dev.mac + resolve_mac = kwargs.get("resolve_mac", True) + if resolve_mac and conf.manufdb: + mac = conf.manufdb._resolve_MAC(mac) + index = str(dev.index) + return (index, dev.description, mac, dev.ips[4], dev.ips[6]) + + +class NetworkInterface(object): + def __init__(self, provider, data=None): + self.provider = provider + self.name = "" + self.description = "" + self.network_name = "" + self.index = -1 + self.ip = None + self.ips = defaultdict(list) + self.mac = None + self.dummy = False + if data is not None: + self.update(data) + + def update(self, data): + """Update info about a network interface according + to a given dictionary. Such data is provided by providers + """ + self.name = data.get('name', "") + self.description = data.get('description', "") + self.network_name = data.get('network_name', "") + self.index = data.get('index', 0) + self.ip = data.get('ip', "") + self.mac = data.get('mac', "") + self.flags = data.get('flags', 0) + self.dummy = data.get('dummy', False) + + for ip in data.get('ips', []): + if in6_isvalid(ip): + self.ips[6].append(ip) + else: + self.ips[4].append(ip) + + # An interface often has multiple IPv6 so we don't store + # a "main" one, unlike IPv4. + if self.ips[4] and not self.ip: + self.ip = self.ips[4][0] + + def __eq__(self, other): + if isinstance(other, str): + return other in [self.name, self.network_name, self.description] + if isinstance(other, NetworkInterface): + return self.__dict__ == other.__dict__ + return False + + def __ne__(self, other): + return not self.__eq__(other) + + def __hash__(self): + return hash(self.network_name) + + def is_valid(self): + if self.dummy: + return False + return self.provider._is_valid(self) + + def l2socket(self): + return self.provider.l2socket() + + def l2listen(self): + return self.provider.l2listen() + + def l3socket(self): + return self.provider.l3socket() + + def __repr__(self): + return "<%s %s [%s]>" % (self.__class__.__name__, + self.description, + self.dummy and "dummy" or (self.flags or "")) + + def __str__(self): + return self.network_name + + def __add__(self, other): + return self.network_name + other + + def __radd__(self, other): + return other + self.network_name + + +class NetworkInterfaceDict(UserDict): + """Store information about network interfaces and convert between names""" + + def __init__(self): + self.providers = {} + UserDict.__init__(self) + + def _load(self, dat, prov): + for ifname, iface in six.iteritems(dat): + if ifname in self.data: + # Handle priorities: keep except if libpcap + if prov.libpcap: + self.data[ifname] = iface + else: + self.data[ifname] = iface + + def register_provider(self, provider): + prov = provider() + self.providers[provider] = prov + + def load_confiface(self): + """ + Reload conf.iface + """ + # Can only be called after conf.route is populated + if not conf.route: + raise ValueError("Error: conf.route isn't populated !") + conf.iface = get_working_if() + + def _reload_provs(self): + self.clear() + for prov in self.providers.values(): + self._load(prov.reload(), prov) + + def reload(self): + self._reload_provs() + if conf.route: + self.load_confiface() + + def dev_from_name(self, name): + """Return the first network device name for a given + device name. + """ + try: + return next(iface for iface in six.itervalues(self) + if (iface.name == name or iface.description == name)) + except (StopIteration, RuntimeError): + raise ValueError("Unknown network interface %r" % name) + + def dev_from_networkname(self, network_name): + """Return interface for a given network device name.""" + try: + return next(iface for iface in six.itervalues(self) + if iface.network_name == network_name) + except (StopIteration, RuntimeError): + raise ValueError( + "Unknown network interface %r" % + network_name) + + def dev_from_index(self, if_index): + """Return interface name from interface index""" + try: + if_index = int(if_index) # Backward compatibility + return next(iface for iface in six.itervalues(self) + if iface.index == if_index) + except (StopIteration, RuntimeError): + if str(if_index) == "1": + # Test if the loopback interface is set up + return self.dev_from_networkname(conf.loopback_name) + raise ValueError("Unknown network interface index %r" % if_index) + + def _add_fake_iface(self, ifname): + """Internal function used for a testing purpose""" + data = { + 'name': ifname, + 'description': ifname, + 'network_name': ifname, + 'index': -1000, + 'dummy': True, + 'mac': '00:00:00:00:00:00', + 'flags': 0, + 'ips': ["127.0.0.1", "::"], + # Windows only + 'guid': "{%s}" % uuid.uuid1(), + 'ipv4_metric': 0, + 'ipv6_metric': 0, + } + if WINDOWS: + from scapy.arch.windows import NetworkInterface_Win, \ + WindowsInterfacesProvider + + class FakeProv(WindowsInterfacesProvider): + name = "fake" + + self.data[ifname] = NetworkInterface_Win( + FakeProv(), + data + ) + else: + self.data[ifname] = NetworkInterface(InterfaceProvider(), data) + + def show(self, print_result=True, hidden=False, **kwargs): + """ + Print list of available network interfaces in human readable form + + :param print_result: print the results if True, else return it + :param hidden: if True, also displays invalid interfaces + """ + res = defaultdict(list) + for iface_name in sorted(self.data): + dev = self.data[iface_name] + if not hidden and not dev.is_valid(): + continue + prov = dev.provider + res[prov].append( + (prov.name,) + prov._format(dev, **kwargs) + ) + output = "" + for provider in res: + output += pretty_list( + res[provider], + [("Source",) + provider.headers], + sortBy=provider.header_sort + ) + "\n" + output = output[:-1] + if print_result: + print(output) + else: + return output + + def __repr__(self): + return self.show(print_result=False) + + +conf.ifaces = IFACES = ifaces = NetworkInterfaceDict() + + +def get_if_list(): + """Return a list of interface names""" + return list(conf.ifaces.keys()) + + +def get_working_if(): + """Return an interface that works""" + # return the interface associated with the route with smallest + # mask (route by default if it exists) + routes = conf.route.routes[:] + routes.sort(key=lambda x: x[1]) + ifaces = (x[3] for x in routes) + # First check the routing ifaces from best to worse, + # then check all the available ifaces as backup. + for iface in itertools.chain(ifaces, conf.ifaces.values()): + iface = resolve_iface(iface) + if iface and iface.is_valid(): + return iface + # There is no hope left + return conf.loopback_name + + +def get_working_ifaces(): + """Return all interfaces that work""" + return [iface for iface in conf.ifaces.values() if iface.is_valid()] + + +def dev_from_networkname(network_name): + """Return Scapy device name for given network device name""" + return conf.ifaces.dev_from_networkname(network_name) + + +def dev_from_index(if_index): + """Return interface for a given interface index""" + return conf.ifaces.dev_from_index(if_index) + + +def resolve_iface(dev): + """ + Resolve an interface name into the interface + """ + if isinstance(dev, NetworkInterface): + return dev + try: + return conf.ifaces.dev_from_name(dev) + except ValueError: + try: + return dev_from_networkname(dev) + except ValueError: + pass + # Return a dummy interface + return NetworkInterface( + InterfaceProvider(), + data={ + "name": dev, + "description": dev, + "network_name": dev, + "dummy": True + } + ) + + +def network_name(dev): + """ + Resolves the device network name of a device or Scapy NetworkInterface + """ + iface = resolve_iface(dev) + if iface: + return iface.network_name + return dev + + +def show_interfaces(resolve_mac=True): + """Print list of available network interfaces""" + return conf.ifaces.show(resolve_mac) diff --git a/scapy/layers/usb.py b/scapy/layers/usb.py index 736b2fd28b6..f30d7ae88e7 100644 --- a/scapy/layers/usb.py +++ b/scapy/layers/usb.py @@ -22,6 +22,8 @@ from scapy.fields import ByteField, XByteField, ByteEnumField, LEShortField, \ LEShortEnumField, LEIntField, LEIntEnumField, XLELongField, \ LenField +from scapy.interfaces import NetworkInterface, InterfaceProvider, \ + network_name, IFACES from scapy.packet import Packet, bind_top_down from scapy.supersocket import SuperSocket from scapy.utils import PcapReader @@ -172,7 +174,7 @@ def _extcap_call(prog, args, keyword, values): if not ifa.startswith(keyword): continue res.append(tuple([re.search(r"{%s=([^}]*)}" % val, ifa).group(1) - for val in values])) + for val in values])) return res @@ -191,6 +193,45 @@ def get_usbpcap_interfaces(): ["value", "display"] ) + class UsbpcapInterfaceProvider(InterfaceProvider): + name = "USBPcap" + headers = ("Index", "Name", "Address") + header_sort = 1 + + def load(self): + data = {} + try: + interfaces = get_usbpcap_interfaces() + except OSError: + return {} + for netw_name, name in interfaces: + index = re.search(r".*(\d+)", name) + if index: + index = int(index.group(1)) + 100 + else: + index = 100 + if_data = { + "name": name, + "network_name": netw_name, + "description": name, + "index": index, + } + data[netw_name] = NetworkInterface(self, if_data) + return data + + def l2socket(self): + return conf.USBsocket + l2listen = l2socket + + def l3socket(self): + raise ValueError("No L3 available for USBpcap !") + + def _format(self, dev, **kwargs): + """Returns a tuple of the elements used by show()""" + return (str(dev.index), dev.name, dev.network_name) + + IFACES.register_provider(UsbpcapInterfaceProvider) + def get_usbpcap_devices(iface, enabled=True): """Return a list of devices on an USBpcap interface""" _usbpcap_check() @@ -226,6 +267,7 @@ def __init__(self, iface=None, *args, **karg): " ".join(x[0] for x in get_usbpcap_interfaces())) raise NameError("No interface specified !" " See get_usbpcap_interfaces()") + iface = network_name(iface) self.outs = None args = ['-d', iface, '-b', '134217728', '-A', '-o', '-'] self.usbpcap_proc = subprocess.Popen( diff --git a/scapy/libs/winpcapy.py b/scapy/libs/winpcapy.py index c246f0dcea9..b6ae5aa96ff 100644 --- a/scapy/libs/winpcapy.py +++ b/scapy/libs/winpcapy.py @@ -12,7 +12,7 @@ import os from scapy.libs.structures import bpf_program -from scapy.consts import WINDOWS +from scapy.consts import WINDOWS, BSD if WINDOWS: # Try to load Npcap, or Winpcap @@ -66,23 +66,11 @@ class timeval(Structure): # sockaddr is used by pcap_addr. # For example if sa_family==socket.AF_INET then we need cast # with sockaddr_in -if WINDOWS: - class sockaddr(Structure): - _fields_ = [("sa_family", c_ushort), - ("sa_data", c_ubyte * 14)] - - class sockaddr_in(Structure): - _fields_ = [("sin_family", c_ushort), - ("sin_port", c_uint16), - ("sin_addr", 4 * c_ubyte)] - class sockaddr_in6(Structure): - _fields_ = [("sin6_family", c_ushort), - ("sin6_port", c_uint16), - ("sin6_flowinfo", c_uint32), - ("sin6_addr", 16 * c_ubyte), - ("sin6_scope", c_uint32)] -else: +# sockaddr has a different structure depending on the OS +if BSD: + # https://github.com/freebsd/freebsd/blob/master/sys/sys/socket.h + # https://opensource.apple.com/source/xnu/xnu-201/bsd/sys/socket.h.auto.html class sockaddr(Structure): _fields_ = [("sa_len", c_ubyte), ("sa_family", c_ubyte), @@ -112,6 +100,26 @@ class sockaddr_dl(Structure): ("sdl_alen", c_ubyte), ("sdl_slen", c_ubyte), ("sdl_data", 46 * c_ubyte)] + +else: + # https://github.com/torvalds/linux/blob/master/include/linux/socket.h + # https://docs.microsoft.com/en-us/windows/win32/winsock/sockaddr-2 + class sockaddr(Structure): + _fields_ = [("sa_family", c_ushort), + ("sa_data", c_ubyte * 14)] + + class sockaddr_in(Structure): + _fields_ = [("sin_family", c_ushort), + ("sin_port", c_uint16), + ("sin_addr", 4 * c_ubyte)] + + class sockaddr_in6(Structure): + _fields_ = [("sin6_family", c_ushort), + ("sin6_port", c_uint16), + ("sin6_flowinfo", c_uint32), + ("sin6_addr", 16 * c_ubyte), + ("sin6_scope", c_uint32)] + ## # END misc ## diff --git a/scapy/route.py b/scapy/route.py index ce2343b7afb..3e8031f7b8e 100644 --- a/scapy/route.py +++ b/scapy/route.py @@ -10,11 +10,9 @@ from __future__ import absolute_import - -import scapy.consts from scapy.config import conf from scapy.error import Scapy_Exception, warning -from scapy.modules import six +from scapy.interfaces import resolve_iface from scapy.utils import atol, ltoa, itom, plain_str, pretty_list @@ -37,10 +35,11 @@ def resync(self): def __repr__(self): rtlst = [] for net, msk, gw, iface, addr, metric in self.routes: + if_repr = resolve_iface(iface).description rtlst.append((ltoa(net), ltoa(msk), gw, - (iface.description if not isinstance(iface, six.string_types) else iface), # noqa: E501 + if_repr, addr, str(metric))) @@ -94,10 +93,7 @@ def ifchange(self, iff, addr): for i, route in enumerate(self.routes): net, msk, gw, iface, addr, metric = route - if scapy.consts.WINDOWS: - if iff.guid != iface.guid: - continue - elif iff != iface: + if iff != iface: continue if gw == '0.0.0.0': self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric) # noqa: E501 @@ -109,10 +105,7 @@ def ifdel(self, iff): self.invalidate_cache() new_routes = [] for rt in self.routes: - if scapy.consts.WINDOWS: - if iff.guid == rt[3].guid: - continue - elif iff == rt[3]: + if iff == rt[3]: continue new_routes.append(rt) self.routes = new_routes @@ -184,10 +177,7 @@ def get_if_bcast(self, iff): continue # Ignore default route "0.0.0.0" elif msk == 0xffffffff: continue # Ignore host-specific routes - if scapy.consts.WINDOWS: - if iff.guid != iface.guid: - continue - elif iff != iface: + if iff != iface: continue bcast = net | (~msk & 0xffffffff) bcast_list.append(ltoa(bcast)) @@ -198,12 +188,5 @@ def get_if_bcast(self, iff): conf.route = Route() -iface = conf.route.route(None, verbose=0)[0] - -if getattr(iface, "name", iface) == conf.loopback_name: - from scapy.arch import get_working_if - conf.iface = get_working_if() -else: - conf.iface = iface - -del iface +# Load everything, update conf.iface +conf.ifaces.reload() diff --git a/scapy/route6.py b/scapy/route6.py index 939df4be7e8..081d9fea487 100644 --- a/scapy/route6.py +++ b/scapy/route6.py @@ -17,6 +17,7 @@ from __future__ import absolute_import import socket from scapy.config import conf +from scapy.interfaces import resolve_iface from scapy.utils6 import in6_ptop, in6_cidr2mask, in6_and, \ in6_islladdr, in6_ismlladdr, in6_isincluded, in6_isgladdr, \ in6_isaddr6to4, in6_ismaddr, construct_source_candidate_set, \ @@ -24,7 +25,6 @@ from scapy.arch import read_routes6, in6_getifaddr from scapy.pton_ntop import inet_pton, inet_ntop from scapy.error import warning, log_loading -import scapy.modules.six as six from scapy.utils import pretty_list @@ -57,11 +57,11 @@ def __repr__(self): rtlst = [] for net, msk, gw, iface, cset, metric in self.routes: + if_repr = resolve_iface(iface).description rtlst.append(('%s/%i' % (net, msk), gw, - (iface if isinstance(iface, six.string_types) - else iface.description), - ", ".join(cset) if len(cset) > 0 else "", + if_repr, + cset, str(metric))) return pretty_list(rtlst, @@ -255,7 +255,7 @@ def route(self, dst=None, dev=None, verbose=conf.verb): # Deal with dev-specific request for cache search k = dst if dev is not None: - k = dst + "%%" + (dev if isinstance(dev, six.string_types) else dev.pcap_name) # noqa: E501 + k = dst + "%%" + dev if k in self.cache: return self.cache[k] @@ -324,7 +324,7 @@ def route(self, dst=None, dev=None, verbose=conf.verb): # Fill the cache (including dev-specific request) k = dst if dev is not None: - k = dst + "%%" + (dev if isinstance(dev, six.string_types) else dev.pcap_name) # noqa: E501 + k = dst + "%%" + dev self.cache[k] = res[0][2] return res[0][2] diff --git a/scapy/sendrecv.py b/scapy/sendrecv.py index 628984566e4..1486434b4e4 100644 --- a/scapy/sendrecv.py +++ b/scapy/sendrecv.py @@ -20,6 +20,7 @@ from scapy.data import ETH_P_ALL from scapy.config import conf from scapy.error import warning +from scapy.interfaces import network_name, resolve_iface from scapy.packet import Gen, Packet from scapy.utils import get_temp_file, tcpdump, wrpcap, \ ContextManagerSubprocess, PcapReader @@ -30,8 +31,9 @@ from scapy.modules.six.moves import map from scapy.sessions import DefaultSession from scapy.supersocket import SuperSocket + if conf.route is None: - # unused import, only to initialize conf.route + # unused import, only to initialize conf.route and conf.iface* import scapy.route # noqa: F401 ################# @@ -322,9 +324,23 @@ def __gen_send(s, x, inter=0, loop=0, count=None, verbose=None, realtime=None, r return sent_packets +def _send(x, _func, inter=0, loop=0, iface=None, count=None, + verbose=None, realtime=None, + return_packets=False, socket=None, **kargs): + """Internal function used by send and sendp""" + need_closing = socket is None + iface = resolve_iface(iface or conf.iface) + socket = socket or _func(iface)(iface=iface, **kargs) + results = __gen_send(socket, x, inter=inter, loop=loop, + count=count, verbose=verbose, + realtime=realtime, return_packets=return_packets) + if need_closing: + socket.close() + return results + + @conf.commands.register -def send(x, inter=0, loop=0, count=None, verbose=None, realtime=None, - return_packets=False, socket=None, iface=None, *args, **kargs): +def send(x, iface=None, *args, **kargs): """ Send packets at layer 3 @@ -340,21 +356,16 @@ def send(x, inter=0, loop=0, count=None, verbose=None, realtime=None, :param monitor: (not on linux) send in monitor mode :returns: None """ - need_closing = socket is None - kargs["iface"] = _interface_selection(iface, x) - socket = socket or conf.L3socket(*args, **kargs) - results = __gen_send(socket, x, inter=inter, loop=loop, - count=count, verbose=verbose, - realtime=realtime, return_packets=return_packets) - if need_closing: - socket.close() - return results + iface = _interface_selection(iface, x) + return _send( + x, + lambda iface: iface.l3socket(), iface=iface, + *args, **kargs + ) @conf.commands.register -def sendp(x, inter=0, loop=0, iface=None, iface_hint=None, count=None, - verbose=None, realtime=None, - return_packets=False, socket=None, *args, **kargs): +def sendp(x, iface=None, iface_hint=None, socket=None, *args, **kargs): """ Send packets at layer 2 @@ -372,14 +383,14 @@ def sendp(x, inter=0, loop=0, iface=None, iface_hint=None, count=None, """ if iface is None and iface_hint is not None and socket is None: iface = conf.route.route(iface_hint)[0] - need_closing = socket is None - socket = socket or conf.L2socket(iface=iface, *args, **kargs) - results = __gen_send(socket, x, inter=inter, loop=loop, - count=count, verbose=verbose, - realtime=realtime, return_packets=return_packets) - if need_closing: - socket.close() - return results + return _send( + x, + lambda iface: iface.l2socket(), + *args, + iface=iface, + socket=socket, + **kargs + ) @conf.commands.register @@ -401,7 +412,7 @@ def sendpfast(x, pps=None, mbps=None, realtime=None, loop=0, file_cache=False, i """ if iface is None: iface = conf.iface - argv = [conf.prog.tcpreplay, "--intf1=%s" % iface] + argv = [conf.prog.tcpreplay, "--intf1=%s" % network_name(iface)] if pps is not None: argv.append("--pps=%i" % pps) elif mbps is not None: @@ -549,8 +560,9 @@ def srp(x, promisc=None, iface=None, iface_hint=None, filter=None, """ if iface is None and iface_hint is not None: iface = conf.route.route(iface_hint)[0] - s = conf.L2socket(promisc=promisc, iface=iface, - filter=filter, nofilter=nofilter, type=type) + iface = resolve_iface(iface or conf.iface) + s = iface.l2socket()(promisc=promisc, iface=iface, + filter=filter, nofilter=nofilter, type=type) result = sndrcv(s, x, *args, **kargs) s.close() return result @@ -681,7 +693,8 @@ def srflood(x, promisc=None, filter=None, iface=None, nofilter=None, *args, **ka :param filter: provide a BPF filter :param iface: listen answers only on the given interface """ - s = conf.L3socket(promisc=promisc, filter=filter, iface=iface, nofilter=nofilter) # noqa: E501 + iface = resolve_iface(iface or conf.iface) + s = iface.l3socket()(promisc=promisc, filter=filter, iface=iface, nofilter=nofilter) # noqa: E501 r = sndrcvflood(s, x, *args, **kargs) s.close() return r @@ -697,7 +710,8 @@ def sr1flood(x, promisc=None, filter=None, iface=None, nofilter=0, *args, **karg :param filter: provide a BPF filter :param iface: listen answers only on the given interface """ - s = conf.L3socket(promisc=promisc, filter=filter, nofilter=nofilter, iface=iface) # noqa: E501 + iface = resolve_iface(iface or conf.iface) + s = iface.l3socket()(promisc=promisc, filter=filter, nofilter=nofilter, iface=iface) # noqa: E501 ans, _ = sndrcvflood(s, x, *args, **kargs) s.close() if len(ans) > 0: @@ -716,7 +730,8 @@ def srpflood(x, promisc=None, filter=None, iface=None, iface_hint=None, nofilter """ if iface is None and iface_hint is not None: iface = conf.route.route(iface_hint)[0] - s = conf.L2socket(promisc=promisc, filter=filter, iface=iface, nofilter=nofilter) # noqa: E501 + iface = resolve_iface(iface or conf.iface) + s = iface.l2socket()(promisc=promisc, filter=filter, iface=iface, nofilter=nofilter) # noqa: E501 r = sndrcvflood(s, x, *args, **kargs) s.close() return r @@ -732,7 +747,8 @@ def srp1flood(x, promisc=None, filter=None, iface=None, nofilter=0, *args, **kar :param filter: provide a BPF filter :param iface: listen answers only on the given interface """ - s = conf.L2socket(promisc=promisc, filter=filter, nofilter=nofilter, iface=iface) # noqa: E501 + iface = resolve_iface(iface or conf.iface) + s = iface.l2socket()(promisc=promisc, filter=filter, nofilter=nofilter, iface=iface) # noqa: E501 ans, _ = sndrcvflood(s, x, *args, **kargs) s.close() if len(ans) > 0: @@ -803,6 +819,7 @@ class AsyncSniffer(object): >>> print("nice weather today") >>> t.stop() """ + def __init__(self, *args, **kwargs): # Store keyword arguments self.args = args @@ -888,8 +905,9 @@ def _write_to_pcap(packets_list): quiet=quiet) )] = offline if not sniff_sockets or iface is not None: + iface = resolve_iface(iface or conf.iface) if L2socket is None: - L2socket = conf.L2listen + L2socket = iface.l2listen() if isinstance(iface, list): sniff_sockets.update( (L2socket(type=ETH_P_ALL, iface=ifname, *arg, **karg), @@ -1065,11 +1083,14 @@ def bridge_and_sniff(if1, if2, xfrm12=None, xfrm21=None, prn=None, L2socket=None "bridge_and_sniff() -- ignoring it.", arg) del kargs[arg] - def _init_socket(iface, count): + def _init_socket(iface, count, L2socket=L2socket): if isinstance(iface, SuperSocket): return iface, "iface%d" % count else: - return (L2socket or conf.L2socket)(iface=iface), iface + if not L2socket: + iface = resolve_iface(iface or conf.iface) + L2socket = iface.l2socket() + return L2socket(iface=iface), iface sckt1, if1 = _init_socket(if1, 1) sckt2, if2 = _init_socket(if2, 2) peers = {if1: sckt2, if2: sckt1} diff --git a/scapy/supersocket.py b/scapy/supersocket.py index ff80b756757..177b6563bd5 100644 --- a/scapy/supersocket.py +++ b/scapy/supersocket.py @@ -21,6 +21,7 @@ from scapy.data import MTU, ETH_P_IP, SOL_PACKET, SO_TIMESTAMPNS from scapy.compat import raw, bytes_encode from scapy.error import warning, log_runtime +from scapy.interfaces import network_name import scapy.modules.six as six import scapy.packet from scapy.utils import PcapReader, tcpdump @@ -222,7 +223,8 @@ def __init__(self, type=ETH_P_IP, filter=None, iface=None, promisc=None, nofilte self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 self.iface = iface if iface is not None: - self.ins.bind((self.iface, type)) + iface = network_name(iface) + self.ins.bind((iface, type)) if not six.PY2: try: # Receive Auxiliary Data (VLAN tags) @@ -361,14 +363,9 @@ def __init__(self, iface=None, promisc=None, filter=None, nofilter=False, args = ['-w', '-', '-s', '65535'] if iface is None and (WINDOWS or DARWIN): iface = conf.iface - if WINDOWS: - try: - iface = iface.pcap_name - except AttributeError: - pass self.iface = iface if iface is not None: - args.extend(['-i', self.iface]) + args.extend(['-i', network_name(iface)]) if not promisc: args.append('-p') if not nofilter: diff --git a/scapy/themes.py b/scapy/themes.py index f9cbc300648..583eccb329f 100644 --- a/scapy/themes.py +++ b/scapy/themes.py @@ -11,7 +11,6 @@ # Color themes # ################## -import cgi import sys @@ -44,6 +43,7 @@ class ColorTable: "blink": ("\033[5m", ""), "invert": ("\033[7m", ""), } + inv_map = {v[0]: v[1] for k, v in colors.items()} def __repr__(self): return "" @@ -52,8 +52,7 @@ def __getattr__(self, attr): return self.colors.get(attr, [""])[0] def ansi_to_pygments(self, x): # Transform ansi encoded text to Pygments text # noqa: E501 - inv_map = {v[0]: v[1] for k, v in self.colors.items()} - for k, v in inv_map.items(): + for k, v in self.inv_map.items(): x = x.replace(k, " " + v) return x.strip() @@ -363,7 +362,8 @@ def apply_ipython_style(shell): if isinstance(conf.color_theme, (FormatTheme, NoTheme)): # Formatable if isinstance(conf.color_theme, HTMLTheme): - prompt = cgi.escape(conf.prompt) + from scapy.compat import html_escape + prompt = html_escape(conf.prompt) elif isinstance(conf.color_theme, LatexTheme): from scapy.utils import tex_escape prompt = tex_escape(conf.prompt) diff --git a/scapy/tools/UTscapy.py b/scapy/tools/UTscapy.py index af292acb113..5f861bcd328 100644 --- a/scapy/tools/UTscapy.py +++ b/scapy/tools/UTscapy.py @@ -515,9 +515,9 @@ def import_UTscapy_tools(ses): ses["retry_test"] = retry_test ses["Bunch"] = Bunch if WINDOWS: - from scapy.arch.windows import _route_add_loopback, IFACES + from scapy.arch.windows import _route_add_loopback _route_add_loopback() - ses["IFACES"] = IFACES + ses["conf"].ifaces = conf.ifaces ses["conf"].route.routes = conf.route.routes ses["conf"].route6.routes = conf.route6.routes diff --git a/scapy/utils.py b/scapy/utils.py index 6eab9cc267c..5b792347ed2 100644 --- a/scapy/utils.py +++ b/scapy/utils.py @@ -26,7 +26,7 @@ import threading import scapy.modules.six as six -from scapy.modules.six.moves import range, input +from scapy.modules.six.moves import range, input, zip_longest from scapy.config import conf from scapy.consts import DARWIN, WINDOWS, WINDOWS_XP, OPENBSD @@ -1940,20 +1940,46 @@ def get_terminal_width(): def pretty_list(rtlst, header, sortBy=0, borders=False): - """Pretty list to fit the terminal, and add header""" + """ + Pretty list to fit the terminal, and add header. + + :param rtlst: a list of tuples. each tuple contains a value which can + be either a string or a list of string. + :param sortBy: the column id (starting with 0) which whill be used for + ordering + :param borders: whether to put borders on the table or not + """ if borders: _space = "|" else: _space = " " + cols = len(header[0]) # Windows has a fat terminal border - _spacelen = len(_space) * (len(header) - 1) + (10 if WINDOWS else 0) + _spacelen = len(_space) * (cols - 1) + int(WINDOWS) _croped = False # Sort correctly rtlst.sort(key=lambda x: x[sortBy]) + # Resolve multi-values + for i, line in enumerate(rtlst): + ids = [] + values = [] + for j, val in enumerate(line): + if isinstance(val, list): + ids.append(j) + values.append(val or " ") + if values: + del rtlst[i] + k = 0 + for ex_vals in zip_longest(*values, fillvalue=" "): + extra_line = ([" "] * cols) if k else list(line) + for j, h in enumerate(ids): + extra_line[h] = ex_vals[j] + rtlst.insert(i + k, tuple(extra_line)) + k += 1 # Append tag rtlst = header + rtlst # Detect column's width - colwidth = [max([len(y) for y in x]) for x in zip(*rtlst)] + colwidth = [max(len(y) for y in x) for x in zip(*rtlst)] # Make text fit in box (if required) width = get_terminal_width() if conf.auto_crop_tables and width: @@ -1982,8 +2008,7 @@ def pretty_list(rtlst, header, sortBy=0, borders=False): if borders: rtlst.insert(1, tuple("-" * x for x in colwidth)) # Compile - rt = "\n".join(((fmt % x).strip() for x in rtlst)) - return rt + return "\n".join(fmt % x for x in rtlst) def __make_table(yfmtfunc, fmtfunc, endline, data, fxyz, sortx=None, sorty=None, seplinefunc=None, dump=False): # noqa: E501 diff --git a/scapy/utils6.py b/scapy/utils6.py index 1e0375968cd..c2c68c941f1 100644 --- a/scapy/utils6.py +++ b/scapy/utils6.py @@ -832,7 +832,7 @@ def in6_isvalid(address): otherwise.""" try: - socket.inet_pton(socket.AF_INET6, address) + inet_pton(socket.AF_INET6, address) return True except Exception: return False diff --git a/test/bpf.uts b/test/bpf.uts index a0dd954deb5..af8ad3dd7e7 100644 --- a/test/bpf.uts +++ b/test/bpf.uts @@ -43,43 +43,6 @@ attach_filter(fd, "arp or icmp", conf.iface) iflist = get_if_list() len(iflist) > 0 - -= Get working network interfaces -~ needs_root - -from scapy.arch.bpf.core import get_working_if, get_working_ifaces -ifworking = get_working_ifaces() -assert len(ifworking) -assert get_working_if() == ifworking[0] - - -= Get working network interfaces order - -import mock -from scapy.arch.bpf.core import get_working_ifaces - -@mock.patch("scapy.arch.bpf.core.os.close") -@mock.patch("scapy.arch.bpf.core.fcntl.ioctl") -@mock.patch("scapy.arch.bpf.core.get_dev_bpf") -@mock.patch("scapy.arch.bpf.core.get_if") -@mock.patch("scapy.arch.bpf.core.get_if_list") -@mock.patch("scapy.arch.bpf.core.os.getuid") -def test_get_working_ifaces(mock_getuid, mock_get_if_list, mock_get_if, - mock_get_dev_bpf, mock_ioctl, mock_close): - mock_getuid.return_value = 0 - mock_get_if_list.return_value = ['igb0', 'em0', 'msk0', 'epair0a', 'igb1', - 'vlan20', 'igb10', 'igb2'] - mock_get_if.return_value = (b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00' - b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') - mock_get_dev_bpf.return_value = (31337,) - mock_ioctl.return_value = 0 - mock_close.return_value = 0 - return get_working_ifaces() - -assert test_get_working_ifaces() == ['em0', 'igb0', 'msk0', 'epair0a', 'igb1', - 'igb2', 'igb10', 'vlan20'] - = Misc functions ~ needs_root diff --git a/test/linux.uts b/test/linux.uts index 12f080e80c6..fabfde913a1 100644 --- a/test/linux.uts +++ b/test/linux.uts @@ -76,14 +76,6 @@ from mock import patch with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'): routes = read_routes() -= catch get_working_if only loopback -~ linux - -from mock import patch - -with patch('scapy.arch.linux.get_if_list', side_effect=lambda: [conf.loopback_name]): - assert get_working_if() == conf.loopback_name - = catch loopback device no address assigned ~ linux needs_root @@ -123,6 +115,8 @@ assert(exit_status == 0) = IPv6 link-local address selection +IFACES._add_fake_iface("scapy0") + from mock import patch conf.route6.routes = [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)] conf.route6.ipv6_ifaces = set(['scapy0']) @@ -238,6 +232,7 @@ except subprocess.CalledProcessError: except Exception: assert False +conf.ifaces.reload() if_list = get_if_list() assert ('veth_scapy_0' in if_list) assert ('veth_scapy_1' in if_list) @@ -265,6 +260,7 @@ try: veth.down() veth.destroy() + conf.ifaces.reload() if_list = get_if_list() assert ('veth_scapy_0' not in if_list) assert ('veth_scapy_1' not in if_list) diff --git a/test/regression.uts b/test/regression.uts index 038596bac56..1348ab407c1 100644 --- a/test/regression.uts +++ b/test/regression.uts @@ -127,15 +127,6 @@ except: assert not conf.use_bpf -if not conf.use_pcap: - a = six.moves.builtins.__dict__ - conf.interactive = True - conf.use_pcap = True - assert a["get_if_list"] == scapy.arch.pcapdnet.get_if_list - conf.use_pcap = False - assert a["get_if_list"] == scapy.arch.linux.get_if_list - conf.interactive = False - = Configuration conf.use_* WINDOWS ~ windows @@ -147,6 +138,18 @@ except: assert not conf.use_bpf += Configuration conf.use_pcap +~ linux + +if not conf.use_pcap: + assert not conf.iface.provider.libpcap + conf.use_pcap = True + assert conf.iface.provider.libpcap + for iface in conf.ifaces.values(): + assert iface.provider.libpcap + conf.use_pcap = False + assert not conf.iface.provider.libpcap + = Test layer filtering ~ filter @@ -182,22 +185,8 @@ bytes_hex(get_if_raw_addr(conf.iface)) def get_dummy_interface(): """Returns a dummy network interface""" - if WINDOWS: - data = {} - data["name"] = "dummy0" - data["description"] = "Does not exist" - data["win_index"] = -1 - data["guid"] = "{1XX00000-X000-0X0X-X00X-00XXXX000XXX}" - data["invalid"] = True - data["ipv4_metric"] = 1 - data["ipv6_metric"] = 1 - data["mac"] = "00:00:00:00:00:00" - data["ips"] = ["127.0.0.1", "::1"] - data["pcap_name"] = "\\Device\\NPF_" + data["guid"] - data["flags"] = 0 - return NetworkInterface(data) - else: - return "dummy0" + IFACES._add_fake_iface("dummy0") + return "dummy0" get_if_raw_addr(get_dummy_interface()) @@ -207,6 +196,56 @@ get_working_if() get_if_raw_addr6(conf.iface) += More Interfaces related functions + +# Test name resolution +old = conf.iface +conf.iface = conf.iface.name +assert conf.iface == old + +assert isinstance(conf.iface, NetworkInterface) +assert conf.iface.is_valid() + +import mock +@mock.patch("scapy.interfaces.conf.route.routes", []) +@mock.patch("scapy.interfaces.conf.ifaces", {}) +def _test_get_working_if(): + assert get_working_if() == conf.loopback_name + +assert conf.iface + "a" # left + +assert "hey! are you, ready to go ? %s" % conf.iface # format +assert "cuz you know the way to go" + conf.iface # right + + +_test_get_working_if() + += Test conf.ifaces + +conf.iface +conf.ifaces + +assert conf.iface in conf.ifaces.values() +assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface +assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface + +conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})} + +with ContextManagerCaptureOutput() as cmco: + conf.ifaces.show() + output = cmco.get_output() + +data = """ +Source Index Name MAC IPv4 IPv6 +Unknown 0 a aa:aa:aa:aa:aa:aa 127.0.0.1 ::1 + 127.0.0.2 ::2 +""".strip() + +output = [x.strip() for x in output.strip().split("\n")] +data = [x.strip() for x in data.strip().split("\n")] + +assert output == data + +conf.ifaces.reload() + = Test read_routes6() - default output routes6 = read_routes6() @@ -1675,6 +1714,7 @@ asrm = AS_resolver_multi(MockAS_resolver()) assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0 = sendpfast +~ tcpreplay old_interactive = conf.interactive conf.interactive = False @@ -4336,6 +4376,11 @@ assert defragment6(pkts).plen == 1508 ############ + Test Route6 class += Fake interfaces +IFACES._add_fake_iface("eth0") +IFACES._add_fake_iface("lo") +IFACES._add_fake_iface("scapy0") + = Route6 - Route6 flushing conf_iface = conf.iface conf.iface = "eth0" @@ -4346,6 +4391,7 @@ conf.route6.flush() not conf.route6.routes = Route6 - Route6.route + conf.route6.flush() conf.route6.ipv6_ifaces = set(['lo', 'eth0']) conf.route6.routes=[ @@ -4356,7 +4402,12 @@ conf.route6.routes=[ ( '2001:db8:0:4444::', 64, '::', 'eth0', ['2001:db8:0:4444:20f:1fff:feca:4650'], 1), ( '::', 0, 'fe80::20f:34ff:fe8a:8aa1', 'eth0', ['2001:db8:0:4444:20f:1fff:feca:4650', '2002:db8:0:4444:20f:1fff:feca:4650'], 1) ] -conf.route6.route("2002::1") == ('eth0', '2002:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') and conf.route6.route("2001::1") == ('eth0', '2001:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') and conf.route6.route("fe80::20f:1fff:feab:4870") == ('eth0', 'fe80::20f:1fff:feca:4650', '::') and conf.route6.route("::1") == ('lo', '::1', '::') and conf.route6.route("::") == ('eth0', '2001:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') and conf.route6.route('ff00::') == ('eth0', '2001:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') +assert conf.route6.route("2002::1") == ('eth0', '2002:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') +assert conf.route6.route("2001::1") == ('eth0', '2001:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') +assert conf.route6.route("fe80::20f:1fff:feab:4870") == ('eth0', 'fe80::20f:1fff:feca:4650', '::') +assert conf.route6.route("::1") == ('lo', '::1', '::') +assert conf.route6.route("::") == ('eth0', '2001:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') +assert conf.route6.route('ff00::') == ('eth0', '2001:db8:0:4444:20f:1fff:feca:4650', 'fe80::20f:34ff:fe8a:8aa1') conf.iface = conf_iface conf.route6.resync() if not len(conf.route6.routes): @@ -9095,6 +9146,9 @@ test_netbsd_7_0() import scapy +IFACES._add_fake_iface("enp3s0") +IFACES._add_fake_iface("lo") + old_routes = conf.route.routes old_iface = conf.iface old_loopback = conf.loopback_name @@ -9124,10 +9178,15 @@ finally: conf.iface = old_iface conf.route.routes = old_routes conf.route.invalidate_cache() + IFACES.reload() = Mocked IPv6 routes calls +IFACES._add_fake_iface("enp3s0") +IFACES._add_fake_iface("lo") + +old_routes = conf.route6.routes old_iface = conf.iface old_loopback = conf.loopback_name try: diff --git a/test/windows.uts b/test/windows.uts index 4754ad1867b..49db64e768c 100644 --- a/test/windows.uts +++ b/test/windows.uts @@ -25,31 +25,19 @@ assert select_objects([TimeOutSelector()], 1) == [] ############ + Windows arch unit tests -= Test pcapname += Test network_name iface = conf.iface -assert pcapname(iface.name) == iface.pcap_name -assert pcapname(iface.description) == iface.pcap_name -assert pcapname(iface.pcap_name) == iface.pcap_name +assert network_name(iface.name) == iface.network_name +assert network_name(iface.description) == iface.network_name +assert network_name(iface.network_name) == iface.network_name -= show_interfaces - -from scapy.arch import show_interfaces - -with ContextManagerCaptureOutput() as cmco: - show_interfaces() - lines = cmco.get_output().split("\n")[1:] - for l in lines: - if not l.strip(): - continue - int(l[:2]) - -= dev_from_pcapname += dev_from_networkname from scapy.config import conf -assert dev_from_pcapname(conf.iface.pcap_name).guid == conf.iface.guid +assert dev_from_networkname(conf.iface.network_name).guid == conf.iface.guid = test pcap_service_status @@ -74,7 +62,7 @@ assert pcap_service_status()[2] == True def _test_autostart_ui(mocked_getiflist): mocked_getiflist.side_effect = lambda: [] IFACES.reload() - assert all(x.win_index < 0 for x in IFACES.data.values()) + assert all(x.index < 0 for x in IFACES.data.values()) try: old_ifaces = IFACES.data.copy() diff --git a/tox.ini b/tox.ini index ff7e5769750..75834c6a963 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,7 @@ description = "Scapy unit tests" whitelist_externals = sudo passenv = PATH PWD PROGRAMFILES WINDIR SYSTEMROOT # Used by scapy - SCAPY_USE_PCAPDNET + SCAPY_USE_LIBPCAP deps = mock # cryptography requirements setuptools>=18.5