More on virtual communication with the message-passing interface.

The message-passing interface (MPI) is a standard that allows numerous computers to communicate in order to achieve a large-scale peer-to-peer communication in a high-performance matter.

OK, let's face it, writing computer programs with MPI can be hard and tedious. However, one can devise a set of techniques that collectively enhance his/her software craftmanship.

I write C++ (1998) using MPI -- you already know that if you read my blog. In ray, I implemented a few tricks to make the message-passing thing a lot easier.

Slave modes and master modes

First, each MPI rank (a rank is usually mapped to a process, if that matters) has a slave mode and a master mode. A slave mode can be called SLAVE_MODE_COUNT_ENTRIES, which would obviously count entries. Master modes follow the same principle. If an MPI rank is in a state such that it must wait for others, than there is this very special slave mode called SLAVE_MODE_DO_NOTHING.




Each of these modes has an associated callback method. SLAVE_MODE_DO_NOTHING's callback method is called

void Machine::call_SLAVE_MODE_DO_NOTHING();



Its implementation is quite simple, actually.

void Machine::call_SLAVE_MODE_DO_NOTHING(){
    /* do nothing */
}
OK, that was funny.

Given a slave mode, I want to call its associated callback method. To do so, I use an array of method pointers. Simple enough.

Keep it simple, stupid

But wait, for each slave mode (the same is true for master modes), I have to

1. Add it in a enumeration so it can be used later;
2. Define its string (read char*) representation for debugging purposes.;
3. Define its callback method prototype (in .h);
4. Define its callback method definition (in .cpp);
5. Add its callback method pointer in the array of method pointers.


Accordingly, to add a new slave mode, I would need to add things at at least 5 places. To avoid that, I use macros.

An example follows.

core/slave_mode_macros.h

MACRO_LIST_ITEM( RAY_SLAVE_MODE_LOAD_SEQUENCES )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_START_SEEDING )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_DO_NOTHING )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_SEND_EXTENSION_DATA )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_ASSEMBLE_WAVES )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_FUSION )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_FINISH_FUSIONS )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_DISTRIBUTE_FUSIONS )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_AMOS )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_AUTOMATIC_DISTANCE_DETECTION )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_SEND_LIBRARY_DISTANCES )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_EXTRACT_VERTICES )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_SEND_DISTRIBUTION )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_EXTENSION )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_INDEX_SEQUENCES )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_REDUCE_MEMORY_CONSUMPTION )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_DELETE_VERTICES )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_SEND_SEED_LENGTHS )
MACRO_LIST_ITEM( RAY_SLAVE_MODE_SCAFFOLDER )


1. Add it in a enumeration so it can be used later

core/slave_modes.h

#define MACRO_LIST_ITEM(element) element,

enum{
#include "core/slave_mode_macros.h"
SLAVE_MODES_H_DUMMY
};

#undef MACRO_LIST_ITEM


2. Define its string (read char*) representation for debugging purposes.

core/slave_modes.cpp

#define MACRO_LIST_ITEM(x) #x,

const char* SLAVE_MODES[]={
#include "core/slave_mode_macros.h"
};

#undef MACRO_LIST_ITEM


3. Define its callback method prototype (in .h)

core/Machine.h

#define MACRO_LIST_ITEM(x) void call_ ## x();
#include "core/slave_mode_macros.h"
#undef MACRO_LIST_ITEM


5. Add its callback method pointer in the array of method pointers

core/Machine.cpp

#define MACRO_LIST_ITEM(x) m_slave_methods[x]=&Machine::call_ ## x ;
#include "core/slave_mode_macros.h"
#undef MACRO_LIST_ITEM

Note that the definition of a callback is not done with these macros because it can not be automated.

The same principle is applied to master modes and to MPI message types. A callback method for an MPI message type receives a message and processes it.


Wary workers

Let us say that I want to compute the following function.

int sum(){
     int sum=0;
     int i=1000;
     while(i--){
          sum+=i;
     }
     return sum;
}


Now, let's do it the worker's way.

The methods of a worker usually are:


void Worker::work();
bool Worker::isDone();
Result Worker::getWorkResult();

In my case:

class Worker{
    int m_i;
    int m_sum;
    bool m_done;
public:
    void constructor(){
        m_i=10000;
        m_sum=0;
        m_done=false;
    }
    void work{
        if(m_done){
            return;
        }
        m_sum+=m_i;
        m_i--;
        if(m_i==0){
            m_done=true;
        }
    }
    bool isDone(){
        return m_done;
    }
    int getWorkResult(){
        return m_sum;
    }
};



I can now use this worker to compute function sum:

int sumWithWorker(){
    Worker worker;
    worker.constructor();
    while(!worker.isDone()){
        worker.work();
    }
    return worker.getWorkResult();
}

This allows the computation to be sliced in parts. This is handy for programming an MPI application because it produces naturally fine granularity. By the way, fine granularity is necessary to obtain low-latency message passing. Otherwise, an MPI rank may have to wait several minutes before receiving its response -- which is perhaps not very acceptable.


Proper message passing and virtual communication


Now, the most time-consuming part of programming with MPI: message transiting -- from one end, we send a message to a destination rank. On the other end, we receive the message. It can be tedious to do this message-passing recipe and obviously you don't want to use a spinlock while waiting for a message's response. Instead, you want to be able to receive messages from other MPI ranks.

In a previous post, I described the virtual communicator. Here, I use the concept again.

Given a piece of MPI code, the 3 basic necessary operations from a worker's point of view are

- void VirtualCommunicator::pushMessage(uint64_t workerId,Message*message);
- bool VirtualCommunicator::isMessageProcessed(uint64_t workerId);
- vector VirtualCommunicator::getMessageResponseElements(uint64_t workerId);


Typical workflow:


The worker identified by workerId pushes a message to the virtual communicator.

The worker identified by workerId asks the virtual communicator if its message has been processed.

If so, the worker identifier by workerId asks its message response to the virtual communicator.

Now, we don't want to send numerous skinny messages (one for each call to pushMessage). Instead, automatic message aggregation is highly desired.
In ray, the VirtualCommunicator does just that right away, for you.

To achieve automatic message aggregation, a pool of workers is needed. Each of these workers are independent, but their messages will be grouped without them knowing it.

This method alone yields a significant performance gain.

What follows is an gain example.

Rank 14 VirtualCommunicator Statistics
Rank 14: 60466 virtual messages for 20854019 pushed messages
Rank 14: percentage= 0.289949%

Out of the 20854019 pushed messages, only 60466 virtual messages were sent.


Berserk producers overflow ring buffers

Note that a worker never goes berserk -- it never sends numerous messages without first getting a response for its first pushed message. Thus, no problem of consumer and producer can occur.


gg

sebhtml

Comments

Popular posts from this blog

A survey of the burgeoning industry of cloud genomics

Generating neural machine instructions for multi-head attention

Adding ZVOL VIRTIO disks to a guest running on a host with the FreeBSD BHYVE hypervisor