47 #define MY_LOG_DEBUG_ID(x) \ 48 if(GETOPT("debug")) cerr << __FILE__ << ":" << __LINE__<< " " << x << endl; 54 #define MY_LOG_DEBUG_ID(x) 65 template<
class T,
class Key>
70 unsigned short buf_id;
77 x(e), buf_id(bid), str_id(sid) {}
81 void set (T &e,
unsigned short bid,
unsigned int sid) {
96 return x.getPriority();
101 return s <<
"<buf_id=" << elt.buf_id
102 <<
",str_id=" << elt.str_id <<
"> " 106 friend int operator < (const ExtendedEltMergeType<T,Key> &e1,
108 return (e1.getPriority() < e2.getPriority());
110 friend int operator <= (const ExtendedEltMergeType<T,Key> &e1,
112 return (e1.getPriority() <= e2.getPriority());
137 template<
class T,
class Key>
139 unsigned short nb_buf,
140 unsigned int buf_ar):
141 pqsize(pq_sz), bufsize(buf_sz), max_nbuf(nb_buf),
142 crt_buf(0), buf_arity(buf_ar) {
148 printf(
"EM_PQUEUE:available memory before allocation: %.2fMB\n",
149 mm_avail/(
float)(1<<20));
150 printf(
"EM_PQUEUE:available memory before allocation: %ldB\n",
157 assert(pqsize > 0 && bufsize > 0);
159 MEMORY_LOG(
"em_pqueue: allocating int pqueue\n");
170 sprintf(str,
"em_pqueue: allocating array of %ld buff pointers\n",
177 for (
unsigned short i=0; i<max_nbuf; i++) {
187 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
188 mm_avail/(
float)(1<<20));
196 cout <<
"em_pqueue constructor: failing to get stream_usage\n";
199 cout <<
"EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
200 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
205 2*sz_stream + max_nbuf*sz_stream;
208 cout <<
"EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
209 if (mm_overhead > mm_avail) {
210 cout <<
"overhead bigger than available memory" 211 <<
"increase -m and try again\n";
214 mm_avail -= mm_overhead;
218 cout <<
"pqsize=" << pqsize
219 <<
", bufsize=" << bufsize
220 <<
", maximum allowed arity=" << mm_avail/sz_stream << endl;
221 if (buf_arity * sz_stream > mm_avail) {
222 cout <<
"sorry - empq excedes memory limits\n";
223 cout <<
"try again decreasing arity or pqsize/bufsize\n";
231 template<
class T,
class Key>
242 printf(
"EM_PQUEUE:available memory before allocation: %.2fMB\n",
243 mm_avail/(
float)(1<<20));
252 cout <<
"em_pqueue constructor: failing to get main_memory_usage\n";
255 cout <<
"EM_PQUEUE:AMI_stream memory usage: " << sz_stream << endl;
256 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
262 buf_arity = mm_avail/(2 * sz_stream);
265 2*sz_stream + max_nbuf*sz_stream;
268 cout <<
"EM_PQUEUE: mm_overhead estimated as " << mm_overhead << endl;
269 if (mm_overhead > mm_avail) {
270 cout <<
"overhead bigger than available memory" 271 <<
"increase -m and try again\n";
274 mm_avail -= mm_overhead;
279 pqsize = mm_avail/(2*
sizeof(T));
281 bufsize = mm_avail/(2*
sizeof(T));
284 pqsize = mm_avail/(4*
sizeof(T));
286 bufsize = mm_avail/(4*
sizeof(T));
289 cout <<
"EM_PQUEUE: pqsize set to " << pqsize << endl;
290 cout <<
"EM_PQUEUE: bufsize set to " << bufsize << endl;
291 cout <<
"EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
303 buf_arity = mm_avail/(2 * sz_stream);
305 buf_arity = mm_avail/(2 * max_nbuf * sz_stream);
315 cout <<
"EM_PQUEUE: arity set to " << buf_arity << endl;
320 MEMORY_LOG(
"em_pqueue: allocating int pqueue\n");
331 sprintf(str,
"em_pqueue: allocating array of %ld buff pointers\n",
337 for (
unsigned short i=0; i<max_nbuf; i++) {
342 cout <<
"EM_PQUEUE: maximum length is " <<
maxlen() <<
"\n";
349 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
350 mm_avail/(
float)(1<<20));
362 template<
class T,
class Key>
367 unsigned int pqcurrentsize;
371 pqsize = pqcapacity + 1;
372 pqcurrentsize = im->
size();
374 if(!(pqcurrentsize <= pqsize)) {
375 cout <<
"EMPQ: pq maxsize=" << pqsize <<
", pq crtsize=" << pqcurrentsize
393 for (
unsigned int i=0; i<pqcurrentsize; i++) {
398 delete im; im =
NULL;
402 bufsize = pqcapacity;
403 cout <<
"EM_PQUEUE: allocating im_buffer size=" << bufsize
404 <<
" total " << (float)bufsize*
sizeof(T)/(1<<20) <<
"MB\n";
408 cout <<
"EM_PQUEUE: allocating pq size=" << pqsize
409 <<
" total " << (float)pqcapacity*
sizeof(T)/(1<<20) <<
"MB\n";
418 for (
unsigned int i=0; i<pqcurrentsize; i++) {
423 assert(pq->size() == pqcurrentsize);
433 cout <<
"em_pqueue constructor: failing to get main_memory_usage\n";
436 cout <<
"EM_PQUEUE: AMI_stream memory usage: " << sz_stream << endl;
437 cout <<
"EM_PQUEUE: item size=" <<
sizeof(T) << endl;
440 buf_arity = pqcapacity *
sizeof(T) / sz_stream;
442 if (buf_arity == 0) {
443 cout <<
"EM_PQUEUE: arity=0 (not enough memory..)\n";
459 sprintf(str,
"em_pqueue: allocating array of %ld buff pointers\n",
464 for (
unsigned short i=0; i<max_nbuf; i++) {
469 cout <<
"EM_PQUEUE: new pqsize set to " << pqcapacity << endl;
470 cout <<
"EM_PQUEUE: bufsize set to " << bufsize << endl;
471 cout <<
"EM_PQUEUE: buf arity set to " << buf_arity << endl;
472 cout <<
"EM_PQUEUE: nb buffers set to " << max_nbuf << endl;
473 cout <<
"EM_PQUEUE: maximum length is " <<
maxlen() <<
"\n";
478 printf(
"EM_PQUEUE: available memory after allocation: %.2fMB\n",
479 mm_avail/(
float)(1<<20));
486 MEMORY_LOG(
"em_pqueue::empty_buff_0: create new em_buffer\n");
488 buff[0]->insert(amis);
499 template<
class T,
class Key>
503 delete pq; pq =
NULL;
507 delete buff_0; buff_0 =
NULL;
510 for (
unsigned short i=0; i< crt_buf; i++) {
511 if (buff[i])
delete buff[i];
519 template<
class T,
class Key>
523 printf(
"em_pqueue::max_len: level=%d exceeds capacity=%d\n",
528 return buff[i]->get_buf_maxlen();
533 cout <<
"em_pqueue::max_len: cannot allocate\n";
544 template<
class T,
class Key>
547 for (
unsigned short i=0; i< max_nbuf; i++) {
550 return len + buff_0->get_buf_maxlen();
557 template<
class T,
class Key>
560 unsigned long elen = 0;
561 for (
unsigned short i=0; i < crt_buf; i++) {
562 elen += buff[i]->get_buf_len();
564 return elen + pq->size() + buff_0->get_buf_len();
570 template<
class T,
class Key>
575 return ((pq->size() == 0) && (buff_0->get_buf_len() == 0) &&
582 template<
class T,
class Key>
588 for (
unsigned short i=0; i<crt_buf; i++) {
589 k |= buff[i]->get_buf_len();
592 cerr <<
"fillpq called with empty external buff!" << endl;
598 #ifdef EMPQ_PQ_FILL_PRINT 599 cout <<
"filling pq\n"; cout .flush();
601 XXX cerr <<
"filling pq" << endl;
607 sprintf(str,
"em_pqueue::fillpq: allocate array of %hd AMI_STREAMs\n",
616 for (
unsigned short i=0; i< crt_buf; i++) {
619 assert(buff[i]->get_buf_len());
622 assert(outstreams[i]->stream_len());
630 delete outstreams[0];
631 delete [] outstreams;
638 for (
int i=0; i< crt_buf; i++) {
639 delete outstreams[i];
641 delete [] outstreams;
649 delete minstream; minstream =
NULL;
654 XXX cerr <<
"fillpq done" << endl;
662 template<
class T,
class Key>
683 if (buff_0->is_empty()) {
687 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0 688 cout <<
"filling pq from B0\n"; cout.flush();
691 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
692 buff_0->reset(pqsize, n);
702 XXX cerr <<
"fillpq done; about to take min" << endl;
704 XXX cerr <<
"after taking min" << endl;
715 template<
class T,
class Key>
730 template<
class T,
class Key>
741 ok = pq->extract_min(elt);
747 MY_LOG_DEBUG_ID(
"internal pq empty: filling it up from external buffers");
754 if (buff_0->is_empty()) {
757 #ifdef EMPQ_PRINT_FILLPQ_FROM_BUFF0 758 cout <<
"filling pq from B0\n"; cout.flush();
762 long n = pq->fill(buff_0->get_array(), buff_0->get_buf_len());
763 buff_0->reset(pqsize, n);
764 ok = pq->extract_min(elt);
772 #ifdef EMPQ_PRINT_SIZE 774 y = x*
sizeof(T) >> 20;
777 cout <<
" total " << x <<
"(" << y <<
"MB)]" << endl;
782 XXX cerr <<
"about to get the min" << endl;
784 ok = pq->extract_min(elt);
786 cout <<
"failing assertion: pq->extract_min == true\n";
803 template<
class T,
class Key>
819 if ((!
min(next_elt)) ||
820 !(next_elt.getPriority() == elt.getPriority())) {
824 elt = elt + next_elt;
832 #ifdef EMPQ_PRINT_EXTRACTALL 833 cout <<
"EXTRACTED: " << elt << endl; cout.flush();
835 #ifdef EMPQ_PRINT_EMPQ 852 template<
class T,
class Key>
864 ae = minstream->seek(0);
867 bool strEmpty=
false, bufEmpty=
false;
869 unsigned int bufPos = 0;
873 ae = minstream->read_item(&strItem);
879 if (bufPos < buff_0->get_buf_len()) {
880 bufElt = buff_0->get_item(bufPos);
886 XXX cerr <<
"pqsize=" << pqsize << endl;
887 XXX if(strEmpty) cerr <<
"stream is empty!!" << endl;
888 for (
unsigned int i=0; i< pqsize; i++) {
891 if ((!strEmpty) && (strElt = strItem->
elt(),
892 bufElt.getPriority() > strElt.getPriority())) {
895 ae = minstream->read_item(&strItem);
905 if (bufPos < buff_0->get_buf_len()) {
906 bufElt = buff_0->get_item(bufPos);
913 strElt = strItem->
elt();
918 ae = minstream->read_item(&strItem);
931 buff_0->shift_left(bufPos);
934 #ifdef EMPQ_PQ_FILL_PRINT 935 cout <<
"merge_bufs2pq: pq filled; now cleaning\n"; cout .flush();
948 template<
class T,
class Key>
950 unsigned int stream_id) {
954 assert(stream_id < buff[buf_id]->get_nbstreams());
956 buff[buf_id]->incr_deleted(stream_id);
963 template<
class T,
class Key>
967 #ifdef EMPQ_PQ_FILL_PRINT 968 cout <<
"em_pqueue: cleanup enter\n"; cout .flush();
971 for (
unsigned short i=0; i< crt_buf; i++) {
977 while ((i>=0) && buff[i]->
is_empty()) {
982 #ifdef EMPQ_PQ_FILL_PRINT 983 cout <<
"em_pqueue: cleanup done\n"; cout .flush();
993 template<
class T,
class Key>
996 #ifdef EMPQ_ASSERT_EXPENSIVE 997 long init_size =
size();
1003 if ((crt_buf == 0) && (buff_0->is_empty())) {
1013 ok = pq->max(pqmax);
1019 #ifdef EMPQ_ASSERT_EXPENSIVE 1026 pq->extract_max(val);
1037 #ifdef EMPQ_ASSERT_EXPENSIVE 1040 if (buff_0->is_full()) {
1041 #ifdef EMPQ_PRINT_SIZE 1043 y = x*
sizeof(T) >> 20;
1046 cout <<
" total " << x <<
"(" << y <<
"MB)]" << endl;
1051 #ifdef EMPQ_ASSERT_EXPENSIVE 1055 assert(!buff_0->is_full());
1057 ok = buff_0->insert(val);
1060 #ifdef EMPQ_PRINT_INSERT 1061 cout <<
"INSERTED: " << x << endl; cout.flush();
1063 #ifdef EMPQ_PRINT_EMPQ 1076 template<
class T,
class Key>
bool 1078 #ifdef EMPQ_ASSERT_EXPENSIVE 1079 long init_size =
size();
1082 #ifdef EMPQ_EMPTY_BUF_PRINT 1083 cout <<
"emptying buff_0\n"; cout.flush();
1088 assert(buff_0->is_full());
1093 #ifdef EMPQ_ASSERT_EXPENSIVE 1099 MEMORY_LOG(
"em_pqueue::empty_buff_0: create new em_buffer\n");
1103 assert(buff_0->get_buf_len() == buff[0]->get_stream_maxlen());
1115 #ifdef EMPQ_ASSERT_EXPENSIVE 1116 assert(
size() + buff_0->maxlen() == init_size);
1124 buff[0]->insert(buff_0_str);
1128 if (crt_buf == 0) crt_buf = 1;
1130 #ifdef EMPQ_ASSERT_EXPENSIVE 1143 template<
class T,
class Key>
1147 #ifdef EMPQ_ASSERT_EXPENSIVE 1148 long init_size =
size();
1150 #ifdef EMPQ_EMPTY_BUF_PRINT 1151 cout <<
"emptying buffer_" << i <<
"\n"; cout.flush();
1162 if (i == max_nbuf-1) {
1163 cerr <<
"empty_buff:: cannot empty further - structure is full..\n";
1165 cerr <<
"ext buff array should reallocate in a future version..\n";
1173 sprintf(str,
"em_pqueue::empty_buff( %hd ) allocate new em_buffer\n", i);
1188 if (sorted_buf->
stream_len() != buff[i]->get_buf_len()) {
1189 cout <<
"sorted_stream_len: " << sorted_buf->
stream_len()
1190 <<
" , bufflen: " << buff[i]->get_buf_len() << endl; cout.flush();
1192 ae = sorted_buf->
seek(0);
1196 cout << *x <<
", "; cout.flush();
1199 #ifdef EMPQ_ASSERT_EXPENSIVE 1203 #ifdef EMPQ_ASSERT_EXPENSIVE 1208 #ifdef EMPQ_ASSERT_EXPENSIVE 1219 buff[i+1]->insert(sorted_buf, 0);
1222 if (crt_buf < i+2) crt_buf = i+2;
1224 #ifdef EMPQ_ASSERT_EXPENSIVE 1242 template<
class T,
class Key>
1248 unsigned short bufid = buf->
get_level() - 1;
1252 std::vector<T*> in_objects(arity);
1266 MEMORY_LOG(
"em_pqueue::merge_buffer: allocate keys array\n");
1272 for (i = 0; i < arity ; i++ ) {
1276 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1280 ami_err = instreams[i]->
read_item(&(in_objects[i]));
1283 in_objects[i] =
NULL;
1294 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1298 unsigned int NonEmptyRuns = j;
1308 ami_err = outstream->seek(0);
1311 while (!mergeheap.
empty() && (extracted < K)) {
1313 i = mergeheap.
min().stream_id();
1317 cerr <<
"WARNING!!! EARLY EXIT!!!" << endl;
1323 ami_err = instreams[i]->
read_item(&(in_objects[i]));
1337 cerr <<
"WARNING!!! early breakout!!!" << endl;
1353 assert(extracted == outstream->stream_len());
1367 template<
class T,
class Key>
1370 unsigned short arity,
1377 std::vector<ExtendedEltMergeType<T,Key> > in_objects(arity);
1381 unsigned int nonEmptyRuns=0;
1385 MEMORY_LOG(
"em_pqueue::merge_streams: allocate keys array\n");
1391 for (
int i = 0; i < arity ; i++ ) {
1398 ami_err = instreams[i]->read_item(&objp);
1401 in_objects[i] = *objp;
1402 keys[nonEmptyRuns] =
merge_key<Key>(in_objects[i].getPriority(), i);
1411 assert(nonEmptyRuns <= arity);
1419 ami_err = outstream->seek(0);
1422 while (!mergeheap.
empty() && (extracted < K)) {
1424 int id = mergeheap.
min().stream_id();
1426 assert(
id < nonEmptyRuns);
1436 assert(
id < nonEmptyRuns);
1439 ami_err = instreams[id]->read_item(&objp);
1443 in_objects[id] = *objp;
1469 template<
class T,
class Key>
1475 for(
int i=0; i<crt_buf; i++) {
1477 delete buff[i]; buff[i] =
NULL;
1485 template<
class T,
class Key>
1488 cout <<
"EM_PQ: [pq=" << pqsize
1489 <<
", b=" << bufsize
1490 <<
", bufs=" << max_nbuf
1491 <<
", ar=" << buf_arity <<
"]\n";
1503 for (
unsigned short i=0; i < crt_buf; i++) {
1504 cout <<
"B" << i+1 <<
": ";
1505 buff[i]->print_range();
1514 template<
class T,
class Key>
1517 cout <<
"EM_PQ: [pq=" << pqsize
1518 <<
", b=" << bufsize
1519 <<
", bufs=" << max_nbuf
1520 <<
", ar=" << buf_arity <<
"]\n";
1530 for (
unsigned short i=0; i < crt_buf; i++) {
1531 cout <<
"B" << i+1 <<
": " << endl;
1539 template<
class T,
class Key>
1543 cout <<
"EMPQ: pq=" << pq->size() <<
",B0=" << buff_0->get_buf_len() << endl;
1545 for (
unsigned short i=0; i < crt_buf; i++) {
1547 cout <<
"B_" << i+1 <<
":"; cout.flush();
1548 buff[i]->print_stream_sizes();
1549 elen += buff[i]->get_buf_len();
1552 cout <<
"total: " << elen + pq->size() + buff_0->get_buf_len() << endl << endl;
1559 template<
class T,
class Key>
1561 for (
unsigned short i=0; i< crt_buf; i++) {
1563 buff[i]->print_stream_sizes();
unsigned long get_buf_maxlen()
AMI_err read_item(T **elt)
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
friend ostream & operator<<(ostream &s, const ExtendedEltMergeType< T, Key > &elt)
bool insert(const T &elt)
AMI_err merge_buffer(em_buffer< T, Key > *buf, ExtendedMergeStream *outstr, long K)
size_t getAvailableMemory()
AMI_err write_item(const T &elt)
void empty_buff(unsigned short i)
unsigned short buffer_id() const
friend int operator>(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
unsigned int size(void) const
bool extract_all_min(T &elt)
void delete_min_and_insert(const T &x)
friend int operator==(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
AMI_STREAM< T > ** get_streams()
AMI_err seek(off_t offset)
#define ExtendedMergeStream
void delete_str_elt(unsigned short buf_id, unsigned int stream_id)
#define assert(condition)
#define MY_LOG_DEBUG_ID(x)
void MEMORY_LOG(const std::string &str)
void merge_bufs2pq(ExtendedMergeStream *minstream)
unsigned long get_buf_len()
unsigned int get_nbstreams() const
unsigned int stream_id() const
friend int operator!=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
void print_stream_sizes()
ExtendedEltMergeType(T &e, unsigned short bid, unsigned int sid)
AMI_err merge_streams(ExtendedMergeStream **instr, unsigned short arity, ExtendedMergeStream *outstr, long K)
friend int operator>=(const ExtendedEltMergeType< T, Key > &e1, const ExtendedEltMergeType< T, Key > &e2)
unsigned short get_level() const
HeapIndex get_maxsize() const