Code: Select all
import multiprocessing
import queue
import time
import numpy as np
class CameraSimulator(multiprocessing.Process):
def __init__(self, queue):
super().__init__()
self.queue = queue
self.connected = multiprocessing.Event()
self.acquiring = multiprocessing.Event()
def run(self):
while self.connected.is_set():
while self.acquiring.is_set():
image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
try:
self.queue.put(image)
except queue.Full:
continue
time.sleep(0.1)
def connect(self):
self.connected.set()
self.start()
def disconnect(self):
self.connected.clear()
def acquire(self):
self.acquiring.set()
def stop(self):
self.acquiring.clear()
if __name__ == "__main__":
image_queue = multiprocessing.Queue(maxsize=1000)
camera = CameraSimulator(image_queue)
print("Connect camera")
camera.connect()
print("Start camera acquisition")
camera.acquire()
time.sleep(2)
print("Stop camera acquisition")
camera.stop()
time.sleep(2)
print("Start camera acquisition again")
camera.acquire()
time.sleep(2)
print("Stop camera acquisition")
camera.stop()
print("Disconnect camera")
camera.disconnect()
print("Draining queue")
while not image_queue.empty():
try:
image_queue.get_nowait()
except queue.Empty:
break
print("Queue drained")
camera.join()
print("Camera process terminated")