GRASS GIS 8 Programmer's Manual  8.2.2dev(2023)-3d2c704037
empq_impl.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 #ifndef __EMPQ_IMPL_H
38 #define __EMPQ_IMPL_H
39 
40 #include <ostream>
41 #include <vector>
42 
43 #include "empq.h"
44 
45 #if(0)
46 #include "option.H"
47 #define MY_LOG_DEBUG_ID(x) \
48  if(GETOPT("debug")) cerr << __FILE__ << ":" << __LINE__<< " " << x << endl;
49 #endif
50 
51 #undef XXX
52 #define XXX if(0)
53 
54 #define MY_LOG_DEBUG_ID(x)
55 
56 /*****************************************************************/
57 /* encapsulation of the element=<key/prio, data> together with <buffer_id>
58  and <stream_id>; used during stream merging to remember where each
59  key comes from;
60 
61  assumes that class T implements: Key getPriority()
62 
63  implements operators {<, <=, ...} such that a< b iff a.x.prio < b.x.prio
64 */
65 template<class T,class Key>
67 
68 private:
69  T x;
70  unsigned short buf_id;
71  unsigned int str_id;
72 
73 public:
75 
76  ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid):
77  x(e), buf_id(bid), str_id(sid) {}
78 
80 
81  void set (T &e, unsigned short bid, unsigned int sid) {
82  x = e;
83  buf_id = bid;
84  str_id = sid;
85  }
86  T elt() const {
87  return x;
88  }
89  unsigned short buffer_id() const {
90  return buf_id;
91  }
92  unsigned int stream_id() const {
93  return str_id;
94  }
95  Key getPriority() const {
96  return x.getPriority();
97  }
98  //print
99  friend ostream& operator<<(ostream& s,
101  return s << "<buf_id=" << elt.buf_id
102  << ",str_id=" << elt.str_id << "> "
103  << elt.x << " ";
104  }
105 
106  friend int operator < (const ExtendedEltMergeType<T,Key> &e1,
107  const ExtendedEltMergeType<T,Key> &e2) {
108  return (e1.getPriority() < e2.getPriority());
109  }
110  friend int operator <= (const ExtendedEltMergeType<T,Key> &e1,
111  const ExtendedEltMergeType<T,Key> &e2) {
112  return (e1.getPriority() <= e2.getPriority());
113  }
115  const ExtendedEltMergeType<T,Key> &e2) {
116  return (e1.getPriority() > e2.getPriority());
117  }
119  const ExtendedEltMergeType<T,Key> &e2) {
120  return (e1.getPriority() >= e2.getPriority());
121  }
123  const ExtendedEltMergeType<T,Key> &e2) {
124  return (e1.getPriority() != e2.getPriority());
125  }
127  const ExtendedEltMergeType<T,Key> &e2) {
128  return (e1.getPriority() == e2.getPriority());
129  }
130 
131 };
132 
133 
134 
135 //************************************************************/
136 //create an em_pqueue
137 template<class T, class Key>
138  em_pqueue<T,Key>::em_pqueue(long pq_sz, long buf_sz ,
139  unsigned short nb_buf,
140  unsigned int buf_ar):
141  pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf),
142  crt_buf(0), buf_arity(buf_ar) {
143 
144  //____________________________________________________________
145  //ESTIMATE AVAILABLE MEMORY BEFORE ALLOCATION
146  AMI_err ae;
147  size_t mm_avail = getAvailableMemory();
148  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
149  mm_avail/(float)(1<<20));
150  printf("EM_PQUEUE:available memory before allocation: %ldB\n",
151  mm_avail);
152 
153 
154  //____________________________________________________________
155  //ALLOCATE STRUCTURE
156  //some dummy checks..
157  assert(pqsize > 0 && bufsize > 0);
158 
159  MEMORY_LOG("em_pqueue: allocating int pqueue\n");
160  //initialize in memory pqueue
161  pq = new MinMaxHeap<T>(pqsize);
162  assert(pq);
163 
164  MEMORY_LOG("em_pqueue: allocating buff_0\n");
165  //initialize in memory buffer
166  buff_0 = new im_buffer<T>(bufsize);
167  assert(buff_0);
168 
169  char str[200];
170  sprintf(str, "em_pqueue: allocating array of %ld buff pointers\n",
171  (long)max_nbuf);
172  MEMORY_LOG(str);
173 
174  //allocate ext memory buffers array
175  buff = new em_buffer<T,Key>* [max_nbuf];
176  assert(buff);
177  for (unsigned short i=0; i<max_nbuf; i++) {
178  buff[i] = NULL;
179  }
180 
181 
182  //____________________________________________________________
183  //some memory checks- make sure the empq fits in memory !!
184 
185  //estimate available memory after allocation
186  mm_avail = getAvailableMemory();
187  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
188  mm_avail/(float)(1<<20));
189 
190  //estimate AMI_STREAM memory usage
191  size_t sz_stream;
192  AMI_STREAM<T> dummy;
193  if ((ae = dummy.main_memory_usage(&sz_stream,
196  cout << "em_pqueue constructor: failing to get stream_usage\n";
197  exit(1);
198  }
199  cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
200  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
201 
202  //estimate memory overhead
203  long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
204  max_nbuf * sizeof(em_buffer<T,Key>) +
205  2*sz_stream + max_nbuf*sz_stream;
206 
207  mm_overhead *= 8; //overestimate
208  cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
209  if (mm_overhead > mm_avail) {
210  cout << "overhead bigger than available memory"
211  << "increase -m and try again\n";
212  exit(1);
213  }
214  mm_avail -= mm_overhead;
215 
216 
217  //arity*sizeof(AMI_STREAM) < memory
218  cout << "pqsize=" << pqsize
219  << ", bufsize=" << bufsize
220  << ", maximum allowed arity=" << mm_avail/sz_stream << endl;
221  if (buf_arity * sz_stream > mm_avail) {
222  cout << "sorry - empq excedes memory limits\n";
223  cout << "try again decreasing arity or pqsize/bufsize\n";
224  cout.flush();
225  }
226 }
227 
228 
229 //************************************************************/
230 //create an em_pqueue capable to store <= N elements
231 template<class T, class Key>
233 
234  MY_LOG_DEBUG_ID("em_pqueue constructor");
235 
236 
237  /************************************************************/
238  //available memory
239  AMI_err ae;
240  //available memory
241  size_t mm_avail = getAvailableMemory();
242  printf("EM_PQUEUE:available memory before allocation: %.2fMB\n",
243  mm_avail/(float)(1<<20));
244  cout.flush();
245 
246  //AMI_STREAM memory usage
247  size_t sz_stream;
248  AMI_STREAM<T> dummy;
249  if ((ae = dummy.main_memory_usage(&sz_stream,
252  cout << "em_pqueue constructor: failing to get main_memory_usage\n";
253  exit(1);
254  }
255  cout << "EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
256  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
257  cout.flush();
258  //assume max_nbuf=2 suffices; check after arity is computed
259  max_nbuf = 2;
260 
261  //account for temporary memory usage (set up a preliminary arity)
262  buf_arity = mm_avail/(2 * sz_stream);
263  long mm_overhead = buf_arity*sizeof(merge_key<Key>) +
264  max_nbuf * sizeof(em_buffer<T,Key>) +
265  2*sz_stream + max_nbuf*sz_stream;
266 
267  mm_overhead *= 8; //overestimate
268  cout << "EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
269  if (mm_overhead > mm_avail) {
270  cout << "overhead bigger than available memory"
271  << "increase -m and try again\n";
272  exit(1);
273  }
274  mm_avail -= mm_overhead;
275 
276 
277 #ifdef SAVE_MEMORY
278  //assign M/2 to pq
279  pqsize = mm_avail/(2*sizeof(T));
280  //assign M/2 to buff_0
281  bufsize = mm_avail/(2*sizeof(T));
282 #else
283  //assign M/4 to pq
284  pqsize = mm_avail/(4*sizeof(T));
285  //assign M/4 to buff_0
286  bufsize = mm_avail/(4*sizeof(T));
287 #endif
288 
289  cout << "EM_PQUEUE: pqsize set to " << pqsize << endl;
290  cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
291  cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
292 
293 
294  //assign M/2 to AMI_STREAMS and compute arity
295  /* arity is mainly constrained by the size of an AMI_STREAM; the
296  rest of the memory must accommodate for arity * max_nbuf
297  *sizeof(AMI_STREAM); there are some temporary stuff like arity *
298  sizeof(long) (the deleted array), arity * sizeof(T) (the array of
299  keys for merging) and so on, but the main factor is the
300  AMI_STREAM size which is roughly B * LBS * 2 (each AMI_STREAM
301  allocates 2 logical blocks) */
302 #ifdef SAVE_MEMORY
303  buf_arity = mm_avail/(2 * sz_stream);
304 #else
305  buf_arity = mm_avail/(2 * max_nbuf * sz_stream);
306 #endif
307 
308  //overestimate usage
309  if (buf_arity > 3) {
310  buf_arity -= 3;
311  } else {
312  buf_arity = 1;
313  }
314 
315  cout << "EM_PQUEUE: arity set to " << buf_arity << endl;
316 
317  crt_buf = 0;
318 
319  //initialize in memory pqueue
320  MEMORY_LOG("em_pqueue: allocating int pqueue\n");
321  pq = new MinMaxHeap<T>(pqsize);
322  assert(pq);
323 
324  //initialize in memory buffer
325  MEMORY_LOG("em_pqueue: allocating buff_0\n");
326  buff_0 = new im_buffer<T>(bufsize);
327  assert(buff_0);
328 
329  //allocate ext memory buffers array
330  char str[200];
331  sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
332  (long)max_nbuf);
333  MEMORY_LOG(str);
334  //allocate ext memory buffers array
335  buff = new em_buffer<T,Key>* [max_nbuf];
336  assert(buff);
337  for (unsigned short i=0; i<max_nbuf; i++) {
338  buff[i] = NULL;
339  }
340 
341  //max nb of items the structure can accommodate (constrained by max_nbuf)
342  cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
343  cout.flush();
344 
345  //check that structure can accommodate N elements
346  // assert(N < buf_arity * (buf_arity + 1) * bufsize);
347  //assert(N < maxlen());
348  mm_avail = getAvailableMemory();
349  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
350  mm_avail/(float)(1<<20));
351 }
352 
353 
354 #ifdef SAVE_MEMORY
355 //************************************************************/
356 // create an empq, initialize its pq with im and insert amis in
357 // buff[0]; im should not be used/deleted after that outside empq;
358 //
359 // assumption: im was allocated such that maxsize = mm_avail/T;
360 // when this constructor is called im is only half full, so we must
361 // free half of its space and give to buff_0
362 template<class T, class Key>
364  AMI_err ae;
365  int pqcapacity; /* amount of memory we can use for each of new
366  minmaxheap, and em-buffer */
367  unsigned int pqcurrentsize; /* number of elements currently in im */
368  assert(im && amis);
369 
370  pqcapacity = im->get_maxsize()/2; // we think this memory is now available
371  pqsize = pqcapacity + 1; //truncate errors
372  pqcurrentsize = im->size();
373  //assert( pqcurrentsize <= pqsize);
374  if(!(pqcurrentsize <= pqsize)) {
375  cout << "EMPQ: pq maxsize=" << pqsize <<", pq crtsize=" << pqcurrentsize
376  << "\n";
377  assert(0);
378  exit(1);
379  }
380 
381 
382  LOG_avail_memo();
383 
384  /* at this point im is allocated all memory, but it is only at most
385  half full; we need to relocate im to half space and to allocate
386  buff_0 the other half; since we use new, there is no realloc, so
387  we will copy to a file...*/
388 
389  {
390  //copy im to a stream and free its memory
391  T x;
392  AMI_STREAM<T> tmpstr;
393  for (unsigned int i=0; i<pqcurrentsize; i++) {
394  im->extract_min(x);
395  ae = tmpstr.write_item(x);
396  assert(ae == AMI_ERROR_NO_ERROR);
397  }
398  delete im; im = NULL;
399  LOG_avail_memo();
400 
401  //allocate pq and buff_0 half size
402  bufsize = pqcapacity;
403  cout << "EM_PQUEUE: allocating im_buffer size=" << bufsize
404  << " total " << (float)bufsize*sizeof(T)/(1<<20) << "MB\n";
405  cout.flush();
406  buff_0 = new im_buffer<T>(bufsize);
407  assert(buff_0);
408  cout << "EM_PQUEUE: allocating pq size=" << pqsize
409  << " total " << (float)pqcapacity*sizeof(T)/(1<<20) << "MB\n";
410  cout.flush();
411  pq = new MinMaxHeap<T>(pqsize);
412  assert(pq);
413 
414  //fill pq from tmp stream
415  ae = tmpstr.seek(0);
416  assert(ae == AMI_ERROR_NO_ERROR);
417  T *elt;
418  for (unsigned int i=0; i<pqcurrentsize; i++) {
419  ae = tmpstr.read_item(&elt);
420  assert(ae == AMI_ERROR_NO_ERROR);
421  pq->insert(*elt);
422  }
423  assert(pq->size() == pqcurrentsize);
424  }
425 
426  //estimate buf_arity
427  //AMI_STREAM memory usage
428  size_t sz_stream;
429  AMI_STREAM<T> dummy;
430  if ((ae = dummy.main_memory_usage(&sz_stream,
433  cout << "em_pqueue constructor: failing to get main_memory_usage\n";
434  exit(1);
435  }
436  cout << "EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
437  cout << "EM_PQUEUE: item size=" << sizeof(T) << endl;
438  //assume max_nbuf=2 suffices; check after arity is computed
439  max_nbuf = 2;
440  buf_arity = pqcapacity * sizeof(T) / sz_stream;
441  //should account for some overhead
442  if (buf_arity == 0) {
443  cout << "EM_PQUEUE: arity=0 (not enough memory..)\n";
444  exit(1);
445  }
446  if (buf_arity > 3) {
447  buf_arity -= 3;
448  } else {
449  buf_arity = 1;
450  }
451 
452  //added on 05/16/2005 by Laura
453  if (buf_arity > MAX_STREAMS_OPEN) {
454  buf_arity = MAX_STREAMS_OPEN;
455  }
456 
457  //allocate ext memory buffer array
458  char str[200];
459  sprintf(str,"em_pqueue: allocating array of %ld buff pointers\n",
460  (long)max_nbuf);
461  MEMORY_LOG(str);
462  buff = new em_buffer<T,Key>* [max_nbuf];
463  assert(buff);
464  for (unsigned short i=0; i<max_nbuf; i++) {
465  buff[i] = NULL;
466  }
467  crt_buf = 0;
468 
469  cout << "EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
470  cout << "EM_PQUEUE: bufsize set to " << bufsize << endl;
471  cout << "EM_PQUEUE: buf arity set to " << buf_arity << endl;
472  cout << "EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
473  cout << "EM_PQUEUE: maximum length is " << maxlen() << "\n";
474  cout.flush();
475 
476  //estimate available remaining memory
477  size_t mm_avail = getAvailableMemory();
478  printf("EM_PQUEUE: available memory after allocation: %.2fMB\n",
479  mm_avail/(float)(1<<20));
480 
481  //last thing: insert the input stream in external buffers
482  //allocate buffer if necessary
483  //assert(crt_buf==0 && !buff[0]);// given
484  if(amis->stream_len()) {
485  //create buff[0] as a level1 buffer
486  MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
487  buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity);
488  buff[0]->insert(amis);
489  crt_buf = 1;
490  }
491 }
492 
493 #endif
494 
495 
496 
497 //************************************************************/
498 //free space
499 template<class T, class Key>
501  //delete in memory pqueue
502  if (pq) {
503  delete pq; pq = NULL;
504  }
505  //delete in memory buffer
506  if (buff_0) {
507  delete buff_0; buff_0 = NULL;
508  }
509  //delete ext memory buffers
510  for (unsigned short i=0; i< crt_buf; i++) {
511  if (buff[i]) delete buff[i];
512  }
513  delete [] buff;
514 }
515 
516 
517 //************************************************************/
518 //return maximum capacity of i-th external buffer
519 template<class T, class Key>
520 long em_pqueue<T,Key>::maxlen(unsigned short i) {
521 
522  if (i >= max_nbuf) {
523  printf("em_pqueue::max_len: level=%d exceeds capacity=%d\n",
524  i, max_nbuf);
525  return 0;
526  }
527  if (i < crt_buf) {
528  return buff[i]->get_buf_maxlen();
529  }
530  //try allocating buffer
531  em_buffer<T,Key> * tmp = new em_buffer<T,Key>(i+1, bufsize, buf_arity);
532  if (!tmp) {
533  cout << "em_pqueue::max_len: cannot allocate\n";
534  return 0;
535  }
536  long len = tmp->get_buf_maxlen();
537  delete tmp;
538  return len;
539 }
540 
541 
542 //************************************************************/
543 //return maximum capacity of em_pqueue
544 template<class T, class Key>
546  long len = 0;
547  for (unsigned short i=0; i< max_nbuf; i++) {
548  len += maxlen(i);
549  }
550  return len + buff_0->get_buf_maxlen();
551 }
552 
553 
554 
555 //************************************************************/
556 //return the total nb of elements in the structure
557 template<class T, class Key>
558 unsigned long em_pqueue<T,Key>::size() {
559  //sum up the lengths(nb of elements) of the external buffers
560  unsigned long elen = 0;
561  for (unsigned short i=0; i < crt_buf; i++) {
562  elen += buff[i]->get_buf_len();
563  }
564  return elen + pq->size() + buff_0->get_buf_len();
565 }
566 
567 
568 //************************************************************/
569 //return true if empty
570 template<class T, class Key>
572 
573  //return (size() == 0);
574  //more efficient?
575  return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) &&
576  (size() == 0));
577 }
578 
579 
580 //************************************************************/
581 //called when pq must be filled from external buffers
582 template<class T, class Key>
584 
585 #ifndef NDEBUG
586  {
587  int k=0;
588  for (unsigned short i=0; i<crt_buf; i++) {
589  k |= buff[i]->get_buf_len();
590  }
591  if(!k) {
592  cerr << "fillpq called with empty external buff!" << endl;
593  }
594  assert(k);
595  }
596 #endif
597 
598 #ifdef EMPQ_PQ_FILL_PRINT
599  cout << "filling pq\n"; cout .flush();
600 #endif
601  XXX cerr << "filling pq" << endl;
602  MY_LOG_DEBUG_ID("fillpq");
603 
604  AMI_err ae;
605  {
606  char str[200];
607  sprintf(str, "em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
608  crt_buf);
609  MEMORY_LOG(str);
610  }
611  //merge pqsize smallest elements from each buffer into a new stream
612  ExtendedMergeStream** outstreams;
613  outstreams = new ExtendedMergeStream* [crt_buf];
614 
615  /* gets stream of smallest pqsize elts from each level */
616  for (unsigned short i=0; i< crt_buf; i++) {
617  MY_LOG_DEBUG_ID(crt_buf);
618  outstreams[i] = new ExtendedMergeStream();
619  assert(buff[i]->get_buf_len());
620  ae = merge_buffer(buff[i], outstreams[i], pqsize);
621  assert(ae == AMI_ERROR_NO_ERROR);
622  assert(outstreams[i]->stream_len());
623  //print_stream(outstreams[i]); cout.flush();
624  }
625 
626  /* merge above streams into pqsize elts in minstream */
627  if (crt_buf == 1) {
628  //just one level; make common case faster :)
629  merge_bufs2pq(outstreams[0]);
630  delete outstreams[0];
631  delete [] outstreams;
632  } else {
633  //merge the outstreams to get the global mins and delete them afterwards
634  ExtendedMergeStream *minstream = new ExtendedMergeStream();
635  //cout << "merging streams\n";
636  ae = merge_streams(outstreams, crt_buf, minstream, pqsize);
637  assert(ae == AMI_ERROR_NO_ERROR);
638  for (int i=0; i< crt_buf; i++) {
639  delete outstreams[i];
640  }
641  delete [] outstreams;
642 
643  //copy the minstream in the internal pqueue while merging with buff_0;
644  //the smallest <pqsize> elements between minstream and buff_0 will be
645  //inserted in internal pqueue;
646  //also, the elements from minstram which are inserted in pqueue must be
647  //marked as deleted in the source streams;
648  merge_bufs2pq(minstream);
649  delete minstream; minstream = NULL;
650  //cout << "after merge_bufs2pq: \n" << *this << "\n";
651  }
652 
653  XXX assert(pq->size());
654  XXX cerr << "fillpq done" << endl;
655  return true;
656 }
657 
658 
659 
660 //************************************************************/
661 //return the element with minimum priority in the structure
662 template<class T, class Key>
663 bool
665 
666  bool ok;
667 
668  MY_LOG_DEBUG_ID("em_pqueue::min");
669 
670  //try first the internal pqueue
671  if (!pq->empty()) {
672  ok = pq->min(elt);
673  assert(ok);
674  return ok;
675  }
676 
677  MY_LOG_DEBUG_ID("extract_min: reset pq");
678  pq->reset();
679 
680  //if external buffers are empty
681  if (crt_buf == 0) {
682  //if internal buffer is empty too, then nothing to extract
683  if (buff_0->is_empty()) {
684  //cerr << "min called on empty empq" << endl;
685  return false;
686  } else {
687 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
688  cout << "filling pq from B0\n"; cout.flush();
689 #endif
690  //ext. buffs empty; fill int pqueue from buff_0
691  long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
692  buff_0->reset(pqsize, n);
693  ok = pq->min(elt);
694  assert(ok);
695  return true;
696  }
697  } else {
698  //external buffers are not empty;
699  //called when pq must be filled from external buffers
700  XXX print_size();
701  fillpq();
702  XXX cerr << "fillpq done; about to take min" << endl;
703  ok = pq->min(elt);
704  XXX cerr << "after taking min" << endl;
705  assert(ok);
706  return ok;
707  } //end of else (if ext buffers are not empty)
708 
709  assert(0); //not reached
710 }
711 
712 
713 
714 //************************************************************/
715 template<class T,class Key>
716 static void print_ExtendedMergeStream(ExtendedMergeStream &str) {
717 
719  str.seek(0);
720  while (str.read_item(&x) == AMI_ERROR_NO_ERROR) {
721  cout << *x << ", ";
722  }
723  cout << "\n";
724 }
725 
726 
727 //************************************************************/
728 //delete the element with minimum priority in the structure;
729 //return false if pq is empty
730 template<class T, class Key>
732 
733  bool ok;
734 
735  MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_MIN");
736 
737  //try first the internal pqueue
738  if (!pq->empty()) {
739  //cout << "extract from internal pq\n";
740  MY_LOG_DEBUG_ID("extract from internal pq");
741  ok = pq->extract_min(elt);
742  assert(ok);
743  return ok;
744  }
745 
746  //if internal pqueue is empty, fill it up
747  MY_LOG_DEBUG_ID("internal pq empty: filling it up from external buffers");
748  MY_LOG_DEBUG_ID("extract_min: reset pq");
749  pq->reset();
750 
751  //if external buffers are empty
752  if (crt_buf == 0) {
753  //if internal buffer is empty too, then nothing to extract
754  if (buff_0->is_empty()) {
755  return false;
756  } else {
757 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0
758  cout << "filling pq from B0\n"; cout.flush();
759 #endif
760  MY_LOG_DEBUG_ID("filling pq from buff_0");
761  //ext. buffs empty; fill int pqueue from buff_0
762  long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
763  buff_0->reset(pqsize, n);
764  ok = pq->extract_min(elt);
765  assert(ok);
766  return true;
767  }
768  } else {
769  //external buffers are not empty;
770  //called when pq must be filled from external buffers
771  MY_LOG_DEBUG_ID("filling pq from buffers");
772 #ifdef EMPQ_PRINT_SIZE
773  long x = size(), y;
774  y = x*sizeof(T) >> 20;
775  cout << "pqsize:[" << active_streams() << " streams: ";
777  cout << " total " << x << "(" << y << "MB)]" << endl;
778  cout.flush();
779 #endif
780  fillpq();
781  MY_LOG_DEBUG_ID("pq filled");
782  XXX cerr << "about to get the min" << endl;
783  assert(pq);
784  ok = pq->extract_min(elt);
785  if (!ok) {
786  cout << "failing assertion: pq->extract_min == true\n";
787  this->print();
788  assert(ok);
789  }
790 
791  return ok;
792  } //end of else (if ext buffers are not empty)
793 
794  assert(0); //not reached
795 }
796 
797 
798 
799 //************************************************************/
800 //extract all elts with min key, add them and return their sum
801 //delete the element with minimum priority in the structure;
802 //return false if pq is empty
803 template<class T, class Key>
805  //cout << "em_pqueue::extract_min_all(T): sorry not implemented\n";
806  //exit(1);
807 
808  T next_elt;
809  bool done = false;
810 
811  MY_LOG_DEBUG_ID("\n\nEM_PQUEUE::EXTRACT_ALL_MIN");
812 
813  //extract first elt
814  if (!extract_min(elt)) {
815  return false;
816  } else {
817  while (!done) {
818  //peek at the next min elt to see if matches
819  if ((!min(next_elt)) ||
820  !(next_elt.getPriority() == elt.getPriority())) {
821  done = true;
822  } else {
823  extract_min(next_elt);
824  elt = elt + next_elt;
825 
826  MY_LOG_DEBUG_ID("EXTRACT_ALL_MIN: adding " );
827  MY_LOG_DEBUG_ID(elt);
828  }
829  }
830  }
831 
832 #ifdef EMPQ_PRINT_EXTRACTALL
833  cout << "EXTRACTED: " << elt << endl; cout.flush();
834 #endif
835 #ifdef EMPQ_PRINT_EMPQ
836  this->print();
837  cout << endl;
838 #endif
839  return true;
840 
841 }
842 
843 
844 
845 
846 //************************************************************/
847 //copy the minstream in the internal pqueue while merging with buff_0;
848 //the smallest <pqsize> elements between minstream and buff_0 will be
849 //inserted in internal pqueue;
850 //also, the elements from minstram which are inserted in pqueue must be
851 //marked as deleted in the source streams;
852 template<class T, class Key>
854 
855  //cout << "bufs2pq: \nminstream: "; print_stream(minstream);
856  MY_LOG_DEBUG_ID("merge_bufs2pq: enter");
857 
858  AMI_err ae;
859 
860  //sort the internal buffer
861  buff_0->sort();
862  //cout << "bufs2pq: \nbuff0: " << *buff_0 << endl;
863 
864  ae = minstream->seek(0); //rewind minstream
865  assert(ae == AMI_ERROR_NO_ERROR);
866 
867  bool strEmpty= false, bufEmpty=false;
868 
869  unsigned int bufPos = 0;
871  T bufElt, strElt;
872 
873  ae = minstream->read_item(&strItem);
874  if (ae == AMI_ERROR_END_OF_STREAM) {
875  strEmpty = true;
876  } else {
877  assert(ae == AMI_ERROR_NO_ERROR);
878  }
879  if (bufPos < buff_0->get_buf_len()) {
880  bufElt = buff_0->get_item(bufPos);
881  } else {
882  //cout << "buff0 empty\n";
883  bufEmpty = true;
884  }
885 
886  XXX cerr << "pqsize=" << pqsize << endl;
887  XXX if(strEmpty) cerr << "stream is empty!!" << endl;
888  for (unsigned int i=0; i< pqsize; i++) {
889 
890  if (!bufEmpty) {
891  if ((!strEmpty) && (strElt = strItem->elt(),
892  bufElt.getPriority() > strElt.getPriority())) {
893  delete_str_elt(strItem->buffer_id(), strItem->stream_id());
894  pq->insert(strElt);
895  ae = minstream->read_item(&strItem);
896  if (ae == AMI_ERROR_END_OF_STREAM) {
897  strEmpty = true;
898  } else {
899  assert(ae == AMI_ERROR_NO_ERROR);
900  }
901 
902  } else {
903  bufPos++;
904  pq->insert(bufElt);
905  if (bufPos < buff_0->get_buf_len()) {
906  bufElt = buff_0->get_item(bufPos);
907  } else {
908  bufEmpty = true;
909  }
910  }
911  } else {
912  if (!strEmpty) { //stream not empty
913  strElt = strItem->elt();
914  //cout << "i=" << i << "str & buff empty\n";
915  delete_str_elt(strItem->buffer_id(), strItem->stream_id());
916  pq->insert(strElt);
917  //cout << "insert " << strElt << "\n";
918  ae = minstream->read_item(&strItem);
919  if (ae == AMI_ERROR_END_OF_STREAM) {
920  strEmpty = true;
921  } else {
922  assert(ae == AMI_ERROR_NO_ERROR);
923  }
924  } else { //both empty: < pqsize items
925  break;
926  }
927  }
928  }
929 
930  //shift left buff_0 in case elements were deleted from the beginning
931  buff_0->shift_left(bufPos);
932 
933  MY_LOG_DEBUG_ID("pq filled");
934 #ifdef EMPQ_PQ_FILL_PRINT
935  cout << "merge_bufs2pq: pq filled; now cleaning\n"; cout .flush();
936 #endif
937  //this->print();
938  //clean buffers in case some streams have been emptied
939  cleanup();
940 
941  MY_LOG_DEBUG_ID("merge_bufs2pq: done");
942 }
943 
944 
945 
946 //************************************************************/
947 //deletes one element from <buffer, stream>
948 template<class T, class Key>
949 void em_pqueue<T,Key>::delete_str_elt(unsigned short buf_id,
950  unsigned int stream_id) {
951 
952  //check them
953  assert(buf_id < crt_buf);
954  assert(stream_id < buff[buf_id]->get_nbstreams());
955  //update;
956  buff[buf_id]->incr_deleted(stream_id);
957 
958 }
959 
960 
961 //************************************************************/
962 //clean buffers in case some streams have been emptied
963 template<class T, class Key>
965 
966  MY_LOG_DEBUG_ID("em_pqueue::cleanup()");
967 #ifdef EMPQ_PQ_FILL_PRINT
968  cout << "em_pqueue: cleanup enter\n"; cout .flush();
969 #endif
970  //adjust buffers in case whole streams got deleted
971  for (unsigned short i=0; i< crt_buf; i++) {
972  //cout << "clean buffer " << i << ": "; cout.flush();
973  buff[i]->cleanup();
974  }
975  if (crt_buf) {
976  short i = crt_buf-1;
977  while ((i>=0) && buff[i]->is_empty()) {
978  crt_buf--;
979  i--;
980  }
981  }
982 #ifdef EMPQ_PQ_FILL_PRINT
983  cout << "em_pqueue: cleanup done\n"; cout .flush();
984 #endif
985  //if a stream becomes too short move it on previous level
986  //to be added..
987  //cout <<"done cleaning up\n";
988 }
989 
990 
991 //************************************************************/
992 //insert an element; return false if insertion fails
993 template<class T, class Key>
994 bool em_pqueue<T,Key>::insert(const T& x) {
995  bool ok;
996 #ifdef EMPQ_ASSERT_EXPENSIVE
997  long init_size = size();
998 #endif
999  T val = x;
1000 
1001  MY_LOG_DEBUG_ID("\nEM_PQUEUE::INSERT");
1002  //if structure is empty insert x in pq; not worth the trouble..
1003  if ((crt_buf == 0) && (buff_0->is_empty())) {
1004  if (!pq->full()) {
1005  MY_LOG_DEBUG_ID("insert in pq");
1006  pq->insert(x);
1007  return true;
1008  }
1009  }
1010  if (!pq->empty()) {
1011  T pqmax;
1012 
1013  ok = pq->max(pqmax);
1014  assert(ok);
1015  // cout << "insert " << x << " max: " << pqmax << "\n";
1016  if (x <= pqmax) {
1017  //if x is smaller then pq_max and pq not full, insert x in pq
1018  if (!pq->full()) {
1019 #ifdef EMPQ_ASSERT_EXPENSIVE
1020  assert(size() == init_size);
1021 #endif
1022  pq->insert(x);
1023  return true;
1024  } else {
1025  //if x is smaller then pq_max and pq full, exchange x with pq_max
1026  pq->extract_max(val);
1027  pq->insert(x);
1028  //cout << "max is: " << val << endl;
1029  }
1030  }
1031  }
1032  /* at this point, x >= pqmax.
1033  we need to insert val==x or val==old max.
1034  */
1035 
1036  //if buff_0 full, empty it
1037 #ifdef EMPQ_ASSERT_EXPENSIVE
1038  assert(size() == init_size);
1039 #endif
1040  if (buff_0->is_full()) {
1041 #ifdef EMPQ_PRINT_SIZE
1042  long x = size(), y;
1043  y = x*sizeof(T) >> 20;
1044  cout << "pqsize:[" << active_streams() << " streams: ";
1046  cout << " total " << x << "(" << y << "MB)]" << endl;
1047  cout.flush();
1048 #endif
1049  empty_buff_0();
1050  }
1051 #ifdef EMPQ_ASSERT_EXPENSIVE
1052  assert(size() == init_size);
1053 #endif
1054  //insert x in buff_0
1055  assert(!buff_0->is_full());
1056  MY_LOG_DEBUG_ID("insert in buff_0");
1057  ok = buff_0->insert(val);
1058  assert(ok);
1059 
1060 #ifdef EMPQ_PRINT_INSERT
1061  cout << "INSERTED: " << x << endl; cout.flush();
1062 #endif
1063 #ifdef EMPQ_PRINT_EMPQ
1064  this->print();
1065  cout << endl;
1066 #endif
1067  MY_LOG_DEBUG_ID("EM_PQUEUE: INSERTED");
1068  return true;
1069 }
1070 
1071 
1072 //************************************************************/
1073 /* called when buff_0 is full to empty it on external level_1 buffer;
1074  can produce cascading emptying
1075 */
1076 template<class T, class Key> bool
1078 #ifdef EMPQ_ASSERT_EXPENSIVE
1079  long init_size = size();
1080 #endif
1081 
1082 #ifdef EMPQ_EMPTY_BUF_PRINT
1083  cout << "emptying buff_0\n"; cout.flush();
1084  print_size();
1085 #endif
1086  MY_LOG_DEBUG_ID("empty buff 0");
1087 
1088  assert(buff_0->is_full());
1089 
1090  //sort the buffer
1091  buff_0->sort();
1092  //cout << "sorted buff_0: \n" << *buff_0 << "\n";
1093 #ifdef EMPQ_ASSERT_EXPENSIVE
1094  assert(size() == init_size);
1095 #endif
1096  //allocate buffer if necessary
1097  if (!buff[0]) { // XXX should check crt_buf
1098  //create buff[0] as a level1 buffer
1099  MEMORY_LOG("em_pqueue::empty_buff_0: create new em_buffer\n");
1100  buff[0] = new em_buffer<T,Key>(1, bufsize, buf_arity);
1101  }
1102  //check that buff_0 fills exactly a stream of buff[0]
1103  assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
1104 
1105  //save buff_0 to stream
1106  MY_LOG_DEBUG_ID("empty buff_0 to stream");
1107  AMI_STREAM<T>* buff_0_str = buff_0->save2str();
1108  assert(buff_0_str);
1109  //MY_LOG_DEBUG_ID("buff_0 emptied");
1110 
1111  //reset buff_0
1112  buff_0->reset();
1113  MY_LOG_DEBUG_ID("buf_0 now reset");
1114 
1115 #ifdef EMPQ_ASSERT_EXPENSIVE
1116  assert(size() + buff_0->maxlen() == init_size);
1117 #endif
1118 
1119  //copy data from buff_0 to buff[0]
1120  if (buff[0]->is_full()) {
1121  //if buff[0] full, empty it recursively
1122  empty_buff(0);
1123  }
1124  buff[0]->insert(buff_0_str);
1125  MY_LOG_DEBUG_ID("stream inserted in buff[0]");
1126 
1127  //update the crt_buf pointer if necessary
1128  if (crt_buf == 0) crt_buf = 1;
1129 
1130 #ifdef EMPQ_ASSERT_EXPENSIVE
1131  assert(size() == init_size);
1132 #endif
1133 
1134  return true;
1135 }
1136 
1137 
1138 //************************************************************/
1139 /* sort and empty buff[i] into buffer[i+1] recursively; called
1140  by empty_buff_0() to empty subsequent buffers; i must
1141  be a valid (i<crt_buf) full buffer;
1142 */
1143 template<class T, class Key>
1144 void
1145 em_pqueue<T,Key>::empty_buff(unsigned short i) {
1146 
1147 #ifdef EMPQ_ASSERT_EXPENSIVE
1148  long init_size = size();
1149 #endif
1150 #ifdef EMPQ_EMPTY_BUF_PRINT
1151  cout << "emptying buffer_" << i << "\n"; cout.flush();
1152  print_size();
1153 #endif
1154  MY_LOG_DEBUG_ID("empty buff ");
1155  MY_LOG_DEBUG_ID(i);
1156 
1157  //i must be a valid, full buffer
1158  assert(i<crt_buf);
1159  assert(buff[i]->is_full());
1160 
1161  //check there is space to empty to
1162  if (i == max_nbuf-1) {
1163  cerr << "empty_buff:: cannot empty further - structure is full..\n";
1164  print_size();
1165  cerr << "ext buff array should reallocate in a future version..\n";
1166  exit(1);
1167  }
1168 
1169  //create next buffer if necessary
1170  if (!buff[i+1]) {
1171  //create buff[i+1] as a level-(i+2) buffer
1172  char str[200];
1173  sprintf(str, "em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
1174  MEMORY_LOG(str);
1175  buff[i+1] = new em_buffer<T,Key>(i+2, bufsize, buf_arity);
1176  }
1177  assert(buff[i+1]);
1178  //check that buff[i] fills exactly a stream of buff[i+1];
1179  //extraneous (its checked in insert)
1180  //assert(buff[i]->len() == buff[i+1]->streamlen());
1181 
1182  //sort the buffer into a new stream
1183  MY_LOG_DEBUG_ID("sort buffer ");
1184  AMI_STREAM<T>* sorted_buf = buff[i]->sort();
1185 
1186  //assert(sorted_buf->stream_len() == buff[i]->len());
1187  //this is just for debugging
1188  if (sorted_buf->stream_len() != buff[i]->get_buf_len()) {
1189  cout << "sorted_stream_len: " << sorted_buf->stream_len()
1190  << " , bufflen: " << buff[i]->get_buf_len() << endl; cout.flush();
1191  AMI_err ae;
1192  ae = sorted_buf->seek(0);
1193  assert(ae == AMI_ERROR_NO_ERROR);
1194  T *x;
1195  while (sorted_buf->read_item(&x) == AMI_ERROR_NO_ERROR) {
1196  cout << *x << ", "; cout.flush();
1197  }
1198  cout << "\n";
1199 #ifdef EMPQ_ASSERT_EXPENSIVE
1200  assert(sorted_buf->stream_len() == buff[i]->len());
1201 #endif
1202  }
1203 #ifdef EMPQ_ASSERT_EXPENSIVE
1204  assert(size() == init_size);
1205 #endif
1206  //reset buff[i] (delete all its streams )
1207  buff[i]->reset();
1208 #ifdef EMPQ_ASSERT_EXPENSIVE
1209  assert(size() == init_size - sorted_buf->stream_len());
1210 #endif
1211 
1212 
1213  //link sorted buff[i] as a substream into buff[i+1];
1214  //sorted_buf is a new stream, so it starts out with 0 deleted elements;
1215  //of ocurse, its length might be smaller than nominal;
1216  if (buff[i+1]->is_full()) {
1217  empty_buff(i+1);
1218  }
1219  buff[i+1]->insert(sorted_buf, 0);
1220 
1221  //update the crt_buf pointer if necessary
1222  if (crt_buf < i+2) crt_buf = i+2;
1223 
1224 #ifdef EMPQ_ASSERT_EXPENSIVE
1225  assert(size() == init_size);
1226 #endif
1227 }
1228 
1229 
1230 //************************************************************/
1231 /* merge the first <K> elements of the streams of input buffer,
1232  starting at position <buf.deleted[i]> in each stream; there are
1233  <buf.arity> streams in total; write output in <outstream>; the
1234  items written in outstream are of type <merge_output_type> which
1235  extends T with the stream nb and buffer nb the item comes from;
1236  this information is needed later to distribute items back; do not
1237  delete the K merged elements from the input streams; <bufid> is the
1238  id of the buffer whose streams are being merged;
1239 
1240  the input streams are assumed sorted in increasing order of keys;
1241 */
1242 template<class T, class Key>
1243 AMI_err
1245  ExtendedMergeStream *outstream, long K) {
1246  long* bos = buf->get_bos();
1247  /* buff[0] is a level-1 buffer and so on */
1248  unsigned short bufid = buf->get_level() - 1;
1249  /* Pointers to current leading elements of streams */
1250  unsigned int arity = buf->get_nbstreams();
1251  AMI_STREAM<T>** instreams = buf->get_streams();
1252  std::vector<T*> in_objects(arity);
1253  AMI_err ami_err;
1254  unsigned int i, j;
1255 
1256  MY_LOG_DEBUG_ID("merge_buffer ");
1257  MY_LOG_DEBUG_ID(buf->get_level());
1258 
1259  assert(outstream);
1260  assert(instreams);
1261  assert(buf->get_buf_len());
1262  assert(K>0);
1263 
1264  //array initialized with first key from each stream (only non-null keys
1265  //must be included)
1266  MEMORY_LOG("em_pqueue::merge_buffer: allocate keys array\n");
1267  merge_key<Key>* keys = new merge_key<Key> [arity];
1268 
1269  /* count number of non-empty runs */
1270  j = 0;
1271  /* rewind and read the first item from every stream */
1272  for (i = 0; i < arity ; i++ ) {
1273  assert(instreams[i]);
1274  //rewind stream
1275  if ((ami_err = instreams[i]->seek(bos[i])) != AMI_ERROR_NO_ERROR) {
1276  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1277  return ami_err;
1278  }
1279  /* read first item */
1280  ami_err = instreams[i]->read_item(&(in_objects[i]));
1281  switch(ami_err) {
1283  in_objects[i] = NULL;
1284  break;
1285  case AMI_ERROR_NO_ERROR:
1286  //cout << "stream " << i << " read " << *in_objects[i] << "\n";
1287  //cout.flush();
1288  // include this key in the array of keys
1289  keys[j] = merge_key<Key>(in_objects[i]->getPriority(), i);
1290  // cout << "key " << j << "set to " << keys[j] << "\n";
1291  j++;
1292  break;
1293  default:
1294  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1295  return ami_err;
1296  }
1297  }
1298  unsigned int NonEmptyRuns = j;
1299  // cout << "nonempyruns = " << NonEmptyRuns << "\n";
1300 
1301  //build heap from the array of keys
1302  pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
1303 
1304  //cout << "heap is : " << mergeheap << "\n";
1305  //repeatedly extract_min from heap and insert next item from same stream
1306  long extracted = 0;
1307  //rewind output buffer
1308  ami_err = outstream->seek(0);
1309  assert(ami_err == AMI_ERROR_NO_ERROR);
1311  while (!mergeheap.empty() && (extracted < K)) {
1312  //find min key and id of stream it comes from
1313  i = mergeheap.min().stream_id();
1314  //write min item to output stream
1315  out = ExtendedEltMergeType<T,Key>(*in_objects[i], bufid, i);
1316  if ((ami_err = outstream->write_item(out)) != AMI_ERROR_NO_ERROR) {
1317  cerr << "WARNING!!! EARLY EXIT!!!" << endl;
1318  return ami_err;
1319  }
1320  //cout << "wrote " << out << "\n";
1321  extracted++; //update nb of extracted elements
1322  //read next item from same input stream
1323  ami_err = instreams[i]->read_item(&(in_objects[i]));
1324  switch(ami_err) {
1326  mergeheap.delete_min();
1327  break;
1328  case AMI_ERROR_NO_ERROR:
1329  //extract the min from the heap and insert next key from the
1330  //same stream
1331  {
1332  Key k = in_objects[i]->getPriority();
1333  mergeheap.delete_min_and_insert(merge_key<Key>(k, i));
1334  }
1335  break;
1336  default:
1337  cerr << "WARNING!!! early breakout!!!" << endl;
1338  return ami_err;
1339  }
1340  //cout << "PQ: " << mergeheap << "\n";
1341  } //while
1342 
1343  //delete [] keys;
1344  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1345  //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
1346  //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
1347  //DESTRUCTOR, AND EVERYTHING SCREWS UP..
1348 
1349  buf->put_streams();
1350  MY_LOG_DEBUG_ID("merge_buffer: done");
1351  //cout << "done merging buffer\n";
1352 
1353  assert(extracted == outstream->stream_len());
1354  assert(extracted); // something in, something out
1355  return AMI_ERROR_NO_ERROR;
1356 }
1357 
1358 
1359 
1360 //************************************************************/
1361 /* merge the first <K> elements of the input streams; there are <arity>
1362  streams in total; write output in <outstream>;
1363 
1364  the input streams are assumed sorted in increasing order of their
1365  keys;
1366 */
1367 template<class T, class Key>
1368 AMI_err
1370  unsigned short arity,
1371  ExtendedMergeStream *outstream, long K) {
1372 
1373  MY_LOG_DEBUG_ID("enter merge_streams");
1374  assert(arity> 1);
1375 
1376  //Pointers to current leading elements of streams
1377  std::vector<ExtendedEltMergeType<T,Key> > in_objects(arity);
1378 
1379  AMI_err ami_err;
1380  //unsigned int i;
1381  unsigned int nonEmptyRuns=0; //count number of non-empty runs
1382 
1383  //array initialized with first element from each stream (only non-null keys
1384  //must be included)
1385  MEMORY_LOG("em_pqueue::merge_streams: allocate keys array\n");
1386 
1387  merge_key<Key>* keys = new merge_key<Key> [arity];
1388  assert(keys);
1389 
1390  //rewind and read the first item from every stream
1391  for (int i = 0; i < arity ; i++ ) {
1392  //rewind stream
1393  if ((ami_err = instreams[i]->seek(0)) != AMI_ERROR_NO_ERROR) {
1394  return ami_err;
1395  }
1396  //read first item
1398  ami_err = instreams[i]->read_item(&objp);
1399  switch(ami_err) {
1400  case AMI_ERROR_NO_ERROR:
1401  in_objects[i] = *objp;
1402  keys[nonEmptyRuns] = merge_key<Key>(in_objects[i].getPriority(), i);
1403  nonEmptyRuns++;
1404  break;
1406  break;
1407  default:
1408  return ami_err;
1409  }
1410  }
1411  assert(nonEmptyRuns <= arity);
1412 
1413  //build heap from the array of keys
1414  pqheap_t1<merge_key<Key> > mergeheap(keys, nonEmptyRuns); /* takes ownership of keys */
1415 
1416  //repeatedly extract_min from heap and insert next item from same stream
1417  long extracted = 0;
1418  //rewind output buffer
1419  ami_err = outstream->seek(0);
1420  assert(ami_err == AMI_ERROR_NO_ERROR);
1421 
1422  while (!mergeheap.empty() && (extracted < K)) {
1423  //find min key and id of stream it comes from
1424  int id = mergeheap.min().stream_id();
1425  //write min item to output stream
1426  assert(id < nonEmptyRuns);
1427  assert(id >= 0);
1428  assert(mergeheap.size() == nonEmptyRuns);
1429  ExtendedEltMergeType<T,Key> obj = in_objects[id];
1430  if ((ami_err = outstream->write_item(obj)) != AMI_ERROR_NO_ERROR) {
1431  return ami_err;
1432  }
1433  //cout << "wrote " << *in_objects[i] << "\n";
1434 
1435  //extract the min from the heap and insert next key from same stream
1436  assert(id < nonEmptyRuns);
1437  assert(id >= 0);
1439  ami_err = instreams[id]->read_item(&objp);
1440  switch(ami_err) {
1441  case AMI_ERROR_NO_ERROR:
1442  {
1443  in_objects[id] = *objp;
1444  merge_key<Key> tmp = merge_key<Key>(in_objects[id].getPriority(), id);
1445  mergeheap.delete_min_and_insert(tmp);
1446  }
1447  extracted++; //update nb of extracted elements
1448  break;
1450  mergeheap.delete_min();
1451  break;
1452  default:
1453  return ami_err;
1454  }
1455  } //while
1456 
1457  //delete [] keys;
1458  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1459  //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT)
1460  //IF I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY
1461  //DESTRUCTOR, AND EVERYTHING SCREWS UP..
1462 
1463  MY_LOG_DEBUG_ID("merge_streams: done");
1464  return AMI_ERROR_NO_ERROR;
1465 }
1466 
1467 
1468 //************************************************************/
1469 template<class T, class Key>
1470 void
1472  pq->clear();
1473  buff_0->clear();
1474 
1475  for(int i=0; i<crt_buf; i++) {
1476  if(buff[i]) {
1477  delete buff[i]; buff[i] = NULL;
1478  }
1479  }
1480  crt_buf = 0;
1481 }
1482 
1483 
1484 //************************************************************/
1485 template<class T, class Key>
1486 void
1488  cout << "EM_PQ: [pq=" << pqsize
1489  << ", b=" << bufsize
1490  << ", bufs=" << max_nbuf
1491  << ", ar=" << buf_arity << "]\n";
1492 
1493  cout << "PQ: ";
1494  //pq->print_range();
1495  pq->print();
1496  cout << endl;
1497 
1498  cout << "B0: ";
1499  // buff_0->print_range();
1500  buff_0->print();
1501  cout << "\n";
1502 
1503  for (unsigned short i=0; i < crt_buf; i++) {
1504  cout << "B" << i+1 << ": ";
1505  buff[i]->print_range();
1506  cout << endl;
1507  }
1508  cout.flush();
1509 }
1510 
1511 
1512 
1513 //************************************************************/
1514 template<class T, class Key>
1515 void
1517  cout << "EM_PQ: [pq=" << pqsize
1518  << ", b=" << bufsize
1519  << ", bufs=" << max_nbuf
1520  << ", ar=" << buf_arity << "]\n";
1521 
1522  cout << "PQ: ";
1523  pq->print();
1524  cout << endl;
1525 
1526  cout << "B0: ";
1527  buff_0->print();
1528  cout << "\n";
1529 
1530  for (unsigned short i=0; i < crt_buf; i++) {
1531  cout << "B" << i+1 << ": " << endl;
1532  buff[i]->print();
1533  cout << endl;
1534  }
1535  cout.flush();
1536 }
1537 
1538 //************************************************************/
1539 template<class T, class Key>
1541  //sum up the lengths(nb of elements) of the external buffers
1542  long elen = 0;
1543  cout << "EMPQ: pq=" << pq->size() <<",B0=" << buff_0->get_buf_len() << endl;
1544  cout.flush();
1545  for (unsigned short i=0; i < crt_buf; i++) {
1546  assert(buff[i]);
1547  cout << "B_" << i+1 << ":"; cout.flush();
1548  buff[i]->print_stream_sizes();
1549  elen += buff[i]->get_buf_len();
1550  //cout << endl; cout.flush();
1551  }
1552  cout << "total: " << elen + pq->size() + buff_0->get_buf_len() << endl << endl;
1553  cout.flush();
1554 }
1555 
1556 
1557 
1558 /*****************************************************************/
1559 template<class T,class Key>
1561  for (unsigned short i=0; i< crt_buf; i++) {
1562  cout << "[";
1563  buff[i]->print_stream_sizes();
1564  cout << "]";
1565  }
1566  cout.flush();
1567 }
1568 
1569 #undef XXX
1570 
1571 #endif
unsigned long get_buf_maxlen()
Definition: embuffer.h:333
bool empty(void)
Definition: pqheap.h:315
unsigned long size()
Definition: empq_impl.h:558
AMI_err read_item(T **elt)
Definition: ami_stream.h:543
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:494
friend ostream & operator<<(ostream &s, const ExtendedEltMergeType< T, Key > &elt)
Definition: empq_impl.h:99
bool insert(const T &elt)
Definition: empq_impl.h:994
AMI_err merge_buffer(em_buffer< T, Key > *buf, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1244
bool is_full()
Definition: empq.h:200
size_t getAvailableMemory()
Definition: mm_utils.cpp:54
bool extract_min(T &elt)
Definition: empq_impl.h:731
AMI_err write_item(const T &elt)
Definition: ami_stream.h:606
void empty_buff(unsigned short i)
Definition: empq_impl.h:1145
void LOG_avail_memo()
Definition: mm_utils.cpp:47
unsigned short buffer_id() const
Definition: empq_impl.h:89
#define NULL
Definition: ccmath.h:32
#define x
#define MAX_STREAMS_OPEN
Definition: ami_stream.h:65
friend int operator>(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:114
unsigned int size(void) const
Definition: pqheap.h:146
void print_size()
Definition: empq_impl.h:1540
bool extract_all_min(T &elt)
Definition: empq_impl.h:804
bool extract_min(T &elt)
Definition: minmaxheap.h:540
void delete_min_and_insert(const T &x)
Definition: pqheap.h:526
friend int operator==(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:126
AMI_STREAM< T > ** get_streams()
Definition: embuffer.h:705
AMI_err seek(off_t offset)
Definition: ami_stream.h:460
void cleanup()
Definition: empq_impl.h:964
bool min(T &elt)
Definition: empq_impl.h:664
long maxlen()
Definition: empq_impl.h:545
#define ExtendedMergeStream
Definition: empq.h:58
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
Definition: empq_impl.h:949
#define assert(condition)
Definition: lz4.c:324
#define MY_LOG_DEBUG_ID(x)
Definition: empq_impl.h:54
#define XXX
Definition: empq_impl.h:52
void put_streams()
Definition: embuffer.h:732
void MEMORY_LOG(const std::string &str)
Definition: mm_utils.cpp:61
bool is_empty()
Definition: empq_impl.h:571
void merge_bufs2pq(ExtendedMergeStream *minstream)
Definition: empq_impl.h:853
unsigned long get_buf_len()
Definition: embuffer.h:324
Key getPriority() const
Definition: empq_impl.h:95
bool min(T &elt)
Definition: pqheap.h:329
void clear()
Definition: empq_impl.h:1471
bool delete_min()
Definition: pqheap.h:451
HeapIndex size() const
Definition: minmaxheap.h:117
unsigned int get_nbstreams() const
Definition: embuffer.h:288
unsigned int stream_id() const
Definition: empq_impl.h:92
friend int operator!=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:122
AMI_err
Definition: ami_stream.h:86
void print_stream_sizes()
Definition: empq_impl.h:1560
ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
Definition: empq_impl.h:76
~em_pqueue()
Definition: empq_impl.h:500
bool fillpq()
Definition: empq_impl.h:583
AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity, ExtendedMergeStream *outstr, long K)
Definition: empq_impl.h:1369
friend int operator>=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
Definition: empq_impl.h:118
unsigned short get_level() const
Definition: embuffer.h:260
void print_range()
Definition: empq_impl.h:1487
off_t stream_len(void)
Definition: ami_stream.h:388
long * get_bos() const
Definition: embuffer.h:276
HeapIndex get_maxsize() const
Definition: minmaxheap.h:747
bool empty_buff_0()
Definition: empq_impl.h:1077
void print()
Definition: empq_impl.h:1516
int active_streams()
Definition: empq.h:252