00001
00007 #include <string.h>
00008 #include <stdlib.h>
00009 #include "Communicate.h"
00010 #include "MStream.h"
00011 #include "charm++.h"
00012
00013 CkpvStaticDeclare(CmmTable, CsmMessages);
00014 CkpvStaticDeclare(int, CsmAcks);
00015
00016 static void CsmHandler(void *msg)
00017 {
00018 if ( CmiMyRank() ) NAMD_bug("Communicate CsmHandler on non-rank-zero pe");
00019
00020 int *m = (int *) ((char *)msg+CmiMsgHeaderSizeBytes);
00021
00022 CmmPut(CkpvAccess(CsmMessages), 2, m, msg);
00023 }
00024
00025 static void CsmAckHandler(void *msg)
00026 {
00027 if ( CmiMyRank() ) NAMD_bug("Communicate CsmAckHandler on non-rank-zero pe");
00028 CmiFree(msg);
00029 CkpvAccess(CsmAcks) += 1;
00030 }
00031
00032 Communicate::Communicate(void)
00033 {
00034 CkpvInitialize(CmmTable, CsmMessages);
00035 CsmHandlerIndex = CmiRegisterHandler((CmiHandler) CsmHandler);
00036 CsmAckHandlerIndex = CmiRegisterHandler((CmiHandler) CsmAckHandler);
00037 CkpvAccess(CsmMessages) = CmmNew();
00038
00039 int parent_node = 0;
00040 int self = CkMyNode();
00041 int range_begin = 0;
00042 int range_end = CkNumNodes();
00043 while ( self != range_begin ) {
00044 parent_node = range_begin;
00045 ++range_begin;
00046 int split = range_begin + ( range_end - range_begin ) / 2;
00047 if ( self < split ) { range_end = split; }
00048 else { range_begin = split; }
00049 }
00050 int send_near = self + 1;
00051 int send_far = send_near + ( range_end - send_near ) / 2;
00052
00053 parent = CkNodeFirst(parent_node);
00054 nchildren = 0;
00055 if ( send_far < range_end ) children[nchildren++] = CkNodeFirst(send_far);
00056 if ( send_near < send_far ) children[nchildren++] = CkNodeFirst(send_near);
00057
00058 CkpvInitialize(int, CsmAcks);
00059 CkpvAccess(CsmAcks) = nchildren;
00060
00061 ackmsg = (char *) CmiAlloc(CmiMsgHeaderSizeBytes);
00062 CmiSetHandler(ackmsg, CsmAckHandlerIndex);
00063 }
00064
00065
00066 Communicate::~Communicate(void)
00067 {
00068 CmiFree(ackmsg);
00069 }
00070
00071 MIStream *Communicate::newInputStream(int PE, int tag)
00072 {
00073 MIStream *st = new MIStream(this, PE, tag);
00074 return st;
00075 }
00076
00077 MOStream *Communicate::newOutputStream(int PE, int tag, unsigned int bufSize)
00078 {
00079 MOStream *st = new MOStream(this, PE, tag, bufSize);
00080 return st;
00081 }
00082
00083 void *Communicate::getMessage(int PE, int tag)
00084 {
00085 if ( CmiMyRank() ) NAMD_bug("Communicate::getMessage called on non-rank-zero Pe\n");
00086
00087 int itag[2], rtag[2];
00088 void *msg;
00089
00090 itag[0] = (PE==(-1)) ? (CmmWildCard) : PE;
00091 itag[1] = (tag==(-1)) ? (CmmWildCard) : tag;
00092 while((msg=CmmGet(CkpvAccess(CsmMessages),2,itag,rtag))==0) {
00093 CmiDeliverMsgs(0);
00094
00095 }
00096
00097 CmiSyncSend(parent, CmiMsgHeaderSizeBytes, ackmsg);
00098
00099 while ( CkpvAccess(CsmAcks) < nchildren ) {
00100 CmiDeliverMsgs(0);
00101
00102 }
00103 CkpvAccess(CsmAcks) = 0;
00104
00105 int size = SIZEFIELD(msg);
00106 for ( int i = 0; i < nchildren; ++i ) {
00107 CmiSyncSend(children[i],size,(char*)msg);
00108 }
00109
00110 return msg;
00111 }
00112
00113 void Communicate::sendMessage(int PE, void *msg, int size)
00114 {
00115 if ( CmiMyPe() ) NAMD_bug("Communicate::sendMessage not from Pe 0");
00116
00117 while ( CkpvAccess(CsmAcks) < nchildren ) {
00118 CmiDeliverMsgs(0);
00119
00120 }
00121 CkpvAccess(CsmAcks) = 0;
00122
00123 CmiSetHandler(msg, CsmHandlerIndex);
00124 switch(PE) {
00125 case ALL:
00126 NAMD_bug("Unexpected Communicate::sendMessage(ALL,...)");
00127
00128 break;
00129 case ALLBUTME:
00130
00131 for ( int i = 0; i < nchildren; ++i ) {
00132 CmiSyncSend(children[i],size,(char*)msg);
00133 }
00134 break;
00135 default:
00136 NAMD_bug("Unexpected Communicate::sendMessage(PEL,...)");
00137
00138 break;
00139 }
00140 }