2011-09-27

Code review: What happens in Open-MPI's MPI_Iprobe ?

Code review: What happens in Open-MPI's MPI_Iprobe ?


Update 1

Subject: Re: [OMPI devel] Implementation of MPI_Iprobe
From: George Bosilca (bosilca_at_[hidden])
Date: 2011-09-27 15:34:05

Sebastien,

Your analysis is correct in case the checkpoint/restart approach maintained by ORNL is enabled. This is not the code path of the "normal" MPI processes, where the PML OB1 is used. In this generic case the function mca_pml_ob1_iprobe, defined in the file ompi/mca/pml/ob1/pml_ob1_iprobe.c is used.

george.


http://www.open-mpi.org/community/lists/devel/2011/09/9766.php

End of update 1


The message-passing interface (MPI) standard defines an interface for passing messages between processes.

These processes are not necessarily running on the same physical computer.


Open-MPI is an implementation of the MPI standard.

Here, I have utilised openmpi-1.4.3 to find out what is happening when a call to MPI_Iprobe occurs.


According to the MPI 2.2 standard


"The MPI_PROBE and MPI_IPROBE operations allow incoming messages to be checked for, without actually receiving them."


The prototype of MPI_Iprobe is

int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status);


In ompi/mpi/c/iprobe.c, MPI_Iprobe is embodied.


The code (some lines omitted):

ompi/mpi/c/iprobe.c

/*
 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2005 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2008 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2005 The Regents of the University of California.
 *                         All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

int MPI_Iprobe(int source, int tag, MPI_Comm comm, int *flag, MPI_Status *status)
{
    int rc;

    MEMCHECKER(
        memchecker_comm(comm);
    );

    if ( MPI_PARAM_CHECK ) {
        rc = MPI_SUCCESS;
        OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
        if (((tag < 0) && (tag != MPI_ANY_TAG)) || (tag > mca_pml.pml_max_tag)) {
            rc = MPI_ERR_TAG;
        } else if (ompi_comm_invalid(comm)) {
            rc = MPI_ERR_COMM;
        } else if ((source != MPI_ANY_SOURCE) &&
                   (MPI_PROC_NULL != source) &&
                   ompi_comm_peer_invalid(comm, source)) {
            rc = MPI_ERR_RANK;
        }
        OMPI_ERRHANDLER_CHECK(rc, comm, rc, FUNC_NAME);
    }

    if (MPI_PROC_NULL == source) {
        if (MPI_STATUS_IGNORE != status) {
            *status = ompi_request_empty.req_status;
            /*
             * Per MPI-1, the MPI_ERROR field is not defined for single-completion calls
             */
            MEMCHECKER(
                opal_memchecker_base_mem_undefined(&status->MPI_ERROR, sizeof(int));
            );
        }
        return MPI_SUCCESS;
    }

    OPAL_CR_ENTER_LIBRARY();

    rc = MCA_PML_CALL(iprobe(source, tag, comm, flag, status));

    /*
     * Per MPI-1, the MPI_ERROR field is not defined for single-completion calls
     */
    MEMCHECKER(
        opal_memchecker_base_mem_undefined(&status->MPI_ERROR, sizeof(int));
    );

    OMPI_ERRHANDLER_RETURN(rc, comm, rc, FUNC_NAME);
}



In this function, the following line is the one we want to follow.

rc = MCA_PML_CALL(iprobe(source, tag, comm, flag, status));


The rest is mostly just parameter validation, I guess.


MCA_PML_CALL is probably a macro. Let's search it.


In ompi/mca/pml/pml.h, MCA_PML_CALL is defined. Here is the code (some lines omitted):

ompi/mca/pml/pml.h

/*
 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2005 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2006 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2006-2007 Los Alamos National Security, LLC.  All rights
 *                         reserved. 
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

#if MCA_pml_DIRECT_CALL

#include MCA_pml_DIRECT_CALL_HEADER

#define MCA_PML_CALL_EXPANDER(a, b) MCA_PML_CALL_STAMP(a,b)
#define MCA_PML_CALL(a) MCA_PML_CALL_EXPANDER(MCA_pml_DIRECT_CALL_COMPONENT, a)

#else
#define MCA_PML_CALL(a) mca_pml.pml_ ## a
#endif


With MCA_PML_CALL, iprobe becomes mca_pml.pml_iprobe and some additional code is added depending on the value of MCA_pml_DIRECT_CALL.




In ompi/mca/pml/pml.h, mca_pml_base_module_t is defined.

ompi/mca/pml/pml.h

/*
 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2005 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2006 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2006-2007 Los Alamos National Security, LLC.  All rights
 *                         reserved. 
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

struct mca_pml_base_module_1_0_0_t {

    /* downcalls from MCA to PML */
    mca_pml_base_module_add_procs_fn_t    pml_add_procs;
    mca_pml_base_module_del_procs_fn_t    pml_del_procs;
    mca_pml_base_module_enable_fn_t       pml_enable;
    mca_pml_base_module_progress_fn_t     pml_progress;

    /* downcalls from MPI to PML */
    mca_pml_base_module_add_comm_fn_t     pml_add_comm;
    mca_pml_base_module_del_comm_fn_t     pml_del_comm;
    mca_pml_base_module_irecv_init_fn_t   pml_irecv_init;
    mca_pml_base_module_irecv_fn_t        pml_irecv;
    mca_pml_base_module_recv_fn_t         pml_recv;
    mca_pml_base_module_isend_init_fn_t   pml_isend_init;
    mca_pml_base_module_isend_fn_t        pml_isend;
    mca_pml_base_module_send_fn_t         pml_send;
    mca_pml_base_module_iprobe_fn_t       pml_iprobe;
    mca_pml_base_module_probe_fn_t        pml_probe;
    mca_pml_base_module_start_fn_t        pml_start;

    /* diagnostics */
    mca_pml_base_module_dump_fn_t         pml_dump;

    /* FT Event */
    mca_pml_base_module_ft_event_fn_t     pml_ft_event;

    /* maximum constant sizes */
    int                                   pml_max_contextid;
    int                                   pml_max_tag;
};
typedef struct mca_pml_base_module_1_0_0_t mca_pml_base_module_1_0_0_t;
typedef mca_pml_base_module_1_0_0_t mca_pml_base_module_t;




In mca_pml_base_module_t, there is the attribute pml_iprobe. This type of this attribute is mca_pml_base_module_iprobe_fn_t.


This type is defined also in ompi/mca/pml/pml.h and is basically a function pointer (right?).

ompi/mca/pml/pml.h

/*
 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2005 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, 
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2006 The Regents of the University of California.
 *                         All rights reserved.
 * Copyright (c) 2006-2007 Los Alamos National Security, LLC.  All rights
 *                         reserved. 
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

/**
 * Probe to poll for pending recv.
 *
 * @param src (IN)        Source rank w/in communicator.
 * @param tag (IN)        User defined tag.
 * @param comm (IN)       Communicator.
 * @param matched (OUT)   Flag indicating if matching recv exists.
 * @param status (OUT)    Completion statuses.
 * @return                OMPI_SUCCESS or failure status.
 *
 */
typedef int (*mca_pml_base_module_iprobe_fn_t)(
    int src,
    int tag,
    struct ompi_communicator_t* comm,
    int *matched,
    ompi_status_public_t *status
);




I believe that this scheme allows to have several implementations of pml_iprobe.



Let us look for these now.


In ompi/mca/pml/crcpw/pml_crcpw_module.c, there is this code (some lines omitted):

ompi/mca/pml/crcpw/pml_crcpw_module.c

/*
 * Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
 *                         University Research and Technology
 *                         Corporation.  All rights reserved.
 * Copyright (c) 2004-2006 The University of Tennessee and The University
 *                         of Tennessee Research Foundation.  All rights
 *                         reserved.
 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
 *                         University of Stuttgart.  All rights reserved.
 * Copyright (c) 2004-2006 The Regents of the University of California.
 *                         All rights reserved.
 * $COPYRIGHT$
 *
 * Additional copyrights may follow
 *
 * $HEADER$
 */

int mca_pml_crcpw_iprobe(int dst, int tag, struct ompi_communicator_t* comm, int *matched, ompi_status_public_t* status )
{
    int ret;
    ompi_crcp_base_pml_state_t * pml_state;

    PML_CRCP_STATE_ALLOC(pml_state, ret);

    pml_state->wrapped_pml_component = &(mca_pml_crcpw_module.wrapped_pml_component);
    pml_state->wrapped_pml_module    = &(mca_pml_crcpw_module.wrapped_pml_module);

    pml_state->state = OMPI_CRCP_PML_PRE;
    pml_state = ompi_crcp.pml_iprobe(dst, tag, comm, matched, status, pml_state);
    if( OMPI_SUCCESS != pml_state->error_code) {
        ret =  pml_state->error_code;
        PML_CRCP_STATE_RETURN(pml_state);
        return ret;
    }

    if( OMPI_CRCP_PML_DONE == pml_state->state) {
        goto CLEANUP;
    }

    if( OMPI_CRCP_PML_SKIP != pml_state->state) {
        if( OMPI_SUCCESS != (ret = mca_pml_crcpw_module.wrapped_pml_module.pml_iprobe(dst, tag, comm, matched, status) ) ) {
            PML_CRCP_STATE_RETURN(pml_state);
            return ret;
        }
    }

    pml_state->state = OMPI_CRCP_PML_POST;
    pml_state = ompi_crcp.pml_iprobe(dst, tag, comm, matched, status, pml_state);
    if( OMPI_SUCCESS != pml_state->error_code) {
        ret =  pml_state->error_code;
        PML_CRCP_STATE_RETURN(pml_state);
        return ret;
    }

 CLEANUP:
    PML_CRCP_STATE_RETURN(pml_state);

    return OMPI_SUCCESS;
}



This code however seems to rely heavily on ompi_crcp.pml_iprobe.



In ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c (some lines omitted):

ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c

/*
 * Copyright (c) 2004-2008 The Trustees of Indiana University.
 *                         All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */
/**************** Probe *****************/
/* JJH - Code reuse: Combine iprobe and probe logic */
ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_iprobe(
                                  int dst, int tag,
                                  struct ompi_communicator_t* comm,
                                  int *matched,
                                  ompi_status_public_t* status,
                                  ompi_crcp_base_pml_state_t* pml_state )
{
    ompi_crcp_bkmrk_pml_drain_message_ref_t   *drain_msg_ref = NULL;
    ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
    int exit_status = OMPI_SUCCESS;
    int ret;

    OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
                        "crcp:bkmrk: pml_iprobe(%d, %d)", dst, tag));

    /*
     * Before PML Call
     * - Determine if this can be satisfied from the drained list
     * - Otherwise let the PML handle it
     */
    if( OMPI_CRCP_PML_PRE == pml_state->state) {
        /*
         * Check to see if this message is in the drained message list
         */
        if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
                                                          comm, PROBE_ANY_SIZE,
                                                          &drain_msg_ref,
                                                          &content_ref,
                                                          NULL) ) ) {
            ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_iprobe(): Failed trying to find a drained message.");
            exit_status = ret;
            goto DONE;
        }

        /*
         * If the message is a drained message
         *  - Copy of the status structure to pass back to the user
         *  - Mark the 'matched' flag as true
         */
        if( NULL != drain_msg_ref ) {
            OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
                                 "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));

            /* Copy the status information */
            if( MPI_STATUS_IGNORE != status ) {
                memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
            }

            /* Mark as complete */
            *matched = 1;

            /* This will identify to the wrapper that this message is complete */
            pml_state->state = OMPI_CRCP_PML_DONE;
            pml_state->error_code = OMPI_SUCCESS;
            return pml_state;
        }
        /*
         * Otherwise the message is not drained (common case), so let the PML deal with it
         */
        else {
            /* Mark as not complete */
            *matched = 0;
        }
    }

 DONE:
    pml_state->error_code = exit_status;
    return pml_state;
}




How I understand it, ompi_crcp_bkmrk_pml_iprobe first check if any message in the drained list satisfies what the calling code wants with
a call to drain_message_find_any.

If this is not the case, the code "let the PML deal with it", which probably means that that this case would be dealt with somewhere in mca_pml_crcpw_iprobe although I am not sure.


drain_message_find_any is defined in ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c:

ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c

/*
 * Copyright (c) 2004-2008 The Trustees of Indiana University.
 *                         All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

static int drain_message_find_any(size_t count, int tag, int peer,
                                  struct ompi_communicator_t* comm, size_t ddt_size,
                                  ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
                                  ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
                                  ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
{
    ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
    opal_list_item_t* item = NULL;

    *found_msg_ref = NULL;

    for(item  = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
        item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
        item  = opal_list_get_next(item) ) {
        cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;

        /*
         * If we ware not MPI_ANY_SOURCE, then extract the process name from the
         * communicator, and search only the peer that matches.
         */
        if( MPI_ANY_SOURCE != peer && peer >= 0) {
            /* Check to see if peer could possibly be in this communicator */
            if( comm->c_local_group->grp_proc_count <= peer ) {
                continue;
            }

            if( OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_ALL,
                                                            &(cur_peer_ref->proc_name),
                                                            &(comm->c_local_group->grp_proc_pointers[peer]->proc_name)) ) {
                continue;
            }
        }

        drain_message_find(&(cur_peer_ref->drained_list),
                           count, tag, peer,
                           comm->c_contextid, ddt_size,
                           found_msg_ref,
                           content_ref);
        if( NULL != *found_msg_ref) {
            if( NULL != peer_ref ) {
                *peer_ref = cur_peer_ref;
            }
            return OMPI_SUCCESS;
        }
    }

    return OMPI_SUCCESS;
}



If peer is MPI_ANY_SOURCE, then drain_message_find_any will fetch a message from any source.
Otherwise, it will look only for messages from the desired peer.


ompi_crcp_bkmrk_pml_peer_refs is populated in ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c (some lines omitted):

ompi/mca/crcp/bkmrk/crcp_bkmrk_pml.c

/*
 * Copyright (c) 2004-2008 The Trustees of Indiana University.
 *                         All rights reserved.
 * $COPYRIGHT$
 * 
 * Additional copyrights may follow
 * 
 * $HEADER$
 */

/**************** Processes *****************/
ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_procs(
                                   struct ompi_proc_t **procs,
                                   size_t nprocs,
                                   ompi_crcp_base_pml_state_t* pml_state )
{
    int ret;
    ompi_crcp_bkmrk_pml_peer_ref_t *new_peer_ref;
    size_t i;

    if( OMPI_CRCP_PML_PRE != pml_state->state ){
        goto DONE;
    }

    OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
                        "crcp:bkmrk: pml_add_procs()"));

    /*
     * Save pointers to the wrapped PML
     */
    wrapped_pml_component = pml_state->wrapped_pml_component;
    wrapped_pml_module    = pml_state->wrapped_pml_module;

    /*
     * Create a peer_ref for each peer added
     */
    for( i = 0; i < nprocs; ++i) {
        HOKE_PEER_REF_ALLOC(new_peer_ref, ret);

        new_peer_ref->proc_name.jobid  = procs[i]->proc_name.jobid;
        new_peer_ref->proc_name.vpid   = procs[i]->proc_name.vpid;

        opal_list_append(&ompi_crcp_bkmrk_pml_peer_refs, &(new_peer_ref->super));
    }

 DONE:
    pml_state->error_code = OMPI_SUCCESS;
    return pml_state;
}


Presumably, nprocs is the number of MPI ranks in MPI_COMM_WORLD because ompi_crcp_bkmrk_pml_add_procs has
no argument struct ompi_communicator_t* comm.



Let's say that the peer is 255 and that the number of MPI ranks in MPI_COMM_WORLD is 256. In this case,
drain_message_find_any will need to loop over all the peers before calling drain_message_find for the peer 255 !

That is slow but it works, right ?

With MPI_ANY_SOURCE, I suspect message starvation can occur because there is no round-robin for the starting point of the loop -- the code always starts with the first peer.

Accordingly, if all peers send messages at the same rate, peers with lower ranks will see their messages received first.

This leads to starvation of peers with higher ranks.

No comments: