python - Deadlock in Multiprocessing Queue - Stack Overflow

I am developing a program to simulate a P2P network using the multiprocessing package. Specifically, I

I am developing a program to simulate a P2P network using the multiprocessing package. Specifically, I have created a Node class that inherits from multiprocessing.Process and contains all the basic functionalities, while the SpecializedNode class implements specific features.

SpecializedNode performs an operation for max_iter iterations and, at each iteration, sends the result to its peers (python broadcast_message(self, msg)) and collects all received messages before executing the task again (collect_messages(self)).

To handle inter-process communication, each Node has a multiprocessing.Queue where it receives messages from other peers.

Unfortunately, it seems that all processes get stuck when calling collect_messages(self), without raising any errors or exceptions. Since this is my first time using the multiprocessing package, I wonder if I am using it correctly.

Moreover, does this implementation allow the node to correctly receive messages from peers while is performing the task in run()?


from multiprocessing import Process, Queue

class Node(Process):

    def __init__(self, node_id):

        super(Node, self).__init__()

        self.node_id = node_id

        # Incoming messages (e.g., models) queue
        self.message_queue = Queue()

    def set_peers(self, peers):
        self.peers = peers

    def collect_messages(self):
        messages = []

        while not self.message_queue.empty():
            msg = self.message_queue.get()
            messages.append(msg)

        return messages

    def broadcast_message(self, msg):
        for peer in self.peers:
            peer.message_queue.put(msg)

Then I have my specialized Node class:


class SpecializedNode(Node):

    def __init__(self, node_id, max_iter):
        super(node_id)

        self.max_iter = max_iter


    def run(self):

        for current_iter in range(self.max_iter):
            #do stuff
        
            msg = #build my message

            self.broadcast_msg(msg)

            incoming_messages = self.collect_messages()

            # do stuff based on the received messages

if __name__ == "__main__":
    n_nodes = 30

    # Create the nodes
    nodes = [SpecializedNode(i, 100) for i in range(n_nodes)

    # Simulate a fully-connected network
    for node in nodes:
        node.set_peers(peers=[n for n in nodes if n.node_id != node.node_id])

    # Start the nodes (i.e., processes)
    for node in nodes:
        node.start()

    # Wait for completition
    for node in nodes:
        node.join()

I am developing a program to simulate a P2P network using the multiprocessing package. Specifically, I have created a Node class that inherits from multiprocessing.Process and contains all the basic functionalities, while the SpecializedNode class implements specific features.

SpecializedNode performs an operation for max_iter iterations and, at each iteration, sends the result to its peers (python broadcast_message(self, msg)) and collects all received messages before executing the task again (collect_messages(self)).

To handle inter-process communication, each Node has a multiprocessing.Queue where it receives messages from other peers.

Unfortunately, it seems that all processes get stuck when calling collect_messages(self), without raising any errors or exceptions. Since this is my first time using the multiprocessing package, I wonder if I am using it correctly.

Moreover, does this implementation allow the node to correctly receive messages from peers while is performing the task in run()?


from multiprocessing import Process, Queue

class Node(Process):

    def __init__(self, node_id):

        super(Node, self).__init__()

        self.node_id = node_id

        # Incoming messages (e.g., models) queue
        self.message_queue = Queue()

    def set_peers(self, peers):
        self.peers = peers

    def collect_messages(self):
        messages = []

        while not self.message_queue.empty():
            msg = self.message_queue.get()
            messages.append(msg)

        return messages

    def broadcast_message(self, msg):
        for peer in self.peers:
            peer.message_queue.put(msg)

Then I have my specialized Node class:


class SpecializedNode(Node):

    def __init__(self, node_id, max_iter):
        super(node_id)

        self.max_iter = max_iter


    def run(self):

        for current_iter in range(self.max_iter):
            #do stuff
        
            msg = #build my message

            self.broadcast_msg(msg)

            incoming_messages = self.collect_messages()

            # do stuff based on the received messages

if __name__ == "__main__":
    n_nodes = 30

    # Create the nodes
    nodes = [SpecializedNode(i, 100) for i in range(n_nodes)

    # Simulate a fully-connected network
    for node in nodes:
        node.set_peers(peers=[n for n in nodes if n.node_id != node.node_id])

    # Start the nodes (i.e., processes)
    for node in nodes:
        node.start()

    # Wait for completition
    for node in nodes:
        node.join()
Share Improve this question asked Mar 21 at 11:53 Mattia CampanaMattia Campana 5292 gold badges6 silver badges15 bronze badges 3
  • 2 You need to implement your own locking scheme to ensure that, if a process sees a queue is non-empty, it's still non-empty when it calls get. – chepner Commented Mar 21 at 12:19
  • replace queue.get with queue.get_nowait – Ahmed AEK Commented Mar 21 at 12:53
  • There are too many things wrong here: 1. Your code does not compile. 2. The program will not do what you want since none of the instances wait around for others to send their messages. 3. Your idea of peers does not work. The lists do not get sent to different processes. – quamrana Commented Mar 21 at 18:11
Add a comment  | 

1 Answer 1

Reset to default 1

In addition to the issue with your code not being compilable, you have a major race condition: A node puts a message on each of the queues belonging to its peers and then proceeds to call collect_messages wherein it loops getting peer responses from its own queue until the queue is empty. But not all of the other pool processes (or perhaps none of them) may have had a chance to put their responses to the broadcaster's queue by time collect_messages is called and the empty condition will occur prematurely. Also, if you read the docs you will see that in general the multiprocessing.Queue.empty method does not return reliable results. Moreover, trying to join a process that has put messages on a multiprocessing.Queue instance will hang until all of those messages have been retrieved with get. And your code, because of the race condition I mentioned, might also hang. From the docs:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

Note that a queue created using a manager does not have this issue. See Programming guidelines.

One way for your code to work is for each peer to know how many responses there should be (i.e. how many peers it has) and then just loop until one message has been retrieved for each peer.

Another problem will arise if your code creates it child processes using the spawn method (which is always the case on platforms such as Windows). In this case the instances of your class SpecializedNode must be serializable using pickle, which will not be the case if the class inherits from multiprocessing.Process. So your code could never work under Windows. (By the way, when you post a question tagged with multiprocessing you are supposed to also tag the question with the platform you are running under since the answer usually depends on the platform.)

In general, I find very little reason to ever derive a class from Process since it locks you in too much to a particular implementation. Here is my approach:

from multiprocessing import Process, Queue

N_NODES = 4  # For testing we don't need to large a number

class Node:

    def __init__(self, node_id):
        self.node_id = node_id

        # Incoming messages (e.g., models) queue
        self.message_queue = Queue()

    def set_peers(self, peers):
        self.peers = peers

    def collect_messages(self):
        # We expect N_NODES - 1 messages:
        return [self.message_queue.get() for _ in range(N_NODES - 1)]

    def broadcast_message(self, msg):
        for peer in self.peers:
            peer.message_queue.put(msg)

class SpecializedNode(Node):

    def __init__(self, node_id, max_iter):
        super().__init__(node_id)

        self.max_iter = max_iter


    def run(self):

        for current_iter in range(self.max_iter):

            msg = f'Message {current_iter} from node {self.node_id}'

            self.broadcast_message(msg)

            incoming_messages = self.collect_messages()
            if self.node_id == 0:
                print(incoming_messages)


if __name__ == "__main__":
    n_iter = 3   # For testing we don't need 100

    # Create the nodes
    nodes = [SpecializedNode(i, n_iter) for i in range(N_NODES)]

    # Simulate a fully-connected network
    for node in nodes:
        node.set_peers(peers=[n for n in nodes if n.node_id != node.node_id])

    # Start the nodes (i.e., processes)
    processes = [
        Process(target=node.run)
        for node in nodes
    ]

    for p in processes:
        p.start()

    for p in processes:
        p.join()

Prints:

['Message 0 from node 1', 'Message 0 from node 3', 'Message 0 from node 2']
['Message 1 from node 1', 'Message 1 from node 3', 'Message 1 from node 2']
['Message 2 from node 1', 'Message 2 from node 3', 'Message 2 from node 2']

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744356209a4570243.html

相关推荐

  • python - Deadlock in Multiprocessing Queue - Stack Overflow

    I am developing a program to simulate a P2P network using the multiprocessing package. Specifically, I

    7天前
    40

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信