Reduzieren Sie die Latenz eines Software-RTSP-SwitchesPython

Python-Programme
Guest
 Reduzieren Sie die Latenz eines Software-RTSP-Switches

Post by Guest »

Ich habe an einem Skript gearbeitet, das gstreamer verwendet, um eine Datenpipeline zum Umschalten zwischen den Kameras zu erstellen, mit mäßigem Erfolg, da die Software tut, was sie soll, aber eine starke Eingabeverzögerung aufweist und eine Latenz von bis zu 3 Sekunden aufweist Kamera zum Bildschirm. Irgendwelche Ratschläge, wie man das beheben kann?

Code: Select all

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GstRtspServer, GLib
import os
import threading
import time
from urllib.parse import urlparse
import socket

os.environ['GST_DEBUG'] = '2'
os.environ['GST_BUFFER_SIZE'] = '1024'

class RTSPMultiStreamServer:
def __init__(self, config_file="rtsp_sources.txt", port="8555"):
Gst.init(None)

self.config_file = config_file
self.sources = []
self.active_sources = []
self.current_source = 0
self.running = True
self.pipeline = None
self.last_switch_time = 0  # Time of last switch
self.switch_cooldown = 1.0  # Minimum time between switches (seconds)
self.switching = False  # Flag to prevent simultaneous switches

self.verify_sources()

if not self.active_sources:
print("No active RTSP sources available")
return

self.server = GstRtspServer.RTSPServer()
self.server.set_service(port)

self.factory = GstRtspServer.RTSPMediaFactory()
self.factory.set_shared(True)
self.factory.set_latency(0)

self.factory.connect('media-configure', self.on_media_configure)

self.setup_pipeline()
mounts = self.server.get_mount_points()
if mounts is None:
print("Error: Could not get mount points")
return

try:
mounts.remove_factory("/test")
except Exception:
pass  # Ignore if doesn't exist

mounts.add_factory("/test", self.factory)
print(f"Mount point added: /test on port {port}")

def verify_rtsp_url(self, url, timeout=2):
"""Verifies if an RTSP stream is active"""
try:
# Create a simpler test pipeline
test_pipeline_str = (
f'rtspsrc location="{url}" latency=0 protocols=tcp ! '
'fakesink sync=false'
)

test_pipeline = Gst.parse_launch(test_pipeline_str)

# Try to play
ret = test_pipeline.set_state(Gst.State.PLAYING)
#time.sleep(1)

# Check state
state = test_pipeline.get_state(timeout * Gst.SECOND)

# Cleanup
test_pipeline.set_state(Gst.State.NULL)

return state[0] == Gst.StateChangeReturn.SUCCESS

except Exception as e:
print(f"Error verifying {url}: {str(e)}")
return False

def verify_sources(self):
"""Reads and verifies all RTSP sources"""
if not os.path.exists(self.config_file):
with open(self.config_file, "w") as f:
f.write("rtsp://example1.com/stream1\nrtsp://example2.com/stream2")
print(f"File {self.config_file} created.  Please add valid RTSP URLs.")
return

with open(self.config_file, "r") as f:
self.sources = [line.strip() for line in f if line.strip()]

print("\nVerifying RTSP sources...")
for url in self.sources:
print(f"\nVerifying {url}...", end=" ", flush=True)
if self.verify_rtsp_url(url):
self.active_sources.append(url)
print("✓ ACTIVE")
else:
print("✗ INACTIVE")

print(f"\nSources summary:")
print(f"Total sources: {len(self.sources)}")
print(f"Active sources: {len(self.active_sources)}")
print(f"Inactive sources: {len(self.sources) - len(self.active_sources)}")

if self.active_sources:
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
print(f"{i}. {url}")

if len(self.sources) - len(self.active_sources) > 0:
print("\nInactive sources:")
inactive = set(self.sources) - set(self.active_sources)
for i, url in enumerate(inactive, 1):
print(f"{i}. {url}")
print()

def setup_pipeline(self):
"""Sets up the complete pipeline with active sources"""
if not self.active_sources:
print("No active sources to configure pipeline")
return

# Create pipelines for each source
source_pipelines = []
for i, url in enumerate(self.active_sources):
source_pipeline = f"""
rtspsrc location="{url}" !
decodebin ! videoconvert !
videoscale ! video/x-raw,width=854,height=480 !
queue max-size-buffers=1 ! selector.sink_{i}
"""
source_pipelines.append(source_pipeline)

# Selector and encoder pipeline
output_pipeline = """
input-selector name=selector sync-mode=0 !
x264enc tune=zerolatency bitrate=1000 speed-preset=ultrafast
key-int-max=30 vbv-buf-capacity=1000 !
rtph264pay name=pay0 pt=96 config-interval=1
"""

# Join all pipelines
full_pipeline = " ".join(source_pipelines) + output_pipeline

try:
self.factory.set_launch(full_pipeline)
self.factory.set_shared(True)
self.factory.set_latency(0)

# Connect signal to get pipeline
self.factory.connect('media-configure', self.on_media_configure)

print("Pipeline configured successfully")
except Exception as e:
print(f"Error configuring pipeline: {str(e)}")

def on_media_configure(self, factory, media):
"""Callback when media is configured"""
self.pipeline = media.get_element()

# Configure initial selector
selector = self.pipeline.get_by_name("selector")
if selector:
selector.set_property("active-pad",
selector.get_static_pad("sink_0"))
return True

def switch_source(self):
"""Switches to next source with rapid change protection"""
if not self.active_sources or not self.pipeline:
return False

# Check if switch is in progress
if self.switching:
print("\nWait...  switch in progress")
return False

# Check cooldown
current_time = time.time()
time_since_last_switch = current_time - self.last_switch_time

if time_since_last_switch < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last_switch, 1)
print(f"\nWait {remaining}s to switch again")
return False

try:
self.switching = True

# Calculate next source
next_source = (self.current_source + 1) % len(self.active_sources)
url = self.active_sources[next_source]

# Switch to next source
selector = self.pipeline.get_by_name("selector")
if selector:
# Prepare switch
next_pad = selector.get_static_pad(f"sink_{next_source}")
if not next_pad:
print("\nError: Pad not found")
return False

# Perform switch
selector.set_property("active-pad", next_pad)
self.current_source = next_source
self.last_switch_time = current_time

print(f"\nSwitching to source {self.current_source + 1}/{len(self.active_sources)}: "
f"{self.active_sources[self.current_source]}")

return True

except Exception as e:
print(f"\nError switching source: {str(e)}")
return False

finally:
self.switching = False

def source_switcher(self):
while self.running:
#time.sleep(10)
if not self.running:
break
self.switch_source()

def handle_commands(self):
"""Handles user input commands"""
while self.running:
try:
command = input().strip().lower()
if command == 'n' or command == 'next':
self.switch_source()
elif command == 'l' or command == 'list':
self.list_sources()
elif command == 'c' or command == 'cooldown':
self.show_cooldown()
elif command == 'h' or command == 'help':
self.print_help()
elif command == 'q' or command == 'quit':
self.running = False
print("\nStopping server...")
break
except EOFError:
continue
except KeyboardInterrupt:
self.running = False
print("\nStopping server...")
break

def list_sources(self):
"""Shows list of active sources"""
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
if i - 1 == self.current_source:
print(f"→ {i}. {url} (CURRENT)")
else:
print(f"  {i}.  {url}")

def print_help(self):
"""Shows available command help"""
print("\nAvailable commands:")
print("  n, next     - Switch to next stream")
print("  l, list     - List active streams")
print("  c, cooldown - Show remaining time to switch")
print("  h, help     - Show this help")
print("  q, quit     - Exit program")

def start(self):
if not self.active_sources:
print("Error: No active RTSP sources")
return False

try:
if not self.server.attach(None):
print("Error: Could not attach server to context")
return False

print(f"\nRTSP Server started:")
print(f"URL: rtsp://localhost:{self.server.get_service()}/test")
print(f"Active sources in use: {len(self.active_sources)}")
self.list_sources()
self.print_help()

# Start thread for GLib main loop
loop = GLib.MainLoop()
loop_thread = threading.Thread(target=loop.run)
loop_thread.daemon = True
loop_thread.start()

# Handle commands in main thread
self.handle_commands()

# Cleanup
loop.quit()

except Exception as e:
print(f"Error starting server: {str(e)}")
return False

return True

def show_cooldown(self):
"""Shows remaining cooldown time"""
time_since_last = time.time() - self.last_switch_time
if time_since_last < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last, 1)
print(f"\nRemaining time to switch: {remaining}s")
else:
print("\nYou can switch sources now")

def main():
try:
server = RTSPMultiStreamServer(port="8555")
server.start()
except KeyboardInterrupt:
print("\nProgram terminated by user")
except Exception as e:
print(f"Error: {str(e)}")

if __name__ == "__main__":
main()
Die Idee hinter dem Code besteht darin, über einen einzigen Ausgang zwischen mehreren Streams wechseln zu können, aber angesichts dieser großen Verzögerung ist es schwierig, dies auf eine reale Anwendung anzuwenden. Ich verwende FPV-Kameras von SIYI und einen Ethernet-Switch, um den Stream an QGroundControl auf einem PC im selben Netzwerk zu übertragen

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post
  • RTSP-Streaming mit Node js?
    by Guest » » in HTML
    0 Replies
    6 Views
    Last post by Guest
  • Vanilla-Android-Emulator spammt RTSP in Wireshark
    by Guest » » in Android
    0 Replies
    9 Views
    Last post by Guest
  • Timing-Problem beim Aufzeichnen mehrerer RTSP-Streams über 720p
    by Guest » » in C++
    0 Replies
    7 Views
    Last post by Guest
  • Streaming über RTSP oder RTP in HTML5
    by Anonymous » » in HTML
    0 Replies
    10 Views
    Last post by Anonymous
  • Wie starte ich eine RTSP -Verbindung mithilfe der Android -App?
    by Anonymous » » in Android
    0 Replies
    1 Views
    Last post by Anonymous