Ryu Controller wird nicht ordnungsgemäß benutzerdefinierte Skripte ausgeführt
Posted: 14 Apr 2025, 10:52
Ich versuche, Zero Trust Architecture in 5G SDN mit RYU Controller und Mininet auf Ubuntu VM zu implementieren. Wenn ich es versuche, ohne benutzerdefiniertes Skript wie
auszuführen
Bitte kann jemand helfen?
auszuführen
Code: Select all
ryu-manager ryu.app.simple_switch_13< /code>
Es funktioniert absolut in Ordnung und alle Pakete werden empfangen. Aber wenn ich ein angepasendes Zero Trust -Skript wie
ausführeryu-manager zero_trust.py< /code>
Alle Pakete werden fallen gelassen und der Zielhost nicht erreichbar. Das Seltsame ist, was ich auf dem Skript schreibe, wird auf dem Bildschirm angemeldet, sodass ich weiß, dass das Skript gut ausführt, aber nur die Pakete fallen und keiner erreicht das Ziel. Ich sende den Code für Topologie und Zero_Trust.py < /p>
5g_topo.py
from mininet.topo import Topo
from mininet.node import OVSSwitch
class FiveGTopology(Topo):
def build(self):
# Hosts
ue1 = self.addHost('h1', ip='10.0.0.1/24') # UE 1
ue2 = self.addHost('h2', ip='10.0.0.2/24') # UE 2
core = self.addHost('h3', ip='10.0.0.3/24') # Core (UPF/server)
# Switches (representing gNBs or access network)
s1 = self.addSwitch('s1', cls=OVSSwitch, protocols='OpenFlow13')
s2 = self.addSwitch('s2', cls=OVSSwitch, protocols='OpenFlow13')
# Links (UEs to access, access to core)
self.addLink(ue1, s1)
self.addLink(ue2, s1)
self.addLink(s1, s2)
self.addLink(s2, core)
topos = { 'my5gtopo': (lambda: FiveGTopology()) }
< /code>
Zero_Trust.py
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet, ethernet, ipv4
class ZeroTrust(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
allowed_flows = [
('10.0.0.1', '10.0.0.3'),
('10.0.0.3', '10.0.0.1'),
('10.0.0.2', '10.0.0.3'),
('10.0.0.3', '10.0.0.2'),
]
def __init__(self, *args, **kwargs):
super(ZeroTrust, self).__init__(*args, **kwargs)
self.mac_to_port = {} # {dpid: {mac: port}}
def add_flow(self, datapath, priority, match, actions):
parser = datapath.ofproto_parser
ofproto = datapath.ofproto
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]
flow_mod = parser.OFPFlowMod(datapath=datapath, priority=priority, match=match, instructions=inst)
datapath.send_msg(flow_mod)
self.logger.info("Flow installed on switch %s: %s", datapath.id, match)
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
datapath = ev.msg.datapath
parser = datapath.ofproto_parser
ofproto = datapath.ofproto
match = parser.OFPMatch()
actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)]
self.add_flow(datapath, 0, match, actions)
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
dpid = datapath.id
parser = datapath.ofproto_parser
ofproto = datapath.ofproto
in_port = msg.match['in_port']
pkt = packet.Packet(msg.data)
eth = pkt.get_protocol(ethernet.ethernet)
ip_pkt = pkt.get_protocol(ipv4.ipv4)
if not eth or not ip_pkt:
return
dst = eth.dst
src = eth.src
src_ip = ip_pkt.src
dst_ip = ip_pkt.dst
self.mac_to_port.setdefault(dpid, {})
self.mac_to_port[dpid][src] = in_port
self.logger.info("[ZT] PacketIn on switch %s: %s (%s) → %s (%s)", dpid, src_ip, src, dst_ip, dst)
if (src_ip, dst_ip) not in self.allowed_flows:
self.logger.info("[ZT] BLOCKED: %s → %s", src_ip, dst_ip)
return
out_port = self.mac_to_port[dpid].get(dst)
if out_port is None:
out_port = ofproto.OFPP_FLOOD
actions = [parser.OFPActionOutput(out_port)]
# Install flow for allowed traffic
match = parser.OFPMatch(in_port=in_port, eth_type=0x0800, ipv4_src=src_ip, ipv4_dst=dst_ip)
self.add_flow(datapath, 10, match, actions)
out = parser.OFPPacketOut(
datapath=datapath,
buffer_id=ofproto.OFP_NO_BUFFER,
in_port=in_port,
actions=actions,
data=msg.data
)
datapath.send_msg(out)