NAMD
Rebalancer.C
Go to the documentation of this file.
1 
7 /*****************************************************************************
8  * $Source: /home/cvs/namd/cvsroot/namd2/src/Rebalancer.C,v $
9  * $Author: jim $
10  * $Date: 2013/08/30 21:43:01 $
11  * $Revision: 1.100 $
12  *****************************************************************************/
13 
14 #include "InfoStream.h"
15 #include "Node.h"
16 #include "Rebalancer.h"
17 #include "ProxyMgr.h"
18 #include "PatchMap.h"
19 #include "LdbCoordinator.h"
20 #include "memusage.h"
21 #include <iomanip>
22 
23 #define ST_NODE_LOAD 0.005
24 #define PROXY_LOAD 0.001
25 #define COMPUTE_LOAD 0.00005
26 
27 Rebalancer::Rebalancer(computeInfo *computeArray, patchInfo *patchArray,
28  processorInfo *processorArray, int nComps, int nPatches, int nPes)
29 {
30  bytesPerAtom = 32;
31  strategyName = "None";
32  computes = computeArray;
33  patches = patchArray;
34  processors = processorArray;
35  numComputes = nComps;
36  numPatches = nPatches;
37  P = nPes;
38  pes = NULL;
39  computePairHeap = NULL;
40  computeSelfHeap = NULL;
41  computeBgPairHeap = NULL;
42  computeBgSelfHeap = NULL;
43  overLoad = 0.;
44  numPesAvailable = 0;
46  collMsg = 0;
47 
48  const int beginGroup = processors[0].Id;
49  const int endGroup = beginGroup + P;
50 #define INGROUP(PROC) ((PROC) >= beginGroup && (PROC) < endGroup)
51 
52  int i;
53  int index;
54  for (i=0; i<P; i++)
55  {
56  // For testing only...
57  // processors[i].backgroundLoad = 0;
58  // End of test section
60  processors[i].computeLoad = 0;
61  if (processors[i].available) {
62  numPesAvailable += 1;
63  }
64  }
65 
66  for (i=0; i<nPatches; i++) {
67  // Only for those patches which are in my group (hierarchical case)
68  if INGROUP(patches[i].processor) {
69  index = patches[i].processor - beginGroup;
70  if (!patches[i].proxiesOn.find(&(processors[index]))) {
73  }
75  }
76  }
77 
79 
80  for (i=0; i<numComputes; i++)
81  computeArray[i].processor = -1;
82 
83  for (i=0; i < numComputes; i++) {
84  // Only for those computes which are in my group (hierarchical case)
85  if INGROUP(computes[i].oldProcessor) {
86  index = computes[i].oldProcessor - beginGroup;
87  processors[index].computeLoad += computes[i].load;
88  }
89  }
90 
91  // Added 4-29-98: Temporarily adds the compute load to the background
92  // load so that the correct value for the total load can be displayed.
93  float *temploads = new float[P];
94  for(i=0; i<P; i++)
95  {
96  temploads[i] = processors[i].load;
98  }
99 
101 
102  // iout << iINFO << "Initial load" << "\n";
103  printLoads(1);
104 
105  for(i=0;i<P; i++)
106  {
107  processors[i].load = temploads[i];
108  processors[i].computeLoad = 0;
109  }
110 
111  delete [] temploads;
112 
113  // int count1=0, count2=0;
114  // for (i=0; i<nPatches; i++)
115  // {
116  // if (patches[i].proxiesOn.numElements() <= 1)
117  // count1++;
118  // else count2++;
119  // }
120  // iout << iINFO << "Count1 = " << count1
121  // << "Count2 = " << count2
122  // << "\n" << std::endl;
123  //
124  // for (i=0; i <P; i++)
125  // {
126  // iout << iINFO << "\n proxies on proc. " << i << " are for patches:";
127  // processorArray[i].proxies->print();
128  // }
129  //
130  // iout << iINFO <<"\n" << endi;
131  // strategy();
132 
133  // for (i=0; i<nPatches; i++)
134  // {
135  // iout << "patch " << i << " on processor " << patches[i].processor << "\n" << endi;
136  // }
137 }
138 
140 {
141  if ( computeMax() > origMaxLoad ) {
142  if ( P == CkNumPes() ) {
143  iout << "LDB:";
144  if ( P != CkNumPes() ) {
145  int w = 1;
146  int maxinw = 10;
147  while ( maxinw < CkNumPes() ) {
148  ++w;
149  maxinw = 10*maxinw;
150  }
151  iout << " PES " <<
152  std::setw(w) << std::right << processors[0].Id << "-" <<
153  std::setw(w) << std::left << processors[P-1].Id <<
154  std::right;
155  }
156  iout << " Reverting to original mapping\n" << endi;
157  fflush(stdout);
158  } else { // P != CkNumPes()
159  if ( ! collMsg ) NAMD_bug("Rebalancer::~Rebalancer() collMsg null.");
165  collMsg->reverted = 1;
166  }
167  const int beginGroup = processors[0].Id;
168  const int endGroup = beginGroup + P;
169  for (int i=0; i < numComputes; i++) {
170  // Only for those computes which are in my group (hierarchical case)
171  if INGROUP(computes[i].oldProcessor) {
173  }
174  }
175  }
176 
177  if ( P != CkNumPes() ) {
178  if ( ! collMsg ) NAMD_bug("Rebalancer::~Rebalancer() collMsg null.");
180  collMsg = 0;
181  }
182 
183  //for(int i=0; i<P; i++)
184  // delete [] processors[i].proxyUsage;
185  delete pes;
186  delete computePairHeap;
187  delete computeSelfHeap;
188  delete computeBgPairHeap;
189  delete computeBgSelfHeap;
190 }
191 
192 // Added 4-29-98: array proxyUsage on each processor keeps track of
193 // how many computes are accessing each proxy on the processor. If
194 // no computes are accessing it, the proxy can be removed in DeAssign
196 {
197  int i;
198  numProxies = 0;
199 
200  for(i=0; i<P; i++) {
201  //processors[i].proxyUsage = new unsigned char [numPatches];
202  //for(int j=0; j<numPatches; j++)
203  //{
204  // processors[i].proxyUsage[j] = 0;
205  //}
206 
207  Iterator nextCompute;
208  nextCompute.id = 0;
209 
210  computeInfo *c = (computeInfo *)
211  processors[i].computeSet.iterator((Iterator *)&nextCompute);
212 
213  while(c)
214  {
215  /* int n1 = */ //processors[i].proxyUsage[c->patch1]++;
216  proxyUsage.increment (i, c->patch1);
217  /* int n2 = */ //processors[i].proxyUsage[c->patch2]++;
218  proxyUsage.increment (i, c->patch2);
219 
220  // iout << iINFO
221  // << "Assigning compute " << c->Id << " with work = " << c->load
222  // << " to processor " << processors[i].Id << "\n"
223  // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1+1 << "\n";
224  // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2+1 << "\n";
225  // << std::endl;
226 
227  nextCompute.id++;
228  c = (computeInfo *) processors[i].computeSet.next((Iterator *)&nextCompute);
229  }
230  }
231 
232  for (i=0; i<numPatches; i++)
233  {
234  numProxies += ( patches[i].proxiesOn.numElements() - 1 );
235  Iterator nextProc;
236  processorInfo *p = (processorInfo *)patches[i].proxiesOn.iterator((Iterator *)&nextProc);
237  while (p) {
238  //p->proxyUsage[i] += 1;
239  proxyUsage.increment (p->Id, i);
240  p = (processorInfo *)patches[i].proxiesOn.next((Iterator*)&nextProc);
241  }
242  }
243 
244 }
245 
246 
248 {
249  iout << iINFO << "Strategy not implemented for the base class.\n" << "\n";
250 }
251 
253 {
254  int i, j;
255 
256  delete pes;
257  pes = new minHeap(P+2);
258  for (i=0; i<P; i++)
259  pes->insert((InfoRecord *) &(processors[i]));
260 
261  delete computePairHeap;
262  delete computeSelfHeap;
263  delete computeBgPairHeap;
264  delete computeBgSelfHeap;
265 
266  double bgLoadLimit = 0.5 * averageLoad;
267  /*
268  iout << iINFO << "Background load limit = " << bgLoadLimit << "\n";
269  for (i=0; i<P; i++)
270  if ( processors[i].backgroundLoad > bgLoadLimit )
271  iout << iINFO << "Processor " << i << " background load = "
272  << processors[i].backgroundLoad << "\n";
273  iout << endi;
274  */
275 
276  int numSelfComputes, numPairComputes, numBgSelfComputes, numBgPairComputes;
277 
278  while ( 1 ) {
279  numSelfComputes = 0;
280  numPairComputes = 0;
281  numBgSelfComputes = 0;
282  numBgPairComputes = 0;
283  for (i=0; i<numComputes; i++) {
284  int pa1 = computes[i].patch1;
285  int pa2 = computes[i].patch2;
286  if ( pa1 == pa2 ) {
287  if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
288  ++numBgSelfComputes;
289  } else {
290  ++numSelfComputes;
291  }
292  } else {
293  if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
294  || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
295  ++numBgPairComputes;
296  } else {
297  ++numPairComputes;
298  }
299  }
300  }
301 
302  int numBgComputes = numBgPairComputes + numBgSelfComputes;
303 
304  /*if ( numBgComputes ) {
305  iout << iINFO << numBgComputes << " of " << numComputes
306  << " computes have background load > " << bgLoadLimit << "\n" << endi;
307  }*/
308 
309  if ( numBgComputes < 0.3 * numComputes ) break;
310  else bgLoadLimit += 0.1 * averageLoad;
311  }
312 
313  computePairHeap = new maxHeap(numPairComputes+2);
314  computeSelfHeap = new maxHeap(numSelfComputes+2);
315  computeBgPairHeap = new maxHeap(numBgPairComputes+2);
316  computeBgSelfHeap = new maxHeap(numBgSelfComputes+2);
317 
318  for (i=0; i<numComputes; i++) {
319  int pa1 = computes[i].patch1;
320  int pa2 = computes[i].patch2;
321  if ( pa1 == pa2 ) {
322  if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit) {
324  } else {
326  }
327  } else {
328  if ( processors[patches[pa1].processor].backgroundLoad > bgLoadLimit
329  || processors[patches[pa2].processor].backgroundLoad > bgLoadLimit) {
331  } else {
333  }
334  }
335  }
336 
337 /*
338  delete computePairHeap;
339  delete computeSelfHeap;
340 
341  int numSelfComputes = 0;
342  for (i=0; i<numComputes; i++)
343  if ( computes[i].patch1 == computes[i].patch2 ) ++numSelfComputes;
344 
345  computeSelfHeap = new maxHeap(numSelfComputes+2);
346  computePairHeap = new maxHeap(numComputes-numSelfComputes+2);
347 
348  for (i=0; i<numComputes; i++)
349  if ( computes[i].patch1 == computes[i].patch2 )
350  computeSelfHeap->insert( (InfoRecord *) &(computes[i]));
351  else
352  computePairHeap->insert( (InfoRecord *) &(computes[i]));
353 */
354 }
355 
357 {
358  int i, j;
359 
360  delete pes;
361  pes = new minHeap(P+2);
362  for (i=0; i<P; i++)
363  pes->insert((InfoRecord *) &(processors[i]));
364 
365  delete computePairHeap;
366  delete computeSelfHeap;
367  delete computeBgPairHeap;
368  delete computeBgSelfHeap;
369 
370  int numSelfComputes, numPairComputes;
371 
372  numSelfComputes = 0;
373  numPairComputes = 0;
374  for (i=0; i<numComputes; i++) {
375  int pa1 = computes[i].patch1;
376  int pa2 = computes[i].patch2;
377  if (pa1 == pa2)
378  ++numSelfComputes;
379  else
380  ++numPairComputes;
381  }
382 
383  computePairHeap = new maxHeap(numPairComputes+2);
384  computeSelfHeap = new maxHeap(numSelfComputes+2);
385 
386  for (i=0; i<numComputes; i++) {
387  int pa1 = computes[i].patch1;
388  int pa2 = computes[i].patch2;
389  if ( pa1 == pa2 )
391  else
393  }
394 }
395 
396 // not safe with hybrid balancer
397 //void Rebalancer::assign(computeInfo *c, int processor)
398 //{
399 // assign(c, &(processors[processor]));
400 //}
401 
403 {
404  c->processor = p->Id;
406 #if COMPUTE_CORRECTION
408  p->computeLoad += c->load + COMPUTE_LOAD;
409  else
410 #endif
411  p->computeLoad += c->load;
412 
413  p->load = p->computeLoad + p->backgroundLoad;
414  patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
415  patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
416 
417  if (!patch1->proxiesOn.find(p)) {
418  p->proxies.unchecked_insert(patch1);
419  patch1->proxiesOn.unchecked_insert(p);
420  numProxies++;
421 #if PROXY_CORRECTION
422  if(firstAssignInRefine) {
423  processors[p->Id].load += PROXY_LOAD;
425  }
426 #endif
427  }
428 
429  if (!patch2->proxiesOn.find(p)) {
430  p->proxies.unchecked_insert(patch2);
431  patch2->proxiesOn.unchecked_insert(p);
432  numProxies++;
433 #if PROXY_CORRECTION
434  if(firstAssignInRefine) {
435  processors[p->Id].load += PROXY_LOAD;
437  }
438 #endif
439  }
440 
441  // 4-29-98: Added the following code to keep track of how many proxies
442  // on each processor are being used by a compute on that processor
443  /* int n1 = */ //p->proxyUsage[c->patch1]++;
444  proxyUsage.increment (p->Id, c->patch1);
445  /* int n2 = */ //p->proxyUsage[c->patch2]++;
446  proxyUsage.increment (p->Id, c->patch2);
447 
448  // iout << iINFO
449  // << "Assigning compute " << c->Id << " with work = " << c->load
450  // << " to processor " << p->Id << "\n"
451  // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1+1 << "\n"
452  // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2+1 << "\n"
453  // << std::endl;
454 
455 #if 0
456  iout << "Assign " << c->Id << " patches " << c->patch1 << " " << c->patch2
457  << " load " << c->load << " to " << p->Id << " new load "
458  << p->load << " background " << p->backgroundLoad
459  << " nPatches " << nPatches << " nProxies " << nProxies;
460  if ( nPatches + nProxies < 2 ) iout << " addProxy";
461  if ( badForComm ) iout << " badForComm";
462  iout << "\n" << endi;
463 #endif
464 }
465 
467 {
468  if (!p->computeSet.remove(c)) {
469  iout << iINFO << "ERROR: Rebalancer tried to deAssign an object that is not on the processor.\n" << endi;
470  return;
471  }
472 
473  double temp_load = 0.0;
474 
475  c->processor = -1;
476  p->computeLoad -= c->load;
477  CmiAssert(p->computeLoad >= 0.0);
478  temp_load = p->load - c->load;
479  p->load = p->computeLoad + p->backgroundLoad;
480  CmiAssert( fabs(temp_load - p->load) < 0.001 );
481 
482  // 4-29-98: Added the following code to keep track of how many proxies
483  // on each processor are being used by a compute on that processor.
484  // If no computes are using the proxy, it should be removed if it is not
485  // on the processor that its patch is on.
486  /* int n1 = */ //p->proxyUsage[c->patch1]--;
487  proxyUsage.decrement (p->Id, c->patch1);
488  /* int n2 = */ //p->proxyUsage[c->patch2]--;
489  proxyUsage.decrement (p->Id, c->patch2);
490 
491  // iout << iINFO
492  // << "De-assigning compute " << c->Id << " from processor " << p->Id << "\n"
493  // << "\tproxyUsage[" << c->patch1 << "]: " << n1 << " --> " << n1-1 << "\n"
494  // << "\tproxyUsage[" << c->patch2 << "]: " << n2 << " --> " << n2-1 << "\n"
495  // << std::endl;
496 
497  //if(p->proxyUsage[c->patch1] <= 0 && p->Id != patches[c->patch1].processor)
498  if(proxyUsage.getVal(p->Id, c->patch1) <= 0 && p->Id != patches[c->patch1].processor)
499  {
500  // iout << iINFO
501  // << "REMOVING PROXY " << c->patch1 << " FROM PROCESSOR " << p->Id
502  // << std::endl << endl;
503 
504  patchInfo* patch1 = (patchInfo *) &(patches[c->patch1]);
505  p->proxies.remove(patch1);
506  patch1->proxiesOn.remove(p);
507  numProxies--;
508 #if PROXY_CORRECTION
509  if(firstAssignInRefine) {
510  processors[p->Id].load -= PROXY_LOAD;
512  if(processors[p->Id].backgroundLoad < 0.0) {
513  processors[p->Id].backgroundLoad = 0.0;
514  processors[p->Id].load += PROXY_LOAD;
515  }
516  }
517 #endif
518  }
519 
520  //if(p->proxyUsage[c->patch2] <= 0 && p->Id != patches[c->patch2].processor)
521  if(proxyUsage.getVal(p->Id, c->patch2) <= 0 && p->Id != patches[c->patch2].processor)
522  {
523  // iout << iINFO
524  // << "REMOVING PROXY " << c->patch1 << " FROM PROCESSOR " << p->Id
525  // << std::endl << endl;
526 
527  patchInfo* patch2 = (patchInfo *) &(patches[c->patch2]);
528  p->proxies.remove(patch2);
529  patch2->proxiesOn.remove(p);
530  numProxies--;
531 #if PROXY_CORRECTION
532  if(firstAssignInRefine) {
533  processors[p->Id].load -= PROXY_LOAD;
535  if(processors[p->Id].backgroundLoad < 0.0) {
536  processors[p->Id].backgroundLoad = 0.0;
537  processors[p->Id].load += PROXY_LOAD;
538  }
539  }
540 #endif
541  }
542 }
543 
544 void Rebalancer::refine_togrid(pcgrid &grid, double thresholdLoad,
545  processorInfo *p, computeInfo *c) {
546 
547  if(p->available == false) return;
548 
549  if ( c->load + p->load < thresholdLoad) {
550  int nPatches, nProxies, badForComm;
551  numAvailable(c,p,&nPatches,&nProxies,&badForComm);
552 
553  // if ( badForComm ) return;
554 
555  pcpair *pair = &grid[nPatches][nProxies][badForComm];
556 
557  if (! pair->c) {
558  pair->c = c;
559  pair->p = p;
560  } else {
561  double newval = p->load - c->load;
562  if ( c->load + p->load < averageLoad ) {
563  newval -= averageLoad;
564  }
565  double oldval = pair->p->load - pair->c->load;
566  if ( pair->c->load + pair->p->load < averageLoad ) {
567  oldval -= averageLoad;
568  }
569  if (newval < oldval) {
570  pair->c = c;
571  pair->p = p;
572  }
573  }
574  }
575 }
576 
578 {
579  int finish = 1;
580  int no_new_proxies = 0; // set to true if new proxies are futile
581  maxHeap *heavyProcessors = new maxHeap(P);
582 
583  IRSet *lightProcessors = new IRSet();
584  int i;
585  double thresholdLoad = overLoad * averageLoad;
586  for (i=0; i<P; i++)
587  {
588  // iout << iINFO << "\n Computes on processor " << i << " ";
589  // processors[i].computeSet->print();
590  // iout << iINFO << "\n" << endi;
591  if (processors[i].load > thresholdLoad)
592  heavyProcessors->insert((InfoRecord *) &(processors[i]));
593  else lightProcessors->insert((InfoRecord *) &(processors[i]));
594  }
595 
596 #if LDB_DEBUG
597  iout << "\nBefore Refinement Summary" << "\n";
598  printSummary();
599 #endif
600 
601  int done = 0;
602  while (!done)
603  {
604  // processorInfo *donor = (processorInfo *) heavyProcessors->deleteMax();
605  /* Keep selecting new donors, until we find one with some compute to
606  * migrate
607  */
608 /*
609  computeInfo* c=0;
610  while (donor && !c) {
611  Iterator nextCompute;
612  nextCompute.id = 0;
613  c = (computeInfo *) donor->
614  computeSet.iterator((Iterator *)&nextCompute);
615  if (!c) {
616  iout << iINFO << "Ignoring donor " << donor->Id
617  << " because no computes\n" << endi;
618  donor = (processorInfo*)heavyProcessors->deleteMax();
619  }
620  };
621 */
622 
623  processorInfo *donor;
624  while (donor = (processorInfo*)heavyProcessors->deleteMax()) {
625  if (donor->computeSet.hasElements()) break;
626  if ( ! no_new_proxies ) {
627  /*
628  iout << iINFO << "Most-loaded processor " << donor->Id
629  << " (" << donor->patchSet.numElements() << " patches, "
630  << donor->proxies.numElements() << " proxies)"
631  << " has no migratable work.\n" << endi;
632  */
633  no_new_proxies = 1; // New proxies would not improve load balance.
634  }
635  }
636 
637  if (!donor) break; // No donors found at all! Give up
638 
639  pcgrid grid;
640 #define REASSIGN(GRID) if (GRID.c) { \
641  deAssign(GRID.c, donor); \
642  assign(GRID.c, GRID.p); \
643  bestP = GRID.p; \
644  }
645 
646  // try for at least one proxy
647  {
648  Iterator nextCompute;
649  nextCompute.id = 0;
650  computeInfo *c = (computeInfo *)
651  donor->computeSet.iterator((Iterator *)&nextCompute);
652  while (c)
653  {
654  Iterator nextProc;
655  processorInfo *p;
656 
658  refine_togrid(grid, thresholdLoad, p, c);
659 
660  if (c->patch1 != c->patch2)
661  {
662  p = &processors[patches[c->patch2].processor];
663  refine_togrid(grid, thresholdLoad, p, c);
664  }
665 
666  p = (processorInfo *)patches[c->patch1].
667  proxiesOn.iterator((Iterator *)&nextProc);
668  while (p) {
669  refine_togrid(grid, thresholdLoad, p, c);
670  p = (processorInfo *)patches[c->patch1].
671  proxiesOn.next((Iterator*)&nextProc);
672  }
673 
674  if (c->patch1 != c->patch2)
675  {
676  p = (processorInfo *)patches[c->patch2].
677  proxiesOn.iterator((Iterator *)&nextProc);
678  while (p) {
679  refine_togrid(grid, thresholdLoad, p, c);
680  p = (processorInfo *)patches[c->patch2].
681  proxiesOn.next((Iterator*)&nextProc);
682  }
683  }
684 
685  nextCompute.id++;
686  c = (computeInfo *) donor->computeSet.
687  next((Iterator *)&nextCompute);
688  }
689  processorInfo* bestP = 0;
690  // prefer proxies to home patches
691  REASSIGN(grid[0][2][0])
692  else REASSIGN(grid[1][1][0])
693  else REASSIGN(grid[2][0][0])
694  else if ( no_new_proxies ) { finish = 0; break; }
695  else REASSIGN(grid[0][1][0])
696  else REASSIGN(grid[1][0][0])
697  else REASSIGN(grid[0][0][0])
698  // else REASSIGN(grid[0][1][1])
699  // else REASSIGN(grid[1][0][1])
700  // else REASSIGN(grid[0][0][1])
701  if (bestP) {
702  if (bestP->load > averageLoad) lightProcessors->remove(bestP);
703  if (donor->load > thresholdLoad)
704  heavyProcessors->insert((InfoRecord *) donor);
705  else lightProcessors->insert((InfoRecord *) donor);
706  continue;
707  }
708  }
709 
710  if ( no_new_proxies ) iout << iINFO
711  << "ERROR: Rebalancer::refine() algorithm is broken.\n" << endi;
712 
713  // no luck, do it the long way
714 
715  //find the best pair (c,receiver)
716  Iterator nextProcessor;
717  processorInfo *p = (processorInfo *)
718  lightProcessors->iterator((Iterator *) &nextProcessor);
719 
720  while (p)
721  {
722  Iterator nextCompute;
723  nextCompute.id = 0;
724  computeInfo *c = (computeInfo *)
725  donor->computeSet.iterator((Iterator *)&nextCompute);
726  while (c)
727  {
728 #if USE_TOPOMAP
729  int flag = tmgr.areNeighbors(p->Id, patches[c->patch1].processor,
730  patches[c->patch2].processor, 8);
731  if(flag)
732 #endif
733  {
734  refine_togrid(grid, thresholdLoad, p, c);
735  }
736  nextCompute.id++;
737  c = (computeInfo *) donor->computeSet.
738  next((Iterator *)&nextCompute);
739  }
740  p = (processorInfo *)
741  lightProcessors->next((Iterator *) &nextProcessor);
742  }
743 
744  //we have narrowed the choice to 6 candidates.
745  // prefer proxies to home patches
746  {
747  processorInfo* bestP = 0;
748  REASSIGN(grid[0][2][0])
749  else REASSIGN(grid[1][1][0])
750  else REASSIGN(grid[2][0][0])
751  else REASSIGN(grid[0][1][0])
752  else REASSIGN(grid[1][0][0])
753  else REASSIGN(grid[0][0][0])
754  // else REASSIGN(grid[0][1][1])
755  // else REASSIGN(grid[1][0][1])
756  // else REASSIGN(grid[0][0][1])
757  else { finish = 0; break; }
758  if (bestP->load > averageLoad) lightProcessors->remove(bestP);
759  if (donor->load > thresholdLoad)
760  heavyProcessors->insert((InfoRecord *) donor);
761  else lightProcessors->insert((InfoRecord *) donor);
762  }
763 
764  }
765 
766 #if LDB_DEBUG
767  iout << "After Refinement Summary" << "\n";
768  printSummary();
769 
770  if (!finish) {
771  iout << iINFO << "Refine: No solution found for overLoad = "
772  << overLoad << "\n" << endi;
773  }
774 #endif
775 
776  delete heavyProcessors;
777  delete lightProcessors;
778 
779  return finish;
780 }
781 
782 // this binary search refinement procedure assume you already assigned computes
783 // to their processors before calling this!!
784 void Rebalancer::multirefine(double overload_start)
785 {
786  // The New refinement procedure. This is identical to the code in
787  // RefineOnly.C, and probably should be merged with that code to form
788  // a binary-search function
789 
790  double avg = computeAverage();
791  double max = computeMax();
792 
793 #if LDB_DEBUG
794  iout << "******** Processors with background load > average load ********" << "\n";
795 #endif
796 
797  int numOverloaded = 0;
798  for (int ip=0; ip<P; ip++) {
799  if ( processors[ip].backgroundLoad > averageLoad ) {
800  ++numOverloaded;
801 #if LDB_DEBUG
802  iout << iINFO << "Info about proc " << ip << ": Load: " << processors[ip].load << " Bg Load: " << processors[ip].backgroundLoad << " Compute Load: " << processors[ip].computeLoad << " No of computes: " << processors[ip].computeSet.numElements() << "\n";
803 #endif
804  }
805  }
806  if ( numOverloaded ) {
807  iout << iWARN << numOverloaded
808  << " processors are overloaded due to high background load.\n" << endi;
809  }
810 #if LDB_DEBUG
811  iout << "******** Processor List Ends ********" << "\n\n";
812 #endif
813 
814  const double overloadStep = 0.01;
815  const double overloadStart = overload_start; //1.05;
816  double dCurOverload = max / avg;
817 
818  int minOverload = 0; //Min overload should be 1.05 ?
819  int maxOverload = (int)((dCurOverload - overloadStart)/overloadStep + 1);
820  double dMinOverload = minOverload * overloadStep + overloadStart;
821  double dMaxOverload = maxOverload * overloadStep + overloadStart;
822 
823 #if LDB_DEBUG
824  iout << iINFO
825  << "Balancing from " << minOverload << " = " << dMinOverload
826  << " to " << maxOverload << "=" << dMaxOverload
827  << " dCurOverload=" << dCurOverload << " max=" << max << " avg=" << avg
828  << "\n" << endi;
829 #endif
830 
831  int curOverload;
832  int refineDone = 0;
833 
834  overLoad = dMinOverload;
835  if (refine())
836  refineDone = 1;
837  else {
838  overLoad = dMaxOverload;
839  if (!refine()) {
840  iout << iINFO << "ERROR: Could not refine at max overload\n" << endi;
841  refineDone = 1;
842  }
843  }
844 
845  // Scan up, until we find a refine that works
846  while (!refineDone) {
847  if (maxOverload - minOverload <= 1)
848  refineDone = 1;
849  else {
850  curOverload = (maxOverload + minOverload ) / 2;
851 
852  overLoad = curOverload * overloadStep + overloadStart;
853 #if LDB_DEBUG
854  iout << iINFO << "Testing curOverload " << curOverload
855  << "=" << overLoad << " [min,max]="
856  << minOverload << ", " << maxOverload
857  << "\n" << endi;
858 #endif
859  if (refine())
860  maxOverload = curOverload;
861  else
862  minOverload = curOverload;
863  }
864  }
865 
866 }
867 
869 {
870  iout << iINFO << "ready to print result \n" << "\n";
871 }
872 
873 
874 void Rebalancer::printLoads(int phase) // 0=nocollective, 1=initial, 2=proxies, 3=final
875 {
876 
877  int i, total = 0, numBytes = 0;
878  double max;
879  int maxproxies = 0;
880  int maxpatchproxies = 0;
881  double avgBgLoad =0.0;
882 
883  for (i=0; i<P; i++) {
884  int nproxies = processors[i].proxies.numElements() -
886  total += nproxies;
887  if ( nproxies > maxproxies ) maxproxies = nproxies;
888  avgBgLoad += processors[i].backgroundLoad;
889  Iterator p;
890  int count = 0;
891 
892  patchInfo *patch = (patchInfo *) processors[i].patchSet.iterator(&p);
893  while (patch)
894  {
895  int myProxies;
896  myProxies = patch->proxiesOn.numElements()-1;
897  if ( myProxies > maxpatchproxies ) maxpatchproxies = myProxies;
898  numBytes += myProxies *patch->numAtoms*bytesPerAtom;
899  count += myProxies;
900  patch = (patchInfo *)processors[i].patchSet.next(&p);
901  }
902  }
903 
904  avgBgLoad /= P;
905  computeAverage();
906  max = computeMax();
907 
908  if ( P == CkNumPes() ) {
909  iout << "LDB:";
910  if ( P != CkNumPes() ) {
911  int w = 1;
912  int maxinw = 10;
913  while ( maxinw < CkNumPes() ) {
914  ++w;
915  maxinw = 10*maxinw;
916  }
917  iout << " PES " <<
918  std::setw(w) << std::right << processors[0].Id << "-" <<
919  std::setw(w) << std::left << processors[P-1].Id <<
920  std::right;
921  }
922  iout << " TIME " << CmiWallTimer() << " LOAD: AVG " << averageLoad
923  << " MAX " << max << " PROXIES: TOTAL " << total << " MAXPE " <<
924  maxproxies << " MAXPATCH " << maxpatchproxies << " " << strategyName
925  << " MEM: " << memusage_MB() << " MB\n" << endi;
926  fflush(stdout);
927  }
928 
929  if ( P != CkNumPes() ) { // collect stats on pe 0
930  switch ( phase ) {
931  case 0: // no collective
932  NAMD_bug("Rebalancer::printLoads(0) called with hybrid balancer.");
933  break;
934  case 1: // initial
935  if ( collMsg ) NAMD_bug("Rebalancer::printLoads(1) collMsg not null.");
936  collMsg = new CollectLoadsMsg;
937  collMsg->reverted = 0;
939  collMsg->lastPe = processors[P-1].Id;
940  collMsg->initTime = CmiWallTimer();
943  collMsg->initMaxPeLoad = max;
944  collMsg->initTotalProxies = total;
945  collMsg->initMaxPeProxies = maxproxies;
946  collMsg->initMaxPatchProxies = maxpatchproxies;
947  break;
948  case 2: // proxies (optional)
949  if ( ! collMsg ) NAMD_bug("Rebalancer::printLoads(2) collMsg null.");
951  collMsg->initMaxPeLoad = max;
952  collMsg->initTotalProxies = total;
953  collMsg->initMaxPeProxies = maxproxies;
954  collMsg->initMaxPatchProxies = maxpatchproxies;
955  break;
956  case 3: // final
957  if ( ! collMsg ) NAMD_bug("Rebalancer::printLoads(3) collMsg null.");
958  collMsg->finalTime = CmiWallTimer();
961  collMsg->finalMaxPeLoad = max;
962  collMsg->finalTotalProxies = total;
963  collMsg->finalMaxPeProxies = maxproxies;
964  collMsg->finalMaxPatchProxies = maxpatchproxies;
965  strncpy(collMsg->strategyName,strategyName,15);
966  collMsg->strategyName[15] = 0;
967  break;
968  default:
969  NAMD_bug("Rebalancer::printLoads() called with unknown phase.");
970  }
971  }
972 
973 }
974 
976 {
977  int i;
978  // After refining, compute min, max and avg processor load
979  double total = processors[0].load;
980  double min = processors[0].load;
981  int min_proc = 0;
982  double max = processors[0].load;
983  int max_proc = 0;
984  for (i=1; i<P; i++) {
985  total += processors[i].load;
986  if (processors[i].load < min) {
987  min = processors[i].load;
988  min_proc = i;
989  }
990  if (processors[i].load > max) {
991  max = processors[i].load;
992  max_proc = i;
993  }
994  }
995  iout << iINFO << " min = " << min << " processor " << min_proc << "\n";
996  iout << iINFO << " max = " << max << " processor " << max_proc << "\n";
997  iout << iINFO << " total = " << total << " average = " << total/P << "\n";
998  iout << iINFO << "Info about most overloaded processor " << max_proc << ": Load: " << processors[max_proc].load << " Bg Load: " << processors[max_proc].backgroundLoad << " Compute Load: " << processors[max_proc].computeLoad << " No of computes: " << processors[max_proc].computeSet.numElements() << " No. of proxies: " << processors[max_proc].proxies.numElements() << "\n" << endi;
999 }
1000 
1002 {
1003  int i;
1004  double total = 0.;
1005  for (i=0; i<numComputes; i++)
1006  total += computes[i].load;
1007 
1008  for (i=0; i<P; i++) {
1009  if (processors[i].available) {
1010  total += processors[i].backgroundLoad;
1011  }
1012  }
1013 
1014  if (numPesAvailable == 0) {
1015  CmiPrintf("Warning: no processors available for load balancing!\n");
1016  averageLoad = 0.0;
1017  }
1018  else
1019  averageLoad = total/numPesAvailable;
1020  return averageLoad;
1021 }
1022 
1024 {
1025  // useful for AlgSeven when some loads start out as zero
1026 
1027  if (numPesAvailable == 0) {
1028  computeAverage(); // because otherwise someone will forget
1029  return;
1030  }
1031 
1032  int i;
1033  double bgtotal = 0.;
1034  for (i=0; i<P; i++) {
1035  if (processors[i].available) {
1036  bgtotal += processors[i].backgroundLoad;
1037  }
1038  }
1039  double bgavg = bgtotal / numPesAvailable;
1040 
1041  int nadjusted = 0;
1042  for (i=0; i<P; i++) {
1043  if (processors[i].available) {
1044  double bgload = processors[i].backgroundLoad;
1045  if ( bgload < bgavg ) {
1046  processors[i].backgroundLoad = bgavg;
1047  ++nadjusted;
1048  }
1049  }
1050  }
1051  // iout << iINFO << "Adjusted background load on " << nadjusted
1052  // << " nodes.\n" << endi;
1053 
1054  computeAverage(); // because otherwise someone will forget
1055 }
1056 
1058 {
1059  int i;
1060  double max = processors[0].load;
1061  for (i=1; i<P; i++)
1062  {
1063  if (processors[i].load > max)
1064  max = processors[i].load;
1065  }
1066  return max;
1067 }
1068 
1070 {
1071  return patch->proxiesOn.find(p);
1072 }
1073 
1075  int *nPatches, int *nProxies, int *isBadForCommunication)
1076 {
1077  // return the number of proxy/home patches available on p for c (0, 1, 2)
1078  int realPe, index;
1079  int patch_count = 0;
1080  int proxy_count = 0;
1081 
1082  const int beginGroup = processors[0].Id;
1083  const int endGroup = beginGroup + P;
1084 
1085  patchInfo &pa1 = patches[c->patch1];
1086  patchInfo &pa2 = patches[c->patch2];
1087  int pa1_avail = 1;
1088  int pa2_avail = 1;
1089 
1090  if (pa1.processor == p->Id) {
1091  patch_count++;
1092  } else if ( pa1.proxiesOn.find(p) ) {
1093  proxy_count++;
1094  } else {
1095  pa1_avail = 0;
1096  }
1097 
1098  // self computes get one patch for free here
1099  if (c->patch1 == c->patch2 || pa2.processor == p->Id) {
1100  patch_count++;
1101  } else if ( pa2.proxiesOn.find(p) ) {
1102  proxy_count++;
1103  } else {
1104  pa2_avail = 0;
1105  }
1106 
1107  *nPatches = patch_count;
1108  *nProxies = proxy_count;
1109 
1110  if ( isBadForCommunication ) { // skip work if pointer is null
1111  int bad = 0;
1112 
1113  if ( patch_count + proxy_count < 2 ) {
1114  double bgLoadLimit = 1.2 * averageLoad;
1115  if ( p->backgroundLoad > bgLoadLimit ) bad = 1;
1116  else {
1117  int proxiesPerPeLimit = numProxies / numPesAvailable + 3;
1118  if ( proxiesPerPeLimit < 6 ) proxiesPerPeLimit = 6;
1119 
1120  if ( p->proxies.numElements() > proxiesPerPeLimit ) bad = 1;
1121 
1122  int proxiesPerPatchLimit = numProxies / numPatches + 3;
1123  if ( proxiesPerPatchLimit < 6 ) proxiesPerPatchLimit = 6;
1124 
1125  if ( ! bad && ! pa1_avail ) {
1126  // HYBRID check for range in local group
1127  realPe = pa1.processor;
1128  if INGROUP(realPe) {
1129  index = realPe - beginGroup;
1130  //BACKUP if ( processors[pa1.processor].backgroundLoad > bgLoadLimit) bad = 1;
1131  if (processors[index].backgroundLoad > bgLoadLimit) bad = 1;
1132  else if ( pa1.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
1133  } else bad = 1; // patch has proxies we don't know about
1134  }
1135 
1136  if ( ! bad && ! pa2_avail ) {
1137  // HYBRID check for range in local group
1138  realPe = pa2.processor;
1139  if INGROUP(realPe) {
1140  index = realPe - beginGroup;
1141  // BACKUP if ( processors[pa2.processor].backgroundLoad > bgLoadLimit) bad = 1;
1142  if ( processors[index].backgroundLoad > bgLoadLimit) bad = 1;
1143  else if ( pa2.proxiesOn.numElements() > proxiesPerPatchLimit ) bad = 1;
1144  } else bad = 1; // patch has proxies we don't know about
1145  }
1146 
1147  }
1148  }
1149 
1150  *isBadForCommunication = bad;
1151  }
1152 }
1153 
1155  ProxyTree &pt = ProxyMgr::Object()->getPtree();
1156  Iterator nextP;
1157  processorInfo *p;
1158 #ifndef NODEAWARE_PROXY_SPANNINGTREE
1159  if(pt.sizes==NULL)
1160  pt.sizes = new int[numPatches];
1161 #endif
1162 
1163  if (pt.proxylist == NULL)
1164  pt.proxylist = new NodeIDList[numPatches];
1165  for(int i=0; i<numPatches; i++)
1166  {
1167  pt.proxylist[i].resize(patches[i].proxiesOn.numElements());
1168  nextP.id = 0;
1169  p = (processorInfo *)(patches[i].proxiesOn.iterator((Iterator *)&nextP));
1170  int j = 0;
1171  while(p) {
1172  //if (p->Id < 0)
1173  // printf ("Inserting proxy on -ve processor %d for patch %d\n", p->Id, i);
1174 
1175  if (p->Id == (PatchMap::Object()->node(i))) {
1176  p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
1177  continue;
1178  }
1179 
1180  pt.proxylist[i][j] = p->Id;
1181  nextP.id++;
1182  p = (processorInfo *)(patches[i].proxiesOn.next((Iterator *)&nextP));
1183  j++;
1184  }
1185  pt.proxylist[i].resize(j);
1186  }
1187  CkPrintf("Done intialising\n");
1188 #ifdef NODEAWARE_PROXY_SPANNINGTREE
1189  ProxyMgr::Object()->buildNodeAwareSpanningTree0();
1190 #else
1192 #endif
1193 }
1194 
1196  int pe;
1197  ProxyTree &pt = ProxyMgr::Object()->getPtree();
1198  for(int i=0; i<numPatches; i++)
1199  for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
1200  pe = pt.proxylist[i][j];
1201  processors[pe].load -= ST_NODE_LOAD;
1203  if(processors[pe].load < 0.0)
1204  processors[pe].load = 0.0;
1205  if(processors[pe].backgroundLoad < 0.0)
1206  processors[pe].backgroundLoad = 0.0;
1207  }
1208 }
1209 
1211  int pe;
1212  ProxyTree &pt = ProxyMgr::Object()->getPtree();
1213  for(int i=0; i<numPatches; i++)
1214  for(int j=1; j<pt.proxylist[i].size() && j<proxySpanDim; j++) {
1215  pe = pt.proxylist[i][j];
1216  processors[pe].load += ST_NODE_LOAD;
1218  }
1219 }
1220 
Definition: heap.h:20
BlockLoad::TempStorage load
void sendCollectLoads(CollectLoadsMsg *)
int numComputes
Definition: Rebalancer.h:138
int patch1
Definition: elements.h:23
std::ostream & iINFO(std::ostream &s)
Definition: InfoStream.C:107
int remove(InfoRecord *)
Definition: Set.C:75
ProxyTree & getPtree()
Definition: ProxyMgr.C:385
computeInfo * computes
Definition: Rebalancer.h:128
computeInfo * c
Definition: Rebalancer.h:113
int insert(InfoRecord *)
Definition: heap.C:30
static ProxyMgr * Object()
Definition: ProxyMgr.h:394
int numPatches
Definition: Rebalancer.h:137
static PatchMap * Object()
Definition: PatchMap.h:27
int numElements()
Definition: Set.C:144
LargeIRSet proxies
Definition: elements.h:46
CollectLoadsMsg * collMsg
Definition: Rebalancer.h:164
int Id
Definition: elements.h:16
void createSpanningTree()
Definition: Rebalancer.C:1154
void assign(computeInfo *c, processorInfo *pRec)
Definition: Rebalancer.C:402
std::ostream & iWARN(std::ostream &s)
Definition: InfoStream.C:108
minHeap * pes
Definition: Rebalancer.h:131
LargeIRSet patchSet
Definition: elements.h:45
double averageLoad
Definition: Rebalancer.h:141
int processor
Definition: elements.h:24
#define iout
Definition: InfoStream.h:87
int oldProcessor
Definition: elements.h:25
void insert(InfoRecord *)
Definition: Set.C:49
int numProxies
Definition: Rebalancer.h:139
void incrSTLoad()
Definition: Rebalancer.C:1210
void makeTwoHeaps()
Definition: Rebalancer.C:356
void refine_togrid(pcgrid &grid, double thresholdLoad, processorInfo *p, computeInfo *c)
Definition: Rebalancer.C:544
char strategyName[16]
processorInfo * processors
Definition: Rebalancer.h:130
int firstAssignInRefine
Definition: Rebalancer.h:143
double memusage_MB()
Definition: memusage.h:13
void printLoads(int phase=0)
Definition: Rebalancer.C:874
void numAvailable(computeInfo *c, processorInfo *p, int *nPatches, int *nProxies, int *isBadForCommunication)
Definition: Rebalancer.C:1074
static Units next(Units u)
Definition: ParseOptions.C:48
void deAssign(computeInfo *c, processorInfo *pRec)
Definition: Rebalancer.C:466
InfoRecord * deleteMax()
Definition: heap.C:152
ProxyUsage proxyUsage
Definition: Rebalancer.h:126
InfoRecord * next(Iterator *)
Definition: Set.C:131
void multirefine(double overload_start=1.02)
Definition: Rebalancer.C:784
int patch2
Definition: elements.h:23
void NAMD_bug(const char *err_msg)
Definition: common.C:123
Definition: heap.h:43
int * sizes
Definition: ProxyMgr.h:298
maxHeap * computeBgSelfHeap
Definition: Rebalancer.h:135
maxHeap * computeSelfHeap
Definition: Rebalancer.h:133
void printSummary()
Definition: Rebalancer.C:975
Definition: Set.h:25
maxHeap * computePairHeap
Definition: Rebalancer.h:132
void increment(int pe, int patch)
Definition: Rebalancer.h:81
void strategy()
Definition: Rebalancer.C:247
LargeIRSet computeSet
Definition: elements.h:47
void buildSpanningTree0()
Definition: ProxyMgr.C:1007
double computeAverage()
Definition: Rebalancer.C:1001
void adjustBackgroundLoadAndComputeAverage()
Definition: Rebalancer.C:1023
void InitProxyUsage()
Definition: Rebalancer.C:195
double overLoad
Definition: Rebalancer.h:168
#define COMPUTE_LOAD
Definition: Rebalancer.C:25
int numAtoms
Definition: elements.h:32
static LdbCoordinator * Object()
int numPesAvailable
Definition: Rebalancer.h:140
void decrement(int pe, int patch)
Definition: Rebalancer.h:88
int insert(InfoRecord *)
Definition: heap.C:126
int hasElements()
Definition: Set.C:149
double load
Definition: elements.h:15
maxHeap * computeBgPairHeap
Definition: Rebalancer.h:134
Definition: Set.h:19
void makeHeaps()
Definition: Rebalancer.C:252
#define PROXY_LOAD
Definition: Rebalancer.C:24
processorInfo * p
Definition: Rebalancer.h:112
pcpair pcgrid[3][3][2]
Definition: Rebalancer.h:125
void resize(int i)
Definition: ResizeArray.h:84
int node(int pid) const
Definition: PatchMap.h:114
NodeIDList * proxylist
Definition: ProxyMgr.h:291
IRSet proxiesOn
Definition: elements.h:33
void printResults()
Definition: Rebalancer.C:868
patchInfo * patches
Definition: Rebalancer.h:129
int bytesPerAtom
Definition: Rebalancer.h:124
#define REASSIGN(GRID)
double origMaxLoad
Definition: Rebalancer.h:142
int isAvailableOn(patchInfo *patch, processorInfo *p)
Definition: Rebalancer.C:1069
#define ST_NODE_LOAD
Definition: Rebalancer.C:23
double computeMax()
Definition: Rebalancer.C:1057
double computeLoad
Definition: elements.h:41
void unchecked_insert(InfoRecord *)
Definition: Set.C:32
int processor
Definition: elements.h:31
int getVal(int pe, int patch)
Definition: Rebalancer.h:101
int size(void) const
Definition: ResizeArray.h:127
infostream & endi(infostream &s)
Definition: InfoStream.C:38
#define INGROUP(PROC)
int refine()
Definition: Rebalancer.C:577
int proxySpanDim
Definition: ProxyMgr.C:48
InfoRecord * iterator(Iterator *)
Definition: Set.C:122
int find(InfoRecord *)
Definition: Set.C:112
double backgroundLoad
Definition: elements.h:39
Rebalancer(computeInfo *computeArray, patchInfo *patchArray, processorInfo *processorArray, int nComps, int nPatches, int nPes)
Definition: Rebalancer.C:27
const char * strategyName
Definition: Rebalancer.h:127
int id
Definition: Set.h:21
bool available
Definition: elements.h:44
void decrSTLoad()
Definition: Rebalancer.C:1195