GRASS GIS 8 Programmer's Manual  8.2.2dev(2023)-3d2c704037
replacementHeap.h
Go to the documentation of this file.
1 
2 /****************************************************************************
3  *
4  * MODULE: iostream
5  *
6 
7  * COPYRIGHT (C) 2007 Laura Toma
8  *
9  *
10 
11  * Iostream is a library that implements streams, external memory
12  * sorting on streams, and an external memory priority queue on
13  * streams. These are the fundamental components used in external
14  * memory algorithms.
15 
16  * Credits: The library was developed by Laura Toma. The kernel of
17  * class STREAM is based on the similar class existent in the GPL TPIE
18  * project developed at Duke University. The sorting and priority
19  * queue have been developed by Laura Toma based on communications
20  * with Rajiv Wickremesinghe. The library was developed as part of
21  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
22  * Rajiv Wickremesinghe as part of the Terracost project.
23 
24  *
25  * This program is free software; you can redistribute it and/or modify
26  * it under the terms of the GNU General Public License as published by
27  * the Free Software Foundation; either version 2 of the License, or
28  * (at your option) any later version.
29  *
30 
31  * This program is distributed in the hope that it will be useful,
32  * but WITHOUT ANY WARRANTY; without even the implied warranty of
33  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
34  * General Public License for more details. *
35  * **************************************************************************/
36 
37 #ifndef REPLACEMENT_QUEUE_H
38 #define REPLACEMENT_QUEUE_H
39 
40 #include <assert.h>
41 
42 #include "queue.h"
43 
44 #define RHEAP_DEBUG if(0)
45 
46 
47 
48 /*****************************************************************/
49 /* encapsulation of the element and the run it comes from;
50  */
51 template<class T>
52 class HeapElement {
53 public:
54  T value;
56 
57  HeapElement(): run(NULL) {};
58 
59  friend ostream& operator << (ostream& s, const HeapElement &p) {
60  return s << "[" << p.value << "]";
61  };
62 };
63 
64 
65 
66 
67 
68 
69 /*****************************************************************/
70 /*
71 This is a heap of HeapElements, i.e. elements which come from streams;
72 when an element is consumed, the heap knows how to replace it with the
73 next element from the same stream.
74 
75 Compare is a class that has a member function called "compare" which
76 is used to compare two elements of type T
77 */
78 template<class T,class Compare>
80 private:
81  HeapElement<T>* mergeHeap; //the heap;
82  size_t arity; //max size
83  size_t size; //represents actual size, i.e. the nb of (non-empty)
84  //runs; they are stored contigously in the first
85  //<size> positions of mergeHeap; once a run becomes
86  //empty, it is deleted and size is decremented. size
87  //stores the next position where a HeapElement can be
88  //added.
89 
90 
91 protected:
92  void heapify(size_t i);
93 
94  void buildheap();
95 
96  /* for each run in the heap, read an element from the run into the heap */
97  void init();
98 
99  /* add a run; make sure total nb of runs does not exceed heap arity */
100  void addRun(AMI_STREAM<T> *run);
101 
102  /* delete the i-th run (and the element); that is, swap the ith run
103  with the last one, and decrement size; just like in a heap, but
104  no heapify. Note: this function messes up the heap order. If the
105  user wants to maintain heap property should call heapify
106  specifically.
107  */
108  void deleteRun(size_t i);
109 
110 public:
111  //allocate array mergeHeap and the runs in runList
112  ReplacementHeap<T,Compare>(size_t arity, queue<char*>* runList);
113 
114  //delete array mergeHeap
116 
117  //is heap empty?
118  int empty() const {
119  return (size == 0);
120  }
121 
122  //delete mergeHeap[0].value, replace it with the next element from
123  //the same stream, and re-heapify
124  T extract_min();
125 
126 
127  ostream & print(ostream& s) const {
128  char* runname;
129  off_t runlen;
130  s << "Replacementheap " << this << ": " << size << " runs";
131  for(size_t i=0; i<size; i++) {
132  s << endl << " <- i=" << i<< ": " << mergeHeap[i].run;
133  assert(mergeHeap[i].run);
134  mergeHeap[i].run->name(&runname);
135  runlen = mergeHeap[i].run->stream_len();
136  s << ", " << runname << ", len=" << runlen;
137  delete runname; //this should be safe
138  }
139  s << endl;
140  return s;
141  }
142 
143 };
144 
145 
146 
147 
148 /*****************************************************************/
149 template<class T,class Compare>
151  queue<char*>* runList) {
152  char* name=NULL;
153 
154  assert(runList && g_arity > 0);
155 
156  RHEAP_DEBUG cerr << "ReplacementHeap arity=" << g_arity << "\n";
157 
158  arity = g_arity;
159  size = 0; //no run yet
160 
161  mergeHeap = new HeapElement<T>[arity];
162  for (unsigned int i=0; i< arity; i++) {
163  //pop a stream from the list and add it to heap
164  runList->dequeue(&name);
165  AMI_STREAM<T>* str = new AMI_STREAM<T>(name);
166  assert(str);
167  delete name; //str makes its own copy
168  addRun(str);
169  }
170  init();
171 }
172 
173 
174 /*****************************************************************/
175 template<class T,class Compare>
176 ReplacementHeap<T,Compare>::~ReplacementHeap<T,Compare>() {
177 
178  if (!empty()) {
179  cerr << "warning: ~ReplacementHeap: heap not empty!\n";
180  }
181  //delete the runs first
182  for(size_t i=0; i<size; i++) {
183  if (mergeHeap[i].run)
184  delete mergeHeap[i].run;
185  }
186  delete [] mergeHeap;
187 }
188 
189 
190 
191 /*****************************************************************/
192 /* add a run; make sure total nb of runs does not exceed heap arity
193  */
194 template<class T,class Compare>
195 void
197 
198  assert(r);
199 
200  if(size == arity) {
201  cerr << "ReplacementHeap::addRun size =" << size << ",arity=" << arity
202  << " full, cannot add another run.\n";
203  assert(0);
204  exit(1);
205  }
206  assert(size < arity);
207 
208  mergeHeap[size].run = r;
209  size++;
210 
211  RHEAP_DEBUG
212  {char* strname;
213  r->name(&strname);
214  cerr << "ReplacementHeap::addRun added run " << strname
215  << " (rheap size=" << size << ")" << endl;
216  delete strname;
217  cerr.flush();
218  }
219 }
220 
221 
222 
223 
224 
225 /*****************************************************************/
226 /* delete the i-th run (and the value); that is, swap ith element with
227  the last one, and decrement size; just like in a heap, but no
228  heapify. Note: this function messes up the heap order. If the user
229  wants to maintain heap property should call heapify specifically.
230  */
231 template<class T,class Compare>
232 void
234 
235  assert(i >= 0 && i < size && mergeHeap[i].run);
236 
237  RHEAP_DEBUG
238  {
239  cerr << "ReplacementHeap::deleteRun deleting run " << i << ", "
240  << mergeHeap[i].run << endl;
241  print(cerr);
242  }
243 
244 
245  //delete it
246  delete mergeHeap[i].run;
247  //and replace it with
248  if (size > 1) {
249  mergeHeap[i].value = mergeHeap[size-1].value;
250  mergeHeap[i].run = mergeHeap[size-1].run;
251  }
252  size--;
253 }
254 
255 
256 
257 
258 /*****************************************************************/
259 /* for each run in the heap, read an element from the run into the
260  heap; if ith run is empty, delete it
261 */
262 template<class T,class Compare>
263 void
265  AMI_err err;
266  T* elt;
267  size_t i;
268 
269  RHEAP_DEBUG cerr << "ReplacementHeap::init " ;
270 
271  i=0;
272  while (i<size) {
273 
274  assert(mergeHeap[i].run);
275 
276  // Rewind run i
277  err = mergeHeap[i].run->seek(0);
278  if (err != AMI_ERROR_NO_ERROR) {
279  cerr << "ReplacementHeap::Init(): cannot seek run " << i << "\n";
280  assert(0);
281  exit(1);
282  }
283  //read first item from run i
284  err = mergeHeap[i].run->read_item(&elt);
285  if (err != AMI_ERROR_NO_ERROR) {
286  if (err == AMI_ERROR_END_OF_STREAM) {
287  deleteRun(i);
288  //need to iterate one more time with same i;
289  } else {
290  cerr << "ReplacementHeap::Init(): cannot read run " << i << "\n";
291  assert(0);
292  exit(1);
293  }
294  } else {
295  //copy.... can this be avoided? xxx
296  mergeHeap[i].value = *elt;
297 
298  i++;
299  }
300  }
301  buildheap();
302 }
303 
304 // Helper functions for navigating through a binary heap.
305 /* for simplicity the heap structure is slightly modified as:
306  0
307  |
308  1
309  /\
310  2 3
311  /\ /\
312 4 5 6 7
313 
314 */
315 
316 // The children of an element of the heap.
317 static inline size_t rheap_lchild(size_t index) {
318  return 2 * index;
319 }
320 
321 static inline size_t rheap_rchild(size_t index) {
322  return 2 * index + 1;
323 }
324 
325 // The parent of an element.
326 static inline size_t rheap_parent(size_t index) {
327  return index >> 1;
328 }
329 // this functions are duplicated in pqheap.H...XXX
330 
331 
332 
333 /*****************************************************************/
334 template<class T,class Compare>
335 void
337  size_t min_index = i;
338  size_t lc = rheap_lchild(i);
339  size_t rc = rheap_rchild(i);
340 
341  Compare cmpobj;
342  assert(i >= 0 && i < size);
343  if ((lc < size) &&
344  (cmpobj.compare(mergeHeap[lc].value, mergeHeap[min_index].value) == -1)) {
345  min_index = lc;
346  }
347  if ((rc < size) &&
348  (cmpobj.compare(mergeHeap[rc].value, mergeHeap[min_index].value) == -1)) {
349  min_index = rc;
350  }
351 
352  if (min_index != i) {
353  HeapElement<T> tmp = mergeHeap[min_index];
354 
355  mergeHeap[min_index] = mergeHeap[i];
356  mergeHeap[i] = tmp;
357 
358 
359  heapify(min_index);
360  }
361 
362  return;
363 }
364 
365 /*****************************************************************/
366 template<class T,class Compare>
367 void
369 
370  if (size > 1) {
371  for (int i = rheap_parent(size-1); i>=0; i--) {
372  heapify(i);
373  }
374  }
375  RHEAP_DEBUG cerr << "Buildheap done\n";
376  return;
377 }
378 
379 
380 /*****************************************************************/
381 template<class T,class Compare>
382 T
384  T *elt, min;
385  AMI_err err;
386 
387  assert(!empty()); //user's job to check first if it's empty
388  min = mergeHeap[0].value;
389 
390 
391  //read a new element from the same run
392  assert(mergeHeap[0].run);
393  err = mergeHeap[0].run->read_item(&elt);
394  if (err != AMI_ERROR_NO_ERROR) {
395  //if run is empty, delete it
396  if (err == AMI_ERROR_END_OF_STREAM) {
397  RHEAP_DEBUG cerr << "rheap extract_min: run " << mergeHeap[0].run
398  << " empty. deleting\n ";
399  deleteRun(0);
400  } else {
401  cerr << "ReplacementHeap::extract_min: cannot read\n";
402  assert(0);
403  exit(1);
404  }
405  } else {
406  //copy...can this be avoided?
407  mergeHeap[0].value = *elt;
408  }
409 
410  //restore heap
411  if (size > 0) heapify(0);
412 
413  return min;
414 }
415 
416 
417 
418 #endif
ostream & print(ostream &s) const
#define min(x, y)
Definition: draw2.c:31
AMI_STREAM< T > * run
void deleteRun(size_t i)
int empty() const
#define NULL
Definition: ccmath.h:32
friend ostream & operator<<(ostream &s, const HeapElement &p)
SYMBOL * err(FILE *fp, SYMBOL *s, char *msg)
Definition: symbol/read.c:220
Definition: queue.h:43
#define assert(condition)
Definition: lz4.c:324
void addRun(AMI_STREAM< T > *run)
#define RHEAP_DEBUG
AMI_err name(char **stream_name)
Definition: ami_stream.h:440
ReplacementHeap(size_t arity, queue< char *> *runList)
AMI_err
Definition: ami_stream.h:86
const char * name
Definition: named_colr.c:7
double r
Definition: r_raster.c:39
void heapify(size_t i)
void init(double work[])
Definition: as177.c:65
bool dequeue(T *)
Definition: queue.h:94