gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r11071 - in gnunet/src: fs include


From: gnunet
Subject: [GNUnet-SVN] r11071 - in gnunet/src: fs include
Date: Mon, 26 Apr 2010 18:06:20 +0200

Author: grothoff
Date: 2010-04-26 18:06:20 +0200 (Mon, 26 Apr 2010)
New Revision: 11071

Modified:
   gnunet/src/fs/fs.c
   gnunet/src/fs/fs.h
   gnunet/src/fs/fs_download.c
   gnunet/src/include/gnunet_fs_service.h
Log:
towards job queuing

Modified: gnunet/src/fs/fs.c
===================================================================
--- gnunet/src/fs/fs.c  2010-04-26 15:31:07 UTC (rev 11070)
+++ gnunet/src/fs/fs.c  2010-04-26 16:06:20 UTC (rev 11071)
@@ -30,6 +30,161 @@
 
 
 /**
+ * Start the given job (send signal, remove from pending queue, update
+ * counters and state).
+ *
+ * @param qe job to start
+ */
+static void
+start_job (struct GNUNET_FS_QueueEntry *qe)
+{
+  qe->client = GNUNET_CLIENT_connect (qe->h->sched, "fs", qe->h->cfg);
+  if (qe->client == NULL)
+    {
+      GNUNET_break (0);
+      return;
+    }
+  qe->start (qe->cls, qe->client);
+  switch (qe->category)
+    {
+    case GNUNET_FS_QC_DOWNLOAD:
+      qe->h->active_downloads++;
+      break;
+    case GNUNET_FS_QC_PROBE:
+      qe->h->active_probes++;
+      break;
+    }
+  qe->start_time = GNUNET_TIME_absolute_get ();
+  GNUNET_CONTAINER_DLL_remove (qe->h->pending_head,
+                              qe->h->pending_tail,
+                              qe);
+  GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+                                    qe->h->running_tail,
+                                    qe->h->running_tail,
+                                    qe);
+}
+
+
+/**
+ * Stop the given job (send signal, remove from active queue, update
+ * counters and state).
+ *
+ * @param qe job to stop
+ */
+static void
+stop_job (struct GNUNET_FS_QueueEntry *qe)
+{
+  qe->client = NULL;
+  qe->stop (qe->cls);
+  switch (qe->category)
+    {
+    case GNUNET_FS_QC_DOWNLOAD:
+      qe->h->active_downloads--;
+      break;
+    case GNUNET_FS_QC_PROBE:
+      qe->h->active_probes--;
+      break;
+    }
+  qe->run_time = GNUNET_TIME_relative_add (qe->run_time,
+                                          GNUNET_TIME_absolute_get_duration 
(qe->start_time));
+  GNUNET_CONTAINER_DLL_remove (qe->h->running_head,
+                              qe->h->running_tail,
+                              qe);
+  GNUNET_CONTAINER_DLL_insert_after (qe->h->pending_head,
+                                    qe->h->pending_tail,
+                                    qe->h->pending_tail,
+                                    qe);
+}
+
+
+/**
+ * Process the jobs in the job queue, possibly starting some
+ * and stopping others.
+ *
+ * @param cls the 'struct GNUNET_FS_Handle'
+ * @param tc scheduler context
+ */
+static void
+process_job_queue (void *cls,
+                  const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct GNUNET_FS_Handle *h = cls;
+
+  h->queue_job = GNUNET_SCHEDULER_NO_TASK;
+  /* FIXME: stupid implementation that just starts everything follows... */
+  while (NULL != h->pending_head)
+    start_job (h->pending_head);
+  
+  /* FIXME: possibly re-schedule queue-job! */
+}
+
+/**
+ * Add a job to the queue.
+ *
+ * @param h handle to the overall FS state
+ * @param start function to call to begin the job
+ * @param stop function to call to pause the job, or on dequeue (if the job 
was running)
+ * @param cls closure for start and stop
+ * @param cat category of the job
+ * @return queue handle
+ */
+struct GNUNET_FS_QueueEntry *
+GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
+                 GNUNET_FS_QueueStart start,
+                 GNUNET_FS_QueueStop stop,
+                 void *cls,
+                 enum GNUNET_FS_QueueCategory cat)
+{
+  struct GNUNET_FS_QueueEntry *qe;
+
+  qe = GNUNET_malloc (sizeof (struct GNUNET_FS_QueueEntry));
+  qe->h = h;
+  qe->start = start;
+  qe->stop = stop;
+  qe->cls = cls;
+  qe->queue_time = GNUNET_TIME_absolute_get ();
+  qe->category = cat;
+  GNUNET_CONTAINER_DLL_insert_after (h->pending_head,
+                                    h->pending_tail,
+                                    h->pending_tail,
+                                    qe);
+  if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (h->sched,
+                            h->queue_job);
+  h->queue_job 
+    = GNUNET_SCHEDULER_add_now (h->sched,
+                               &process_job_queue,
+                               h);
+  return qe;
+}
+
+
+/**
+ * Dequeue a job from the queue.
+ * @param qh handle for the job
+ */
+void
+GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh)
+{
+  if (qh->client != NULL)    
+    {
+      if (qh->h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+       GNUNET_SCHEDULER_cancel (qh->h->sched,
+                                qh->h->queue_job);
+      qh->h->queue_job 
+       = GNUNET_SCHEDULER_add_now (qh->h->sched,
+                                   &process_job_queue,
+                                   qh->h);
+      stop_job (qh);
+    }
+  GNUNET_CONTAINER_DLL_remove (qh->h->pending_head,
+                              qh->h->pending_tail,
+                              qh);
+  GNUNET_free (qh);
+}
+
+
+/**
  * Setup a connection to the file-sharing service.
  *
  * @param sched scheduler to use
@@ -97,6 +252,9 @@
 {
   // FIXME: serialize state!? (or is it always serialized???)
   // FIXME: terminate receive-loop with client  
+  if (h->queue_job != GNUNET_SCHEDULER_NO_TASK)
+    GNUNET_SCHEDULER_cancel (h->sched,
+                            h->queue_job);
   GNUNET_CLIENT_disconnect (h->client, GNUNET_NO);
   GNUNET_free (h->client_name);
   GNUNET_free (h);

Modified: gnunet/src/fs/fs.h
===================================================================
--- gnunet/src/fs/fs.h  2010-04-26 15:31:07 UTC (rev 11070)
+++ gnunet/src/fs/fs.h  2010-04-26 16:06:20 UTC (rev 11071)
@@ -446,6 +446,131 @@
 
 
 /**
+ * The job is now ready to run and should use the given client
+ * handle to communicate with the FS service.
+ *
+ * @param cls closure
+ * @param client handle to use for FS communication
+ */
+typedef void (*GNUNET_FS_QueueStart)(void *cls,
+                                    struct GNUNET_CLIENT_Connection *client);
+
+
+/**
+ * The job must now stop to run and should destry the client handle as
+ * soon as possible (ideally prior to returning).
+ */
+typedef void (*GNUNET_FS_QueueStop)(void *cls);
+
+/**
+ * Categories of jobs in the FS queue.
+ */
+enum GNUNET_FS_QueueCategory 
+  {
+    /**
+     * File download.
+     */
+    GNUNET_FS_QC_DOWNLOAD,
+
+    /**
+     * Availability probe (related to search).
+     */
+    GNUNET_FS_QC_PROBE
+
+  };
+
+/**
+ * Entry in the job queue.
+ */
+struct GNUNET_FS_QueueEntry
+{
+  /**
+   * This is a linked list.
+   */
+  struct GNUNET_FS_QueueEntry *next;
+
+  /**
+   * This is a linked list.
+   */
+  struct GNUNET_FS_QueueEntry *prev;
+
+  /**
+   * Function to call when the job is started.
+   */
+  GNUNET_FS_QueueStart start;
+
+  /**
+   * Function to call when the job needs to stop (or is done / dequeued).
+   */
+  GNUNET_FS_QueueStop stop;
+
+  /**
+   * Closure for start and stop.
+   */
+  void *cls;
+
+  /**
+   * Handle to FS primary context.
+   */ 
+  struct GNUNET_FS_Handle *h;
+
+  /**
+   * Client handle, or NULL if job is not running.
+   */
+  struct GNUNET_CLIENT_Connection *client;
+
+  /**
+   * Time the job was originally queued.
+   */
+  struct GNUNET_TIME_Absolute queue_time;
+
+  /**
+   * Time the job was started last.
+   */
+  struct GNUNET_TIME_Absolute start_time;
+
+  /**
+   * Total amount of time the job has been running (except for the
+   * current run).
+   */
+  struct GNUNET_TIME_Relative run_time;
+
+  /**
+   * What type of job is this?
+   */
+  enum GNUNET_FS_QueueCategory category;
+
+};
+
+
+
+
+/**
+ * Add a job to the queue.
+ *
+ * @param h handle to the overall FS state
+ * @param start function to call to begin the job
+ * @param stop function to call to pause the job, or on dequeue (if the job 
was running)
+ * @param cls closure for start and stop
+ * @return queue handle
+ */
+struct GNUNET_FS_QueueEntry *
+GNUNET_FS_queue_ (struct GNUNET_FS_Handle *h,
+                 GNUNET_FS_QueueStart start,
+                 GNUNET_FS_QueueStop stop,
+                 void *cls,
+                 enum GNUNET_FS_QueueCategory cat);
+
+
+/**
+ * Dequeue a job from the queue.
+ * @param qh handle for the job
+ */
+void
+GNUNET_FS_dequeue_ (struct GNUNET_FS_QueueEntry *qh);
+
+
+/**
  * Master context for most FS operations.
  */
 struct GNUNET_FS_Handle
@@ -481,13 +606,43 @@
   struct GNUNET_CLIENT_Connection *client;
 
   /**
-   * How many downloads probing availability
-   * of search results do we have running
-   * right now?
+   * Head of DLL of running jobs.
    */
+  struct GNUNET_FS_QueueEntry *running_head;
+
+  /**
+   * Tail of DLL of running jobs.
+   */
+  struct GNUNET_FS_QueueEntry *running_tail;
+
+  /**
+   * Head of DLL of pending jobs.
+   */
+  struct GNUNET_FS_QueueEntry *pending_head;
+
+  /**
+   * Tail of DLL of pending jobs.
+   */
+  struct GNUNET_FS_QueueEntry *pending_tail;
+
+  /**
+   * Task that processes the jobs in the running and pending queues
+   * (and moves jobs around as needed).
+   */
+  GNUNET_SCHEDULER_TaskIdentifier queue_job;
+
+  /**
+   * How many downloads probing availability of search results do we
+   * have running right now?
+   */
   unsigned int active_probes;
 
   /**
+   * How many actual downloads do we have running right now?
+   */
+  unsigned int active_downloads;
+
+  /**
    * General flags.
    */
   enum GNUNET_FS_Flags flags;

Modified: gnunet/src/fs/fs_download.c
===================================================================
--- gnunet/src/fs/fs_download.c 2010-04-26 15:31:07 UTC (rev 11070)
+++ gnunet/src/fs/fs_download.c 2010-04-26 16:06:20 UTC (rev 11071)
@@ -1396,26 +1396,31 @@
              dc->treedepth);
 #endif
   // FIXME: make persistent
+  pi.status = GNUNET_FS_STATUS_DOWNLOAD_START;
+  make_download_status (&pi, dc);
+  pi.value.download.specifics.start.meta = meta;
+  dc->client_info = dc->h->upcb (dc->h->upcb_cls,
+                                &pi);
+
   
-  // FIXME: bound parallelism here!
+  // FIXME: bound parallelism here
   client = GNUNET_CLIENT_connect (h->sched,
                                  "fs",
                                  h->cfg);
+  GNUNET_assert (NULL != client);
   dc->client = client;
-  schedule_block_download (dc, 
-                          &dc->uri->data.chk.chk,
-                          0, 
-                          1 /* 0 == CHK, 1 == top */);
   GNUNET_CLIENT_receive (client,
                         &receive_results,
                         dc,
                         GNUNET_TIME_UNIT_FOREVER_REL);
-  pi.status = GNUNET_FS_STATUS_DOWNLOAD_START;
+  pi.status = GNUNET_FS_STATUS_DOWNLOAD_ACTIVE;
   make_download_status (&pi, dc);
-  pi.value.download.specifics.start.meta = meta;
   dc->client_info = dc->h->upcb (dc->h->upcb_cls,
                                 &pi);
-
+  schedule_block_download (dc, 
+                          &dc->uri->data.chk.chk,
+                          0, 
+                          1 /* 0 == CHK, 1 == top */);
   return dc;
 }
 

Modified: gnunet/src/include/gnunet_fs_service.h
===================================================================
--- gnunet/src/include/gnunet_fs_service.h      2010-04-26 15:31:07 UTC (rev 
11070)
+++ gnunet/src/include/gnunet_fs_service.h      2010-04-26 16:06:20 UTC (rev 
11071)
@@ -557,6 +557,18 @@
   GNUNET_FS_STATUS_DOWNLOAD_STOPPED,
 
   /**
+   * Notification that this download is now actively being
+   * pursued (as opposed to waiting in the queue).
+   */
+  GNUNET_FS_STATUS_DOWNLOAD_ACTIVE,
+
+  /**
+   * Notification that this download is no longer actively
+   * being pursued (back in the queue).
+   */
+  GNUNET_FS_STATUS_DOWNLOAD_INACTIVE,
+
+  /**
    * First event generated when a client requests 
    * a search to begin or when a namespace result
    * automatically triggers the search for updates.





reply via email to

[Prev in Thread] Current Thread [Next in Thread]