GRASS GIS 8 Programmer's Manual  8.2.2dev(2023)-3d2c704037
embuffer.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 #ifndef __EMBUFFER_H
38 #define __EMBUFFER_H
39 
40 
41 #include <stdio.h>
42 #include <assert.h>
43 #include <stdlib.h>
44 #include <math.h>
45 
46 #include "ami_config.h" //for SAVE_MEMORY
47 #include "ami_stream.h"
48 #include "mm.h"
49 #include "mm_utils.h"
50 #include "pqheap.h"
51 
52 
53 
54 
55 #define MY_LOG_DEBUG_ID(x) //inhibit debug printing
56 //#define MY_LOG_DEBUG_ID(x) LOG_DEBUG_ID(x)
57 
58 
59 
60 /**********************************************************
61  DEBUGGING FLAGS
62 ***********************************************************/
63 
64 //setting this enables checking that the streams/arrays inserted in
65 //buffers are sorted in increasing order
66 //#define EMBUF_CHECK_INSERT
67 
68 //enable checking that stream name is the same as the one stored in
69 //the buffer name[]
70 //#define EMBUF_CHECK_NAME
71 
72 //enable printing names as they are checked
73 //#define EMBUF_CHECK_NAME_PRINT
74 
75 //enable printing when streams in a buffer are shifted left to
76 //check that names are shifted accordingly
77 //#define EMBUF_DELETE_STREAM_PRINT
78 
79 //enable printing the name of the stream which is inserted in buff
80 //#define EMBUF_PRINT_INSERT
81 
82 //enable printing the stream names/sizes in cleanup()
83 //#define EMBUF_CLEANUP_PRINT
84 
85 //enable printing when get/put_stream is called (for each stream)
86 //#define EMBUF_PRINT_GETPUT_STREAM
87 
88 //enable printing when get/put_streams is called
89 //#define EMBUF_PRINT_GETPUT_STREAMS
90 
91 /***********************************************************/
92 
93 
94 
95 
96 
97 /*****************************************************************/
98 /* encapsulation of the key together with stream_id; used during
99  stream merging to save space;
100 */
101 template<class KEY>
102 class merge_key {
103 public:
104  KEY k;
105  unsigned int str_id; //id of the stream where key comes from
106 
107 public:
108  merge_key(): str_id(0) {}
109 
110  merge_key(const KEY &x, const unsigned int sid):
111  k(x), str_id(sid) {}
112 
114 
115  void set(const KEY &x, const unsigned int sid) {
116  k = x;
117  str_id = sid;
118  }
119  KEY key() const {
120  return k;
121  }
122  unsigned int stream_id() const {
123  return str_id;
124  }
125  KEY getPriority() const {
126  return k;
127  }
128 
129  friend ostream& operator<<(ostream& s, const merge_key<KEY> &x) {
130  return s << "<str_id=" << x.str_id << "> " << x.k << " ";
131  }
132  friend int operator < (const merge_key &x,
133  const merge_key &y) {
134  return (x.k < y.k);
135  }
136  friend int operator <= (const merge_key &x,
137  const merge_key &y) {
138  return (x.k <= y.k);
139  }
140  friend int operator > (const merge_key &x,
141  const merge_key &y) {
142  return (x.k > y.k);
143  }
144  friend int operator >= (const merge_key &x,
145  const merge_key &y) {
146  return (x.k >= y.k);
147  }
148  friend int operator != (const merge_key &x,
149  const merge_key &y) {
150  return (x.k != y.k);
151  }
152  friend int operator == (const merge_key &x,
153  const merge_key &y) {
154  return (x.k == y.k);
155  }
156  friend merge_key operator + (const merge_key &x,
157  const merge_key &y) {
158  assert(0);
159  return x;
160  // Key sum = x.k + y.k;
161  // merge_key f(sum, x.str_id);
162  // return f;
163  }
164 
165 };
166 
167 
168 
169 
170 
171 
172 /*****************************************************************
173  *****************************************************************
174  *****************************************************************
175 
176  external_memory buffer
177 
178  Each level-i buffer can store up to <arity>^i * <basesize> items,
179  where tipically <arity> is \theta(m) and <basesize> is \theta(M);
180  therefore log_m{n/m} buffers are needed to store N items, one
181  buffer for each level 1..log_m{n/m}. All buffers must have same
182  values or <arity> and <basesize>.
183 
184  Functionality:
185 
186  A level-i on-disk buffer stores <arity>^i * <basesize> items of
187  data, organized in <arity> streams of <arity>^{i-1} items each;
188  <basesize> is same for all buffers and equal to the size of the
189  level 0 buffer (in memory buffer).
190 
191  Invariant: all the <arity> streams of a level-i buffer are in
192  sorted order; in this way sorting the buffer is done by merging the
193  <arity> streams in linear time.
194 
195  Items are inserted in level i-buffer only a whole stream at a time
196  (<arity>^{i-1}*<basesize> items). When all the <arity> streams of
197  the buffer are full, the buffer is sorted and emptied into a stream
198  of a level (i+1)-buffer.
199 
200  The <arity> streams of a buffer are allocated contigously from left
201  to r ight. The unused streams are NULL; The buffer keeps the index of
202  the last used(non-NULL) stream. When a buffer becomes full and is
203  empty, all its buffers are set to NULL.
204 
205  *****************************************************************
206  *****************************************************************
207  ***************************************************************** */
208 
209 /* T is a type with priority of type K and method getPriority() */
210 template<class T, class Key>
211 class em_buffer {
212 private:
213 
214  //number of streams in a buffer;
215  unsigned int arity;
216 
217  //level of buffer: between 1 and log_arity{n/arity}; (level-0 buffer
218  //has a slightly different behaviour so it is implemented as a
219  //different class <im_buffer>)
220  unsigned short level;
221 
222  //level-i buffer contains m streams of data, each of size
223  //arity^{i-1}*basesize;
224  AMI_STREAM<T>** data;
225 
226  //the buffers can be depleted to fill the internal pq;
227  //keep an array which counts, for each stream, how many elements
228  //have been deleted (implicitely from the beginning of stream)
229  long* deleted;
230 
231  //nb of items in each substream; this can be found out by calling
232  //stream_len() on the stream, but it is more costly esp in the case
233  //when streams are on disk and must be moved in and out just to find
234  //stream length; streamsize is set only at stream creation, and the
235  //actual size must subtract the number of iteme deleted from the
236  //bos
237  unsigned long* streamsize;
238 
239  //index of the next available(empty) stream (out of the total m
240  //streams in the buffer);
241  unsigned int index;
242 
243  //nb of items in a stream of level_1 buffer
244  unsigned long basesize;
245 
246 
247 public:
248 
249  //create a level-i buffer of given basesize;
250  em_buffer(const unsigned short i, const unsigned long bs,
251  const unsigned int ar);
252 
253  //copy constructor;
254  em_buffer(const em_buffer &buf);
255 
256  //free the stream array and the streams pointers
257  ~em_buffer();
258 
259  //return the level of the buffer;
260  unsigned short get_level() const { return level;}
261 
262  //return the ith stream (load stream in memory)
263  AMI_STREAM<T>* get_stream(unsigned int i);
264 
265  //return a pointer to the streams of the buffer (loads streams in
266  //memory)
267  AMI_STREAM<T>** get_streams();
268 
269  //put the ith stream back to disk
270  void put_stream(unsigned int i);
271 
272  //called in pair with get_streams to put all streams back to disk
273  void put_streams();
274 
275  //return a pointer to the array of deletion count for each stream
276  long* get_bos() const { return deleted;}
277 
278  //return the index of the last stream in buffer which contains data;
279  unsigned int laststream() const { return index -1;}
280 
281  //return the index of the next available stream in the buffer
282  unsigned int nextstream() const { return index;}
283 
284  //increment the index of the next available stream in the buffer
285  void incr_nextstream() { ++index;}
286 
287  //return nb of (non-empty) streams in buffer
288  unsigned int get_nbstreams() const { return index;}
289 
290  //return arity
291  unsigned int get_arity() const { return arity;}
292 
293  //return total nb of deleted elements in all active streams of the buffer
294  long total_deleted() const {
295  long tot = 0;
296  for (unsigned int i=0; i< index; i++) {
297  tot += deleted[i];
298  }
299  return tot;
300  }
301 
302  //mark as deleted one more element from i'th stream
303  void incr_deleted(unsigned int i) {
304  assert(i<index);
305  deleted[i]++;
306  }
307 
308 
309  //return the nominal size of a stream (nb of items):
310  //arity^{level-1}*basesize;
311  unsigned long get_stream_maxlen() const {
312  return (unsigned long)pow((double)arity,(double)level-1)*basesize;
313  }
314 
315  //return the actual size of stream i; i must be the index of a valid
316  //stream
317  unsigned long get_stream_len(unsigned int i) {
318  //assert(i>= 0 && i<index);
319  return streamsize[i] - deleted[i];
320  }
321 
322  //return the total current size of the buffer; account for the
323  //deleted elements;
324  unsigned long get_buf_len() {
325  unsigned long tot = 0;
326  for (unsigned int i=0; i< index; i++) {
327  tot += get_stream_len(i);
328  }
329  return tot;
330  }
331 
332  //return the total maximal capacity of the buffer
333  unsigned long get_buf_maxlen() {
334  return arity * get_stream_maxlen();
335  }
336 
337  //return true if buffer is empty (all streams are empty)
338  bool is_empty() {
339  return ((nextstream() == 0) || (get_buf_len() == 0));
340  }
341 
342  //return true if buffer is full(all streams are full)
343  bool is_full() const {
344  return (nextstream() == arity);
345  }
346 
347  //reset
348  void reset();
349 
350  //clean buffer: in case some streams have been emptied by deletion
351  //delete them and shift streams left;
352  void cleanup();
353 
354 
355  //create and return a stream which contains all elements of all
356  //streams of the buffer in sorted ascending order of their
357  //keys(priorities);
358  AMI_STREAM<T>* sort();
359 
360 
361  // insert an array into the buffer; can only insert one
362  // level-i-full-stream-len nb of items at a time; assume the length
363  // of the array is precisely the streamlen of level-i buffer n =
364  // (pow(arity,level-1)*basesize); assume array is sorted; return the
365  // number of items actually inserted
366  long insert(T* a, long n);
367 
368 
369  // insert a stream into the buffer; assume the length of the stream
370  // is precisely the streamlen of level-i buffer n =
371  // (pow(arity,level-1)*basesize); the <nextstream> pointer of buffer
372  // is set to point to the argument stream; (in this way no stream
373  // copying is done, just one pointer copy). The user should be aware
374  // the the argument stream is 'lost' - that is a stream cannot be
375  // inserted repeatedly into many buffers because this would lead to
376  // several buffers pointing to the same stream.
377 
378  // stream is assumed sorted; bos = how many elements are deleted
379  // from the beginning of stream;
380 
381  // return the number of items actually inserted
382  long insert(AMI_STREAM<T>* str,
383  long bos=0);
384 
385  //print range of elements in buffer
386  void print_range();
387 
388  //print all elements in buffer
389  void print();
390 
391  //prints the sizes of the streams in the buffer
392  void print_stream_sizes();
393 
394  //print the elements in the buffer
395  friend ostream& operator<<(ostream& s, em_buffer &b) {
396  s << "BUFFER_" << b.level << ": ";
397  if (b.index ==0) {
398  s << "[]";
399  }
400  s << "\n";
401  b.get_streams();
402  for (unsigned int i=0; i < b.index; i++) {
403  b.print_stream(s, i);
404  }
405  b.put_streams();
406  return s;
407  }
408 
409 
410 private:
411 
412  // merge the input streams; there are <arity> streams in total;
413  // write output in <outstream>; the input streams are assumed sorted
414  // in increasing order of their keys;
415  AMI_err substream_merge(AMI_STREAM<T>** instreams,
416  unsigned int arity,
417  AMI_STREAM<T> *outstream);
418 
419 
420  //print to stream the elements in i'th stream
421  void print_stream(ostream& s, unsigned int i);
422 
423 
424 
425 #ifdef SAVE_MEMORY
426  //array of names of streams;
427  char** name;
428 
429  //return the designated name for stream i
430  char* get_stream_name(unsigned int i) const;
431 
432  //print all stream names in buffer
433  void print_stream_names();
434 
435 
436  //checks that name[i] is the same as stream name; stream i must be in
437  //memory (by a previous get_stream call, for instance) in order to
438  //find its length
439  void check_name(unsigned int i);
440 #endif
441 
442 };
443 
444 
445 /************************************************************/
446 //create a level-i buffer of given basesize;
447 template <class T, class Key>
448 em_buffer<T,Key>::em_buffer(const unsigned short i, const unsigned long bs,
449  const unsigned int ar) :
450  arity(ar), level(i), basesize(bs) {
451 
452  assert((level>=1) && (basesize >=0));
453 
454  char str[100];
455  sprintf(str, "em_buffer: allocate %d AMI_STREAM*, total %ld\n",
456  arity, (long)(arity*sizeof(AMI_STREAM<T>*)));
457  MEMORY_LOG(str);
458  //allocate STREAM* array
459  data = new AMI_STREAM<T>* [arity];
460 
461  //allocate deleted array
462  sprintf(str, "em_buffer: allocate deleted array: %ld\n",
463  (long)(arity*sizeof(long)));
464  MEMORY_LOG(str);
465  deleted = new long[arity];
466 
467  //allocate streamsize array
468  sprintf(str, "em_buffer: allocate streamsize array: %ld\n",
469  (long)(arity*sizeof(long)));
470  MEMORY_LOG(str);
471  streamsize = new unsigned long[arity];
472 
473 #ifdef SAVE_MEMORY
474  //allocate name array
475  sprintf(str, "em_buffer: allocate name array: %ld\n",
476  (long)(arity*sizeof(char*)));
477  MEMORY_LOG(str);
478  name = new char* [arity];
479  assert(name);
480 #endif
481 
482  //assert data
483  if ((!data) || (!deleted) || (!streamsize)) {
484  cerr << "em_buffer: cannot allocate\n";
485  exit(1);
486  }
487 
488  //initialize the <arity> streams to NULL, deleted[], streamsize[]
489  //and name[]
490  for (unsigned int ui=0; ui< arity; ui++) {
491  data[ui] = NULL;
492  deleted[ui] = 0;
493  streamsize[ui] = 0;
494 #ifdef SAVE_MEMORY
495  name[ui] = NULL;
496 #endif
497  }
498  //set index
499  index = 0;
500 
501 #ifdef SAVE_MEMORY
502  //streams_in_memory = false;
503 #endif
504 }
505 
506 
507 /************************************************************/
508 //copy constructor;
509 template<class T, class Key>
511  level(buf.level), basesize(buf.basesize),
512  index(buf.index), arity(buf.arity) {
513 
514  assert(0);//should not get called
515 
516  MEMORY_LOG("em_buffer: copy constr start\n");
517  get_streams();
518  for (unsigned int i=0; i< index; i++) {
519  assert(data[i]);
520  delete data[i]; //delete old stream if existing
521  data[i] = NULL;
522 
523  //call copy constructor; i'm not sure that it actually duplicates
524  //the stream and copies the data; should that in the BTE
525  //sometimes..
526  data[i] = new AMI_STREAM<T>(*buf.data[i]);
527  deleted[i] = buf.deleted[i];
528  streamsize[i] = buf.streamsize[i];
529 #ifdef SAVE_MEMORY
530  assert(name[i]);
531  delete name[i];
532  name[i] = NULL;
533  name[i] = buf.name[i];
534 #endif
535  }
536  put_streams();
537  MEMORY_LOG("em_buffer: copy constr end\n");
538 }
539 
540 
541 /************************************************************/
542 //free the stream array and the streams pointers
543 template<class T, class Key>
545 
546  assert(data);
547  //delete the m streams in the buffer
548  get_streams();
549  for (unsigned int i=0; i<index; i++) {
550  assert(data[i]);
551 #ifdef SAVE_MEMORY
552  check_name(i);
553  delete name[i];
554 #endif
555  delete data[i];
556  data[i] = NULL;
557  }
558 
559  delete [] data;
560  delete [] deleted;
561  delete [] streamsize;
562 #ifdef SAVE_MEMORY
563  delete [] name;
564 #endif
565 }
566 
567 
568 #ifdef SAVE_MEMORY
569 /************************************************************/
570 //checks that name[i] is the same as stream name; stream i must be in
571 //memory (by a previous get_stream call, for instance) in order to
572 //find its length
573 template<class T, class Key>
574 void em_buffer<T,Key>::check_name(unsigned int i) {
575 
576 #ifdef EMBUF_CHECK_NAME
577  assert(i>=0 && i < index);
578  assert(data[i]);
579 
580  char* fooname;
581  data[i]->name(&fooname);//name() allocates the string
582 #ifdef EMBUF_CHECK_NAME_PRINT
583  cout << "::check_name: checking stream [" << level << "," << i << "] name:"
584  << fooname << endl;
585  cout.flush();
586 #endif
587  if (strcmp(name[i], fooname) != 0) {
588  cerr << "name[" << i << "]=" << name[i]
589  << ", streamname=" << fooname << endl;
590  }
591  assert(strcmp(fooname, name[i]) == 0);
592  delete fooname;
593 #endif
594 }
595 #endif
596 
597 
598 
599 
600 /************************************************************/
601 //if SAVE_MEMORY flag is set, load the stream in memory; return the
602 //ith stream
603 template<class T, class Key>
605 
606  assert(i>=0 && i < index);
607 
608 #ifdef SAVE_MEMORY
609  MY_LOG_DEBUG_ID("em_buffer::get_stream");
610  MY_LOG_DEBUG_ID(i);
611 
612  if (data[i] == NULL) {
613 
614  //stream is on disk, load it in memory
615  assert(name[i]);
616  MY_LOG_DEBUG_ID("load stream in memory");
617  MY_LOG_DEBUG_ID(name[i]);
618 
619 #ifdef EMBUF_PRINT_GETPUT_STREAM
620  cout << "get_stream:: name[" << i << "]=" << name[i] << " from disk\n";
621  cout.flush();
622 #endif
623 
624  //assert that file exists
625  FILE* fp;
626  if ((fp = fopen(name[i],"rb")) == NULL) {
627  cerr << "get_stream: checking that stream " << name[i] << "exists\n";
628  perror(name[i]);
629  assert(0);
630  exit(1);
631  }
632  fclose(fp);
633 
634  //create an AMI_STREAM from file
635  data[i] = new AMI_STREAM<T>(name[i]);
636  assert(data[i]);
637 
638  } else {
639 
640  //if data[i] not NULL, stream must be already in memory
641  MY_LOG_DEBUG_ID("stream not NULL");
642  MY_LOG_DEBUG_ID(data[i]->sprint());
643  }
644 #endif
645 
646 
647  //NOW STREAM IS IN MEMORY
648 
649  //some assertion checks
650  assert(data[i]);
651  assert(data[i]->stream_len() == streamsize[i]);
652 #ifdef SAVE_MEMORY
653  check_name(i);
654 #endif
655 
656  return data[i];
657 }
658 
659 
660 
661 
662 /************************************************************/
663 //if SAVE_MEMORY flag is set, put the i'th stream back on disk
664 template<class T, class Key>
665 void em_buffer<T,Key>::put_stream(unsigned int i) {
666 
667  assert(i>=0 && i < index);
668 
669 #ifdef SAVE_MEMORY
670  MY_LOG_DEBUG_ID("em_buffer::put_stream");
671  MY_LOG_DEBUG_ID(i);
672 
673  if (data[i] != NULL) {
674 
675  //stream is in memory, put it on disk
676  MY_LOG_DEBUG_ID("stream put to disk");
677  MY_LOG_DEBUG_ID(data[i]->sprint());
678 
679  check_name(i);
680 #ifdef EMBUF_PRINT_GETPUT_STREAM
681  cout << "put_stream:: name[" << i << "]=" << name[i] << " to disk\n";
682  cout.flush();
683 #endif
684 
685  //make stream persistent and delete it
686  data[i]->persist(PERSIST_PERSISTENT);
687  delete data[i];
688  data[i] = NULL;
689 
690  } else {
691 
692  //data[i] is NULL, so stream must be already put on disk
693  MY_LOG_DEBUG_ID("stream is NULL");
694  }
695 #endif
696 
697 }
698 
699 
700 
701 
702 /************************************************************/
703 //return a pointer to the streams of the buffer
704 template<class T, class Key>
706 
707 #ifdef SAVE_MEMORY
708  MY_LOG_DEBUG_ID("em_buffer::get_streams: reading streams from disk");
709 #ifdef EMBUF_PRINT_GETPUT_STREAMS
710  cout << "em_buffer::get_streams (buffer " << level <<")";
711  cout << ": index = " << index << "(arity=" << arity << ")\n";
712  cout.flush();
713 #endif
714 
715  for (unsigned int i=0; i<index; i++) {
716  get_stream(i);
717  assert(data[i]);
718  }
719 
720 #endif
721 
722  return data;
723 }
724 
725 
726 
727 
728 /************************************************************/
729 //called in pair with load_streams to store streams on disk
730 //and release the memory
731 template<class T, class Key>
733 
734 #ifdef SAVE_MEMORY
735  MY_LOG_DEBUG_ID("em_buffer::put_streams: writing streams on disk");
736 #ifdef EMBUF_PRINT_GETPUT_STREAMS
737  cout << "em_buffer::put_streams (buffer " << level <<")";
738  cout << ": index = " << index << "(arity=" << arity << ")\n";
739  cout.flush();
740 #endif
741 
742  for (unsigned int i=0; i<index; i++) {
743  put_stream(i);
744  assert(data[i] == NULL);
745  }
746 #endif
747 
748 }
749 
750 
751 
752 #ifdef SAVE_MEMORY
753 /************************************************************/
754 //return the name of the ith stream
755 template<class T, class Key>
756 char* em_buffer<T,Key>::get_stream_name(unsigned int i) const {
757 
758  assert(i>=0 && i<index);
759  assert(name[i]);
760  return name[i];
761 }
762 #endif
763 
764 
765 
766 
767 #ifdef SAVE_MEMORY
768 /************************************************************/
769 template<class T, class Key>
771  unsigned int i;
772  for (i=0; i<index; i++) {
773  assert(name[i]);
774  cout << "stream " << i << ": " << name[i] << endl;
775  }
776  cout.flush();
777 }
778 #endif
779 
780 
781 
782 
783 /************************************************************/
784 //clean buffer in case some streams have been emptied by deletion
785 template<class T, class Key>
787 
788  MY_LOG_DEBUG_ID("em_buffer::cleanup()");
789 #ifdef EMBUF_CLEANUP_PRINT
790 #ifdef SAVE_MEMORY
791  if (index>0) {
792  cout << "before cleanup:\n";
793  print_stream_names();
795  cout.flush();
796  }
797 #endif
798 #endif
799 
800  //load all streams in memory
801  get_streams();
802 
803  //count streams of size=0
804  unsigned int i, empty=0;
805  for (i=0; i<index; i++) {
806 
807  if (get_stream_len(i) == 0) {
808  //printing..
809 #ifdef EMBUF_DELETE_STREAM_PRINT
810  cout<<"deleting stream [" << level << "," << i <<"]:" ;
811 #ifdef SAVE_MEMORY
812  cout << name[i];
813 #endif
814  cout << endl;
815  cout.flush();
816 #endif
817 
818 #ifdef SAVE_MEMORY
819  //stream is empty ==> delete its name
820  assert(name[i]);
821  delete name[i];
822  name[i] = NULL;
823 #endif
824 
825  //stream is empty ==> reset data
826  assert(data[i]);
827  //data[i]->persist(PERSIST_DELETE); //this is done automatically..
828  delete data[i];
829  data[i] = NULL;
830  deleted[i] = 0;
831  streamsize[i] = 0;
832  empty++;
833  }
834  }
835  //streams are in memory; all streams which are NULL must have been
836  //deleted
837 
838  //shift streams to the left in case holes were introduced
839  unsigned int j=0;
840  if (empty) {
841 #ifdef EMBUF_DELETE_STREAM_PRINT
842  cout << "em_buffer::cleanup: shifting streams\n"; cout.flush();
843 #endif
844  for (i=0; i<index; i++) {
845  //if i'th stream is not empty, shift it left if necessary
846  if (data[i]) {
847  if (i!=j) {
848  //set j'th stream to point to i'th stream
849  //cout << j << " set to " << i << endl; cout.flush();
850  data[j] = data[i];
851  deleted[j] = deleted[i];
852  streamsize[j] = streamsize[i];
853  //set i'th stream to point to NULL
854  data[i] = NULL;
855  deleted[i] = 0;
856  streamsize[i] = 0;
857 #ifdef SAVE_MEMORY
858  //fix the names
859 /* already done assert(name[j]); */
860 /* delete name[j]; */
861  name[j] = name[i];
862  name[i] = NULL;
863  check_name(j);
864 #endif
865  } else {
866  //cout << i << " left the same" << endl;
867  }
868  j++;
869  } //if data[i] != NULL
870  }//for i
871 
872  //set the index
873  assert(index == j + empty);
874  index = j;
875 
876 #ifdef EMBUF_DELETE_STREAM_PRINT
877  cout << "em_buffer::cleanup: index set to " << index << endl;
878  cout.flush();
879 #endif
880  } //if empty
881 
882  //put streams back to disk
883  put_streams();
884 
885 #ifdef EMBUF_CLEANUP_PRINT
886 #ifdef SAVE_MEMORY
887  if (index >0) {
888  cout << "after cleanup:\n";
889  print_stream_names();
891  cout.flush();
892  }
893 #endif
894 #endif
895 }
896 
897 
898 
899 
900 /************************************************************/
901 //delete all streams
902 template<class T, class Key>
904 
905  get_streams();
906 
907  //make streams not-persistent and delete them
908  for (unsigned int i=0; i<index; i++) {
909  assert(data[i]);
910  assert(streamsize[i] == data[i]->stream_len());
911 #ifdef SAVE_MEMORY
912  check_name(i);
913  assert(name[i]);
914  delete name[i];
915  name[i] = NULL;
916 #endif
917 
918  data[i]->persist(PERSIST_DELETE);
919 
920  delete data[i];
921  data[i] = NULL;
922  deleted[i] = 0;
923  streamsize[i] = 0;
924  }
925 
926  index = 0;
927 }
928 
929 
930 
931 /************************************************************/
932 //create and return a stream which contains all elements of
933 //all streams of the buffer in sorted ascending order of
934 //their keys (priorities);
935 template<class T, class Key>
938 
939  //create stream
940  MEMORY_LOG("em_buffer::sort: allocate new AMI_STREAM\n");
941 
942  AMI_STREAM<T>* sorted_stream = new AMI_STREAM<T>(); /* will be deleteed in insert() */
943  assert(sorted_stream);
944 
945  //merge the streams into sorted stream
946  AMI_err aerr;
947  //Key dummykey;
948  //must modify this to seek after deleted[i] elements!!!!!!!!!!!!!
949  // aerr = MIAMI_single_merge_Key(data, arity, sorted_stream,
950  // 0, dummykey);
951  //could not use AMI_merge so i had to write my own..
952 
953  get_streams();
954 
955  aerr = substream_merge(data, arity, sorted_stream);
956  assert(aerr == AMI_ERROR_NO_ERROR);
957 
958  put_streams();
959 
960  return sorted_stream;
961 }
962 
963 
964 
965 
966 /************************************************************/
967 /* merge the input streams; there are <arity> streams in total; write
968  output in <outstream>;
969 
970  the input streams are assumed sorted in increasing order of their
971  keys;
972 
973  assumes the instreams are in memory (no need for get_streams()) */
974 template<class T, class Key>
976  unsigned int arity,
977  AMI_STREAM<T> *outstream) {
978 
979  unsigned int i, j;
980 
981  //some assertion checks
982  assert(instreams);
983  assert(outstream);
984  for (i = 0; i < arity ; i++ ) {
985  assert(instreams[i]);
986 #ifdef SAVE_MEMORY
987  check_name(i);
988 #endif
989  }
990 
991  std::vector<T*> in_objects(arity); //pointers to current leading elements of streams
992  AMI_err ami_err;
993 
994 
995  char str[200];
996  sprintf(str, "em_buffer::substream_merge: allocate keys array, total %ldB\n",
997  (long)((long)arity * sizeof(merge_key<Key>)));
998  MEMORY_LOG(str);
999 
1000 
1001  //keys array is initialized with smallest key from each stream (only
1002  //non-null keys must be included)
1003  merge_key<Key>* keys;
1004  //merge_key<Key>* keys = new (merge_key<Key>)[arity];
1005  typedef merge_key<Key> footype;
1006  keys = new footype[arity];
1007  assert(keys);
1008 
1009  //count number of non-empty streams
1010  j = 0;
1011  //rewind and read the first item from every stream initializing
1012  //in_objects and keys
1013  for (i = 0; i < arity ; i++ ) {
1014  assert(instreams[i]);
1015  //rewind stream
1016  if ((ami_err = instreams[i]->seek(deleted[i])) != AMI_ERROR_NO_ERROR) {
1017  return ami_err;
1018  }
1019  //read first item from stream
1020  if ((ami_err = instreams[i]->read_item(&(in_objects[i]))) !=
1022  if (ami_err == AMI_ERROR_END_OF_STREAM) {
1023  in_objects[i] = NULL;
1024  } else {
1025  return ami_err;
1026  }
1027  } else {
1028  //include this key in the array of keys
1029  Key k = in_objects[i]->getPriority();
1030  keys[j].set(k, i);
1031  j++;
1032  }
1033  }
1034  unsigned int NonEmptyRuns = j;
1035 
1036  //build heap from the array of keys
1037  pqheap_t1<merge_key<Key> > mergeheap(keys, NonEmptyRuns);
1038 
1039  //repeatedly extract_min from heap, write it to output stream and
1040  //insert next element from same stream
1041  merge_key<Key> minelt;
1042  //rewind output buffer
1043  ami_err = outstream->seek(0);
1044  assert(ami_err == AMI_ERROR_NO_ERROR);
1045  while (!mergeheap.empty()) {
1046  //find min key and id of the stream from whereit comes
1047  mergeheap.min(minelt);
1048  i = minelt.stream_id();
1049  //write min item to output stream
1050  if ((ami_err = outstream->write_item(*in_objects[i]))
1051  != AMI_ERROR_NO_ERROR) {
1052  return ami_err;
1053  }
1054  //read next item from same input stream
1055  if ((ami_err = instreams[i]->read_item(&(in_objects[i])))
1056  != AMI_ERROR_NO_ERROR) {
1057  if (ami_err != AMI_ERROR_END_OF_STREAM) {
1058  return ami_err;
1059  }
1060  }
1061  //extract the min from the heap and insert next key from same stream
1062  if (ami_err == AMI_ERROR_END_OF_STREAM) {
1063  mergeheap.delete_min();
1064  } else {
1065  Key k = in_objects[i]->getPriority();
1066  merge_key<Key> nextit(k, i);
1067  mergeheap.delete_min_and_insert(nextit);
1068  }
1069  } //while
1070 
1071  //delete [] keys;
1072  //!!! KEYS BELONGS NOW TO MERGEHEAP, AND WILL BE DELETED BY THE
1073  //DESTRUCTOR OF MERGEHEAP (CALLED AUUTOMATICALLY ON FUNCTION EXIT) IF
1074  //I DELETE KEYS EXPLICITELY, THEY WILL BE DELETED AGAIN BY DESTRUCTOR,
1075  //AND EVERYTHING SCREWS UP..
1076 
1077  return AMI_ERROR_NO_ERROR;
1078 }
1079 
1080 
1081 
1082 
1083 
1084 /************************************************************/
1085 // insert an array into the buffer; assume array is sorted; return the
1086 // number of items actually inserted; if SAVE_MEMORY FLAG is on, put
1087 // stream on disk and release its memory
1088 template<class T, class Key>
1089 long em_buffer<T,Key>::insert(T* a, long n) {
1090 
1091  assert(a);
1092 
1093  if (is_full()) {
1094  cout << "em_buffer::insert: buffer full\n";
1095  return 0;
1096  }
1097 
1098  //can only insert one full stream at a time
1099  //relaxed..
1100  //assert(n == get_stream_maxlen());
1101 
1102  //create the stream
1103  MEMORY_LOG("em_buffer::insert(from array): allocate AMI_STREAM\n");
1104  AMI_STREAM<T>* str = new AMI_STREAM<T>();
1105  assert(str);
1106 
1107  //write the array to stream
1108  AMI_err ae;
1109  for (long i=0; i< n; i++) {
1110  ae = str->write_item(a[i]);
1111  assert(ae == AMI_ERROR_NO_ERROR);
1112  }
1113  assert(n == str->stream_len());
1114 
1115  //insert the stream in the buffer
1116  return insert(str);
1117 }
1118 
1119 
1120 
1121 
1122 /************************************************************/
1123 /* insert a stream into the buffer; the next free entry in the buffer
1124  is set to point to the stream; if SAVE_MEMORY flag is on, the
1125  stream is put to disk;
1126 
1127  the <nextstream> pointer of buffer is set to point to the argument
1128  stream; (in this way no stream copying is done, just one pointer
1129  copy). The user should be aware the the argument stream is 'lost' -
1130  that is a stream cannot be inserted repeatedly into many buffers
1131  because this would lead to several buffers pointing to the same
1132  stream.
1133 
1134  stream is assume stream is sorted; bos = how many elements must be
1135  skipped (were deleted) from the beginning fo stream;
1136 
1137  return the number of items actually inserted */
1138 template<class T, class Key>
1140 
1141  assert(str);
1142 
1143  if (is_full()) {
1144  cout << "em_buffer::insert: buffer full\n";
1145  return 0;
1146  }
1147 
1148  //can only insert one level-i-full-stream at a time;
1149  //relaxed..can specify bos;
1150  //not only that, but the length of the stream can be smaller
1151  //than nominal length, because a stream is normally obtained by
1152  //merging streams which can be shorter;
1153  //assert(str->stream_len() == get_stream_len() - bos);
1154 
1155 
1156 #ifdef EMBUF_CHECK_INSERT
1157  //check that stream is sorted
1158  cout << "CHECK_INSERT: checking stream is sorted\n";
1159  AMI_err ae;
1160  str->seek(0);
1161  T *crt=NULL, *prev=NULL;
1162  while (str->read_item(&crt)) {
1163  assert(ae == AMI_ERROR_NO_ERROR);
1164  if (prev) assert(*prev <= *crt);
1165  }
1166 #endif
1167 
1168  //nextstream must be empty
1169  assert(str);
1170  assert(data[nextstream()] == NULL);
1171  assert(deleted[nextstream()] == 0);
1172  assert(streamsize[nextstream()] == 0);
1173 #ifdef SAVE_MEMORY
1174  assert(name[nextstream()] == NULL);
1175 #endif
1176 
1177 
1178  //set next entry i the buffer to point to this stream
1179  data[nextstream()] = str;
1180  deleted[nextstream()] = bos;
1181  streamsize[nextstream()] = str->stream_len();
1182 #ifdef SAVE_MEMORY
1183  //set next name entry in buffer to point to this stream's name
1184  char* s;
1185  str->name(&s); //name() allocates the string
1186  name[nextstream()] = s;
1187 
1188  //put stream on disk and release its memory
1190  delete str; //stream should be persistent; just delete it
1191  data[nextstream()] = NULL;
1192 
1193 #ifdef EMBUF_PRINT_INSERT
1194  cout << "insert stream " << s << " at buf [" << level
1195  << "," << nextstream() << "]" << endl;
1196 #endif
1197 #endif
1198 
1199  //increment the index of next available stream in buffer
1200  incr_nextstream();
1201 
1202 #ifdef EMBUF_PRINT_INSERT
1204  print_stream_names();
1205 #endif
1206 
1207 #ifdef SAVE_MEMORY
1208  MY_LOG_DEBUG_ID("em_buffer::insert(): inserted stream ");
1209  MY_LOG_DEBUG_ID(name[nextstream()-1]);
1210 #endif
1211 
1212  //return nb of items inserted
1213  return get_stream_len(nextstream()-1);
1214 }
1215 
1216 
1217 
1218 
1219 /************************************************************/
1220 //print the elements of the i'th stream of the buffer to a stream;
1221 //assumes stream is in memory;
1222 template<class T, class Key>
1223 void em_buffer<T,Key>::print_stream(ostream& s, unsigned int i) {
1224 
1225  assert(data[i]);
1226  assert((i>=0) && (i<index));
1227 
1228  AMI_err ae;
1229  T* x;
1230 
1231  s << "STREAM " << i << ": [";
1232 
1233  ae = data[i]->seek(deleted[i]);
1234  assert(ae == AMI_ERROR_NO_ERROR);
1235 
1236  for (long j = 0; j < get_stream_len(i); j++) {
1237  ae = data[i]->read_item(&x);
1238  assert(ae == AMI_ERROR_NO_ERROR);
1239  s << *x << ",";
1240  }
1241  s << "]\n";
1242 }
1243 
1244 
1245 
1246 /************************************************************/
1247 //print elements range in buffer (read first and last element in each
1248 //substream and find global min and max)
1249 template<class T, class Key>
1251 
1252  T *min, *max;
1253  AMI_err ae;
1254 
1255  get_streams();
1256 
1257  for (unsigned int i=0; i< index; i++) {
1258  cout << "[";
1259  //read min element in substream i
1260  ae = data[i]->seek(deleted[i]);
1261  assert(ae == AMI_ERROR_NO_ERROR);
1262  ae = data[i]->read_item(&min);
1263  assert(ae == AMI_ERROR_NO_ERROR);
1264  cout << min->getPriority() << "..";
1265  //read max element in substream i
1266  ae = data[i]->seek(streamsize[i] - 1);
1267  assert(ae == AMI_ERROR_NO_ERROR);
1268  ae = data[i]->read_item(&max);
1269  assert(ae == AMI_ERROR_NO_ERROR);
1270  cout << max->getPriority()
1271  << " (sz=" << get_stream_len(i) << ")] ";
1272  }
1273  for (unsigned int i=index; i< arity; i++) {
1274  cout << "[] ";
1275  }
1276 
1277  put_streams();
1278 }
1279 
1280 
1281 
1282 /************************************************************/
1283 //print all elements in buffer
1284 template<class T, class Key>
1286 
1287  T *x;
1288  AMI_err ae;
1289 
1290  get_streams();
1291 
1292  for (unsigned int i=0; i<index; i++) {
1293  cout << " [";
1294  ae = data[i]->seek(deleted[i]);
1295  assert(ae == AMI_ERROR_NO_ERROR);
1296  for (unsigned long j=0; j<get_stream_len(i); j++) {
1297  ae = data[i]->read_item(&x);
1298  assert(ae == AMI_ERROR_NO_ERROR);
1299  cout << x->getPriority() << ",";
1300  }
1301  cout << "]" << endl;
1302  }
1303  for (unsigned int i=index; i< arity; i++) {
1304  cout << "[] ";
1305  }
1306 
1307  put_streams();
1308 }
1309 
1310 
1311 
1312 /************************************************************/
1313 //print the sizes of the substreams in the buffer
1314 template<class T, class Key>
1316 
1317  cout << "(streams=" << index << ") sizes=[";
1318  for (unsigned int i=0; i< arity; i++) {
1319  cout << get_stream_len(i) << ",";
1320  }
1321  cout << "]" << endl;
1322  cout.flush();
1323 }
1324 
1325 
1326 
1327 #endif
unsigned long get_buf_maxlen()
Definition: embuffer.h:333
bool empty(void)
Definition: pqheap.h:315
void cleanup()
Definition: embuffer.h:786
AMI_err read_item(T **elt)
Definition: ami_stream.h:543
unsigned int nextstream() const
Definition: embuffer.h:282
#define min(x, y)
Definition: draw2.c:31
friend int operator<(const merge_key &x, const merge_key &y)
Definition: embuffer.h:132
void set(const KEY &x, const unsigned int sid)
Definition: embuffer.h:115
AMI_err write_item(const T &elt)
Definition: ami_stream.h:606
merge_key()
Definition: embuffer.h:108
void incr_deleted(unsigned int i)
Definition: embuffer.h:303
void reset()
Definition: embuffer.h:903
void print()
Definition: embuffer.h:1285
#define NULL
Definition: ccmath.h:32
AMI_STREAM< T > * sort()
Definition: embuffer.h:937
#define x
#define max(x, y)
Definition: draw2.c:32
~merge_key()
Definition: embuffer.h:113
void delete_min_and_insert(const T &x)
Definition: pqheap.h:526
AMI_STREAM< T > ** get_streams()
Definition: embuffer.h:705
AMI_err seek(off_t offset)
Definition: ami_stream.h:460
#define MY_LOG_DEBUG_ID(x)
Definition: embuffer.h:55
unsigned int stream_id() const
Definition: embuffer.h:122
KEY key() const
Definition: embuffer.h:119
~em_buffer()
Definition: embuffer.h:544
#define assert(condition)
Definition: lz4.c:324
merge_key(const KEY &x, const unsigned int sid)
Definition: embuffer.h:110
double b
Definition: r_raster.c:39
AMI_STREAM< T > * get_stream(unsigned int i)
Definition: embuffer.h:604
friend int operator==(const merge_key &x, const merge_key &y)
Definition: embuffer.h:152
friend merge_key operator+(const merge_key &x, const merge_key &y)
Definition: embuffer.h:156
void put_streams()
Definition: embuffer.h:732
em_buffer(const unsigned short i, const unsigned long bs, const unsigned int ar)
Definition: embuffer.h:448
void MEMORY_LOG(const std::string &str)
Definition: mm_utils.cpp:61
friend int operator>(const merge_key &x, const merge_key &y)
Definition: embuffer.h:140
long insert(T *a, long n)
Definition: embuffer.h:1089
unsigned long get_buf_len()
Definition: embuffer.h:324
void print_stream_sizes()
Definition: embuffer.h:1315
bool min(T &elt)
Definition: pqheap.h:329
long total_deleted() const
Definition: embuffer.h:294
bool delete_min()
Definition: pqheap.h:451
AMI_err name(char **stream_name)
Definition: ami_stream.h:440
unsigned int get_nbstreams() const
Definition: embuffer.h:288
unsigned long get_stream_maxlen() const
Definition: embuffer.h:311
unsigned int get_arity() const
Definition: embuffer.h:291
void persist(persistence p)
Definition: ami_stream.h:657
unsigned int laststream() const
Definition: embuffer.h:279
bool is_full() const
Definition: embuffer.h:343
AMI_err
Definition: ami_stream.h:86
bool is_empty()
Definition: embuffer.h:338
KEY getPriority() const
Definition: embuffer.h:125
void print_range()
Definition: embuffer.h:1250
friend int operator!=(const merge_key &x, const merge_key &y)
Definition: embuffer.h:148
const char * name
Definition: named_colr.c:7
unsigned short get_level() const
Definition: embuffer.h:260
off_t stream_len(void)
Definition: ami_stream.h:388
long * get_bos() const
Definition: embuffer.h:276
unsigned long get_stream_len(unsigned int i)
Definition: embuffer.h:317
void put_stream(unsigned int i)
Definition: embuffer.h:665
void incr_nextstream()
Definition: embuffer.h:285
friend int operator<=(const merge_key &x, const merge_key &y)
Definition: embuffer.h:136
friend ostream & operator<<(ostream &s, em_buffer &b)
Definition: embuffer.h:395
unsigned int str_id
Definition: embuffer.h:105
friend int operator>=(const merge_key &x, const merge_key &y)
Definition: embuffer.h:144