gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r23101 - Extractor/src/main


From: gnunet
Subject: [GNUnet-SVN] r23101 - Extractor/src/main
Date: Sat, 4 Aug 2012 21:44:41 +0200

Author: grothoff
Date: 2012-08-04 21:44:40 +0200 (Sat, 04 Aug 2012)
New Revision: 23101

Modified:
   Extractor/src/main/TODO
   Extractor/src/main/extractor.c
   Extractor/src/main/extractor_datasource.c
Log:
implementing bz2 support

Modified: Extractor/src/main/TODO
===================================================================
--- Extractor/src/main/TODO     2012-08-04 19:26:28 UTC (rev 23100)
+++ Extractor/src/main/TODO     2012-08-04 19:44:40 UTC (rev 23101)
@@ -1,5 +1,2 @@
-* bz2 decompression (not implemented)
-* extract-from-bz2-file test fails!
-
 * MAX_META_DATA buffer of 32 MB is a bit big as a non-growing default size;
   also, valgrind reports it is leaked even though printf-debugging shows it is 
not (!?)

Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c      2012-08-04 19:26:28 UTC (rev 23100)
+++ Extractor/src/main/extractor.c      2012-08-04 19:44:40 UTC (rev 23101)
@@ -377,6 +377,7 @@
   ssize_t data_available;
   ssize_t ready;
   int done;
+  int have_in_memory;
 
   plugin_count = 0;
   for (pos = plugins; NULL != pos; pos = pos->next)
@@ -385,7 +386,7 @@
     ready = EXTRACTOR_IPC_shared_memory_set_ (shm, ds, 0, DEFAULT_SHM_SIZE);
   else
     ready = 0;
-
+  have_in_memory = 0;
   prp.file_finished = 0;
   prp.proc = proc;
   prp.proc_cls = proc_cls;
@@ -398,6 +399,8 @@
   start.file_size = EXTRACTOR_datasource_get_size_ (ds, 0);
   for (pos = plugins; NULL != pos; pos = pos->next)
     {
+      if (EXTRACTOR_OPTION_IN_PROCESS == pos->flags)
+       have_in_memory = 1;
       if ( (NULL != pos->channel) &&
           (-1 == EXTRACTOR_IPC_channel_send_ (pos->channel,
                                               &start,
@@ -537,6 +540,8 @@
        }
     }
 
+  if (0 == have_in_memory)
+    return;
   /* run in-process plugins */
   ctx.finished = 0;
   ctx.ds = ds;
@@ -548,7 +553,10 @@
   ec.get_size = &in_process_get_size;
   ec.proc = &in_process_proc;
   if (-1 == EXTRACTOR_datasource_seek_ (ds, 0, SEEK_SET))
-    return;
+    {
+      LOG ("Failed to seek to 0 for in-memory plugins\n");
+      return;
+    }
 
   for (pos = plugins; NULL != pos; pos = pos->next)
     {

Modified: Extractor/src/main/extractor_datasource.c
===================================================================
--- Extractor/src/main/extractor_datasource.c   2012-08-04 19:26:28 UTC (rev 
23100)
+++ Extractor/src/main/extractor_datasource.c   2012-08-04 19:44:40 UTC (rev 
23101)
@@ -449,97 +449,6 @@
 
 #if HAVE_ZLIB
 /**
- * Reset gz-compressed data stream to the beginning.
- *
- * @return 1 on success, 0 to terminate extraction,
- *        -1 on decompressor initialization failure
- */ 
-static int
-cfs_reset_stream_zlib (struct CompressedFileSource *cfs)
-{
-  if (cfs->gzip_header_length != 
-      bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET))
-    return -1;
-  memset (&cfs->strm, 0, sizeof (z_stream));
-  cfs->strm.avail_out = COM_CHUNK_SIZE;
-
-  /*
-   * note: maybe plain inflateInit(&strm) is adequate,
-   * it looks more backward-compatible also ;
-   *
-   * ZLIB_VERNUM isn't defined by zlib version 1.1.4 ;
-   * there might be a better check.
-   */
-  if (Z_OK != inflateInit2 (&cfs->strm,
-#ifdef ZLIB_VERNUM
-      15 + 32
-#else
-      - MAX_WBITS
-#endif
-      ))
-    {
-      LOG ("Failed to initialize zlib decompression\n");
-      return -1;
-    }
-  cfs->fpos = 0;
-  return 1;
-}
-#endif
-
-
-#if HAVE_LIBBZ2
-/**
- * Reset bz2-compressed data stream to the beginning.
- *
- * @return 1 on success, 0 to terminate extraction,
- *        -1 on decompressor initialization failure
- */ 
-static int
-cfs_reset_stream_bz2 (struct CompressedFileSource *cfs)
-{
-  BZ2_bzDecompressEnd (&cfs->bstrm);
-  if (BZ_OK !=
-      BZ2_bzDecompressInit (&cfs->bstrm, 0, 0))
-    {
-      LOG ("Failed to reinitialize BZ2 decompressor\n");
-      return -1;
-    }
-  return 1;
-}
-#endif
-
-
-/**
- * Resets the compression stream to begin uncompressing
- * from the beginning. Used at initialization time, and when
- * seeking backward.
- *
- * @param cfs cfs to reset
- * @return 1 on success, 0 to terminate extraction,
- *        -1 on error
- */
-static int
-cfs_reset_stream (struct CompressedFileSource *cfs)
-{
-  switch (cfs->compression_type)
-    {
-#if HAVE_ZLIB
-    case COMP_TYPE_ZLIB:
-      return cfs_reset_stream_zlib (cfs);
-#endif
-#if HAVE_LIBBZ2
-    case COMP_TYPE_BZ2:
-      return cfs_reset_stream_bz2 (cfs);
-#endif
-    default:
-      LOG ("invalid compression type selected\n");
-      return -1;
-    }
-}
-
-
-#if HAVE_ZLIB
-/**
  * Initializes gz-decompression object. Might report metadata about
  * compresse stream, if available. Resets the stream to the beginning.
  *
@@ -591,10 +500,11 @@
          return -1;
        }
       len = cptr - fname;
-      if (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_FILENAME,
-                    EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
-                    fname,
-                    len))
+      if ( (NULL != proc) &&
+          (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_FILENAME,
+                      EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
+                      fname,
+                      len)) )
        return 0; /* done */    
       gzip_header_length += len + 1;
     }
@@ -624,10 +534,11 @@
          return -1;
        }
       len = cptr - fcomment;
-      if (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_COMMENT,
-                    EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
-                    (const char *) fcomment,
-                    len))
+      if ( (NULL != proc) &&
+          (0 != proc (proc_cls, "<zlib>", EXTRACTOR_METATYPE_COMMENT,
+                      EXTRACTOR_METAFORMAT_C_STRING, "text/plain",
+                      (const char *) fcomment,
+                      len)) )
        return 0; /* done */
       gzip_header_length += len + 1;
     }
@@ -640,7 +551,33 @@
   gzip_header_length = 0;
 #endif
   cfs->gzip_header_length = gzip_header_length;
-  return cfs_reset_stream_zlib (cfs);
+
+  if (cfs->gzip_header_length != 
+      bfds_seek (cfs->bfds, cfs->gzip_header_length, SEEK_SET))
+    {
+      LOG ("Failed to seek to start to initialize gzip decompressor\n");
+      return -1;
+    }
+  cfs->strm.avail_out = COM_CHUNK_SIZE;
+  /*
+   * note: maybe plain inflateInit(&strm) is adequate,
+   * it looks more backward-compatible also ;
+   *
+   * ZLIB_VERNUM isn't defined by zlib version 1.1.4 ;
+   * there might be a better check.
+   */
+  if (Z_OK != inflateInit2 (&cfs->strm,
+#ifdef ZLIB_VERNUM
+      15 + 32
+#else
+      - MAX_WBITS
+#endif
+      ))
+    {
+      LOG ("Failed to initialize zlib decompression\n");
+      return -1;
+    }
+  return 1;
 }
 #endif
 
@@ -659,9 +596,20 @@
 cfs_init_decompressor_bz2 (struct CompressedFileSource *cfs, 
                           EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
 {
+  if (0 !=
+      bfds_seek (cfs->bfds, 0, SEEK_SET))
+    {
+      LOG ("Failed to seek to start to initialize BZ2 decompressor\n");
+      return -1;
+    }
+  memset (&cfs->bstrm, 0, sizeof (bz_stream));
   if (BZ_OK !=
       BZ2_bzDecompressInit (&cfs->bstrm, 0, 0))
-    return -1;
+    {
+      LOG ("Failed to initialize BZ2 decompressor\n");
+      return -1;
+    }
+  cfs->bstrm.avail_out = COM_CHUNK_SIZE;
   return 1;
 }
 #endif
@@ -680,6 +628,8 @@
 cfs_init_decompressor (struct CompressedFileSource *cfs, 
                       EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
 {
+  cfs->result_pos = 0;
+  cfs->fpos = 0;
   switch (cfs->compression_type)
     {
 #if HAVE_ZLIB
@@ -756,6 +706,24 @@
 
 
 /**
+ * Resets the compression stream to begin uncompressing
+ * from the beginning. Used at initialization time, and when
+ * seeking backward.
+ *
+ * @param cfs cfs to reset
+ * @return 1 on success, 0 to terminate extraction,
+ *        -1 on error
+ */
+static int
+cfs_reset_stream (struct CompressedFileSource *cfs)
+{
+  if (-1 == cfs_deinit_decompressor (cfs))
+    return -1;
+  return cfs_init_decompressor (cfs, NULL, NULL);
+}
+
+
+/**
  * Destroy compressed file source.
  *
  * @param cfs source to destroy
@@ -826,7 +794,7 @@
   int ret;
   size_t rc;
   ssize_t in;
-  char buf[COM_CHUNK_SIZE];
+  unsigned char buf[COM_CHUNK_SIZE];
 
   if (cfs->fpos == cfs->uncompressed_size)
     {
@@ -856,7 +824,12 @@
          LOG ("unexpected EOF\n");
          return -1; /* unexpected EOF */
        }
-      cfs->strm.next_in = (unsigned char *) buf;
+      if (0 == in)
+       {
+         cfs->uncompressed_size = cfs->fpos;
+         return rc; 
+       }
+      cfs->strm.next_in = buf;
       cfs->strm.avail_in = (uInt) in;
       cfs->strm.next_out = (unsigned char *) cfs->result;
       cfs->strm.avail_out = COM_CHUNK_SIZE;
@@ -905,8 +878,74 @@
              void *data,
              size_t size)
 {
-  LOG ("bz2 decompression not implemented\n");
-  return -1;
+  char *dst = data;
+  int ret;
+  size_t rc;
+  ssize_t in;
+  char buf[COM_CHUNK_SIZE];
+
+  if (cfs->fpos == cfs->uncompressed_size)
+    {
+      /* end of file */      
+      return 0;
+    }
+  rc = 0;
+  if (COM_CHUNK_SIZE > cfs->bstrm.avail_out + cfs->result_pos)
+    {
+      /* got left-over decompressed data from previous round! */
+      in = COM_CHUNK_SIZE - (cfs->bstrm.avail_out + cfs->result_pos);
+      if (in > size)
+       in = size;
+      memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
+      cfs->fpos += in;
+      cfs->result_pos += in;
+      rc += in;
+    }
+  ret = BZ_OK;
+  while ( (rc < size) && (BZ_STREAM_END != ret) )
+    {
+      /* read block from original data source */
+      in = bfds_read (cfs->bfds,
+                     buf, sizeof (buf));
+      if (in < 0)      
+       {
+         LOG ("unexpected EOF\n");
+         return -1; /* unexpected EOF */
+       }
+      if (0 == in)
+       {
+         cfs->uncompressed_size = cfs->fpos;
+         return rc;
+       }
+      cfs->bstrm.next_in = buf;
+      cfs->bstrm.avail_in = (uInt) in;
+      cfs->bstrm.next_out = cfs->result;
+      cfs->bstrm.avail_out = COM_CHUNK_SIZE;
+      cfs->result_pos = 0;
+      ret = BZ2_bzDecompress (&cfs->bstrm);
+      if ( (BZ_OK != ret) && (BZ_STREAM_END != ret) )
+       {
+         LOG ("unexpected bzip2 decompress error: %d\n", ret);
+         return -1; /* unexpected error */
+       }
+      /* go backwards by the number of bytes left in the buffer */
+      if (-1 == bfds_seek (cfs->bfds, - (int64_t) cfs->bstrm.avail_in, 
SEEK_CUR))
+       {
+         LOG ("seek failed\n");
+         return -1;
+       }
+      /* copy decompressed bytes to target buffer */
+      in = COM_CHUNK_SIZE - cfs->bstrm.avail_out;
+      if (in > size - rc)
+       in = size - rc;
+      memcpy (&dst[rc], &cfs->result[cfs->result_pos], in);
+      cfs->fpos += in;
+      cfs->result_pos += in;
+      rc += in;
+    }
+  if (BZ_STREAM_END == ret)
+    cfs->uncompressed_size = cfs->fpos;
+  return rc;
 }
 #endif
 
@@ -1041,6 +1080,12 @@
          LOG ("Failed to read decompressed stream for seek operation\n");
          return -1;
        }
+      if (0 == ret)
+       {
+         LOG ("Reached unexpected end of stream during seek operation\n");
+         return -1;
+       }
+      ASSERT (ret <= delta);
       delta -= ret;      
     }
   return cfs->fpos;




reply via email to

[Prev in Thread] Current Thread [Next in Thread]