Code: Select all
import multiprocessing
def process_data(data_chunk):
return [x * 2 for x in data_chunk]
def data_generator():
for i in range(1000000):
yield i # Yields one item at a time (not a list)
def worker(input_queue, output_queue):
while True:
data_chunk = input_queue.get()
if data_chunk is None:
break
output_queue.put(process_data(data_chunk))
if __name__ == "__main__":
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
num_processes = multiprocessing.cpu_count()
processes = [multiprocessing.Process(target=worker, args=(input_queue, output_queue)) for _ in range(num_processes)]
for p in processes:
p.start()
for data_chunk in data_generator():
input_queue.put(data_chunk)
for _ in range(num_processes):
input_queue.put(None)
for p in processes:
p.join()