DPDK logo

Elixir Cross Referencer

#!/usr/bin/env python3
# SPDX-License-Identifier: BSD-3-Clause

import fcntl
import pkg_resources
import socket
import struct
import sys
import unittest


if sys.version_info < (3, 0):
    print("Python3 is required to run this script")
    sys.exit(1)


try:
    from scapy.all import Ether
except ImportError:
    print("Scapy module is required")
    sys.exit(1)


PKTTEST_REQ = [
    "scapy>=2.4.3",
]


def assert_requirements(req):
    """
    assert requirement is met
    req can hold a string or a list of strings
    """
    try:
        pkg_resources.require(req)
    except (pkg_resources.DistributionNotFound, pkg_resources.VersionConflict) as e:
        print("Requirement assertion: " + str(e))
        sys.exit(1)


TAP_UNPROTECTED = "dtap1"
TAP_PROTECTED = "dtap0"


class Interface(object):
    ETH_P_ALL = 3
    MAX_PACKET_SIZE = 1280
    IOCTL_GET_INFO = 0x8927
    SOCKET_TIMEOUT = 0.5
    def __init__(self, ifname):
        self.name = ifname

        # create and bind socket to specified interface
        self.s = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(Interface.ETH_P_ALL))
        self.s.settimeout(Interface.SOCKET_TIMEOUT)
        self.s.bind((self.name, 0, socket.PACKET_OTHERHOST))

        # get interface MAC address
        info = fcntl.ioctl(self.s.fileno(), Interface.IOCTL_GET_INFO,  struct.pack('256s', bytes(ifname[:15], encoding='ascii')))
        self.mac = ':'.join(['%02x' % i for i in info[18:24]])

    def __del__(self):
        self.s.close()

    def send_l3packet(self, pkt, mac):
        e = Ether(src=self.mac, dst=mac)
        self.send_packet(e/pkt)

    def send_packet(self, pkt):
        self.send_bytes(bytes(pkt))

    def send_bytes(self, bytedata):
        self.s.send(bytedata)

    def recv_packet(self):
        return Ether(self.recv_bytes())

    def recv_bytes(self):
        return self.s.recv(Interface.MAX_PACKET_SIZE)

    def get_mac(self):
        return self.mac


class PacketXfer(object):
    def __init__(self, protected_iface=TAP_PROTECTED, unprotected_iface=TAP_UNPROTECTED):
        self.protected_port = Interface(protected_iface)
        self.unprotected_port = Interface(unprotected_iface)

    def send_to_protected_port(self, pkt, remote_mac=None):
        if remote_mac is None:
            remote_mac = self.unprotected_port.get_mac()
        self.protected_port.send_l3packet(pkt, remote_mac)

    def send_to_unprotected_port(self, pkt, remote_mac=None):
        if remote_mac is None:
            remote_mac = self.protected_port.get_mac()
        self.unprotected_port.send_l3packet(pkt, remote_mac)

    def xfer_unprotected(self, pkt):
        self.send_to_unprotected_port(pkt)
        return self.protected_port.recv_packet()

    def xfer_protected(self, pkt):
        self.send_to_protected_port(pkt)
        return self.unprotected_port.recv_packet()


def pkttest():
    if len(sys.argv) == 1:
        sys.exit(unittest.main(verbosity=2))
    elif len(sys.argv) == 2:
        if sys.argv[1] == "config":
            module = __import__('__main__')
            try:
                print(module.config())
            except AttributeError:
                sys.stderr.write("Cannot find \"config()\" in a test")
                sys.exit(1)
    else:
        sys.exit(1)


if __name__ == "__main__":
    if len(sys.argv) == 2 and sys.argv[1] == "check_reqs":
        assert_requirements(PKTTEST_REQ)
    else:
        print("Usage: " + sys.argv[0] + " check_reqs")