Ich entwickle eine Filterbroker -Klass, die aufrufbare Filter für Abonnentenprozesse verwaltet. Der Broker empfängt Funktionen, die in einem Filter über eine Nachrichtenwarteschlange eingewickelt sind. Ich stoße jedoch auf einen Wickelfehler, wenn ich versuche, eine lokal definierte Funktion zu senden: < /p>
Code: Select all
AttributeError: Can't get local object 'task..function'
< /code>
Der Fehler tritt auf, da die Funktion in einer anderen Funktion definiert ist ((task()
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect
@dataclass
class Service:
id: Optional[int] = None
name: str = ""
port: int = 0
class Filter:
def __init__(self, filter_function: callable):
self.filter_function: callable = filter_function
self.subscribers: list[Service] = []
def __call__(self, *args, **kwds):
return self.filter_function(*args, **kwds)
class FilterBroker(Thread):
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
self.filters: dict[str, Filter] = {}
def add_filter(self, name: str, filter: Filter):
if len(inspect.signature(filter).parameters) != 2:
raise TypeError("Invalid Filter: must have exactly two parameters")
self.filters[name] = filter
def run(self):
class_name = self.__class__.__name__
logging.info(f"[{class_name}]: Process started")
while True:
try:
task = self.queue.get()
logging.debug(f"[{class_name}]: Task received: {task}")
if task is None:
break
if not isinstance(task, tuple) or not callable(task[0]) or not isinstance(task[1], Queue):
continue
response_queue, method, *args = task
response = method(self, *args)
except Exception:
response = None
finally:
response_queue.put_nowait(response)
@staticmethod
def ask(fb: 'FilterBroker', *task):
response_queue = Manager().Queue()
fb.queue.put((response_queue, *task))
print("I put in queue")
result = response_queue.get()
print("I got result")
response_queue.close()
return result
manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()
def task(broker):
def function(x):
return x > 0
f = Filter(function)
print(f(2))
FilterBroker.ask(broker, FilterBroker.add_filter, 'test', f)
logging.debug(f"Filter added")
process = Process(target=task, args=(broker,))
process.start()
process.join()
print("Process finished")
< /code>
Voller Fehler Traceback < /h2>
Traceback (most recent call last):
File "/usr/lib64/python3.13/multiprocessing/process.py", line 313, in _bootstrap
self.run()
~~~~~~~~^^
File "/usr/lib64/python3.13/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/****/Scrivania/github/ctf_proxy/refactoring/test.py", line 22, in task
fb.ask(broker, fb.add_filter, 'test', f)
~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//****/Scrivania/github/ctf_proxy/refactoring/proxy/multiprocess/FilterBroker.py", line 299, in ask
fb.queue.put((response_queue, *task))
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 2, in put
File "/usr/lib64/python3.13/multiprocessing/managers.py", line 830, in _callmethod
conn.send((self._id, methodname, args, kwds))
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/lib64/python3.13/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
~~~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get local object 'task..function'
< /code>
Frage < /h2>
Wie kann ich meinen Code ändern, um das Senden lokal definierter Funktionen und Lambdas per Multiprocessing zu unterstützen? Ich möchte>