Anonymous
Ryu Controller wird nicht ordnungsgemäß benutzerdefinierte Skripte ausgeführt
Post
by Anonymous » 14 Apr 2025, 10:52
Ich versuche, Zero Trust Architecture in 5G SDN mit RYU Controller und Mininet auf Ubuntu VM zu implementieren. Wenn ich es versuche, ohne benutzerdefiniertes Skript wie
auszuführen
Code: Select all
ryu-manager ryu.app.simple_switch_13< /code>
Es funktioniert absolut in Ordnung und alle Pakete werden empfangen. Aber wenn ich ein angepasendes Zero Trust -Skript wie
ausführeryu-manager zero_trust.py< /code>
Alle Pakete werden fallen gelassen und der Zielhost nicht erreichbar. Das Seltsame ist, was ich auf dem Skript schreibe, wird auf dem Bildschirm angemeldet, sodass ich weiß, dass das Skript gut ausführt, aber nur die Pakete fallen und keiner erreicht das Ziel. Ich sende den Code für Topologie und Zero_Trust.py < /p>
5g_topo.py
from mininet.topo import Topo
from mininet.node import OVSSwitch
class FiveGTopology(Topo):
def build(self):
# Hosts
ue1 = self.addHost('h1', ip='10.0.0.1/24') # UE 1
ue2 = self.addHost('h2', ip='10.0.0.2/24') # UE 2
core = self.addHost('h3', ip='10.0.0.3/24') # Core (UPF/server)
# Switches (representing gNBs or access network)
s1 = self.addSwitch('s1', cls=OVSSwitch, protocols='OpenFlow13')
s2 = self.addSwitch('s2', cls=OVSSwitch, protocols='OpenFlow13')
# Links (UEs to access, access to core)
self.addLink(ue1, s1)
self.addLink(ue2, s1)
self.addLink(s1, s2)
self.addLink(s2, core)
topos = { 'my5gtopo': (lambda: FiveGTopology()) }
< /code>
Zero_Trust.py
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.lib.packet import packet, ethernet, ipv4
class ZeroTrust(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION]
allowed_flows = [
('10.0.0.1', '10.0.0.3'),
('10.0.0.3', '10.0.0.1'),
('10.0.0.2', '10.0.0.3'),
('10.0.0.3', '10.0.0.2'),
]
def __init__(self, *args, **kwargs):
super(ZeroTrust, self).__init__(*args, **kwargs)
self.mac_to_port = {} # {dpid: {mac: port}}
def add_flow(self, datapath, priority, match, actions):
parser = datapath.ofproto_parser
ofproto = datapath.ofproto
inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)]
flow_mod = parser.OFPFlowMod(datapath=datapath, priority=priority, match=match, instructions=inst)
datapath.send_msg(flow_mod)
self.logger.info("Flow installed on switch %s: %s", datapath.id, match)
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
datapath = ev.msg.datapath
parser = datapath.ofproto_parser
ofproto = datapath.ofproto
match = parser.OFPMatch()
actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)]
self.add_flow(datapath, 0, match, actions)
@set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER)
def packet_in_handler(self, ev):
msg = ev.msg
datapath = msg.datapath
dpid = datapath.id
parser = datapath.ofproto_parser
ofproto = datapath.ofproto
in_port = msg.match['in_port']
pkt = packet.Packet(msg.data)
eth = pkt.get_protocol(ethernet.ethernet)
ip_pkt = pkt.get_protocol(ipv4.ipv4)
if not eth or not ip_pkt:
return
dst = eth.dst
src = eth.src
src_ip = ip_pkt.src
dst_ip = ip_pkt.dst
self.mac_to_port.setdefault(dpid, {})
self.mac_to_port[dpid][src] = in_port
self.logger.info("[ZT] PacketIn on switch %s: %s (%s) → %s (%s)", dpid, src_ip, src, dst_ip, dst)
if (src_ip, dst_ip) not in self.allowed_flows:
self.logger.info("[ZT] BLOCKED: %s → %s", src_ip, dst_ip)
return
out_port = self.mac_to_port[dpid].get(dst)
if out_port is None:
out_port = ofproto.OFPP_FLOOD
actions = [parser.OFPActionOutput(out_port)]
# Install flow for allowed traffic
match = parser.OFPMatch(in_port=in_port, eth_type=0x0800, ipv4_src=src_ip, ipv4_dst=dst_ip)
self.add_flow(datapath, 10, match, actions)
out = parser.OFPPacketOut(
datapath=datapath,
buffer_id=ofproto.OFP_NO_BUFFER,
in_port=in_port,
actions=actions,
data=msg.data
)
datapath.send_msg(out)
Bitte kann jemand helfen?
1744620728
Anonymous
Ich versuche, Zero Trust Architecture in 5G SDN mit RYU Controller und Mininet auf Ubuntu VM zu implementieren. Wenn ich es versuche, ohne benutzerdefiniertes Skript wie auszuführen[code]ryu-manager ryu.app.simple_switch_13< /code> Es funktioniert absolut in Ordnung und alle Pakete werden empfangen. Aber wenn ich ein angepasendes Zero Trust -Skript wie ausführeryu-manager zero_trust.py< /code> Alle Pakete werden fallen gelassen und der Zielhost nicht erreichbar. Das Seltsame ist, was ich auf dem Skript schreibe, wird auf dem Bildschirm angemeldet, sodass ich weiß, dass das Skript gut ausführt, aber nur die Pakete fallen und keiner erreicht das Ziel. Ich sende den Code für Topologie und Zero_Trust.py < /p> 5g_topo.py from mininet.topo import Topo from mininet.node import OVSSwitch class FiveGTopology(Topo): def build(self): # Hosts ue1 = self.addHost('h1', ip='10.0.0.1/24') # UE 1 ue2 = self.addHost('h2', ip='10.0.0.2/24') # UE 2 core = self.addHost('h3', ip='10.0.0.3/24') # Core (UPF/server) # Switches (representing gNBs or access network) s1 = self.addSwitch('s1', cls=OVSSwitch, protocols='OpenFlow13') s2 = self.addSwitch('s2', cls=OVSSwitch, protocols='OpenFlow13') # Links (UEs to access, access to core) self.addLink(ue1, s1) self.addLink(ue2, s1) self.addLink(s1, s2) self.addLink(s2, core) topos = { 'my5gtopo': (lambda: FiveGTopology()) } < /code> Zero_Trust.py from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.ofproto import ofproto_v1_3 from ryu.lib.packet import packet, ethernet, ipv4 class ZeroTrust(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] allowed_flows = [ ('10.0.0.1', '10.0.0.3'), ('10.0.0.3', '10.0.0.1'), ('10.0.0.2', '10.0.0.3'), ('10.0.0.3', '10.0.0.2'), ] def __init__(self, *args, **kwargs): super(ZeroTrust, self).__init__(*args, **kwargs) self.mac_to_port = {} # {dpid: {mac: port}} def add_flow(self, datapath, priority, match, actions): parser = datapath.ofproto_parser ofproto = datapath.ofproto inst = [parser.OFPInstructionActions(ofproto.OFPIT_APPLY_ACTIONS, actions)] flow_mod = parser.OFPFlowMod(datapath=datapath, priority=priority, match=match, instructions=inst) datapath.send_msg(flow_mod) self.logger.info("Flow installed on switch %s: %s", datapath.id, match) @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER) def switch_features_handler(self, ev): datapath = ev.msg.datapath parser = datapath.ofproto_parser ofproto = datapath.ofproto match = parser.OFPMatch() actions = [parser.OFPActionOutput(ofproto.OFPP_CONTROLLER, ofproto.OFPCML_NO_BUFFER)] self.add_flow(datapath, 0, match, actions) @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def packet_in_handler(self, ev): msg = ev.msg datapath = msg.datapath dpid = datapath.id parser = datapath.ofproto_parser ofproto = datapath.ofproto in_port = msg.match['in_port'] pkt = packet.Packet(msg.data) eth = pkt.get_protocol(ethernet.ethernet) ip_pkt = pkt.get_protocol(ipv4.ipv4) if not eth or not ip_pkt: return dst = eth.dst src = eth.src src_ip = ip_pkt.src dst_ip = ip_pkt.dst self.mac_to_port.setdefault(dpid, {}) self.mac_to_port[dpid][src] = in_port self.logger.info("[ZT] PacketIn on switch %s: %s (%s) → %s (%s)", dpid, src_ip, src, dst_ip, dst) if (src_ip, dst_ip) not in self.allowed_flows: self.logger.info("[ZT] BLOCKED: %s → %s", src_ip, dst_ip) return out_port = self.mac_to_port[dpid].get(dst) if out_port is None: out_port = ofproto.OFPP_FLOOD actions = [parser.OFPActionOutput(out_port)] # Install flow for allowed traffic match = parser.OFPMatch(in_port=in_port, eth_type=0x0800, ipv4_src=src_ip, ipv4_dst=dst_ip) self.add_flow(datapath, 10, match, actions) out = parser.OFPPacketOut( datapath=datapath, buffer_id=ofproto.OFP_NO_BUFFER, in_port=in_port, actions=actions, data=msg.data ) datapath.send_msg(out) [/code] Bitte kann jemand helfen?