I am developing a program to simulate a P2P network using the multiprocessing
package. Specifically, I have created a Node
class that inherits from multiprocessing.Process
and contains all the basic functionalities, while the SpecializedNode
class implements specific features.
SpecializedNode
performs an operation for max_iter
iterations and, at each iteration, sends the result to its peers (python broadcast_message(self, msg)
) and collects all received messages before executing the task again (collect_messages(self)
).
To handle inter-process communication, each Node
has a multiprocessing.Queue
where it receives messages from other peers.
Unfortunately, it seems that all processes get stuck when calling collect_messages(self)
, without raising any errors or exceptions. Since this is my first time using the multiprocessing
package, I wonder if I am using it correctly.
Moreover, does this implementation allow the node to correctly receive messages from peers while is performing the task in run()
?
from multiprocessing import Process, Queue
class Node(Process):
def __init__(self, node_id):
super(Node, self).__init__()
self.node_id = node_id
# Incoming messages (e.g., models) queue
self.message_queue = Queue()
def set_peers(self, peers):
self.peers = peers
def collect_messages(self):
messages = []
while not self.message_queue.empty():
msg = self.message_queue.get()
messages.append(msg)
return messages
def broadcast_message(self, msg):
for peer in self.peers:
peer.message_queue.put(msg)
Then I have my specialized Node class:
class SpecializedNode(Node):
def __init__(self, node_id, max_iter):
super(node_id)
self.max_iter = max_iter
def run(self):
for current_iter in range(self.max_iter):
#do stuff
msg = #build my message
self.broadcast_msg(msg)
incoming_messages = self.collect_messages()
# do stuff based on the received messages
if __name__ == "__main__":
n_nodes = 30
# Create the nodes
nodes = [SpecializedNode(i, 100) for i in range(n_nodes)
# Simulate a fully-connected network
for node in nodes:
node.set_peers(peers=[n for n in nodes if n.node_id != node.node_id])
# Start the nodes (i.e., processes)
for node in nodes:
node.start()
# Wait for completition
for node in nodes:
node.join()
I am developing a program to simulate a P2P network using the multiprocessing
package. Specifically, I have created a Node
class that inherits from multiprocessing.Process
and contains all the basic functionalities, while the SpecializedNode
class implements specific features.
SpecializedNode
performs an operation for max_iter
iterations and, at each iteration, sends the result to its peers (python broadcast_message(self, msg)
) and collects all received messages before executing the task again (collect_messages(self)
).
To handle inter-process communication, each Node
has a multiprocessing.Queue
where it receives messages from other peers.
Unfortunately, it seems that all processes get stuck when calling collect_messages(self)
, without raising any errors or exceptions. Since this is my first time using the multiprocessing
package, I wonder if I am using it correctly.
Moreover, does this implementation allow the node to correctly receive messages from peers while is performing the task in run()
?
from multiprocessing import Process, Queue
class Node(Process):
def __init__(self, node_id):
super(Node, self).__init__()
self.node_id = node_id
# Incoming messages (e.g., models) queue
self.message_queue = Queue()
def set_peers(self, peers):
self.peers = peers
def collect_messages(self):
messages = []
while not self.message_queue.empty():
msg = self.message_queue.get()
messages.append(msg)
return messages
def broadcast_message(self, msg):
for peer in self.peers:
peer.message_queue.put(msg)
Then I have my specialized Node class:
class SpecializedNode(Node):
def __init__(self, node_id, max_iter):
super(node_id)
self.max_iter = max_iter
def run(self):
for current_iter in range(self.max_iter):
#do stuff
msg = #build my message
self.broadcast_msg(msg)
incoming_messages = self.collect_messages()
# do stuff based on the received messages
if __name__ == "__main__":
n_nodes = 30
# Create the nodes
nodes = [SpecializedNode(i, 100) for i in range(n_nodes)
# Simulate a fully-connected network
for node in nodes:
node.set_peers(peers=[n for n in nodes if n.node_id != node.node_id])
# Start the nodes (i.e., processes)
for node in nodes:
node.start()
# Wait for completition
for node in nodes:
node.join()
Share
Improve this question
asked Mar 21 at 11:53
Mattia CampanaMattia Campana
5292 gold badges6 silver badges15 bronze badges
3
|
1 Answer
Reset to default 1In addition to the issue with your code not being compilable, you have a major race condition: A node puts a message on each of the queues belonging to its peers and then proceeds to call collect_messages
wherein it loops getting peer responses from its own queue until the queue is empty. But not all of the other pool processes (or perhaps none of them) may have had a chance to put their responses to the broadcaster's queue by time collect_messages
is called and the empty
condition will occur prematurely. Also, if you read the docs you will see that in general the multiprocessing.Queue.empty
method does not return reliable results. Moreover, trying to join a process that has put messages on a multiprocessing.Queue
instance will hang until all of those messages have been retrieved with get. And your code, because of the race condition I mentioned, might also hang. From the docs:
Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
One way for your code to work is for each peer to know how many responses there should be (i.e. how many peers it has) and then just loop until one message has been retrieved for each peer.
Another problem will arise if your code creates it child processes using the spawn method (which is always the case on platforms such as Windows). In this case the instances of your class SpecializedNode
must be serializable using pickle
, which will not be the case if the class inherits from multiprocessing.Process
. So your code could never work under Windows. (By the way, when you post a question tagged with multiprocessing you are supposed to also tag the question with the platform you are running under since the answer usually depends on the platform.)
In general, I find very little reason to ever derive a class from Process
since it locks you in too much to a particular implementation. Here is my approach:
from multiprocessing import Process, Queue
N_NODES = 4 # For testing we don't need to large a number
class Node:
def __init__(self, node_id):
self.node_id = node_id
# Incoming messages (e.g., models) queue
self.message_queue = Queue()
def set_peers(self, peers):
self.peers = peers
def collect_messages(self):
# We expect N_NODES - 1 messages:
return [self.message_queue.get() for _ in range(N_NODES - 1)]
def broadcast_message(self, msg):
for peer in self.peers:
peer.message_queue.put(msg)
class SpecializedNode(Node):
def __init__(self, node_id, max_iter):
super().__init__(node_id)
self.max_iter = max_iter
def run(self):
for current_iter in range(self.max_iter):
msg = f'Message {current_iter} from node {self.node_id}'
self.broadcast_message(msg)
incoming_messages = self.collect_messages()
if self.node_id == 0:
print(incoming_messages)
if __name__ == "__main__":
n_iter = 3 # For testing we don't need 100
# Create the nodes
nodes = [SpecializedNode(i, n_iter) for i in range(N_NODES)]
# Simulate a fully-connected network
for node in nodes:
node.set_peers(peers=[n for n in nodes if n.node_id != node.node_id])
# Start the nodes (i.e., processes)
processes = [
Process(target=node.run)
for node in nodes
]
for p in processes:
p.start()
for p in processes:
p.join()
Prints:
['Message 0 from node 1', 'Message 0 from node 3', 'Message 0 from node 2']
['Message 1 from node 1', 'Message 1 from node 3', 'Message 1 from node 2']
['Message 2 from node 1', 'Message 2 from node 3', 'Message 2 from node 2']
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744356209a4570243.html
get
. – chepner Commented Mar 21 at 12:19queue.get
withqueue.get_nowait
– Ahmed AEK Commented Mar 21 at 12:53peers
does not work. The lists do not get sent to different processes. – quamrana Commented Mar 21 at 18:11