GRASS GIS 8 Programmer's Manual  8.2.2dev(2023)-3d2c704037
ami_stream.h
Go to the documentation of this file.
1 /****************************************************************************
2  *
3  * MODULE: iostream
4  *
5 
6  * COPYRIGHT (C) 2007 Laura Toma
7  *
8  *
9 
10  * Iostream is a library that implements streams, external memory
11  * sorting on streams, and an external memory priority queue on
12  * streams. These are the fundamental components used in external
13  * memory algorithms.
14 
15  * Credits: The library was developed by Laura Toma. The kernel of
16  * class STREAM is based on the similar class existent in the GPL TPIE
17  * project developed at Duke University. The sorting and priority
18  * queue have been developed by Laura Toma based on communications
19  * with Rajiv Wickremesinghe. The library was developed as part of
20  * porting Terraflow to GRASS in 2001. PEARL upgrades in 2003 by
21  * Rajiv Wickremesinghe as part of the Terracost project.
22 
23  *
24  * This program is free software; you can redistribute it and/or modify
25  * it under the terms of the GNU General Public License as published by
26  * the Free Software Foundation; either version 2 of the License, or
27  * (at your option) any later version.
28  *
29 
30  * This program is distributed in the hope that it will be useful,
31  * but WITHOUT ANY WARRANTY; without even the implied warranty of
32  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
33  * General Public License for more details. *
34  * **************************************************************************/
35 
36 
37 
38 #ifndef _AMI_STREAM_H
39 #define _AMI_STREAM_H
40 
41 #include <grass/config.h>
42 
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <assert.h>
48 #include <fcntl.h>
49 #include <errno.h>
50 #include <unistd.h>
51 
52 #include <cstring>
53 #include <iostream>
54 using std::cout;
55 using std::cerr;
56 using std::endl;
57 using std::ostream;
58 using std::ofstream;
59 using std::istream;
60 
61 extern "C" {
62 #include <grass/gis.h>
63 }
64 
65 #define MAX_STREAMS_OPEN 200
66 
67 #include "mm.h" // Get the memory manager.
68 
69 #define DEBUG_DELETE if(0)
70 #define DEBUG_STREAM_LEN if(0)
71 #define DEBUG_ASSERT if(0)
72 
73 // The name of the environment variable which keeps the name of the
74 // directory where streams are stored
75 #define STREAM_TMPDIR "STREAM_DIR"
76 
77 // All streams will be names STREAM_*****
78 #define BASE_NAME "STREAM"
79 
80 #define STREAM_BUFFER_SIZE (1<<18)
81 
82 
83 //
84 // AMI error codes are returned using the AMI_err type.
85 //
86 enum AMI_err {
100 };
101 
102 extern const char *ami_str_error[];
103 
104 //
105 // AMI stream types passed to constructors
106 //
108  AMI_READ_STREAM = 1, // Open existing stream for reading
109  AMI_WRITE_STREAM, // Open for writing. Create if non-existent
110  AMI_APPEND_STREAM, // Open for writing at end. Create if needed.
111  AMI_READ_WRITE_STREAM, // Open to read and write.
112  AMI_APPEND_WRITE_STREAM // Open for writing at end (write only mode).
113 };
114 
115 
116 
117 
119  // Delete the stream from the disk when it is destructed.
121  // Do not delete the stream from the disk when it is destructed.
123  // Delete each block of data from the disk as it is read.
125 };
126 
127 /* an un-templated version makes for easier debugging */
129 protected:
130  FILE * fp;
131  int fildes; //descriptor of file
133  char path[BUFSIZ];
135 
136  //0 for streams, positive for substreams
137  unsigned int substream_level;
138 
139  // If this stream is actually a substream, these will be set to
140  // indicate the portion of the file that is part of this stream. If
141  // the stream is the whole file, they will be set to -1. Both are in
142  // T units.
143  off_t logical_bos;
144  off_t logical_eos;
145 
146  //stream buffer passed in the call to setvbuf when file is opened
147  char* buf;
149 
150  public:
151  static unsigned int get_block_length() {
152  return STREAM_BUFFER_SIZE;
153  //return getpagesize();
154  };
155 
156 };
157 
158 template<class T>
159 class AMI_STREAM : public UntypedStream {
160 
161 protected:
162 
163  T read_tmp; /* this is ugly... RW */
164 
165 public:
166  // An AMI_stream with default name
167  AMI_STREAM();
168 
169  // An AMI stream based on a specific path name.
170  AMI_STREAM(const char *path_name, AMI_stream_type st = AMI_READ_WRITE_STREAM);
171 
172  // convenience function with split path_name
173  //AMI_STREAM(const char *dir_name, const char *file_name, AMI_stream_type st);
174 
175 
176  // A psuedo-constructor for substreams.
177  AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end,
178  AMI_STREAM<T> **sub_stream);
179 
180  // Destructor
181  ~AMI_STREAM(void);
182 
183  // Read and write elements.
184  AMI_err read_item(T **elt);
185  AMI_err write_item(const T &elt);
186  AMI_err read_array(T *data, off_t len, off_t *lenp=NULL);
187  AMI_err write_array(const T *data, off_t len);
188 
189  // Return the number of items in the stream.
190  off_t stream_len(void);
191 
192  // Return the path name of this stream.
193  AMI_err name(char **stream_name);
194  const char* name() const;
195 
196  // Move to a specific item in the stream.
197  AMI_err seek(off_t offset);
198 
199  // Query memory usage
200  static AMI_err main_memory_usage(size_t *usage,
202 
203  void persist(persistence p);
204 
205  char *sprint();
206 
207  // have we hit the end of the stream
208  int eof();
209 };
210 
211 
212 /**********************************************************************/
213 
214 
215 /**********************************************************************/
216 /* creates a random file name, opens the file for reading and writing
217  and and returns a file descriptor */
218 /* int ami_single_temp_name(char *base, char* tmp_path); */
219 /* fix from Andy Danner */
220 int ami_single_temp_name(const std::string& base, char* tmp_path);
221 
222 
223 /**********************************************************************/
224 /* given fd=fide descriptor, associates with it a stream aopened in
225  access_mode and returns it */
226 FILE* open_stream(int fd, AMI_stream_type st);
227 
228 
229 /**********************************************************************/
230 /* open the file whose name is pathname in access mode */
231 FILE* open_stream(char* pathname, AMI_stream_type st);
232 
233 
234 
235 
236 /********************************************************************/
237 // An AMI stream with default name.
238 template<class T>
240 
243  fildes = fd;
244  fp = open_stream(fd, access_mode);
245 
246  /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
247  buf = new char[STREAM_BUFFER_SIZE];
248  if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
249  cerr << "ERROR: setvbuf failed (stream " << path << ") with: "
250  << strerror(errno) << endl;
251  exit(1);
252  }
253 
254  // By default, all streams are deleted at destruction time.
256 
257  // Not a substream.
258  substream_level = 0;
259  logical_bos = logical_eos = -1;
260 
261  // why is this here in the first place?? -RW
262  seek(0);
263 
264  eof_reached = 0;
265 
266  // Register memory usage before returning.
267  //size_t usage;
268  //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
269  //MM_manager.register_allocation(usage);
270 }
271 
272 
273 
274 /**********************************************************************/
275 // An AMI stream based on a specific path name.
276 template<class T>
278 
279  access_mode = st;
280 
281  if(path_name == NULL) {
283  fildes = fd;
284  fp = open_stream(fd, access_mode);
285  } else {
286  strcpy(path, path_name);
287  fp = open_stream(path, st);
288  fildes = -1;
289  }
290 
291  /* a stream is by default buffered with a buffer of size BUFSIZ=1K */
292  buf = new char[STREAM_BUFFER_SIZE];
293  if (setvbuf(fp, buf, _IOFBF, STREAM_BUFFER_SIZE) != 0) {
294  cerr << "ERROR: setvbuf failed (stream " << path << ") with: "
295  << strerror(errno) << endl;
296  exit(1);
297  }
298 
299  eof_reached = 0;
300 
301  // By default, all streams are deleted at destruction time.
302  if(st == AMI_READ_STREAM) {
304  } else {
306  }
307 
308  // Not a substream.
309  substream_level = 0;
310  logical_bos = logical_eos = -1;
311 
312  seek(0);
313 
314  // Register memory usage before returning.
315  //size_t usage;
316  //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
317  //MM_manager.register_allocation(usage);
318 }
319 
320 
321 
322 /**********************************************************************/
323  // A psuedo-constructor for substreams.
324 template<class T>
326  off_t sub_begin,
327  off_t sub_end,
328  AMI_STREAM<T> **sub_stream) {
329 
330  //assume this for now
331  assert(st == AMI_READ_STREAM);
332 
333 #ifdef __MINGW32__
334  /* MINGW32: reopen file here for stream_len() below */
335  //reopen the file
336  AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
337 #endif
338 
339  //check range
340  if (substream_level) {
341  if( (sub_begin >= (logical_eos - logical_bos)) ||
342  (sub_end >= (logical_eos - logical_bos)) ) {
343 
344  return AMI_ERROR_OUT_OF_RANGE;
345  }
346  } else {
347  off_t len = stream_len();
348  if (sub_begin > len || sub_end > len) {
349 
350  return AMI_ERROR_OUT_OF_RANGE;
351  }
352  }
353 
354 #ifndef __MINGW32__
355  //reopen the file
356  AMI_STREAM<T> *substr = new AMI_STREAM<T>(path, st);
357 #endif
358 
359  // Set up the beginning and end positions.
360  if (substream_level) {
361  substr->logical_bos = logical_bos + sub_begin;
362  substr->logical_eos = logical_bos + sub_end + 1;
363  } else {
364  substr->logical_bos = sub_begin;
365  substr->logical_eos = sub_end + 1;
366  }
367 
368  // Set the current position.
369  substr->seek(0);
370 
371  substr->eof_reached = 0;
372 
373  //set substream level
374  substr->substream_level = substream_level + 1;
375 
376  substr->per = per; //set persistence
377 
378  //*sub_stream = (AMI_base_stream < T > *)substr;
379  *sub_stream = substr;
380  return AMI_ERROR_NO_ERROR;
381 }
382 
383 
384 
385 /**********************************************************************/
386 // Return the number of items in the stream.
387 template<class T>
389 
390  fflush(fp);
391 
392 #ifdef __MINGW32__
393  //stat() fails on MS Windows if the file is open, so try G_ftell() instead.
394  //FIXME: not 64bit safe, but WinGrass isn't either right now.
395  off_t posn_save, st_size;
396 
397  posn_save = G_ftell(fp);
398  if(posn_save == -1) {
399  perror("ERROR: AMI_STREAM::stream_len(): ftell(fp) failed ");
400  perror(path);
401  exit(1);
402  }
403 
404  G_fseek(fp, 0, SEEK_END);
405  st_size = G_ftell(fp);
406  if(st_size == -1) {
407  perror("ERROR: AMI_STREAM::stream_len(): ftell[SEEK_END] failed ");
408  perror(path);
409  exit(1);
410  }
411 
412  G_fseek(fp, posn_save, SEEK_SET);
413 
414  //debug stream_len:
415  DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%d\n",
416  path, st_size, sizeof(T));
417 
418  return (st_size / sizeof(T));
419 #else
420  struct stat statbuf;
421  if (stat(path, &statbuf) == -1) {
422  perror("AMI_STREAM::stream_len(): fstat failed ");
423  DEBUG_ASSERT assert(0);
424  exit(1);
425  }
426 
427  //debug stream_len:
428  DEBUG_STREAM_LEN fprintf(stderr, "%s: length = %lld sizeof(T)=%lud\n",
429  path, (long long int)statbuf.st_size, sizeof(T));
430 
431  return (statbuf.st_size / sizeof(T));
432 #endif
433 }
434 
435 
436 
437 /**********************************************************************/
438 // Return the path name of this stream.
439 template<class T>
440 AMI_err AMI_STREAM<T>::name(char **stream_name) {
441 
442  *stream_name = new char [strlen(path) + 1];
443  strcpy(*stream_name, path);
444 
445  return AMI_ERROR_NO_ERROR;
446 }
447 
448 // Return the path name of this stream.
449 template<class T>
450 const char *
452  return path;
453 }
454 
455 
456 
457 /**********************************************************************/
458 // Move to a specific offset within the (sub)stream.
459 template<class T>
461 
462  off_t seek_offset;
463 
464  if (substream_level) { //substream
465  if (offset > (unsigned) (logical_eos - logical_bos)) {
466  //offset out of range
467  cerr << "ERROR: AMI_STREAM::seek bos=" << logical_bos << ", eos="
468  << logical_eos << ", offset " << offset << " out of range.\n";
469  DEBUG_ASSERT assert(0);
470  exit(1);
471  } else {
472  //offset in range
473  seek_offset = (logical_bos + offset) * sizeof(T);
474  }
475 
476 
477  } else {
478  //not a substream
479  seek_offset = offset * sizeof(T);
480  }
481 
482  G_fseek(fp, seek_offset, SEEK_SET);
483 
484  return AMI_ERROR_NO_ERROR;
485 }
486 
487 
488 
489 
490 /**********************************************************************/
491 // Query memory usage
492 template<class T>
493 AMI_err
495 
496  switch (usage_type) {
498  *usage = sizeof (AMI_STREAM<T>);
499  break;
501  // *usage = get_block_length();
502  *usage = STREAM_BUFFER_SIZE*sizeof(char);
503  break;
506  // *usage = sizeof (*this) + get_block_length();
507  *usage = sizeof (AMI_STREAM<T>) + STREAM_BUFFER_SIZE*sizeof(char);
508  break;
509  }
510  return AMI_ERROR_NO_ERROR;
511 }
512 
513 
514 
515 /**********************************************************************/
516 template<class T>
518 
519  DEBUG_DELETE cerr << "~AMI_STREAM: " << path << "(" << this << ")\n";
520  assert(fp);
521  fclose(fp);
522  delete buf;
523 
524  // Get rid of the file if not persistent and if not substream.
525  if ((per != PERSIST_PERSISTENT) && (substream_level == 0)) {
526  if (unlink(path) == -1) {
527  cerr << "ERROR: AMI_STREAM: failed to unlink " << path << endl;
528  perror("cannot unlink: ");
529  DEBUG_ASSERT assert(0);
530  exit(1);
531  }
532  }
533  // Register memory deallocation before returning.
534  //size_t usage;
535  //main_memory_usage(&usage, MM_STREAM_USAGE_CURRENT);
536  //MM_manager.register_deallocation(usage);
537  }
538 
539 
540 
541 /**********************************************************************/
542 template<class T>
544 
545  assert(fp);
546 
547  //if we go past substream range
548  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
550 
551  } else {
552  if (fread((char *) (&read_tmp), sizeof(T), 1, fp) < 1) {
553  if(feof(fp)) {
554  eof_reached = 1;
556  } else {
557  cerr << "ERROR: file=" << path << ":";
558  perror("cannot read!");
559  return AMI_ERROR_IO_ERROR;
560  }
561  }
562 
563  *elt = &read_tmp;
564  return AMI_ERROR_NO_ERROR;
565  }
566 }
567 
568 
569 
570 
571 /**********************************************************************/
572 template<class T>
573 AMI_err AMI_STREAM<T>::read_array(T *data, off_t len, off_t *lenp) {
574  size_t nobj;
575  assert(fp);
576 
577  //if we go past substream range
578  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
579  eof_reached = 1;
581 
582  } else {
583  nobj = fread((void*)data, sizeof(T), len, fp);
584 
585  if (nobj < len) { /* some kind of error */
586  if(feof(fp)) {
587  if(lenp) *lenp = nobj;
588  eof_reached = 1;
590  } else {
591  cerr << "ERROR: file=" << path << ":";
592  perror("cannot read!");
593  return AMI_ERROR_IO_ERROR;
594  }
595  }
596  if(lenp) *lenp = nobj;
597  return AMI_ERROR_NO_ERROR;
598  }
599 }
600 
601 
602 
603 
604 /**********************************************************************/
605 template<class T>
607 
608  assert(fp);
609  //if we go past substream range
610  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
612 
613  } else {
614  if (fwrite((char*)(&elt), sizeof(T), 1,fp) < 1) {
615  cerr << "ERROR: AMI_STREAM::write_item failed.\n";
616  if (*path)
617  perror(path);
618  else
619  perror("AMI_STREAM::write_item: ");
620  DEBUG_ASSERT assert(0);
621  exit(1);
622  }
623 
624  return AMI_ERROR_NO_ERROR;
625  }
626 }
627 
628 
629 /**********************************************************************/
630 template<class T>
631 AMI_err AMI_STREAM<T>::write_array(const T *data, off_t len) {
632  size_t nobj;
633 
634  assert(fp);
635  //if we go past substream range
636  if ((logical_eos >= 0) && G_ftell(fp) >= sizeof(T) * logical_eos) {
638 
639  } else {
640  nobj = fwrite(data, sizeof(T), len, fp);
641  if (nobj < len) {
642  cerr << "ERROR: AMI_STREAM::write_array failed.\n";
643  if (*path)
644  perror(path);
645  else
646  perror("AMI_STREAM::write_array: ");
647  DEBUG_ASSERT assert(0);
648  exit(1);
649  }
650  return AMI_ERROR_NO_ERROR;
651  }
652 }
653 
654 
655 /**********************************************************************/
656 template<class T>
658  per = p;
659 }
660 
661 
662 
663 /**********************************************************************/
664 // sprint()
665 // Return a string describing the stream
666 //
667 // This function gives easy access to the file name, length.
668 // It is not reentrant, but this should not be too much of a problem
669 // if you are careful.
670 template<class T>
672  static char desc[BUFSIZ + 256];
673  sprintf(desc, "[AMI_STREAM %s %ld]", path, (long)stream_len());
674  return desc;
675 }
676 
677 /**********************************************************************/
678 template<class T>
680  return eof_reached;
681 }
682 
683 
684 #endif // _AMI_STREAM_H
#define DEBUG_DELETE
Definition: ami_stream.h:69
AMI_stream_type access_mode
Definition: ami_stream.h:132
~AMI_STREAM(void)
Definition: ami_stream.h:517
AMI_err read_item(T **elt)
Definition: ami_stream.h:543
static AMI_err main_memory_usage(size_t *usage, MM_stream_usage usage_type=MM_STREAM_USAGE_OVERHEAD)
Definition: ami_stream.h:494
off_t logical_eos
Definition: ami_stream.h:144
#define STREAM_BUFFER_SIZE
Definition: ami_stream.h:80
static unsigned int get_block_length()
Definition: ami_stream.h:151
AMI_err read_array(T *data, off_t len, off_t *lenp=NULL)
Definition: ami_stream.h:573
AMI_err write_item(const T &elt)
Definition: ami_stream.h:606
#define DEBUG_STREAM_LEN
Definition: ami_stream.h:70
#define NULL
Definition: ccmath.h:32
persistence per
Definition: ami_stream.h:134
AMI_stream_type
Definition: ami_stream.h:107
off_t logical_bos
Definition: ami_stream.h:143
AMI_err new_substream(AMI_stream_type st, off_t sub_begin, off_t sub_end, AMI_STREAM< T > **sub_stream)
Definition: ami_stream.h:325
char path[BUFSIZ]
Definition: ami_stream.h:133
unsigned int substream_level
Definition: ami_stream.h:137
AMI_err seek(off_t offset)
Definition: ami_stream.h:460
struct state * st
Definition: parser.c:104
#define assert(condition)
Definition: lz4.c:324
AMI_err write_array(const T *data, off_t len)
Definition: ami_stream.h:631
#define BASE_NAME
Definition: ami_stream.h:78
FILE * open_stream(int fd, AMI_stream_type st)
Definition: ami_stream.cpp:104
MM_stream_usage
Definition: mm.h:72
void G_fseek(FILE *, off_t, int)
Change the file position of the stream.
Definition: gis/seek.c:50
#define DEBUG_ASSERT
Definition: ami_stream.h:71
char * sprint()
Definition: ami_stream.h:671
persistence
Definition: ami_stream.h:118
off_t G_ftell(FILE *)
Get the current file position of the stream.
Definition: gis/seek.c:29
Definition: path.h:16
void persist(persistence p)
Definition: ami_stream.h:657
int ami_single_temp_name(const std::string &base, char *tmp_path)
Definition: ami_stream.cpp:74
AMI_err
Definition: ami_stream.h:86
const char * name() const
Definition: ami_stream.h:451
const char * name
Definition: named_colr.c:7
off_t stream_len(void)
Definition: ami_stream.h:388
int eof()
Definition: ami_stream.h:679
const char * ami_str_error[]
Definition: ami_stream.cpp:54