Die lokale Funktion kann beim Senden von Cellable -Filterobjekten über Multiprocessing -Warteschlange nicht einsetzt werPython

Python-Programme
Anonymous
 Die lokale Funktion kann beim Senden von Cellable -Filterobjekten über Multiprocessing -Warteschlange nicht einsetzt wer

Post by Anonymous »

Problembeschreibung
Ich entwickle eine Filterbroker -Klass, die aufrufbare Filter für Abonnentenprozesse verwaltet. Der Broker empfängt Funktionen, die in einem Filter über eine Nachrichtenwarteschlange eingewickelt sind. Ich stoße jedoch auf einen Wickelfehler, wenn ich versuche, eine lokal definierte Funktion zu senden: < /p>

Code: Select all

AttributeError: Can't get local object 'task..function'
< /code>
Der Fehler tritt auf, da die Funktion in einer anderen Funktion definiert ist ((task()
) und ist daher nicht sackisch, aber ich muss das Senden von Lambda und lokal definierten Funktionen unterstützen.from threading import Thread
from multiprocessing import Queue, Manager, Process
from dataclasses import dataclass
from typing import Optional
import logging
import inspect

@dataclass
class Service:
id: Optional[int] = None
name: str = ""
port: int = 0

class Filter:
def __init__(self, filter_function: callable):
self.filter_function: callable = filter_function
self.subscribers: list[Service] = []

def __call__(self, *args, **kwds):
return self.filter_function(*args, **kwds)

class FilterBroker(Thread):
def __init__(self, queue: Queue) -> None:
super().__init__()
self.queue = queue
self.filters: dict[str, Filter] = {}

def add_filter(self, name: str, filter: Filter):
if len(inspect.signature(filter).parameters) != 2:
raise TypeError("Invalid Filter: must have exactly two parameters")
self.filters[name] = filter

def run(self):
class_name = self.__class__.__name__
logging.info(f"[{class_name}]: Process started")
while True:
try:
task = self.queue.get()
logging.debug(f"[{class_name}]: Task received: {task}")
if task is None:
break
if not isinstance(task, tuple) or not callable(task[0]) or not isinstance(task[1], Queue):
continue
response_queue, method, *args = task
response = method(self, *args)
except Exception:
response = None
finally:
response_queue.put_nowait(response)

@staticmethod
def ask(fb: 'FilterBroker', *task):
response_queue = Manager().Queue()
fb.queue.put((response_queue, *task))
print("I put in queue")
result = response_queue.get()
print("I got result")
response_queue.close()
return result

manager = Manager()
broker = FilterBroker(manager.Queue())
broker.start()

def task(broker):
def function(x):
return x > 0

f = Filter(function)
print(f(2))
FilterBroker.ask(broker, FilterBroker.add_filter, 'test', f)
logging.debug(f"Filter added")

process = Process(target=task, args=(broker,))

process.start()
process.join()

print("Process finished")
< /code>
Voller Fehler Traceback < /h2>
Traceback (most recent call last):
File "/usr/lib64/python3.13/multiprocessing/process.py", line 313, in _bootstrap
self.run()
~~~~~~~~^^
File "/usr/lib64/python3.13/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/****/Scrivania/github/ctf_proxy/refactoring/test.py", line 22, in task
fb.ask(broker, fb.add_filter, 'test', f)
~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//****/Scrivania/github/ctf_proxy/refactoring/proxy/multiprocess/FilterBroker.py", line 299, in ask
fb.queue.put((response_queue, *task))
~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
File "", line 2, in put
File "/usr/lib64/python3.13/multiprocessing/managers.py", line 830, in _callmethod
conn.send((self._id, methodname, args, kwds))
~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib64/python3.13/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
~~~~~~~~~~~~~~~~~~~~~^^^^^
File "/usr/lib64/python3.13/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
~~~~~~~~~~~~~~~~~~~~~~~^^^^^
AttributeError: Can't get local object 'task..function'
< /code>
Frage < /h2>
Wie kann ich meinen Code ändern, um das Senden lokal definierter Funktionen und Lambdas per Multiprocessing zu unterstützen? Ich möchte>

Quick Reply

Change Text Case: 
   
  • Similar Topics
    Replies
    Views
    Last post