[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[GNUnet-SVN] r20971 - Extractor/src/main
From: |
gnunet |
Subject: |
[GNUnet-SVN] r20971 - Extractor/src/main |
Date: |
Thu, 12 Apr 2012 18:44:30 +0200 |
Author: grothoff
Date: 2012-04-12 18:44:30 +0200 (Thu, 12 Apr 2012)
New Revision: 20971
Modified:
Extractor/src/main/extractor.c
Extractor/src/main/extractor_plugins.h
Log:
-LRN: minor cleanup, documentation
Modified: Extractor/src/main/extractor.c
===================================================================
--- Extractor/src/main/extractor.c 2012-04-12 16:44:17 UTC (rev 20970)
+++ Extractor/src/main/extractor.c 2012-04-12 16:44:30 UTC (rev 20971)
@@ -49,16 +49,13 @@
#define MAX_READ 32 * 1024 * 1024
/**
- * How many bytes do we actually try to decompress? (from the beginning
- * of the file). Limit to 16 MB.
+ * Maximum length of a Mime-Type string.
*/
-#define MAX_DECOMPRESS 16 * 1024 * 1024
+#define MAX_MIME_LEN 256
/**
- * Maximum length of a Mime-Type string.
+ * Maximum length of a shared memory object name
*/
-#define MAX_MIME_LEN 256
-
#define MAX_SHM_NAME 255
/**
@@ -67,15 +64,62 @@
*/
#define DEBUG 1
+/**
+ * Sent from LE to a plugin to initialize it (open shm,
+ * reset position counters etc).
+ */
#define MESSAGE_INIT_STATE 0x01
+
+/**
+ * Sent from LE to a plugin to tell it that shm contents
+ * were updated. Only used for OPMODE_COMPRESS.
+ */
#define MESSAGE_UPDATED_SHM 0x02
+
+/**
+ * Sent from plugin to LE to tell LE that plugin is done
+ * analyzing current file and will send no more data.
+ */
#define MESSAGE_DONE 0x03
+
+/**
+ * Sent from plugin to LE to tell LE that plugin needs
+ * to read a different part of the source file.
+ */
#define MESSAGE_SEEK 0x04
+
+/**
+ * Sent from plugin to LE to tell LE about metadata discovered.
+ */
#define MESSAGE_META 0x05
+
+/**
+ * Sent from LE to plugin to make plugin discard its state (unmap
+ * and close shm).
+ */
#define MESSAGE_DISCARD_STATE 0x06
+/**
+ * Client provided a memory buffer, analyze it. Creates a shm, copies
+ * buffer contents into it. Does not support seeking (all data comes
+ * in one [big] chunk.
+ */
#define OPMODE_MEMORY 1
+
+/**
+ * Client provided a memory buffer or a file, which contains compressed data.
+ * Creates a shm of limited size and repeatedly fills it with uncompressed
+ * data. Never skips data (has to uncompress every byte, discards unwanted
bytes),
+ * can't efficiently seek backwards. Uses MESSAGE_UPDATED_SHM and MESSAGE_SEEK.
+ */
#define OPMODE_DECOMPRESS 2
+
+/**
+ * Client provided a filename. Creates a file-backed shm (on W32) or just
+ * communicates the file name to each plugin, and plugin opens its own file
+ * descriptor of the file (POSIX). Each plugin maps different parts of the
+ * file into its memory independently.
+ */
#define OPMODE_FILE 3
/**
@@ -92,7 +136,16 @@
};
#if !WINDOWS
-int
+/**
+ * Opens a shared memory object (for later mmapping).
+ * This is POSIX variant of the the plugin_open_* function. Shm is always
memory-backed.
+ * Closes a shm is already opened, closes it before opening a new one.
+ *
+ * @param plugin plugin context
+ * @param shm_name name of the shm.
+ * @return shm id (-1 on error). That is, the result of shm_open() syscall.
+ */
+static int
plugin_open_shm (struct EXTRACTOR_PluginList *plugin, const char *shm_name)
{
if (plugin->shm_id != -1)
@@ -100,7 +153,17 @@
plugin->shm_id = shm_open (shm_name, O_RDONLY, 0);
return plugin->shm_id;
}
-int
+
+/**
+ * Opens a file (for later mmapping).
+ * This is POSIX variant of the plugin_open_* function.
+ * Closes a file is already opened, closes it before opening a new one.
+ *
+ * @param plugin plugin context
+ * @param shm_name name of the file to open.
+ * @return file id (-1 on error). That is, the result of open() syscall.
+ */
+static int
plugin_open_file (struct EXTRACTOR_PluginList *plugin, const char *shm_name)
{
if (plugin->shm_id != -1)
@@ -109,6 +172,17 @@
return plugin->shm_id;
}
#else
+/**
+ * Opens a shared memory object (for later mmapping).
+ * This is W32 variant of the plugin_open_* function.
+ * Opened shm might be memory-backed or file-backed (depending on how
+ * it was created). shm_name is never a file name, unlike POSIX.
+ * Closes a shm is already opened, closes it before opening a new one.
+ *
+ * @param plugin plugin context
+ * @param shm_name name of the shared memory object.
+ * @return memory-mapped file handle (NULL on error). That is, the result of
OpenFileMapping() syscall.
+ */
HANDLE
plugin_open_shm (struct EXTRACTOR_PluginList *plugin, const char *shm_name)
{
@@ -117,6 +191,9 @@
plugin->map_handle = OpenFileMapping (FILE_MAP_READ, FALSE, shm_name);
return plugin->map_handle;
}
+/**
+ * Another name for plugin_open_shm().
+ */
HANDLE
plugin_open_file (struct EXTRACTOR_PluginList *plugin, const char *shm_name)
{
@@ -124,6 +201,16 @@
}
#endif
+/**
+ * Writes @size bytes from @buf into @fd, returns only when
+ * writing is not possible, or when all @size bytes were written
+ * (never does partial writes).
+ *
+ * @param fd fd to write into
+ * @param buf buffer to read from
+ * @param size number of bytes to write
+ * @return number of bytes written (that is - @size), or -1 on error
+ */
static int
write_all (int fd,
const void *buf,
@@ -194,7 +281,20 @@
return 0;
}
-/* init the read/seek wrappers */
+/**
+ * Initializes an extracting session for a plugin.
+ * opens the file/shm (only in OPMODE_FILE)
+ * sets shm_ptr to NULL (unmaps it, if it was mapped)
+ * sets position to 0
+ * initializes file size to @fsize (may be -1)
+ * sets seek request to 0
+ *
+ * @param plugin plugin context
+ * @param operation_mode the mode of operation (OPMODE_*)
+ * @param fsize size of the source file (may be -1)
+ * @param shm_name name of the shm or file to open
+ * @return 0 on success, non-0 on error.
+ */
static int
init_state_method (struct EXTRACTOR_PluginList *plugin, uint8_t
operation_mode, int64_t fsize, const char *shm_name)
{
@@ -223,6 +323,14 @@
return 0;
}
+/**
+ * Deinitializes an extracting session for a plugin.
+ * unmaps shm_ptr (if was mapped)
+ * closes file/shm (if it was opened)
+ * sets map size and shm_ptr to NULL.
+ *
+ * @param plugin plugin context
+ */
static void
discard_state_method (struct EXTRACTOR_PluginList *plugin)
{
@@ -243,6 +351,15 @@
plugin->shm_ptr = NULL;
}
+/**
+ * Main loop function for plugins.
+ * Reads a message from the plugin input pipe and acts on it.
+ * Can be called recursively (once) in OPMODE_DECOMPRESS.
+ * plugin->waiting_for_update == 1 indicates the recursive call.
+ *
+ * @param plugin plugin context
+ * @return 0, always
+ */
static int
process_requests (struct EXTRACTOR_PluginList *plugin)
{
@@ -265,6 +382,10 @@
in = plugin->pipe_in;
out = plugin->cpipe_out;
+ /* The point of recursing into this function is to request
+ * a seek from LE server and wait for a reply. This snipper
+ * requests a seek.
+ */
if (plugin->waiting_for_update == 1)
{
unsigned char seek_byte = MESSAGE_SEEK;
@@ -308,6 +429,7 @@
do_break = 1;
break;
}
+ /* Fsize may be -1 only in decompression mode */
if (plugin->operation_mode != OPMODE_DECOMPRESS && plugin->fsize <= 0)
{
do_break = 1;
@@ -329,6 +451,9 @@
}
shm_name[shm_name_len - 1] = '\0';
do_break = init_state_method (plugin, plugin->operation_mode,
plugin->fsize, shm_name);
+ /* in OPMODE_MEMORY and OPMODE_FILE we can start extracting right away,
+ * there won't be UPDATED_SHM message, and we don't need it
+ */
if (!do_break && (plugin->operation_mode == OPMODE_MEMORY ||
plugin->operation_mode == OPMODE_FILE))
{
@@ -369,6 +494,7 @@
break;
}
/* FIXME: also check mapped region size (lseek for *nix, VirtualQuery
for W32) */
+ /* Re-map the shm */
#if !WINDOWS
if ((-1 == plugin->shm_id) ||
(NULL == (plugin->shm_ptr = mmap (NULL, plugin->map_size,
PROT_READ, MAP_SHARED, plugin->shm_id, 0))) ||
@@ -387,11 +513,16 @@
#endif
if (plugin->waiting_for_update == 1)
{
+ /* We were only waiting for this one message */
do_break = 1;
plugin->waiting_for_update = 2;
break;
}
+ /* Run extractor on mapped region (recursive call doesn't reach this
+ * point and breaks out earlier.
+ */
extract_reply = plugin->extract_method (plugin, transmit_reply, &out);
+ /* Unmap the shm */
#if !WINDOWS
if ((plugin->shm_ptr != NULL) &&
(plugin->shm_ptr != (void*) -1) )
@@ -403,6 +534,7 @@
plugin->shm_ptr = NULL;
if (extract_reply == 1)
{
+ /* Tell LE that we're done */
unsigned char done_byte = MESSAGE_DONE;
if (write (out, &done_byte, 1) != 1)
{
@@ -424,6 +556,7 @@
}
else
{
+ /* Tell LE that we're not done, and we need to seek */
unsigned char seek_byte = MESSAGE_SEEK;
if (write (out, &seek_byte, 1) != 1)
{
@@ -439,6 +572,7 @@
}
else
{
+ /* This is mostly to safely skip unrelated messages */
int64_t t;
size_t t2;
read_result2 = read (in, &t, sizeof (int64_t));
@@ -452,9 +586,8 @@
}
/**
- * 'main' function of the child process. Reads shm-filenames from
- * 'in' (line-by-line) and writes meta data blocks to 'out'. The meta
- * data stream is terminated by an empty entry.
+ * 'main' function of the child process. Loads the plugin,
+ * sets up its in and out pipes, then runs the request serving function.
*
* @param plugin extractor plugin to use
* @param in stream to read from
@@ -486,6 +619,7 @@
close (1);
plugin->pipe_in = in;
+ /* Compiler will complain, and it's right. This is a kind of hack...*/
plugin->cpipe_out = out;
process_requests (plugin);
@@ -606,11 +740,11 @@
static int
write_plugin_data (const struct EXTRACTOR_PluginList *plugin)
{
- /* only does anything on Windows */
+ /* This function is only necessary on W32. On POSIX
+ * systems plugin inherits its own data from the parent */
return 0;
}
-#define plugin_print(plug, fmt, ...) fprintf (plug->cpipe_in, fmt, ...)
#define plugin_write(plug, buf, size) write_all (fileno (plug->cpipe_in), buf,
size)
#else /* WINDOWS */
@@ -721,6 +855,28 @@
return 0;
}
+/**
+ * Writes @size bytes from @buf to @h, using @ov for
+ * overlapped i/o. Deallocates @old_buf and sets it to NULL,
+ * if necessary.
+ * Writes asynchronously, but sequentially (only one writing
+ * operation may be active at any given moment, but it will
+ * be done in background). Thus it is intended to be used
+ * for writing a few big chunks rather than a lot of small pieces.
+ *
+ * The extravagant interface is mainly because this function
+ * does not use a separate struct to group together overlapped
+ * structure, buffer pointer and the handle.
+ *
+ * @param h pipe handle
+ * @param ov overlapped structure pointer
+ * @param buf buffer to read from. Will be copied internally
+ * @param size number of bytes to write
+ * @param old_buf pointer where a copy of previous buffer is stored,
+ * and where a copy of @buf will be stored.
+ *
+ * @return number of bytes written, -1 on error
+ */
static int
write_to_pipe (HANDLE h, OVERLAPPED *ov, unsigned char *buf, size_t size,
unsigned char **old_buf)
{
@@ -765,39 +921,17 @@
return -1;
}
-static int
-print_to_pipe (HANDLE h, OVERLAPPED *ov, unsigned char **buf, const char *fmt,
...)
-{
- va_list va;
- va_list vacp;
- size_t size;
- char *print_buf;
- int result;
-
- va_start (va, fmt);
- va_copy (vacp, va);
- size = VSNPRINTF (NULL, 0, fmt, vacp) + 1;
- va_end (vacp);
- if (size <= 0)
- {
- va_end (va);
- return size;
- }
-
- print_buf = malloc (size);
- if (print_buf == NULL)
- return -1;
- VSNPRINTF (print_buf, size, fmt, va);
- va_end (va);
-
- result = write_to_pipe (h, ov, print_buf, size, buf);
- free (buf);
- return result;
-}
-
-#define plugin_print(plug, fmt, ...) print_to_pipe (plug->cpipe_in,
&plug->ov_write, &plug->ov_write_buffer, fmt, ...)
#define plugin_write(plug, buf, size) write_to_pipe (plug->cpipe_in,
&plug->ov_write, buf, size, &plug->ov_write_buffer)
+/**
+ * Communicates plugin data (library name, options) to the plugin
+ * process. This is only necessary on W32, where this information
+ * is not inherited by the plugin, because it is not forked.
+ *
+ * @param plugin plugin context
+ *
+ * @return 0 on success, -1 on failure
+ */
static int
write_plugin_data (struct EXTRACTOR_PluginList *plugin)
{
@@ -864,6 +998,14 @@
return 0;
}
+/**
+ * Reads plugin data from the LE server process.
+ * Also initializes allocation granularity (duh...).
+ *
+ * @param fd the pipe to read from
+ *
+ * @return newly allocated plugin context
+ */
static struct EXTRACTOR_PluginList *
read_plugin_data (int fd)
{
@@ -995,6 +1137,10 @@
return;
}
+ /* TODO: write our own plugin-hosting executable? rundll32, for once, has
smaller than usual stack size.
+ * Also, users might freak out seeing over 9000 rundll32 processes (seeing
over 9000 processes named
+ * "libextractor_plugin_helper" is probably less confusing).
+ */
snprintf(cmd, MAX_PATH + 1, "rundll32.exe libextractor-3.dll,address@hidden
%lu %lu", p10_os_inh, p21_os_inh);
cmd[MAX_PATH] = '\0';
if (CreateProcessA (NULL, cmd, NULL, NULL, TRUE, 0, NULL, NULL,
@@ -1189,10 +1335,10 @@
}
/**
- * Setup a shared memory segment.
+ * Setup a file-backed shared memory segment.
*
- * @param ptr set to the location of the map segment
* @param map where to store the map handle
+ * @param file handle of the file to back the shm
* @param fn name of the mapping
* @param fn_size size available in fn
* @param size number of bytes to allocated for the mapping
@@ -1283,32 +1429,109 @@
#define O_LARGEFILE 0
#endif
+/**
+ * A poor attempt to abstract the data source (file or a memory buffer)
+ * for the decompressor.
+ */
struct BufferedFileDataSource
{
+ /**
+ * Descriptor of the file to read data from (may be -1)
+ */
int fd;
+
+ /**
+ * Pointer to the buffer to read from (may be NULL)
+ */
const unsigned char *data;
+ /**
+ * Size of the file (or the data buffer)
+ */
int64_t fsize;
+
+ /**
+ * Position within the file or the data buffer
+ */
int64_t fpos;
+ /**
+ * A buffer to read into. For fd != -1: when data != NULL,
+ * data is used directly.
+ */
unsigned char *buffer;
+
+ /**
+ * Position within the buffer.
+ */
int64_t buffer_pos;
+
+ /**
+ * Number of bytes in the buffer (<= buffer_size)
+ */
int64_t buffer_bytes;
+
+ /**
+ * Allocated size of the buffer
+ */
int64_t buffer_size;
};
+/**
+ * Creates a bfds
+ *
+ * @param data data buffer to use as a source (NULL if fd != -1)
+ * @param fd file descriptor to use as a source (-1 if data != NULL)
+ * @param fsize size of the file (or the buffer)
+ * @return newly allocated bfds
+ */
struct BufferedFileDataSource *
bfds_new (const unsigned char *data, int fd, int64_t fsize);
+/**
+ * Unallocates bfds
+ *
+ * @param bfds bfds to deallocate
+ */
void
bfds_delete (struct BufferedFileDataSource *bfds);
+/**
+ * Makes bfds seek to @pos and read a chunk of bytes there.
+ * Changes bfds->fpos, bfds->buffer_bytes and bfds->buffer_pos.
+ * Does almost nothing for memory-backed bfds.
+ *
+ * @param bfds bfds
+ * @param pos position
+ * @return 0 on success, -1 on error
+ */
int
bfds_pick_next_buffer_at (struct BufferedFileDataSource *bfds, int64_t pos);
+/**
+ * Makes bfds seek to @pos in @whence mode.
+ * Will try to seek within the buffer, will move the buffer location if
+ * the seek request falls outside of the buffer range.
+ *
+ * @param bfds bfds
+ * @param pos position to seek to
+ * @param whence one of the seek constants (SEEK_CUR, SEEK_SET, SEEK_END)
+ * @return new absolute position
+ */
int64_t
bfds_seek (struct BufferedFileDataSource *bfds, int64_t pos, int whence);
+/**
+ * Fills @buf_ptr with a pointer to a chunk of data.
+ * Same as read() but there's no need to allocate or de-allocate the
+ * memory (since data IS already in memory).
+ * Will seek if necessary. Will fail if @count exceeds buffer size.
+ *
+ * @param bfds bfds
+ * @param buf_ptr location to store data pointer
+ * @param count number of bytes to read
+ * @return number of bytes (<= count) available at location pointed by buf_ptr
+ */
int64_t
bfds_read (struct BufferedFileDataSource *bfds, unsigned char **buf_ptr,
int64_t count);
@@ -1477,41 +1700,87 @@
COMP_TYPE_BZ2 = 2
};
+/**
+ * An object from which uncompressed data can be read
+ */
struct CompressedFileSource
{
+ /**
+ * The type of compression used in the source
+ */
enum ExtractorCompressionType compression_type;
+ /**
+ * The source of data
+ */
struct BufferedFileDataSource *bfds;
+ /**
+ * Size of the source (same as bfds->fsize)
+ */
int64_t fsize;
+ /**
+ * Position within the source
+ */
int64_t fpos;
+ /**
+ * Total size of the uncompressed data. Remains -1 until
+ * decompression is finished.
+ */
int64_t uncompressed_size;
+ /*
unsigned char *buffer;
int64_t buffer_bytes;
int64_t buffer_len;
+ */
#if WINDOWS
+ /**
+ * W32 handle of the shm into which data is uncompressed
+ */
HANDLE shm;
#else
+ /**
+ * POSIX id of the shm into which data is uncompressed
+ */
int shm;
#endif
+ /**
+ * Name of the shm
+ */
char shm_name[MAX_SHM_NAME + 1];
+ /**
+ * Pointer to the mapped region of the shm (covers the whole shm)
+ */
void *shm_ptr;
+ /**
+ * Position within shm
+ */
int64_t shm_pos;
- size_t shm_buf_pos;
+ /**
+ * Allocated size of the shm
+ */
int64_t shm_size;
+ /**
+ * Number of bytes in shm (<= shm_size)
+ */
size_t shm_buf_size;
#if HAVE_ZLIB
+ /**
+ * ZLIB stream object
+ */
z_stream strm;
- int ret;
- size_t pos;
+ /**
+ * Length of gzip header (may be 0, in that case ZLIB parses the header)
+ */
int gzip_header_length;
#endif
#if HAVE_LIBBZ2
+ /**
+ * BZ2 stream object
+ */
bz_stream bstrm;
- int bret;
- size_t bpos;
#endif
};
@@ -1558,13 +1827,10 @@
cfs->fpos = cfs->gzip_header_length;
cfs->shm_pos = 0;
- cfs->shm_buf_pos = 0;
cfs->shm_buf_size = 0;
#if HAVE_ZLIB
z_stream strm;
- cfs->ret = 0;
- cfs->pos = 0;
#endif
return 1;
}
@@ -1575,6 +1841,14 @@
return -1;
}
+/**
+ * Resets the compression stream to begin uncompressing
+ * from the beginning. Used at initialization time, and when
+ * seeking backward.
+ *
+ * @param cfs cfs to reset
+ * @return 1 on success, -1 on error
+ */
int
cfs_reset_stream (struct CompressedFileSource *cfs)
{
@@ -1687,10 +1961,11 @@
return cfs_reset_stream_zlib (cfs);
}
-int
+static int
cfs_deinit_decompressor_zlib (struct CompressedFileSource *cfs)
{
inflateEnd (&cfs->strm);
+ return 1;
}
static int
@@ -1705,6 +1980,15 @@
return -1;
}
+/**
+ * Initializes decompression object. Might report metadata about
+ * compresse stream, if available. Resets the stream to the beginning.
+ *
+ * @param cfs cfs to initialize
+ * @param proc callback for metadata
+ * @param proc_cls callback cls
+ * @return 1 on success, -1 on error
+ */
static int
cfs_init_decompressor (struct CompressedFileSource *cfs,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
@@ -1719,6 +2003,12 @@
}
}
+/**
+ * Deinitializes decompression object.
+ *
+ * @param cfs cfs to deinitialize
+ * @return 1 on success, -1 on error
+ */
static int
cfs_deinit_decompressor (struct CompressedFileSource *cfs)
{
@@ -1733,6 +2023,16 @@
}
}
+/**
+ * Allocates and initializes new cfs object.
+ *
+ * @param bfds data source to use
+ * @param fsize size of the source
+ * @param compression_type type of compression used
+ * @param proc metadata callback
+ * @param proc_cls callback cls
+ * @return newly allocated cfs on success, NULL on error
+ */
struct CompressedFileSource *
cfs_new (struct BufferedFileDataSource *bfds, int64_t fsize, enum
ExtractorCompressionType compression_type, EXTRACTOR_MetaDataProcessor proc,
void *proc_cls)
{
@@ -1761,6 +2061,10 @@
return cfs;
}
+/**
+ * Data is read from the source and shoved into decompressor
+ * in chunks this big.
+ */
#define COM_CHUNK_SIZE (10*1024)
int
@@ -1801,6 +2105,17 @@
return -1;
}
+/**
+ * Re-fills shm with new uncompressed data, preserving the last
+ * @preserve bytes of existing data as the first @preserve bytes
+ * of the new data.
+ * Does the actual decompression. Will set uncompressed_size on
+ * the end of compressed stream.
+ *
+ * @param cfds cfs to read from
+ * @param preserve number of bytes to preserve (0 to discard all old data)
+ * @return number of bytes in shm. 0 if no more data can be uncompressed.
+ */
int64_t
cfs_read (struct CompressedFileSource *cfs, int64_t preserve)
{
@@ -1844,6 +2159,15 @@
return -1;
}
+/**
+ * Moves the buffer to @position in uncompressed steam. If position
+ * requires seeking backwards beyond the boundaries of the buffer, resets the
+ * stream and repeats decompression from the beginning to @position.
+ *
+ * @param cfds cfs to seek on
+ * @param position new starting point for the buffer
+ * @return new absolute buffer position, -1 on error or EOS
+ */
int64_t
cfs_seek (struct CompressedFileSource *cfs, int64_t position)
{
@@ -1920,8 +2244,17 @@
return result;
}
+/**
+ * Initializes plugin state. Calls init_state_method()
+ * directly or indirectly.
+ *
+ * @param plugin plugin to initialize
+ * @param operation_mode operation mode
+ * @param shm_name name of the shm/file
+ * @param fsize file size (may be -1)
+ */
static void
-init_plugin_state (struct EXTRACTOR_PluginList *plugin, uint8_t
operation_mode, int fd, const char *shm_name, int64_t fsize)
+init_plugin_state (struct EXTRACTOR_PluginList *plugin, uint8_t
operation_mode, const char *shm_name, int64_t fsize)
{
int write_result;
int init_state_size;
@@ -1970,6 +2303,12 @@
}
}
+/**
+ * Discards plugin state. Calls discard_state_method()
+ * directly or indirectly.
+ *
+ * @param plugin plugin to initialize
+ */
static void
discard_plugin_state (struct EXTRACTOR_PluginList *plugin)
{
@@ -2002,6 +2341,17 @@
}
}
+/**
+ * Forces plugin to move the buffer window to @pos.
+ *
+ * @param plugin plugin context
+ * @param pos position to move to
+ * @param want_start 1 if the caller is interested in the beginning of the
+ * window, 0 if the caller is interested in its end. Window position
+ * must be aligned to page size, and this parameter controls the
+ * direction of window shift. 0 is used mostly by SEEK_END.
+ * @return 0 on success, -1 on error
+ */
static int
pl_pick_next_buffer_at (struct EXTRACTOR_PluginList *plugin, int64_t pos,
uint8_t want_start)
{
@@ -2114,6 +2464,7 @@
int64_t old_pos;
old_pos = plugin->fpos + plugin->shm_pos;
plugin->seek_request = pos;
+ /* Recourse into request loop to wait for shm update */
while (plugin->fpos != pos)
{
plugin->waiting_for_update = 1;
@@ -2127,18 +2478,28 @@
{
if (pos < plugin->fpos)
{
- if (1 != cfs_reset_stream (plugin->state))
+ if (1 != cfs_reset_stream (plugin->pass_cfs))
return -1;
}
while (plugin->fpos < pos && plugin->fpos >= 0)
- plugin->fpos = cfs_seek (plugin->state, pos);
- plugin->fsize = ((struct CompressedFileSource
*)plugin->state)->uncompressed_size;
+ plugin->fpos = cfs_seek (plugin->pass_cfs, pos);
+ plugin->fsize = ((struct CompressedFileSource
*)plugin->pass_cfs)->uncompressed_size;
plugin->shm_pos = pos - plugin->fpos;
}
return 0;
}
}
+/**
+ * Moves current absolute buffer position to @pos in @whence mode.
+ * Will move logical position withouth shifting the buffer, if possible.
+ * Will not move beyond the end of file.
+ *
+ * @param plugin plugin context
+ * @param pos position to move to
+ * @param whence seek mode (SEEK_CUR, SEEK_SET, SEEK_END)
+ * @return new absolute position, -1 on error
+ */
int64_t
pl_seek (struct EXTRACTOR_PluginList *plugin, int64_t pos, int whence)
{
@@ -2203,6 +2564,17 @@
return plugin->fpos + plugin->shm_pos;
}
+/**
+ * Fills @data with a pointer to the data buffer.
+ * Equivalent to read(), except you don't have to allocate and free
+ * a buffer, since the data is already in memory.
+ * Will move the buffer, if necessary
+ *
+ * @param plugin plugin context
+ * @param data location to store data pointer
+ * @param count number of bytes to read
+ * @return number of bytes (<= count) avalable in @data, -1 on error
+ */
int64_t
pl_read (struct EXTRACTOR_PluginList *plugin, unsigned char **data, size_t
count)
{
@@ -2226,6 +2598,17 @@
}
}
+/**
+ * Transmits information about updated shm to plugin.
+ * For OPMODE_DECOMPRESS only.
+ *
+ * @param plugin plugin context
+ * @param position current absolute position in uncompressed stream
+ * @param map_size number of bytes that are available in shm
+ * @param fsize total size of the uncompressed stream (might be -1)
+ * @param operation_mode mode of operation
+ * @return 0 on success, 1 on error
+ */
static int
give_shm_to_plugin (struct EXTRACTOR_PluginList *plugin, int64_t position,
size_t map_size, int64_t fsize, uint8_t operation_mode)
{
@@ -2272,6 +2655,14 @@
}
}
+/**
+ * Calls _extract_method of in-process plugin.
+ *
+ * @param plugin plugin context
+ * @param shm_ptr pointer to the data buffer
+ * @param proc metadata callback
+ * @param proc_cls callback cls
+ */
static void
ask_in_process_plugin (struct EXTRACTOR_PluginList *plugin, void *shm_ptr,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
@@ -2297,6 +2688,14 @@
}
#if !WINDOWS
+/**
+ * Receive @size bytes from plugin, store them in @buf
+ *
+ * @param plugin plugin context
+ * @param buf buffer to fill
+ * @param size number of bytes to read
+ * @return number of bytes read, 0 on EOS, < 0 on error
+ */
int
plugin_read (struct EXTRACTOR_PluginList *plugin, unsigned char *buf, size_t
size)
{
@@ -2312,6 +2711,14 @@
return read_count;
}
#else
+/**
+ * Receive @size bytes from plugin, store them in @buf
+ *
+ * @param plugin plugin context
+ * @param buf buffer to fill
+ * @param size number of bytes to read
+ * @return number of bytes read, 0 on EOS, < 0 on error
+ */
int
plugin_read (struct EXTRACTOR_PluginList *plugin, unsigned char *buf, size_t
size)
{
@@ -2329,6 +2736,14 @@
}
#endif
+/**
+ * Receive a reply from plugin (seek request, metadata and done message)
+ *
+ * @param plugin plugin context
+ * @param proc metadata callback
+ * @param proc_cls callback cls
+ * @return 0 on success, -1 on error
+ */
static int
receive_reply (struct EXTRACTOR_PluginList *plugin,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
@@ -2397,6 +2812,19 @@
}
#if !WINDOWS
+/**
+ * Wait for one of the plugins to reply.
+ * Selects on plugin output pipes, runs receive_reply()
+ * on each activated pipe until it gets a seek request
+ * or a done message. Called repeatedly by the user until all pipes are dry or
+ * broken.
+ *
+ * @param plugins to select upon
+ * @param proc metadata callback
+ * @param proc_cls callback cls
+ * @return number of dry/broken pipes since last call, -1 on error or if no
+ * plugins reply in 10 seconds.
+ */
static int
wait_for_reply (struct EXTRACTOR_PluginList *plugins,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
@@ -2464,6 +2892,21 @@
return result;
}
#else
+/**
+ * Wait for one of the plugins to reply.
+ * Selects on plugin output pipes, runs receive_reply()
+ * on each activated pipe until it gets a seek request
+ * or a done message. Called repeatedly by the user until all pipes are dry or
+ * broken.
+ * This W32 version of wait_for_reply() can't select on more than 64 plugins
+ * at once (returns -1 if there are more than 64 plugins).
+ *
+ * @param plugins to select upon
+ * @param proc metadata callback
+ * @param proc_cls callback cls
+ * @return number of dry/broken pipes since last call, -1 on error or if no
+ * plugins reply in 10 seconds.
+ */
static int
wait_for_reply (struct EXTRACTOR_PluginList *plugins,
EXTRACTOR_MetaDataProcessor proc, void *proc_cls)
{
@@ -2563,6 +3006,16 @@
#endif
+/**
+ * Checks the seek requests that plugins made, finds the one with
+ * smallest offset from the beginning of the stream, and satisfies it.
+ *
+ * @param plugins to check
+ * @param cfs compressed file source to seek in
+ * @param current_position current stream position
+ * @param map_size number of bytes currently buffered
+ * @return new stream position, -1 on error
+ */
static int64_t
seek_to_new_position (struct EXTRACTOR_PluginList *plugins, struct
CompressedFileSource *cfs, int64_t current_position, int64_t map_size)
{
@@ -2617,9 +3070,9 @@
* @param plugins the list of plugins to use
* @param data data to process, or NULL if fds is not -1
* @param fd file to read data from, or -1 if data is not NULL
- * @param fsize size of data or size of file
- * @param buffer a buffer with data alteady read from the file (if fd != -1)
- * @param buffer_size size of buffer
+ * @param filename name of the file to which fd belongs
+ * @param cfs compressed file source for compressed stream (may be NULL)
+ * @param fsize size of the file or data buffer
* @param proc function to call for each meta data item found
* @param proc_cls cls argument to proc
*/
@@ -2700,21 +3153,21 @@
if (operation_mode == OPMODE_DECOMPRESS)
{
for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- init_plugin_state (ppos, operation_mode, -1, cfs->shm_name, -1);
+ init_plugin_state (ppos, operation_mode, cfs->shm_name, -1);
}
else if (operation_mode == OPMODE_FILE)
{
for (ppos = plugins; NULL != ppos; ppos = ppos->next)
#if !WINDOWS
- init_plugin_state (ppos, operation_mode, fd, filename, fsize);
+ init_plugin_state (ppos, operation_mode, filename, fsize);
#else
- init_plugin_state (ppos, operation_mode, fd, shm_name, fsize);
+ init_plugin_state (ppos, operation_mode, shm_name, fsize);
#endif
}
else
{
for (ppos = plugins; NULL != ppos; ppos = ppos->next)
- init_plugin_state (ppos, operation_mode, -1, shm_name, fsize);
+ init_plugin_state (ppos, operation_mode, shm_name, fsize);
}
if (operation_mode == OPMODE_FILE || operation_mode == OPMODE_MEMORY)
@@ -2749,7 +3202,7 @@
{
/* Pass this way. we'll need it to call cfs functions later on */
/* This is a special case */
- ppos->state = cfs;
+ ppos->pass_cfs = cfs;
ask_in_process_plugin (ppos, cfs->shm_ptr, proc, proc_cls);
}
while (plugins_not_ready > 0 && !kill_plugins)
Modified: Extractor/src/main/extractor_plugins.h
===================================================================
--- Extractor/src/main/extractor_plugins.h 2012-04-12 16:44:17 UTC (rev
20970)
+++ Extractor/src/main/extractor_plugins.h 2012-04-12 16:44:30 UTC (rev
20971)
@@ -101,6 +101,10 @@
#else
HANDLE cpipe_in;
#endif
+
+ /**
+ * Pipe used by plugin to read from its parent.
+ */
int pipe_in;
/**
@@ -110,36 +114,71 @@
int64_t seek_request;
#if !WINDOWS
+ /**
+ * ID of the shm object
+ */
int shm_id;
#else
+ /**
+ * Handle of the shm object
+ */
HANDLE map_handle;
#endif
- void *state;
+ /**
+ * Used to pass cfs pointer to in-process plugin in OPMODE_DECOMPRESS
+ */
+ void *pass_cfs;
+ /**
+ * Uncompressed stream size. Initially -1, until file is fully decompressed
+ * (for sources that are not compressed it is set from the start).
+ */
int64_t fsize;
+ /**
+ * Absolute position within the stream
+ */
int64_t fpos;
+ /**
+ * Pointer to the shared memory segment
+ */
unsigned char *shm_ptr;
+ /**
+ * Number of bytes in the segment
+ */
int64_t map_size;
+ /**
+ * Position within the segment
+ */
int64_t shm_pos;
+#if !WINDOWS
/**
* Pipe used to read information about extracted meta data from
* the plugin child process. -1 if not initialized.
*/
-#if !WINDOWS
int cpipe_out;
#else
+ /**
+ * Pipe used to read information about extracted meta data from
+ * the plugin child process. -1 if not initialized.
+ */
HANDLE cpipe_out;
#endif
#if !WINDOWS
+ /**
+ * Page size. Mmap offset is a multiple of this number.
+ */
long allocation_granularity;
#else
+ /**
+ * Page size. Mmap offset is a multiple of this number.
+ */
DWORD allocation_granularity;
#endif
@@ -160,7 +199,15 @@
unsigned char *ov_write_buffer;
#endif
+ /**
+ * Mode of operation. One of the OPMODE_* constants
+ */
uint8_t operation_mode;
+
+ /**
+ * 1 if plugin is currently in a recursive process_requests() call,
+ * 0 otherwise
+ */
int waiting_for_update;
};
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [GNUnet-SVN] r20971 - Extractor/src/main,
gnunet <=