NAMD
DataExchanger.C
Go to the documentation of this file.
1 
6 #include <stdio.h>
7 #include "DataExchanger.h"
8 #include "ProcessorPrivate.h"
9 #include "common.h"
10 #include "Node.h"
11 #include "CollectionMaster.h"
12 #include "Output.h"
13 #include "ScriptTcl.h"
14 #include "qd.h"
15 
16 #if CMK_HAS_PARTITION
17 #ifdef CmiMyPartitionSize
18 extern "C" {
19  void setDefaultPartitionParams() {
20  // if(!CmiMyNodeGlobal()) printf("NAMD setDefaultPartitionParams called\n");
21  CmiSetPartitionScheme(3); // recursive bisection
22  }
23 }
24 #endif
25 #endif
26 
27 static int recvRedCalledEarly;
28 
29 CpvDeclare(int, breakScheduler);
30 CpvDeclare(int, inEval);
31 
32 //functions to receive and invoke chare's entry methods
33 extern "C" {
34  void packSend(int dst, int dstPart, const char *data, int size, int handler, int code) {
35  int msgsize = sizeof(DataMessage) + size;
36  DataMessage *dmsg = (DataMessage *)CmiAlloc(msgsize);
37  dmsg->setMessage(data,CkMyPe(),CmiMyPartition(),size,handler,code);
38 #if CMK_HAS_PARTITION
39  CmiInterSyncSendAndFree(dst,dstPart,msgsize,(char*)dmsg);
40 #else
41  CmiSyncSendAndFree(dst,msgsize,(char*)dmsg);
42 #endif
43  }
44 
45  void sendReplicaDcdInit(int dstPart, ReplicaDcdInitMsg *msg, int msgsize) {
46  CmiSetHandler(msg->core, CkpvAccess(recv_replica_dcd_init_idx));
47 #if CMK_HAS_PARTITION
48  CmiInterSyncSendAndFree(0,dstPart,msgsize,(char*)msg);
49 #else
50  CmiSyncSendAndFree(0,msgsize,(char*)msg);
51 #endif
52  CollectionMaster::Object()->blockPositions(); // ensure ordering
53  CkpvAccess(_qd)->create(); // ensure completion
54  }
55 
56  void sendReplicaDcdData(int dstPart, ReplicaDcdDataMsg *msg, int msgsize) {
57  CmiSetHandler(msg->core, CkpvAccess(recv_replica_dcd_data_idx));
58 #if CMK_HAS_PARTITION
59  CmiInterSyncSendAndFree(0,dstPart,msgsize,(char*)msg);
60 #else
61  CmiSyncSendAndFree(0,msgsize,(char*)msg);
62 #endif
63  CollectionMaster::Object()->blockPositions(); // ensure ordering
64  CkpvAccess(_qd)->create(); // ensure completion
65  }
66 
67  void sendReplicaDcdAck(int dstPart, ReplicaDcdAckMsg *msg) {
68  CmiSetHandler(msg->core, CkpvAccess(recv_replica_dcd_ack_idx));
69  int msgsize = sizeof(ReplicaDcdAckMsg);
70 #if CMK_HAS_PARTITION
71  CmiInterSyncSendAndFree(0,dstPart,msgsize,(char*)msg);
72 #else
73  CmiSyncSendAndFree(0,msgsize,(char*)msg);
74 #endif
75  }
76 
79  CmiFree(msg);
80  }
81 
84  CmiFree(msg);
85  }
86 
88  CmiFree(msg);
90  CkpvAccess(_qd)->process();
91  }
92 
93  void recvData(DataMessage *dmsg) {
94  Pointer msg(dmsg);
95  if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvData!");
96  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_data(msg);
97  }
98 
99  void recvAck(DataMessage *dmsg) {
100  if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvAck!");
101  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_ack();
102  CmiFree(dmsg);
103  }
104 
105  void recvBcast(DataMessage *dmsg) {
106  if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvBcast!");
107  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_bcast();
108  CmiFree(dmsg);
109  }
110 
111  void recvRed(DataMessage *dmsg) {
112  if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) {
114  CmiFree(dmsg);
115  return;
116  }
117  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_red();
118  CmiFree(dmsg);
119  }
120 
122  Pointer msg(dmsg);
123  if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvEvalCommand!");
124  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_eval_command(msg);
125  }
126 
128  Pointer msg(dmsg);
129  if ( CkpvAccess(BOCclass_group).dataExchanger.isZero() ) NAMD_bug("BOCgroup::dataExchanger is zero in recvEvalResult!");
130  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_eval_result(msg);
131  }
132 
133  void replica_send(const char *sndbuf, int sendcount, int destPart, int destPE) {
134  if ( CpvAccess(inEval) ) {
135  packSend(destPE,destPart,sndbuf,sendcount,CkpvAccess(recv_data_idx),1);
136  return;
137  }
138  ConstPointer sendPointer(sndbuf);
139  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].send(sendPointer,sendcount,destPart,destPE);
140  CpvAccess(breakScheduler) = 0;
141  while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
142  }
143 
144  void replica_recv(DataMessage **precvMsg, int srcPart, int srcPE) {
145  Pointer recvPointer((char *) precvMsg);
146  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv(recvPointer,srcPart,srcPE);
147  CpvAccess(breakScheduler) = 0;
148  while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
149  }
150 
151  void replica_sendRecv(const char *sndbuf, int sendcount, int destPart, int destPE, DataMessage **precvMsg, int srcPart, int srcPE) {
152  ConstPointer sendPointer(sndbuf);
153  Pointer recvPointer((char *) precvMsg);
154  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].sendRecv(sendPointer,sendcount,destPart,destPE,recvPointer,srcPart,srcPE);
155  CpvAccess(breakScheduler) = 0;
156  while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
157  }
158 
159  void replica_eval(const char *cmdbuf, int targPart, int targPE, DataMessage **precvMsg) {
160  ConstPointer sendPointer(cmdbuf);
161  Pointer recvPointer((char *) precvMsg);
162  int sendcount = strlen(cmdbuf) + 1;
163  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].eval(sendPointer,sendcount,targPart,targPE,recvPointer);
164  CpvAccess(breakScheduler) = 0;
165  while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
166  }
167 
169  for ( ; recvRedCalledEarly > 0; --recvRedCalledEarly ) CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].recv_red();
170  CPROXY_DE(CkpvAccess(BOCclass_group).dataExchanger)[CkMyPe()].barrier();
171  CpvAccess(breakScheduler) = 0;
172  while(!CpvAccess(breakScheduler)) CsdSchedulePoll();
173  }
174 
175  void replica_bcast(char *buf, int count, int root) {
176  if ( root != 0 ) NAMD_bug("replica_bcast root must be zero");
177  int rank = CmiMyPartition();
178  int size = CmiNumPartitions();
179  int i;
180  for ( i=1; i<size; i*=2 );
181  for ( i/=2; i>0; i/=2 ) {
182  if ( rank & (i - 1) ) continue;
183  if ( rank & i ) {
184  int src = rank - i;
185  //CkPrintf("rank %d recv from %d\n", rank, src);
186  DataMessage *recvMsg = NULL;
187  replica_recv(&recvMsg, src, CkMyPe());
188  if ( recvMsg == NULL ) NAMD_bug("recvMsg == NULL in replica_bcast");
189  if ( recvMsg->size != count ) NAMD_bug("size != count in replica_bcast");
190  memcpy(buf, recvMsg->data, count);
191  CmiFree(recvMsg);
192  } else {
193  int dst = rank + i;
194  if ( dst < size ) {
195  //CkPrintf("rank %d send to %d\n", rank, dst);
196  replica_send(buf, count, dst, CkMyPe());
197  }
198  }
199  }
200  }
201 
202  void replica_min_double(double *dat, int count) {
203  int rank = CmiMyPartition();
204  int size = CmiNumPartitions();
205  for ( int i=1; i<size; i*=2 ) {
206  if ( rank & i ) {
207  int dst = rank - i;
208  //CkPrintf("rank %d send to %d\n", rank, dst);
209  replica_send((char*)dat, count * sizeof(double), dst, CkMyPe());
210  } else {
211  int src = rank + i;
212  if ( src < size ) {
213  //CkPrintf("rank %d recv from %d\n", rank, src);
214  DataMessage *recvMsg = NULL;
215  replica_recv(&recvMsg, src, CkMyPe());
216  if ( recvMsg == NULL ) NAMD_bug("recvMsg == NULL in replica_bcast");
217  if ( recvMsg->size != count * sizeof(double) ) NAMD_bug("size != count in replica_min_double");
218  double *rdat = new double[count];
219  memcpy(rdat, recvMsg->data, count * sizeof(double));
220  CmiFree(recvMsg);
221  for ( int j=0; j<count; ++j ) {
222  if ( rdat[j] < dat[j] ) dat[j] = rdat[j];
223  }
224  delete [] rdat;
225  }
226  }
227  if ( rank & (2 * i - 1) ) break;
228  }
229  replica_bcast((char*)dat, count * sizeof(double), 0);
230  }
231 } //endof extern C
232 
233 #if CMK_IMMEDIATE_MSG && CMK_SMP && ! ( CMK_MULTICORE || CMK_SMP_NO_COMMTHD )
234 extern "C" void CmiPushImmediateMsg(void *msg);
235 
236 class SleepCommthdMsg {
237  public:
238  char core[CmiMsgHeaderSizeBytes];
239 };
240 
241 void recvSleepCommthdMsg(SleepCommthdMsg *msg) {
242  if ( CkMyRank() != CkMyNodeSize() ) NAMD_bug("recvSleepCommthdMsg called on PE instead of communication thread");
243 #ifdef WIN32
244  Sleep(1);
245 #else
246  usleep(1000);
247 #endif
248  CmiDelayImmediate(); // re-enqueue for next cycle
249 }
250 #endif
251 
253  CkpvInitialize(int, recv_data_idx);
254  CkpvInitialize(int, recv_ack_idx);
255  CkpvInitialize(int, recv_bcast_idx);
256  CkpvInitialize(int, recv_red_idx);
257  CkpvInitialize(int, recv_eval_command_idx);
258  CkpvInitialize(int, recv_eval_result_idx);
259  CkpvInitialize(int, recv_replica_dcd_init_idx);
260  CkpvInitialize(int, recv_replica_dcd_data_idx);
261  CkpvInitialize(int, recv_replica_dcd_ack_idx);
262 
263  CkpvAccess(recv_data_idx) = CmiRegisterHandler((CmiHandler)recvData);
264  CkpvAccess(recv_ack_idx) = CmiRegisterHandler((CmiHandler)recvAck);
265  CkpvAccess(recv_red_idx) = CmiRegisterHandler((CmiHandler)recvRed);
266  CkpvAccess(recv_bcast_idx) = CmiRegisterHandler((CmiHandler)recvBcast);
267  CkpvAccess(recv_eval_command_idx) = CmiRegisterHandler((CmiHandler)recvEvalCommand);
268  CkpvAccess(recv_eval_result_idx) = CmiRegisterHandler((CmiHandler)recvEvalResult);
269  CkpvAccess(recv_replica_dcd_init_idx) = CmiRegisterHandler((CmiHandler)recvReplicaDcdInit);
270  CkpvAccess(recv_replica_dcd_data_idx) = CmiRegisterHandler((CmiHandler)recvReplicaDcdData);
271  CkpvAccess(recv_replica_dcd_ack_idx) = CmiRegisterHandler((CmiHandler)recvReplicaDcdAck);
272 
273 #if CMK_IMMEDIATE_MSG && CMK_SMP && ! ( CMK_MULTICORE || CMK_SMP_NO_COMMTHD )
274  int sleep_commthd_idx = CmiRegisterHandler((CmiHandler)recvSleepCommthdMsg);
275  if ( CkMyPe() == 0 && CkNumNodes() == 1 ) {
276  CkPrintf("Charm++ communication thread will sleep due to single-process run.\n");
277  SleepCommthdMsg *msg = (SleepCommthdMsg *)malloc(sizeof(SleepCommthdMsg));
278  CmiSetHandler(msg, sleep_commthd_idx);
279  CmiBecomeImmediate(msg);
280  CmiPushImmediateMsg(msg);
281  }
282 #endif
283 }
284 
285 //======================================================================
286 // Public functions
287 //----------------------------------------------------------------------
289 {
290  CpvInitialize(int, breakScheduler);
291  CpvAccess(breakScheduler) = 1;
292  CpvInitialize(int, inEval);
293  CpvAccess(inEval) = 0;
294  if(CmiMyPartition() == 0)
295  parent = -1;
296  else
297  parent = (CmiMyPartition()+1)/TREE_WIDTH - 1;
298  firstChild = (CmiMyPartition()+1)*TREE_WIDTH - 1;
299  numChildren = CmiNumPartitions() - firstChild;
300  if(numChildren > TREE_WIDTH)
302 
303  recv_data_idx = CkpvAccess(recv_data_idx);
304  recv_ack_idx = CkpvAccess(recv_ack_idx);
305  recv_red_idx = CkpvAccess(recv_red_idx);
306  recv_bcast_idx = CkpvAccess(recv_bcast_idx);
309 }
310 
311 //----------------------------------------------------------------------
313 { }
314 
315 
316 #include "DataExchanger.def.h"
static Node * Object()
Definition: Node.h:86
void setMessage(const char *_data, int _src, int _srcPart, int _size, int _handler, int _code)
Definition: DataExchanger.h:25
void recvRed(DataMessage *dmsg)
void recvReplicaDcdData(ReplicaDcdDataMsg *msg)
Definition: DataExchanger.C:82
~DataExchanger(void)
void sendReplicaDcdInit(int dstPart, ReplicaDcdInitMsg *msg, int msgsize)
Definition: DataExchanger.C:45
void packSend(int dst, int dstPart, const char *data, int size, int handler, int code)
Definition: DataExchanger.C:34
Output * output
Definition: Node.h:182
static int recvRedCalledEarly
Definition: DataExchanger.C:27
char core[CmiMsgHeaderSizeBytes]
Definition: DataExchanger.h:55
void recvEvalResult(DataMessage *dmsg)
void replica_send(const char *sndbuf, int sendcount, int destPart, int destPE)
void recvBcast(DataMessage *dmsg)
void recvEvalCommand(DataMessage *dmsg)
void replica_eval(const char *cmdbuf, int targPart, int targPE, DataMessage **precvMsg)
void recvData(DataMessage *dmsg)
Definition: DataExchanger.C:93
int recv_eval_command_idx
Definition: DataExchanger.h:74
void recvReplicaDcdAck(ReplicaDcdAckMsg *msg)
Definition: DataExchanger.C:87
void replica_recv(DataMessage **precvMsg, int srcPart, int srcPE)
void replica_bcast(char *buf, int count, int root)
void recvReplicaDcdInit(ReplicaDcdInitMsg *msg)
Definition: Output.C:689
void initializeReplicaConverseHandlers()
void sendReplicaDcdData(int dstPart, ReplicaDcdDataMsg *msg, int msgsize)
Definition: DataExchanger.C:56
void NAMD_bug(const char *err_msg)
Definition: common.C:123
void recvReplicaDcdInit(ReplicaDcdInitMsg *msg)
Definition: DataExchanger.C:77
CpvDeclare(int, breakScheduler)
void recvReplicaDcdData(ReplicaDcdDataMsg *msg)
Definition: Output.C:700
int recv_eval_result_idx
Definition: DataExchanger.h:75
char core[CmiMsgHeaderSizeBytes]
Definition: DataExchanger.h:36
void replica_barrier()
static CollectionMaster * Object()
char data[1]
Definition: DataExchanger.h:23
void replica_min_double(double *dat, int count)
void sendReplicaDcdAck(int dstPart, ReplicaDcdAckMsg *msg)
Definition: DataExchanger.C:67
#define CPROXY_DE(x)
Definition: DataExchanger.h:14
Definition: Pointer.h:4
char core[CmiMsgHeaderSizeBytes]
Definition: DataExchanger.h:44
void replica_sendRecv(const char *sndbuf, int sendcount, int destPart, int destPE, DataMessage **precvMsg, int srcPart, int srcPE)
void recvAck(DataMessage *dmsg)
Definition: DataExchanger.C:99