Ich habe an einem Skript gearbeitet, das gstreamer verwendet, um eine Datenpipeline zum Umschalten zwischen den Kameras zu erstellen, mit mäßigem Erfolg, da die Software tut, was sie soll, aber eine starke Eingabeverzögerung aufweist und eine Latenz von bis zu 3 Sekunden aufweist Kamera zum Bildschirm. Irgendwelche Ratschläge, wie man das beheben kann?
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GstRtspServer, GLib
import os
import threading
import time
from urllib.parse import urlparse
import socket
os.environ['GST_DEBUG'] = '2'
os.environ['GST_BUFFER_SIZE'] = '1024'
class RTSPMultiStreamServer:
def __init__(self, config_file="rtsp_sources.txt", port="8555"):
Gst.init(None)
self.config_file = config_file
self.sources = []
self.active_sources = []
self.current_source = 0
self.running = True
self.pipeline = None
self.last_switch_time = 0 # Time of last switch
self.switch_cooldown = 1.0 # Minimum time between switches (seconds)
self.switching = False # Flag to prevent simultaneous switches
self.verify_sources()
if not self.active_sources:
print("No active RTSP sources available")
return
self.server = GstRtspServer.RTSPServer()
self.server.set_service(port)
self.factory = GstRtspServer.RTSPMediaFactory()
self.factory.set_shared(True)
self.factory.set_latency(0)
self.factory.connect('media-configure', self.on_media_configure)
self.setup_pipeline()
mounts = self.server.get_mount_points()
if mounts is None:
print("Error: Could not get mount points")
return
try:
mounts.remove_factory("/test")
except Exception:
pass # Ignore if doesn't exist
mounts.add_factory("/test", self.factory)
print(f"Mount point added: /test on port {port}")
def verify_rtsp_url(self, url, timeout=2):
"""Verifies if an RTSP stream is active"""
try:
# Create a simpler test pipeline
test_pipeline_str = (
f'rtspsrc location="{url}" latency=0 protocols=tcp ! '
'fakesink sync=false'
)
test_pipeline = Gst.parse_launch(test_pipeline_str)
# Try to play
ret = test_pipeline.set_state(Gst.State.PLAYING)
#time.sleep(1)
# Check state
state = test_pipeline.get_state(timeout * Gst.SECOND)
# Cleanup
test_pipeline.set_state(Gst.State.NULL)
return state[0] == Gst.StateChangeReturn.SUCCESS
except Exception as e:
print(f"Error verifying {url}: {str(e)}")
return False
def verify_sources(self):
"""Reads and verifies all RTSP sources"""
if not os.path.exists(self.config_file):
with open(self.config_file, "w") as f:
f.write("rtsp://example1.com/stream1\nrtsp://example2.com/stream2")
print(f"File {self.config_file} created. Please add valid RTSP URLs.")
return
with open(self.config_file, "r") as f:
self.sources = [line.strip() for line in f if line.strip()]
print("\nVerifying RTSP sources...")
for url in self.sources:
print(f"\nVerifying {url}...", end=" ", flush=True)
if self.verify_rtsp_url(url):
self.active_sources.append(url)
print("✓ ACTIVE")
else:
print("✗ INACTIVE")
print(f"\nSources summary:")
print(f"Total sources: {len(self.sources)}")
print(f"Active sources: {len(self.active_sources)}")
print(f"Inactive sources: {len(self.sources) - len(self.active_sources)}")
if self.active_sources:
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
print(f"{i}. {url}")
if len(self.sources) - len(self.active_sources) > 0:
print("\nInactive sources:")
inactive = set(self.sources) - set(self.active_sources)
for i, url in enumerate(inactive, 1):
print(f"{i}. {url}")
print()
def setup_pipeline(self):
"""Sets up the complete pipeline with active sources"""
if not self.active_sources:
print("No active sources to configure pipeline")
return
# Create pipelines for each source
source_pipelines = []
for i, url in enumerate(self.active_sources):
source_pipeline = f"""
rtspsrc location="{url}" !
decodebin ! videoconvert !
videoscale ! video/x-raw,width=854,height=480 !
queue max-size-buffers=1 ! selector.sink_{i}
"""
source_pipelines.append(source_pipeline)
# Selector and encoder pipeline
output_pipeline = """
input-selector name=selector sync-mode=0 !
x264enc tune=zerolatency bitrate=1000 speed-preset=ultrafast
key-int-max=30 vbv-buf-capacity=1000 !
rtph264pay name=pay0 pt=96 config-interval=1
"""
# Join all pipelines
full_pipeline = " ".join(source_pipelines) + output_pipeline
try:
self.factory.set_launch(full_pipeline)
self.factory.set_shared(True)
self.factory.set_latency(0)
# Connect signal to get pipeline
self.factory.connect('media-configure', self.on_media_configure)
print("Pipeline configured successfully")
except Exception as e:
print(f"Error configuring pipeline: {str(e)}")
def on_media_configure(self, factory, media):
"""Callback when media is configured"""
self.pipeline = media.get_element()
# Configure initial selector
selector = self.pipeline.get_by_name("selector")
if selector:
selector.set_property("active-pad",
selector.get_static_pad("sink_0"))
return True
def switch_source(self):
"""Switches to next source with rapid change protection"""
if not self.active_sources or not self.pipeline:
return False
# Check if switch is in progress
if self.switching:
print("\nWait... switch in progress")
return False
# Check cooldown
current_time = time.time()
time_since_last_switch = current_time - self.last_switch_time
if time_since_last_switch < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last_switch, 1)
print(f"\nWait {remaining}s to switch again")
return False
try:
self.switching = True
# Calculate next source
next_source = (self.current_source + 1) % len(self.active_sources)
url = self.active_sources[next_source]
# Switch to next source
selector = self.pipeline.get_by_name("selector")
if selector:
# Prepare switch
next_pad = selector.get_static_pad(f"sink_{next_source}")
if not next_pad:
print("\nError: Pad not found")
return False
# Perform switch
selector.set_property("active-pad", next_pad)
self.current_source = next_source
self.last_switch_time = current_time
print(f"\nSwitching to source {self.current_source + 1}/{len(self.active_sources)}: "
f"{self.active_sources[self.current_source]}")
return True
except Exception as e:
print(f"\nError switching source: {str(e)}")
return False
finally:
self.switching = False
def source_switcher(self):
while self.running:
#time.sleep(10)
if not self.running:
break
self.switch_source()
def handle_commands(self):
"""Handles user input commands"""
while self.running:
try:
command = input().strip().lower()
if command == 'n' or command == 'next':
self.switch_source()
elif command == 'l' or command == 'list':
self.list_sources()
elif command == 'c' or command == 'cooldown':
self.show_cooldown()
elif command == 'h' or command == 'help':
self.print_help()
elif command == 'q' or command == 'quit':
self.running = False
print("\nStopping server...")
break
except EOFError:
continue
except KeyboardInterrupt:
self.running = False
print("\nStopping server...")
break
def list_sources(self):
"""Shows list of active sources"""
print("\nActive sources:")
for i, url in enumerate(self.active_sources, 1):
if i - 1 == self.current_source:
print(f"→ {i}. {url} (CURRENT)")
else:
print(f" {i}. {url}")
def print_help(self):
"""Shows available command help"""
print("\nAvailable commands:")
print(" n, next - Switch to next stream")
print(" l, list - List active streams")
print(" c, cooldown - Show remaining time to switch")
print(" h, help - Show this help")
print(" q, quit - Exit program")
def start(self):
if not self.active_sources:
print("Error: No active RTSP sources")
return False
try:
if not self.server.attach(None):
print("Error: Could not attach server to context")
return False
print(f"\nRTSP Server started:")
print(f"URL: rtsp://localhost:{self.server.get_service()}/test")
print(f"Active sources in use: {len(self.active_sources)}")
self.list_sources()
self.print_help()
# Start thread for GLib main loop
loop = GLib.MainLoop()
loop_thread = threading.Thread(target=loop.run)
loop_thread.daemon = True
loop_thread.start()
# Handle commands in main thread
self.handle_commands()
# Cleanup
loop.quit()
except Exception as e:
print(f"Error starting server: {str(e)}")
return False
return True
def show_cooldown(self):
"""Shows remaining cooldown time"""
time_since_last = time.time() - self.last_switch_time
if time_since_last < self.switch_cooldown:
remaining = round(self.switch_cooldown - time_since_last, 1)
print(f"\nRemaining time to switch: {remaining}s")
else:
print("\nYou can switch sources now")
def main():
try:
server = RTSPMultiStreamServer(port="8555")
server.start()
except KeyboardInterrupt:
print("\nProgram terminated by user")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()
Die Idee hinter dem Code besteht darin, über einen einzigen Ausgang zwischen mehreren Streams wechseln zu können, aber angesichts dieser großen Verzögerung ist es schwierig, dies auf eine reale Anwendung anzuwenden. Ich verwende FPV-Kameras von SIYI und einen Ethernet-Switch, um den Stream an QGroundControl auf einem PC im selben Netzwerk zu übertragen
Ich habe an einem Skript gearbeitet, das gstreamer verwendet, um eine Datenpipeline zum Umschalten zwischen den Kameras zu erstellen, mit mäßigem Erfolg, da die Software tut, was sie soll, aber eine starke Eingabeverzögerung aufweist und eine Latenz von bis zu 3 Sekunden aufweist Kamera zum Bildschirm. Irgendwelche Ratschläge, wie man das beheben kann? [code] import gi gi.require_version('Gst', '1.0') gi.require_version('GstRtspServer', '1.0') from gi.repository import Gst, GstRtspServer, GLib import os import threading import time from urllib.parse import urlparse import socket
# Try to play ret = test_pipeline.set_state(Gst.State.PLAYING) #time.sleep(1)
# Check state state = test_pipeline.get_state(timeout * Gst.SECOND)
# Cleanup test_pipeline.set_state(Gst.State.NULL)
return state[0] == Gst.StateChangeReturn.SUCCESS
except Exception as e: print(f"Error verifying {url}: {str(e)}") return False
def verify_sources(self): """Reads and verifies all RTSP sources""" if not os.path.exists(self.config_file): with open(self.config_file, "w") as f: f.write("rtsp://example1.com/stream1\nrtsp://example2.com/stream2") print(f"File {self.config_file} created. Please add valid RTSP URLs.") return
with open(self.config_file, "r") as f: self.sources = [line.strip() for line in f if line.strip()]
print("\nVerifying RTSP sources...") for url in self.sources: print(f"\nVerifying {url}...", end=" ", flush=True) if self.verify_rtsp_url(url): self.active_sources.append(url) print("✓ ACTIVE") else: print("✗ INACTIVE")
if self.active_sources: print("\nActive sources:") for i, url in enumerate(self.active_sources, 1): print(f"{i}. {url}")
if len(self.sources) - len(self.active_sources) > 0: print("\nInactive sources:") inactive = set(self.sources) - set(self.active_sources) for i, url in enumerate(inactive, 1): print(f"{i}. {url}") print()
def setup_pipeline(self): """Sets up the complete pipeline with active sources""" if not self.active_sources: print("No active sources to configure pipeline") return
# Create pipelines for each source source_pipelines = [] for i, url in enumerate(self.active_sources): source_pipeline = f""" rtspsrc location="{url}" ! decodebin ! videoconvert ! videoscale ! video/x-raw,width=854,height=480 ! queue max-size-buffers=1 ! selector.sink_{i} """ source_pipelines.append(source_pipeline)
# Switch to next source selector = self.pipeline.get_by_name("selector") if selector: # Prepare switch next_pad = selector.get_static_pad(f"sink_{next_source}") if not next_pad: print("\nError: Pad not found") return False
print(f"\nSwitching to source {self.current_source + 1}/{len(self.active_sources)}: " f"{self.active_sources[self.current_source]}")
return True
except Exception as e: print(f"\nError switching source: {str(e)}") return False
finally: self.switching = False
def source_switcher(self): while self.running: #time.sleep(10) if not self.running: break self.switch_source()
def handle_commands(self): """Handles user input commands""" while self.running: try: command = input().strip().lower() if command == 'n' or command == 'next': self.switch_source() elif command == 'l' or command == 'list': self.list_sources() elif command == 'c' or command == 'cooldown': self.show_cooldown() elif command == 'h' or command == 'help': self.print_help() elif command == 'q' or command == 'quit': self.running = False print("\nStopping server...") break except EOFError: continue except KeyboardInterrupt: self.running = False print("\nStopping server...") break
def list_sources(self): """Shows list of active sources""" print("\nActive sources:") for i, url in enumerate(self.active_sources, 1): if i - 1 == self.current_source: print(f"→ {i}. {url} (CURRENT)") else: print(f" {i}. {url}")
def print_help(self): """Shows available command help""" print("\nAvailable commands:") print(" n, next - Switch to next stream") print(" l, list - List active streams") print(" c, cooldown - Show remaining time to switch") print(" h, help - Show this help") print(" q, quit - Exit program")
def start(self): if not self.active_sources: print("Error: No active RTSP sources") return False
try: if not self.server.attach(None): print("Error: Could not attach server to context") return False
print(f"\nRTSP Server started:") print(f"URL: rtsp://localhost:{self.server.get_service()}/test") print(f"Active sources in use: {len(self.active_sources)}") self.list_sources() self.print_help()
# Start thread for GLib main loop loop = GLib.MainLoop() loop_thread = threading.Thread(target=loop.run) loop_thread.daemon = True loop_thread.start()
# Handle commands in main thread self.handle_commands()
# Cleanup loop.quit()
except Exception as e: print(f"Error starting server: {str(e)}") return False
return True
def show_cooldown(self): """Shows remaining cooldown time""" time_since_last = time.time() - self.last_switch_time if time_since_last < self.switch_cooldown: remaining = round(self.switch_cooldown - time_since_last, 1) print(f"\nRemaining time to switch: {remaining}s") else: print("\nYou can switch sources now")
def main(): try: server = RTSPMultiStreamServer(port="8555") server.start() except KeyboardInterrupt: print("\nProgram terminated by user") except Exception as e: print(f"Error: {str(e)}")
if __name__ == "__main__": main() [/code] Die Idee hinter dem Code besteht darin, über einen einzigen Ausgang zwischen mehreren Streams wechseln zu können, aber angesichts dieser großen Verzögerung ist es schwierig, dies auf eine reale Anwendung anzuwenden. Ich verwende FPV-Kameras von SIYI und einen Ethernet-Switch, um den Stream an QGroundControl auf einem PC im selben Netzwerk zu übertragen
Ich verwende das Paket node-rtsp-stream, um RTSP-Streaming auszuführen. Wie kann ich es in der MVC-Struktur implementieren und zurückrufen?
Controller:
var RTSP = require('./models/rtsp.js');
var...
Ich habe mit einigen ziemlich heimtückischen Hacks sowohl auf meinem Telefon als auch auf meinem Computer zu kämpfen (Fedora 41). Vieles davon hat mit Google zu tun, aber darüber hinaus bin ich immer...
Suche nach Tipps zur Optimierung meines RTSP-Stream-Capture-Projekts.
Das Ziel: Den Stream kontinuierlich in 30-Sekunden-Blöcken erfassen.
Das Problem : Die Gesamtzeit ist inkonsistent, wenn die Last...
Ich erstelle eine Webanwendung, die einen RTSP/RTP -Stream von einem Server zurückspielen sollte. Wenn nicht, was wäre die einfachste Lösung? Vielleicht fallen auf ein VLC -Plugin oder ähnliches.
Wie kann ich eine RTSP -Verbindung mithilfe der Android -App starten? Gibt es eine API oder Methode, mit der wir eine RTSP -Verbindung starten und herstellen können?