ScaLAPACK  2.0.2
ScaLAPACK: Scalable Linear Algebra PACKage
psswap_.c
Go to the documentation of this file.
00001 /* ---------------------------------------------------------------------
00002 *
00003 *  -- PBLAS routine (version 2.0) --
00004 *     University of Tennessee, Knoxville, Oak Ridge National Laboratory,
00005 *     and University of California, Berkeley.
00006 *     April 1, 1998
00007 *
00008 *  ---------------------------------------------------------------------
00009 */
00010 /*
00011 *  Include files
00012 */
00013 #include "pblas.h"
00014 #include "PBpblas.h"
00015 #include "PBtools.h"
00016 #include "PBblacs.h"
00017 #include "PBblas.h"
00018 
00019 #ifdef __STDC__
00020 void psswap_( int * N,
00021               float * X, int * IX, int * JX, int * DESCX, int * INCX,
00022               float * Y, int * IY, int * JY, int * DESCY, int * INCY )
00023 #else
00024 void psswap_( N, X, IX, JX, DESCX, INCX, Y, IY, JY, DESCY, INCY )
00025 /*
00026 *  .. Scalar Arguments ..
00027 */
00028    int            * INCX, * INCY, * IX, * IY, * JX, * JY, * N;
00029 /*
00030 *  .. Array Arguments ..
00031 */
00032    int            * DESCX, * DESCY;
00033    float          * X, * Y;
00034 #endif
00035 {
00036 /*
00037 *  Purpose
00038 *  =======
00039 *
00040 *  PSSWAP  swaps two subvectors,
00041 *
00042 *     sub( Y ) := sub( X ) and sub( X ) := sub( Y )
00043 *
00044 *  where sub( X ) denotes X(IX,JX:JX+N-1) if INCX = M_X,
00045 *                         X(IX:IX+N-1,JX) if INCX = 1 and INCX <> M_X,
00046 *
00047 *        sub( Y ) denotes Y(IY,JY:JY+N-1) if INCY = M_Y,
00048 *                         Y(IY:IY+N-1,JY) if INCY = 1 and INCY <> M_Y.
00049 *
00050 *  Notes
00051 *  =====
00052 *
00053 *  A description  vector  is associated with each 2D block-cyclicly dis-
00054 *  tributed matrix.  This  vector  stores  the  information  required to
00055 *  establish the  mapping  between a  matrix entry and its corresponding
00056 *  process and memory location.
00057 *
00058 *  In  the  following  comments,   the character _  should  be  read  as
00059 *  "of  the  distributed  matrix".  Let  A  be a generic term for any 2D
00060 *  block cyclicly distributed matrix.  Its description vector is DESC_A:
00061 *
00062 *  NOTATION         STORED IN       EXPLANATION
00063 *  ---------------- --------------- ------------------------------------
00064 *  DTYPE_A (global) DESCA[ DTYPE_ ] The descriptor type.
00065 *  CTXT_A  (global) DESCA[ CTXT_  ] The BLACS context handle, indicating
00066 *                                   the NPROW x NPCOL BLACS process grid
00067 *                                   A  is  distributed over. The context
00068 *                                   itself  is  global,  but  the handle
00069 *                                   (the integer value) may vary.
00070 *  M_A     (global) DESCA[ M_     ] The  number of rows in the distribu-
00071 *                                   ted matrix A, M_A >= 0.
00072 *  N_A     (global) DESCA[ N_     ] The number of columns in the distri-
00073 *                                   buted matrix A, N_A >= 0.
00074 *  IMB_A   (global) DESCA[ IMB_   ] The number of rows of the upper left
00075 *                                   block of the matrix A, IMB_A > 0.
00076 *  INB_A   (global) DESCA[ INB_   ] The  number  of columns of the upper
00077 *                                   left   block   of   the  matrix   A,
00078 *                                   INB_A > 0.
00079 *  MB_A    (global) DESCA[ MB_    ] The blocking factor used to  distri-
00080 *                                   bute the last  M_A-IMB_A  rows of A,
00081 *                                   MB_A > 0.
00082 *  NB_A    (global) DESCA[ NB_    ] The blocking factor used to  distri-
00083 *                                   bute the last  N_A-INB_A  columns of
00084 *                                   A, NB_A > 0.
00085 *  RSRC_A  (global) DESCA[ RSRC_  ] The process row over which the first
00086 *                                   row of the matrix  A is distributed,
00087 *                                   NPROW > RSRC_A >= 0.
00088 *  CSRC_A  (global) DESCA[ CSRC_  ] The  process column  over  which the
00089 *                                   first column of  A  is  distributed.
00090 *                                   NPCOL > CSRC_A >= 0.
00091 *  LLD_A   (local)  DESCA[ LLD_   ] The  leading dimension  of the local
00092 *                                   array  storing  the  local blocks of
00093 *                                   the distributed matrix A,
00094 *                                   IF( Lc( 1, N_A ) > 0 )
00095 *                                      LLD_A >= MAX( 1, Lr( 1, M_A ) )
00096 *                                   ELSE
00097 *                                      LLD_A >= 1.
00098 *
00099 *  Let K be the number of  rows of a matrix A starting at the global in-
00100 *  dex IA,i.e, A( IA:IA+K-1, : ). Lr( IA, K ) denotes the number of rows
00101 *  that the process of row coordinate MYROW ( 0 <= MYROW < NPROW ) would
00102 *  receive if these K rows were distributed over NPROW processes.  If  K
00103 *  is the number of columns of a matrix  A  starting at the global index
00104 *  JA, i.e, A( :, JA:JA+K-1, : ), Lc( JA, K ) denotes the number  of co-
00105 *  lumns that the process MYCOL ( 0 <= MYCOL < NPCOL ) would  receive if
00106 *  these K columns were distributed over NPCOL processes.
00107 *
00108 *  The values of Lr() and Lc() may be determined via a call to the func-
00109 *  tion PB_Cnumroc:
00110 *  Lr( IA, K ) = PB_Cnumroc( K, IA, IMB_A, MB_A, MYROW, RSRC_A, NPROW )
00111 *  Lc( JA, K ) = PB_Cnumroc( K, JA, INB_A, NB_A, MYCOL, CSRC_A, NPCOL )
00112 *
00113 *  Arguments
00114 *  =========
00115 *
00116 *  N       (global input) INTEGER
00117 *          On entry,  N  specifies the  length of the  subvectors to  be
00118 *          swapped. N must be at least zero.
00119 *
00120 *  X       (local input/local output) REAL array
00121 *          On entry, X is an array of dimension (LLD_X, Kx), where LLD_X
00122 *          is   at  least  MAX( 1, Lr( 1, IX ) )  when  INCX = M_X   and
00123 *          MAX( 1, Lr( 1, IX+N-1 ) )  otherwise,  and,  Kx  is  at least
00124 *          Lc( 1, JX+N-1 )  when  INCX = M_X  and Lc( 1, JX ) otherwise.
00125 *          Before  entry,  this array  contains the local entries of the
00126 *          matrix X. On exit, sub( X ) is overwritten with sub( Y ).
00127 *
00128 *  IX      (global input) INTEGER
00129 *          On entry, IX  specifies X's global row index, which points to
00130 *          the beginning of the submatrix sub( X ).
00131 *
00132 *  JX      (global input) INTEGER
00133 *          On entry, JX  specifies X's global column index, which points
00134 *          to the beginning of the submatrix sub( X ).
00135 *
00136 *  DESCX   (global and local input) INTEGER array
00137 *          On entry, DESCX  is an integer array of dimension DLEN_. This
00138 *          is the array descriptor for the matrix X.
00139 *
00140 *  INCX    (global input) INTEGER
00141 *          On entry,  INCX   specifies  the  global  increment  for  the
00142 *          elements of  X.  Only two values of  INCX   are  supported in
00143 *          this version, namely 1 and M_X. INCX  must not be zero.
00144 *
00145 *  Y       (local input/local output) REAL array
00146 *          On entry, Y is an array of dimension (LLD_Y, Ky), where LLD_Y
00147 *          is   at  least  MAX( 1, Lr( 1, IY ) )  when  INCY = M_Y   and
00148 *          MAX( 1, Lr( 1, IY+N-1 ) )  otherwise,  and,  Ky  is  at least
00149 *          Lc( 1, JY+N-1 )  when  INCY = M_Y  and Lc( 1, JY ) otherwise.
00150 *          Before  entry,  this array  contains the local entries of the
00151 *          matrix Y. On exit, sub( Y ) is overwritten with sub( X ).
00152 *
00153 *  IY      (global input) INTEGER
00154 *          On entry, IY  specifies Y's global row index, which points to
00155 *          the beginning of the submatrix sub( Y ).
00156 *
00157 *  JY      (global input) INTEGER
00158 *          On entry, JY  specifies Y's global column index, which points
00159 *          to the beginning of the submatrix sub( Y ).
00160 *
00161 *  DESCY   (global and local input) INTEGER array
00162 *          On entry, DESCY  is an integer array of dimension DLEN_. This
00163 *          is the array descriptor for the matrix Y.
00164 *
00165 *  INCY    (global input) INTEGER
00166 *          On entry,  INCY   specifies  the  global  increment  for  the
00167 *          elements of  Y.  Only two values of  INCY   are  supported in
00168 *          this version, namely 1 and M_Y. INCY  must not be zero.
00169 *
00170 *  -- Written on April 1, 1998 by
00171 *     Antoine Petitet, University of Tennessee, Knoxville 37996, USA.
00172 *
00173 *  ---------------------------------------------------------------------
00174 */
00175 /*
00176 *  .. Local Scalars ..
00177 */
00178    char           Xscope, Yscope, * one, * top, tran, * zero;
00179    int            OneBlock, OneDgrid, RRorCC, Square, Xcol, Xi, XisD, XisR,
00180                   Xinb1D, XinbD, XisRow, Xii, Xj, Xjj, Xld, Xlinc, Xm, XmyprocD,
00181                   XmyprocR, Xn, XnbD, XnpD, XnprocsD, XnprocsR, XprocD, XprocR,
00182                   Xroc, Xrow, Ycol, Yi, Yii, Yinb1D, YinbD, YisD, YisR, YisRow,
00183                   Yj, Yjj, Yld, Ylinc, Ym, YmyprocD, YmyprocR, Yn, YnbD, YnpD,
00184                   YnprocsD, YnprocsR, YprocD, YprocR, Yroc, Yrow, cdst, csrc,
00185                   ctxt, dst, gcdPQ, info, ione=1, k, l, lcmPQ, lcmb, mycol,
00186                   myrow, npcol, npq, nprow, p, q, rdst, rsrc, src, size;
00187    PBTYP_T        * type;
00188    PB_VM_T        VM;
00189 /*
00190 *  .. Local Arrays ..
00191 */
00192    int            Xd[DLEN_], Yd[DLEN_];
00193    char           * buf = NULL;
00194 /* ..
00195 *  .. Executable Statements ..
00196 *
00197 */
00198    PB_CargFtoC( *IX, *JX, DESCX, &Xi, &Xj, Xd );
00199    PB_CargFtoC( *IY, *JY, DESCY, &Yi, &Yj, Yd );
00200 #ifndef NO_ARGCHK
00201 /*
00202 *  Test the input parameters
00203 */
00204    Cblacs_gridinfo( ( ctxt = Xd[CTXT_] ), &nprow, &npcol, &myrow, &mycol );
00205    if( !( info = ( ( nprow == -1 ) ? -( 501 + CTXT_ ) : 0 ) ) )
00206    {
00207       PB_Cchkvec( ctxt, "PSSWAP", "X", *N, 1, Xi, Xj, Xd, *INCX,  5, &info );
00208       PB_Cchkvec( ctxt, "PSSWAP", "Y", *N, 1, Yi, Yj, Yd, *INCY, 10, &info );
00209    }
00210    if( info ) { PB_Cabort( ctxt, "PSSWAP", info ); return; }
00211 #endif
00212 /*
00213 *  Quick return if possible
00214 */
00215    if( *N == 0 ) return;
00216 /*
00217 *  Retrieve process grid information
00218 */
00219 #ifdef NO_ARGCHK
00220    Cblacs_gridinfo( ( ctxt = Xd[CTXT_] ), &nprow, &npcol, &myrow, &mycol );
00221 #endif
00222 /*
00223 *  Determine if sub( X ) is distributed or not
00224 */
00225    if( ( XisRow = ( *INCX == Xd[M_] ) ) != 0 )
00226       XisD = ( ( Xd[CSRC_] >= 0 ) && ( ( XnprocsD = npcol ) > 1 ) );
00227    else
00228       XisD = ( ( Xd[RSRC_] >= 0 ) && ( ( XnprocsD = nprow ) > 1 ) );
00229 /*
00230 *  Determine if sub( Y ) is distributed or not
00231 */
00232    if( ( YisRow = ( *INCY == Yd[M_] ) ) != 0 )
00233       YisD = ( ( Yd[CSRC_] >= 0 ) && ( ( YnprocsD = npcol ) > 1 ) );
00234    else
00235       YisD = ( ( Yd[RSRC_] >= 0 ) && ( ( YnprocsD = nprow ) > 1 ) );
00236 /*
00237 *  Are sub( X ) and sub( Y ) both row or column vectors ?
00238 */
00239    RRorCC = ( ( XisRow && YisRow ) || ( !( XisRow ) && !( YisRow ) ) );
00240 /*
00241 *  XisD && YisD <=> both vector operands are indeed distributed
00242 */
00243    if( XisD && YisD )
00244    {
00245 /*
00246 *  Retrieve sub( X )'s local information: Xii, Xjj, Xrow, Xcol ...
00247 */
00248       PB_Cinfog2l( Xi, Xj, Xd, nprow, npcol, myrow, mycol, &Xii, &Xjj,
00249                    &Xrow, &Xcol );
00250       if( XisRow )
00251       {
00252          XinbD  = Xd[INB_ ]; XnbD     = Xd[NB_ ]; Xld      = Xd[LLD_];
00253          Xlinc  = Xld;
00254          XprocD = Xcol;      XmyprocD = mycol;
00255          XprocR = Xrow;      XmyprocR = myrow;    XnprocsR = nprow;
00256          XisR   = ( ( Xrow == -1 ) || ( XnprocsR == 1 ) );
00257          Mfirstnb( Xinb1D, *N, Xj, XinbD, XnbD );
00258       }
00259       else
00260       {
00261          XinbD  = Xd[IMB_ ]; XnbD     = Xd[MB_ ]; Xld      = Xd[LLD_];
00262          Xlinc  = 1;
00263          XprocD = Xrow;      XmyprocD = myrow;
00264          XprocR = Xcol;      XmyprocR = mycol;    XnprocsR = npcol;
00265          XisR   = ( ( Xcol == -1 ) || ( XnprocsR == 1 ) );
00266          Mfirstnb( Xinb1D, *N, Xi, XinbD, XnbD );
00267       }
00268 /*
00269 *  Retrieve sub( Y )'s local information: Yii, Yjj, Yrow, Ycol ...
00270 */
00271       PB_Cinfog2l( Yi, Yj, Yd, nprow, npcol, myrow, mycol, &Yii, &Yjj,
00272                    &Yrow, &Ycol );
00273       if( YisRow )
00274       {
00275          YinbD  = Yd[INB_ ]; YnbD     = Yd[NB_ ]; Yld      = Yd[LLD_];
00276          Ylinc  = Yld;
00277          YprocD = Ycol;      YmyprocD = mycol;
00278          YprocR = Yrow;      YmyprocR = myrow;    YnprocsR = nprow;
00279          YisR   = ( ( Yrow == -1 ) || ( YnprocsR == 1 ) );
00280          Mfirstnb( Yinb1D, *N, Yj, YinbD, YnbD );
00281       }
00282       else
00283       {
00284          YinbD  = Yd[IMB_ ]; YnbD     = Yd[MB_ ]; Yld      = Yd[LLD_];
00285          Ylinc  = 1;
00286          YprocD = Yrow;      YmyprocD = myrow;
00287          YprocR = Ycol;      YmyprocR = mycol;    YnprocsR = npcol;
00288          YisR   = ( ( Ycol == -1 ) || ( YnprocsR == 1 ) );
00289          Mfirstnb( Yinb1D, *N, Yi, YinbD, YnbD );
00290       }
00291 /*
00292 *  Do sub( X ) and sub( Y ) span more than one process ?
00293 */
00294       OneDgrid = ( ( XnprocsD ==  1 ) && ( YnprocsD ==  1 ) );
00295       OneBlock = ( ( Xinb1D   >= *N ) && ( Yinb1D   >= *N ) );
00296 /*
00297 *  Are sub( X ) and sub( Y ) distributed in the same manner ?
00298 */
00299       Square   = ( ( Xinb1D   ==   Yinb1D ) && ( XnbD == YnbD ) &&
00300                    ( XnprocsD == YnprocsD ) );
00301 
00302       if( !( XisR ) )
00303       {
00304 /*
00305 *  sub( X ) is distributed but not replicated
00306 */
00307          if( YisR )
00308          {
00309 /*
00310 *  If sub( X ) is not replicated, but sub( Y ) is, a process row or column
00311 *  YprocR need to be selected. It will contain the non-replicated vector to
00312 *  swap sub( X ) with.
00313 */
00314             if( RRorCC )
00315             {
00316 /*
00317 *  sub( X ) and sub( Y ) are both row or column vectors
00318 */
00319                if( ( OneDgrid || OneBlock || Square ) && ( XprocD == YprocD ) )
00320                {
00321 /*
00322 *  sub( X ) and sub( Y ) start in the same process row or column XprocD=YprocD.
00323 *  Enforce a purely local operation by choosing YprocR to be equal to XprocR.
00324 */
00325                   YprocR = XprocR;
00326                }
00327                else
00328                {
00329 /*
00330 *  Otherwise, communication has to occur, so choose the next process row or
00331 *  column for YprocR to maximize the number of links, i.e reduce contention.
00332 */
00333                   YprocR = MModAdd1( XprocR, XnprocsR );
00334                }
00335             }
00336             else
00337             {
00338 /*
00339 *  sub( X ) and sub( Y ) are distributed in orthogonal directions, what is
00340 *  chosen for YprocR does not really matter. Select the process origin.
00341 */
00342                YprocR = XprocD;
00343             }
00344          }
00345          else
00346          {
00347 /*
00348 *  Neither sub( X ) nor sub( Y ) are replicated. If I am not in process row or
00349 *  column XprocR and not in process row or column YprocR, then quick return.
00350 */
00351             if( ( XmyprocR != XprocR ) && ( YmyprocR != YprocR ) )
00352                return;
00353          }
00354       }
00355       else
00356       {
00357 /*
00358 *  sub( X ) is distributed and replicated (so no quick return possible)
00359 */
00360          if( YisR )
00361          {
00362 /*
00363 *  sub( Y ) is distributed and replicated as well
00364 */
00365             if( RRorCC )
00366             {
00367 /*
00368 *  sub( X ) and sub( Y ) are both row or column vectors
00369 */
00370                if( ( OneDgrid || OneBlock || Square ) && ( XprocD == YprocD ) )
00371                {
00372 /*
00373 *  sub( X ) and sub( Y ) start in the same process row or column XprocD=YprocD.
00374 *  Enforce a purely local operation by choosing XprocR and YprocR to be equal
00375 *  to zero.
00376 */
00377                   XprocR = YprocR = 0;
00378                }
00379                else
00380                {
00381 /*
00382 *  Otherwise, communication has to occur, so select YprocR to be zero and the
00383 *  next process row or column for XprocR in order to maximize the number of
00384 *  used links, i.e reduce contention.
00385 */
00386                   YprocR = 0;
00387                   XprocR = MModAdd1( YprocR, YnprocsR );
00388                }
00389             }
00390             else
00391             {
00392 /*
00393 *  sub( X ) and sub( Y ) are distributed in orthogonal directions, select the
00394 *  origin processes.
00395 */
00396                XprocR = YprocD;
00397                YprocR = XprocD;
00398             }
00399          }
00400          else
00401          {
00402 /*
00403 *  sub( Y ) is distributed, but not replicated
00404 */
00405             if( RRorCC )
00406             {
00407 /*
00408 *  sub( X ) and sub( Y ) are both row or column vectors
00409 */
00410                if( ( OneDgrid || OneBlock || Square ) && ( XprocD == YprocD ) )
00411                {
00412 /*
00413 *  sub( X ) and sub( Y ) start in the same process row or column XprocD=YprocD.
00414 *  Enforce a purely local operation by choosing XprocR to be equal to YprocR.
00415 */
00416                   XprocR = YprocR;
00417                }
00418                else
00419                {
00420 /*
00421 *  Otherwise, communication has to occur, so choose the next process row or
00422 *  column for XprocR to maximize the number of links, i.e reduce contention.
00423 */
00424                   XprocR = MModAdd1( YprocR, YnprocsR );
00425                }
00426             }
00427             else
00428             {
00429 /*
00430 *  sub( X ) and sub( Y ) are distributed in orthogonal directions, what is
00431 *  chosen for XprocR does not really matter. Select the origin process.
00432 */
00433                XprocR = YprocD;
00434             }
00435          }
00436       }
00437 /*
00438 *  Even if sub( X ) and/or sub( Y ) are replicated, only two process row or
00439 *  column are active, namely XprocR and YprocR. If any of those operands is
00440 *  replicated, broadcast will occur (unless there is an easy way out).
00441 */
00442       type = PB_Cstypeset(); size = type->size;
00443 /*
00444 *  A purely local operation occurs iff the operands start in the same process
00445 *  and, if either the grid is mono-dimensional or there is a single local block
00446 *  to be swapped or if both operands are aligned.
00447 */
00448       if( ( (    RRorCC   && ( XprocD == YprocD ) && ( XprocR == YprocR ) ) ||
00449             ( !( RRorCC ) && ( XprocD == YprocR ) && ( XprocR == YprocD ) ) ) &&
00450           ( OneDgrid || OneBlock || ( RRorCC && Square ) ) )
00451       {
00452          if( ( !XisR &&         ( XmyprocR == XprocR ) &&
00453                !YisR &&         ( YmyprocR == YprocR ) ) ||
00454              ( !XisR && YisR && ( YmyprocR == YprocR ) ) ||
00455              ( !YisR && XisR && ( XmyprocR == XprocR ) ) ||
00456              (  XisR && YisR                           ) )
00457          {
00458             XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD,
00459                                XnprocsD );
00460             YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD,
00461                                YnprocsD );
00462             if( ( XnpD > 0 ) && ( YnpD > 0 ) )
00463             {
00464                sswap_( &XnpD,
00465                        Mptr( ((char *) X), Xii, Xjj, Xld, size ), &Xlinc,
00466                        Mptr( ((char *) Y), Yii, Yjj, Yld, size ), &Ylinc );
00467             }
00468             if( RRorCC && XisR && YisR ) return;
00469          }
00470       }
00471       else if( ( RRorCC && OneDgrid ) || OneBlock || Square )
00472       {
00473 /*
00474 *  Otherwise, it may be possible to swap the distributed vectors in a single
00475 *  message exchange iff the grid is mono-dimensional and the operands are
00476 *  distributed in the same direction, or there is just one block to be exchanged
00477 *  or if both operands are similarly distributed in their respective direction.
00478 */
00479          if( RRorCC && ( XprocR != YprocR ) )
00480          {
00481 /*
00482 *  Both operands are distributed in the same direction, but reside in different
00483 *  process rows or columns.
00484 */
00485             if( XmyprocR == XprocR )
00486             {
00487                XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD,
00488                                   XnprocsD );
00489                if( XnpD > 0 )
00490                {
00491                   dst = YprocD + MModSub( XmyprocD, XprocD, XnprocsD );
00492                   dst = MPosMod( dst, YnprocsD );
00493                   if( XisRow )
00494                   {
00495                      Csgesd2d( ctxt, 1, XnpD, Mptr( ((char*) X), Xii, Xjj,
00496                                Xld, size ), Xld, YprocR, dst );
00497                      Csgerv2d( ctxt, 1, XnpD, Mptr( ((char*) X), Xii, Xjj,
00498                                Xld, size ), Xld, YprocR, dst );
00499                   }
00500                   else
00501                   {
00502 
00503                      Csgesd2d( ctxt, XnpD, 1, Mptr( ((char*) X), Xii, Xjj,
00504                                Xld, size ), Xld, dst, YprocR );
00505                      Csgerv2d( ctxt, XnpD, 1, Mptr( ((char*) X), Xii, Xjj,
00506                                Xld, size ), Xld, dst, YprocR );
00507                   }
00508                }
00509             }
00510             if( YmyprocR == YprocR )
00511             {
00512                YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD,
00513                                   YnprocsD );
00514                if( YnpD > 0 )
00515                {
00516                   dst = XprocD + MModSub( YmyprocD, YprocD, YnprocsD );
00517                   dst = MPosMod( dst, XnprocsD );
00518                   if( YisRow )
00519                   {
00520                      Csgesd2d( ctxt, 1, YnpD, Mptr( ((char*) Y), Yii, Yjj,
00521                                Yld, size ), Yld, XprocR, dst );
00522                      Csgerv2d( ctxt, 1, YnpD, Mptr( ((char*) Y), Yii, Yjj,
00523                                Yld, size ), Yld, XprocR, dst );
00524                   }
00525                   else
00526                   {
00527                      Csgesd2d( ctxt, YnpD, 1, Mptr( ((char*) Y), Yii, Yjj,
00528                                Yld, size ), Yld, dst, XprocR );
00529                      Csgerv2d( ctxt, YnpD, 1, Mptr( ((char*) Y), Yii, Yjj,
00530                                Yld, size ), Yld, dst, XprocR );
00531                   }
00532                }
00533             }
00534          }
00535          else
00536          {
00537 /*
00538 *  General case when just one message needs to be exchanged
00539 */
00540             if( XmyprocR == XprocR )
00541             {
00542 /*
00543 *  The processes owning a piece of sub( X ) send it to the corresponding
00544 *  process owning s piece of sub ( Y ).
00545 */
00546                XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD,
00547                                   XnprocsD );
00548                if( XnpD > 0 )
00549                {
00550                   dst = YprocD + MModSub( XmyprocD, XprocD, XnprocsD );
00551                   dst = MPosMod( dst, YnprocsD );
00552                   if( YisRow ) { rdst = YprocR; cdst = dst; }
00553                   else         { rdst = dst; cdst = YprocR; }
00554 
00555                   if( ( myrow == rdst ) && ( mycol == cdst ) )
00556                   {
00557                      sswap_( &XnpD,  Mptr( ((char *) X), Xii, Xjj, Xld,
00558                              size ), &Xlinc, Mptr( ((char *) Y), Yii, Yjj, Yld,
00559                              size ), &Ylinc );
00560                   }
00561                   else
00562                   {
00563                      if( XisRow )
00564                         Csgesd2d( ctxt, 1, XnpD, Mptr( ((char *) X), Xii,
00565                                   Xjj, Xld, size ), Xld, rdst, cdst );
00566                      else
00567                         Csgesd2d( ctxt, XnpD, 1, Mptr( ((char *) X), Xii,
00568                                   Xjj, Xld, size ), Xld, rdst, cdst );
00569                   }
00570                }
00571             }
00572             if( YmyprocR == YprocR )
00573             {
00574 /*
00575 *  The processes owning a piece of sub( Y ) receive the corresponding piece
00576 *  of sub( X ) and send the piece of sub( Y ) they own to the same process.
00577 */
00578                YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD,
00579                                   YnprocsD );
00580                if( YnpD > 0 )
00581                {
00582                   src = XprocD + MModSub( YmyprocD, YprocD, YnprocsD );
00583                   src = MPosMod( src, XnprocsD );
00584                   if( XisRow ) { rsrc = XprocR; csrc = src; }
00585                   else         { rsrc = src; csrc = XprocR; }
00586 
00587                   if( ( myrow != rsrc ) || ( mycol != csrc ) )
00588                   {
00589                      buf = PB_Cmalloc( YnpD * size );
00590                      if( XisRow )
00591                         Csgerv2d( ctxt, 1, YnpD, buf,    1, rsrc, csrc );
00592                      else
00593                         Csgerv2d( ctxt, YnpD, 1, buf, YnpD, rsrc, csrc );
00594                      if( YisRow )
00595                         Csgesd2d( ctxt, 1, YnpD, Mptr( ((char *) Y), Yii,
00596                                   Yjj, Yld, size ), Yld, rsrc, csrc );
00597                      else
00598                         Csgesd2d( ctxt, YnpD, 1, Mptr( ((char *) Y), Yii,
00599                                   Yjj, Yld, size ), Yld, rsrc, csrc );
00600                      scopy_( &YnpD, buf, &ione, Mptr( ((char *) Y), Yii,
00601                              Yjj, Yld, size ), &Ylinc );
00602                      if( buf ) free( buf );
00603                   }
00604                }
00605             }
00606             if( XmyprocR == XprocR )
00607             {
00608 /*
00609 *  The processes owning a piece of sub( X ) receive the corresponding piece
00610 *  of sub( Y ).
00611 */
00612                if( XnpD > 0 )
00613                {
00614                   if( ( myrow != rdst ) || ( mycol != cdst ) )
00615                   {
00616                      buf = PB_Cmalloc( XnpD * size );
00617                      if( YisRow )
00618                         Csgerv2d( ctxt, 1, XnpD, buf,    1, rdst, cdst );
00619                      else
00620                         Csgerv2d( ctxt, XnpD, 1, buf, XnpD, rdst, cdst );
00621                      scopy_( &XnpD, buf, &ione, Mptr( ((char *) X), Xii,
00622                              Xjj, Xld, size ), &Xlinc );
00623                      if( buf ) free( buf );
00624                   }
00625                }
00626             }
00627          }
00628       }
00629       else if( ( XmyprocR == XprocR ) || ( YmyprocR == YprocR ) )
00630       {
00631 /*
00632 *  General case
00633 */
00634          tran   = ( RRorCC ? CNOTRAN : CTRAN );
00635          if( XisRow ) { Xscope = CCOLUMN; Xm = 1; rsrc = XprocR; }
00636          else         { Xscope = CROW;    Xn = 1; csrc = XprocR; }
00637          if( YisRow ) { Yscope = CCOLUMN; Ym = 1; rdst = YprocR; }
00638          else         { Yscope = CROW;    Yn = 1; cdst = YprocR; }
00639          lcmb   = PB_Clcm( XnprocsD * XnbD, YnprocsD * YnbD );
00640          one    = type->one; zero = type->zero;
00641          gcdPQ  = PB_Cgcd( XnprocsD, YnprocsD );
00642          lcmPQ  = ( XnprocsD / gcdPQ ) * YnprocsD;
00643 
00644          for( k = 0; k < gcdPQ; k++ )
00645          {
00646             p = 0; q = k;
00647 
00648             for( l = 0; l < lcmPQ; l++ )
00649             {
00650                Xroc = MModAdd( XprocD, p, XnprocsD );
00651                Yroc = MModAdd( YprocD, q, YnprocsD );
00652 
00653                if( ( XmyprocD == Xroc ) || ( YmyprocD == Yroc ) )
00654                {
00655                   XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, Xroc, XprocD,
00656                                      XnprocsD );
00657                   YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, Yroc, YprocD,
00658                                      YnprocsD );
00659                   PB_CVMinit( &VM, 0, XnpD, YnpD, Xinb1D, Yinb1D, XnbD, YnbD,
00660                               p, q, XnprocsD, YnprocsD, lcmb );
00661                   if( npq = PB_CVMnpq( &VM ) )
00662                   {
00663                      if( (    RRorCC   && ( Xroc ==   Yroc ) &&
00664                            ( XprocR == YprocR ) ) ||
00665                          ( !( RRorCC ) && ( Xroc == YprocR ) &&
00666                            ( XprocR == Yroc   ) ) )
00667                      {
00668 /*
00669 *  If I am at the intersection of the process cross, or simply common to the
00670 *  process rows or columns owning sub( X ) and sub( Y )
00671 */
00672                         if( ( YmyprocD == Yroc ) && ( YmyprocR == YprocR ) )
00673                         {
00674                            PB_CVMswp( type, &VM, ROW, &Xscope, &tran, npq,
00675                                       Mptr( ((char *) X), Xii, Xjj, Xld, size ),
00676                                       Xlinc, Mptr( ((char *) Y), Yii, Yjj, Yld,
00677                                       size ), Ylinc );
00678                         }
00679                      }
00680                      else
00681                      {
00682 /*
00683 *  Perform the message exchange: pack the data I own, send it, receive the
00684 *  remote data, and unpack it.
00685 */
00686                         if( ( XmyprocR == XprocR ) && ( XmyprocD == Xroc ) )
00687                         {
00688                            if( XisRow ) { Xn = npq; }
00689                            else         { Xm = npq; }
00690                            if( YisRow ) { Yn = npq; cdst = Yroc; }
00691                            else         { Ym = npq; rdst = Yroc; }
00692                            buf = PB_Cmalloc( npq * size );
00693                            PB_CVMpack( type, &VM, ROW, &Xscope, PACKING, NOTRAN,
00694                                        npq, 1, one, Mptr( ((char *) X), Xii,
00695                                        Xjj, Xld, size ), Xld, zero, buf, Xm );
00696                            Csgesd2d( ctxt, Xm, Xn, buf, Xm, rdst, cdst );
00697                            Csgerv2d( ctxt, Ym, Yn, buf, Ym, rdst, cdst );
00698                            PB_CVMpack( type, &VM, ROW, &Xscope, UNPACKING,
00699                                        &tran, npq, 1, zero, Mptr( ((char *) X),
00700                                        Xii, Xjj, Xld, size ), Xld, one, buf,
00701                                        Ym );
00702                            if( buf ) free ( buf );
00703                         }
00704                         if( ( YmyprocR == YprocR ) && ( YmyprocD == Yroc  ) )
00705                         {
00706                            if( XisRow ) { Xn = npq; csrc = Xroc; }
00707                            else         { Xm = npq; rsrc = Xroc; }
00708                            if( YisRow ) { Yn = npq; }
00709                            else         { Ym = npq; }
00710                            buf = PB_Cmalloc( npq * size );
00711                            PB_CVMpack( type, &VM, COLUMN, &Yscope, PACKING,
00712                                        NOTRAN, npq, 1, one, Mptr( ((char *) Y),
00713                                        Yii, Yjj, Yld, size ), Yld, zero, buf,
00714                                        Ym );
00715                            Csgesd2d( ctxt, Ym, Yn, buf, Ym, rsrc, csrc );
00716                            Csgerv2d( ctxt, Xm, Xn, buf, Xm, rsrc, csrc );
00717                            PB_CVMpack( type, &VM, COLUMN, &Yscope, UNPACKING,
00718                                        &tran, npq, 1, zero, Mptr( ((char *) Y),
00719                                        Yii, Yjj, Yld, size ), Yld, one, buf,
00720                                        Xm );
00721                            if( buf ) free ( buf );
00722                         }
00723                      }
00724                   }
00725                }
00726                p = MModAdd1( p, XnprocsD );
00727                q = MModAdd1( q, YnprocsD );
00728             }
00729          }
00730       }
00731 
00732       if( XisR )
00733       {
00734 /*
00735 *  Replicate sub( X ) when necessary
00736 */
00737          XnpD = PB_Cnumroc( *N, 0, Xinb1D, XnbD, XmyprocD, XprocD, XnprocsD );
00738          if( XnpD > 0 )
00739          {
00740             if( XisRow )
00741             {
00742                top = PB_Ctop( &ctxt, BCAST, COLUMN, TOP_GET );
00743                if( XmyprocR == XprocR )
00744                   Csgebs2d( ctxt, COLUMN, top, 1, XnpD, Mptr( ((char *) X),
00745                             Xii, Xjj, Xld, size ), Xld );
00746                else
00747                   Csgebr2d( ctxt, COLUMN, top, 1, XnpD, Mptr( ((char *) X),
00748                             Xii, Xjj, Xld, size ), Xld, XprocR, XmyprocD );
00749             }
00750             else
00751             {
00752                top = PB_Ctop( &ctxt, BCAST, ROW,    TOP_GET );
00753                if( XmyprocR == XprocR )
00754                   Csgebs2d( ctxt, ROW,    top, XnpD, 1, Mptr( ((char *) X),
00755                             Xii, Xjj, Xld, size ), Xld );
00756                else
00757                   Csgebr2d( ctxt, ROW,    top, XnpD, 1, Mptr( ((char *) X),
00758                             Xii, Xjj, Xld, size ), Xld, XmyprocD, XprocR );
00759             }
00760          }
00761       }
00762 
00763       if( YisR )
00764       {
00765 /*
00766 *  Replicate sub( Y ) when necessary
00767 */
00768          YnpD = PB_Cnumroc( *N, 0, Yinb1D, YnbD, YmyprocD, YprocD, YnprocsD );
00769          if( YnpD > 0 )
00770          {
00771             if( YisRow )
00772             {
00773                top = PB_Ctop( &ctxt, BCAST, COLUMN, TOP_GET );
00774                if( YmyprocR == YprocR )
00775                   Csgebs2d( ctxt, COLUMN, top, 1, YnpD, Mptr( ((char *) Y),
00776                             Yii, Yjj, Yld, size ), Yld );
00777                else
00778                   Csgebr2d( ctxt, COLUMN, top, 1, YnpD, Mptr( ((char *) Y),
00779                             Yii, Yjj, Yld, size ), Yld, YprocR, YmyprocD );
00780             }
00781             else
00782             {
00783                top = PB_Ctop( &ctxt, BCAST, ROW,    TOP_GET );
00784                if( YmyprocR == YprocR )
00785                   Csgebs2d( ctxt, ROW,    top, YnpD, 1, Mptr( ((char *) Y),
00786                             Yii, Yjj, Yld, size ), Yld );
00787                else
00788                   Csgebr2d( ctxt, ROW,    top, YnpD, 1, Mptr( ((char *) Y),
00789                             Yii, Yjj, Yld, size ), Yld, YmyprocD, YprocR );
00790             }
00791          }
00792       }
00793    }
00794    else if( !( XisD ) && YisD )
00795    {
00796 /*
00797 *  sub( X ) is not distributed and sub( Y ) is distributed.
00798 */
00799       PB_CpswapND( PB_Cstypeset(), *N, ((char *) X), Xi, Xj, Xd, *INCX,
00800                    ((char *) Y), Yi, Yj, Yd, *INCY );
00801    }
00802    else if( XisD && !( YisD ) )
00803    {
00804 /*
00805 *  sub( X ) is distributed and sub( Y ) is not distributed.
00806 */
00807       PB_CpswapND( PB_Cstypeset(), *N, ((char *) Y), Yi, Yj, Yd, *INCY,
00808                    ((char *) X), Xi, Xj, Xd, *INCX );
00809    }
00810    else
00811    {
00812 /*
00813 *  Neither sub( X ) nor sub( Y ) are distributed.
00814 */
00815       PB_CpswapNN( PB_Cstypeset(), *N, ((char *) X), Xi, Xj, Xd, *INCX,
00816                    ((char *) Y), Yi, Yj, Yd, *INCY );
00817    }
00818 /*
00819 *  End of PSSWAP
00820 */
00821 }