gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[GNUnet-SVN] r14992 - gnunet/src/datastore


From: gnunet
Subject: [GNUnet-SVN] r14992 - gnunet/src/datastore
Date: Fri, 15 Apr 2011 15:08:23 +0200

Author: grothoff
Date: 2011-04-15 15:08:23 +0200 (Fri, 15 Apr 2011)
New Revision: 14992

Modified:
   gnunet/src/datastore/plugin_datastore_mysql.c
   gnunet/src/datastore/plugin_datastore_postgres.c
Log:
fixes

Modified: gnunet/src/datastore/plugin_datastore_mysql.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_mysql.c       2011-04-15 11:49:51 UTC 
(rev 14991)
+++ gnunet/src/datastore/plugin_datastore_mysql.c       2011-04-15 13:08:23 UTC 
(rev 14992)
@@ -208,6 +208,8 @@
   unsigned int count;
 
   int end_it;
+
+  int one_shot;
 };
 
 
@@ -284,9 +286,12 @@
 #define SELECT_ENTRY_BY_HASH_VHASH_AND_TYPE "SELECT 
type,prio,anonLevel,expire,hash,value,uid FROM gn090 FORCE INDEX 
(idx_hash_vhash) WHERE hash=? AND vhash=? AND type=? ORDER BY uid ASC LIMIT 1 
OFFSET ?"
   struct GNUNET_MysqlStatementHandle *select_entry_by_hash_vhash_and_type;
 
-#define UPDATE_ENTRY "UPDATE gn090 SET 
prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=? LIMIT 1"
+#define UPDATE_ENTRY "UPDATE gn090 SET 
prio=prio+?,expire=IF(expire>=?,expire,?) WHERE uid=?"
   struct GNUNET_MysqlStatementHandle *update_entry;
 
+#define DEC_REPL "UPDATE gn090 SET repl=GREATEST (0, repl - 1) WHERE uid=?"
+  struct GNUNET_MysqlStatementHandle *dec_repl;
+
 #define SELECT_SIZE "SELECT SUM(BIT_LENGTH(value) DIV 8) FROM gn090"
   struct GNUNET_MysqlStatementHandle *get_size;
 
@@ -866,144 +871,6 @@
 
 
 /**
- * Continuation of "mysql_next_request".
- *
- * @param next_cls the next context
- * @param tc the task context (unused)
- */
-static void 
-mysql_next_request_cont (void *next_cls,
-                        const struct GNUNET_SCHEDULER_TaskContext *tc)
-{
-  struct NextRequestClosure *nrc = next_cls;
-  struct Plugin *plugin;
-  int ret;
-  unsigned int type;
-  unsigned int priority;
-  unsigned int anonymity;
-  unsigned long long exp;
-  unsigned long hashSize;
-  unsigned long size;
-  unsigned long long uid;
-  char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
-  GNUNET_HashCode key;
-  struct GNUNET_TIME_Absolute expiration;
-  MYSQL_BIND *rbind = nrc->rbind;
-
-  plugin = nrc->plugin;
-  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
-  plugin->next_task_nc = NULL;
-
-  if (GNUNET_YES == nrc->end_it) 
-    goto END_SET;
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->now = GNUNET_TIME_absolute_get ();
-  hashSize = sizeof (GNUNET_HashCode);
-  memset (nrc->rbind, 0, sizeof (nrc->rbind));
-  rbind = nrc->rbind;
-  rbind[0].buffer_type = MYSQL_TYPE_LONG;
-  rbind[0].buffer = &type;
-  rbind[0].is_unsigned = 1;
-  rbind[1].buffer_type = MYSQL_TYPE_LONG;
-  rbind[1].buffer = &priority;
-  rbind[1].is_unsigned = 1;
-  rbind[2].buffer_type = MYSQL_TYPE_LONG;
-  rbind[2].buffer = &anonymity;
-  rbind[2].is_unsigned = 1;
-  rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[3].buffer = &exp;
-  rbind[3].is_unsigned = 1;
-  rbind[4].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[4].buffer = &key;
-  rbind[4].buffer_length = hashSize;
-  rbind[4].length = &hashSize;
-  rbind[5].buffer_type = MYSQL_TYPE_BLOB;
-  rbind[5].buffer = value;
-  rbind[5].buffer_length = size = sizeof (value);
-  rbind[5].length = &size;
-  rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
-  rbind[6].buffer = &uid;
-  rbind[6].is_unsigned = 1;
-
-  if (GNUNET_OK != nrc->prep (nrc->prep_cls,
-                             nrc))
-    goto END_SET;
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  GNUNET_assert (size <= sizeof(value));
-  if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
-       (hashSize != sizeof (GNUNET_HashCode)) )
-    {
-      GNUNET_break (0);
-      goto END_SET;
-    }    
-#if DEBUG_MYSQL
-  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
-             "Found %u-byte value under key `%s' with prio %u, anon %u, expire 
%llu selecting from gn090 table\n",
-             (unsigned int) size,
-             GNUNET_h2s (&key),
-             priority,
-             anonymity,
-             exp);
-#endif
-  expiration.abs_value = exp;
-  ret = nrc->dviter (nrc->dviter_cls, (nrc->end_it == GNUNET_YES) ? NULL : nrc,
-                    &key,
-                    size, value,
-                    type, priority, anonymity, expiration,
-                    uid);
-  if (ret == GNUNET_SYSERR)
-    {
-      nrc->end_it = GNUNET_YES;
-      return;
-    }
-  if (ret == GNUNET_NO)
-    {
-      do_delete_entry (plugin, uid);
-      if (size != 0)
-       plugin->env->duc (plugin->env->cls,
-                         - size);
-    }
-  return;
- END_SET:
-  /* call dviter with "end of set" */
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->dviter (nrc->dviter_cls, 
-              NULL, NULL, 0, NULL, 0, 0, 0, 
-              GNUNET_TIME_UNIT_ZERO_ABS, 0);
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  nrc->prep (nrc->prep_cls, NULL);
-  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
-  GNUNET_free (nrc);
-}
-
-
-/**
- * Function invoked on behalf of a "PluginIterator"
- * asking the database plugin to call the iterator
- * with the next item.
- *
- * @param next_cls whatever argument was given
- *        to the PluginIterator as "next_cls".
- * @param end_it set to GNUNET_YES if we
- *        should terminate the iteration early
- *        (iterator should be still called once more
- *         to signal the end of the iteration).
- */
-static void 
-mysql_plugin_next_request (void *next_cls,
-                          int end_it)
-{
-  struct NextRequestClosure *nrc = next_cls;
-
-  if (GNUNET_YES == end_it)
-    nrc->end_it = GNUNET_YES;
-  nrc->plugin->next_task_nc = nrc;
-  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
-                                                    nrc);
-}  
-
-
-/**
  * Get an estimate of how much space the database is
  * currently using.
  *
@@ -1167,6 +1034,152 @@
 }
 
 
+
+
+/**
+ * Continuation of "mysql_next_request".
+ *
+ * @param next_cls the next context
+ * @param tc the task context (unused)
+ */
+static void 
+mysql_next_request_cont (void *next_cls,
+                        const struct GNUNET_SCHEDULER_TaskContext *tc)
+{
+  struct NextRequestClosure *nrc = next_cls;
+  struct Plugin *plugin;
+  int ret;
+  unsigned int type;
+  unsigned int priority;
+  unsigned int anonymity;
+  unsigned long long exp;
+  unsigned long hashSize;
+  unsigned long size;
+  unsigned long long uid;
+  char value[GNUNET_DATASTORE_MAX_VALUE_SIZE];
+  GNUNET_HashCode key;
+  struct GNUNET_TIME_Absolute expiration;
+  MYSQL_BIND *rbind = nrc->rbind;
+
+  plugin = nrc->plugin;
+  plugin->next_task = GNUNET_SCHEDULER_NO_TASK;
+  plugin->next_task_nc = NULL;
+
+  if (GNUNET_YES == nrc->end_it) 
+    goto END_SET;
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->now = GNUNET_TIME_absolute_get ();
+  hashSize = sizeof (GNUNET_HashCode);
+  memset (nrc->rbind, 0, sizeof (nrc->rbind));
+  rbind = nrc->rbind;
+  rbind[0].buffer_type = MYSQL_TYPE_LONG;
+  rbind[0].buffer = &type;
+  rbind[0].is_unsigned = 1;
+  rbind[1].buffer_type = MYSQL_TYPE_LONG;
+  rbind[1].buffer = &priority;
+  rbind[1].is_unsigned = 1;
+  rbind[2].buffer_type = MYSQL_TYPE_LONG;
+  rbind[2].buffer = &anonymity;
+  rbind[2].is_unsigned = 1;
+  rbind[3].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[3].buffer = &exp;
+  rbind[3].is_unsigned = 1;
+  rbind[4].buffer_type = MYSQL_TYPE_BLOB;
+  rbind[4].buffer = &key;
+  rbind[4].buffer_length = hashSize;
+  rbind[4].length = &hashSize;
+  rbind[5].buffer_type = MYSQL_TYPE_BLOB;
+  rbind[5].buffer = value;
+  rbind[5].buffer_length = size = sizeof (value);
+  rbind[5].length = &size;
+  rbind[6].buffer_type = MYSQL_TYPE_LONGLONG;
+  rbind[6].buffer = &uid;
+  rbind[6].is_unsigned = 1;
+
+  if (GNUNET_OK != nrc->prep (nrc->prep_cls,
+                             nrc))
+    goto END_SET;
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_assert (size <= sizeof(value));
+  if ( (rbind[4].buffer_length != sizeof (GNUNET_HashCode)) ||
+       (hashSize != sizeof (GNUNET_HashCode)) )
+    {
+      GNUNET_break (0);
+      goto END_SET;
+    }    
+#if DEBUG_MYSQL
+  GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+             "Found %u-byte value under key `%s' with prio %u, anon %u, expire 
%llu selecting from gn090 table\n",
+             (unsigned int) size,
+             GNUNET_h2s (&key),
+             priority,
+             anonymity,
+             exp);
+#endif
+  expiration.abs_value = exp;
+  ret = nrc->dviter (nrc->dviter_cls, 
+                    (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
+                    &key,
+                    size, value,
+                    type, priority, anonymity, expiration,
+                    uid);
+  if (ret == GNUNET_SYSERR)
+    {
+      nrc->end_it = GNUNET_YES;
+      return;
+    }
+  if (ret == GNUNET_NO)
+    {
+      do_delete_entry (plugin, uid);
+      if (size != 0)
+       plugin->env->duc (plugin->env->cls,
+                         - size);
+    }
+  if (nrc->one_shot == GNUNET_YES)
+    GNUNET_free (nrc);
+  return;
+ END_SET:
+  /* call dviter with "end of set" */
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->dviter (nrc->dviter_cls, 
+              NULL, NULL, 0, NULL, 0, 0, 0, 
+              GNUNET_TIME_UNIT_ZERO_ABS, 0);
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  nrc->prep (nrc->prep_cls, NULL);
+  GNUNET_assert (nrc->plugin->next_task == GNUNET_SCHEDULER_NO_TASK);
+  GNUNET_free (nrc);
+}
+
+
+/**
+ * Function invoked on behalf of a "PluginIterator"
+ * asking the database plugin to call the iterator
+ * with the next item.
+ *
+ * @param next_cls whatever argument was given
+ *        to the PluginIterator as "next_cls".
+ * @param end_it set to GNUNET_YES if we
+ *        should terminate the iteration early
+ *        (iterator should be still called once more
+ *         to signal the end of the iteration).
+ */
+static void 
+mysql_plugin_next_request (void *next_cls,
+                          int end_it)
+{
+  struct NextRequestClosure *nrc = next_cls;
+
+  if (GNUNET_YES == end_it)
+    nrc->end_it = GNUNET_YES;
+  nrc->plugin->next_task_nc = nrc;
+  nrc->plugin->next_task = GNUNET_SCHEDULER_add_now (&mysql_next_request_cont,
+                                                    nrc);
+}  
+
+
+/**
+ * Context for 'get_statement_prepare'.
+ */
 struct GetContext
 {
   GNUNET_HashCode key;
@@ -1466,7 +1479,6 @@
 {
   struct Plugin *plugin = cls;
 
-  nrc->end_it = GNUNET_YES;
   return prepared_statement_run_select (plugin,
                                        plugin->select_replication, 
                                        7, nrc->rbind, 
@@ -1475,7 +1487,93 @@
 }
 
 
+
 /**
+ * Context for 'repl_iter' function.
+ */
+struct ReplCtx
+{
+  
+  /**
+   * Plugin handle.
+   */
+  struct Plugin *plugin;
+  
+  /**
+   * Function to call for the result (or the NULL).
+   */
+  PluginIterator iter;
+  
+  /**
+   * Closure for iter.
+   */
+  void *iter_cls;
+};
+
+
+/**
+ * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Decrements the replication counter and calls the original
+ * iterator.
+ *
+ * @param cls closure
+ * @param next_cls closure to pass to the "next" function.
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ *         (continue on call to "next", of course),
+ *         GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+repl_iter (void *cls,
+          void *next_cls,
+          const GNUNET_HashCode *key,
+          uint32_t size,
+          const void *data,
+          enum GNUNET_BLOCK_Type type,
+          uint32_t priority,
+          uint32_t anonymity,
+          struct GNUNET_TIME_Absolute expiration, 
+          uint64_t uid)
+{
+  struct ReplCtx *rc = cls;
+  struct Plugin *plugin = rc->plugin;
+  unsigned long long oid;
+  int ret;
+
+  ret = rc->iter (rc->iter_cls,
+                 next_cls, key,
+                 size, data, 
+                 type, priority, anonymity, expiration,
+                 uid);
+  if (NULL != key)
+    {
+      oid = (unsigned long long) uid;
+      ret = prepared_statement_run (plugin,
+                                   plugin->dec_repl,
+                                   NULL,
+                                   MYSQL_TYPE_LONGLONG, &oid, GNUNET_YES, 
+                                   -1);
+      if (ret == GNUNET_SYSERR)
+       {
+         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+                     "Failed to reduce replication counter\n");
+         return GNUNET_SYSERR;
+       }
+    }
+  return ret;
+}
+
+
+/**
  * Get a random item for replication.  Returns a single, not expired, random 
item
  * from those with the highest replication counters.  The item's 
  * replication counter is decremented by one IF it was positive before.
@@ -1490,18 +1588,23 @@
                              PluginIterator iter, void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure nrc;
-
-  memset (&nrc, 0, sizeof (nrc));
-  nrc.plugin = plugin;
-  nrc.now = GNUNET_TIME_absolute_get ();
-  nrc.prep = &replication_prepare;
-  nrc.prep_cls = plugin;
-  nrc.type = 0;
-  nrc.dviter = iter;
-  nrc.dviter_cls = iter_cls;
-  nrc.end_it = GNUNET_NO;
-  mysql_next_request_cont (&nrc, NULL);
+  struct NextRequestClosure *nrc;
+  struct ReplCtx rc;
+  
+  rc.plugin = plugin;
+  rc.iter = iter;
+  rc.iter_cls = iter_cls;
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->now = GNUNET_TIME_absolute_get ();
+  nrc->prep = &replication_prepare;
+  nrc->prep_cls = plugin;
+  nrc->type = 0;
+  nrc->dviter = &repl_iter;
+  nrc->dviter_cls = &rc;
+  nrc->end_it = GNUNET_NO;
+  nrc->one_shot = GNUNET_YES;
+  mysql_next_request_cont (nrc, NULL);
 }
 
 
@@ -1522,7 +1625,6 @@
 
   if (NULL == nrc)
     return GNUNET_NO;
-  nrc->end_it = GNUNET_YES;
   nt = (long long) nrc->now.abs_value;
   return prepared_statement_run_select
     (plugin,
@@ -1547,18 +1649,19 @@
                             PluginIterator iter, void *iter_cls)
 {
   struct Plugin *plugin = cls;
-  struct NextRequestClosure nrc;
+  struct NextRequestClosure *nrc;
 
-  memset (&nrc, 0, sizeof (nrc));
-  nrc.plugin = plugin;
-  nrc.now = GNUNET_TIME_absolute_get ();
-  nrc.prep = &expiration_prepare;
-  nrc.prep_cls = plugin;
-  nrc.type = 0;
-  nrc.dviter = iter;
-  nrc.dviter_cls = iter_cls;
-  nrc.end_it = GNUNET_NO;
-  mysql_next_request_cont (&nrc, NULL);
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->plugin = plugin;
+  nrc->now = GNUNET_TIME_absolute_get ();
+  nrc->prep = &expiration_prepare;
+  nrc->prep_cls = plugin;
+  nrc->type = 0;
+  nrc->dviter = iter;
+  nrc->dviter_cls = iter_cls;
+  nrc->end_it = GNUNET_NO;
+  nrc->one_shot = GNUNET_YES;
+  mysql_next_request_cont (nrc, NULL);
 }
 
 
@@ -1639,6 +1742,7 @@
       || PINIT (plugin->count_entry_by_hash_vhash_and_type,
                 COUNT_ENTRY_BY_HASH_VHASH_AND_TYPE)
       || PINIT (plugin->update_entry, UPDATE_ENTRY)
+      || PINIT (plugin->dec_repl, DEC_REPL)
       || PINIT (plugin->zero_iter, SELECT_IT_NON_ANONYMOUS) 
       || PINIT (plugin->select_expiration, SELECT_IT_EXPIRATION) 
       || PINIT (plugin->select_replication, SELECT_IT_REPLICATION) )

Modified: gnunet/src/datastore/plugin_datastore_postgres.c
===================================================================
--- gnunet/src/datastore/plugin_datastore_postgres.c    2011-04-15 11:49:51 UTC 
(rev 14991)
+++ gnunet/src/datastore/plugin_datastore_postgres.c    2011-04-15 13:08:23 UTC 
(rev 14992)
@@ -1,6 +1,6 @@
 /*
      This file is part of GNUnet
-     (C) 2009, 2010 Christian Grothoff (and other contributing authors)
+     (C) 2009, 2010, 2011 Christian Grothoff (and other contributing authors)
 
      GNUnet is free software; you can redistribute it and/or modify
      it under the terms of the GNU General Public License as published
@@ -30,45 +30,6 @@
 
 #define DEBUG_POSTGRES GNUNET_NO
 
-#define SELECT_IT_LOW_PRIORITY "(SELECT type, prio, anonLevel, expire, hash, 
value, oid FROM gn090 "\
-                               "WHERE (prio = $1 AND oid > $2) "               
        \
-                               "ORDER BY prio ASC,oid ASC LIMIT 1) "\
-                               "UNION "\
-                               "(SELECT type, prio, anonLevel, expire, hash, 
value, oid FROM gn090 "\
-                               "WHERE (prio > $1 AND oid != $2)"\
-                               "ORDER BY prio ASC,oid ASC LIMIT 1)"\
-                               "ORDER BY prio ASC,oid ASC LIMIT 1"
-
-#define SELECT_IT_NON_ANONYMOUS "(SELECT type, prio, anonLevel, expire, hash, 
value, oid FROM gn090 "\
-                                "WHERE (prio = $1 AND oid < $2)"\
-                                " AND anonLevel=0 ORDER BY prio DESC,oid DESC 
LIMIT 1) "\
-                                "UNION "\
-                                "(SELECT type, prio, anonLevel, expire, hash, 
value, oid FROM gn090 "\
-                                "WHERE (prio < $1 AND oid != $2)"\
-                                " AND anonLevel=0 ORDER BY prio DESC,oid DESC 
LIMIT 1) "\
-                                "ORDER BY prio DESC,oid DESC LIMIT 1"
-
-#define SELECT_IT_EXPIRATION_TIME "(SELECT type, prio, anonLevel, expire, 
hash, value, oid FROM gn090 "\
-                                  "WHERE (expire = $1 AND oid > $2) "\
-                                  "ORDER BY expire ASC,oid ASC LIMIT 1) "\
-                                  "UNION "\
-                                  "(SELECT type, prio, anonLevel, expire, 
hash, value, oid FROM gn090 "\
-                                  "WHERE (expire > $1 AND oid != $2) "         
\
-                                  "ORDER BY expire ASC,oid ASC LIMIT 1)"\
-                                  "ORDER BY expire ASC,oid ASC LIMIT 1"
-
-
-#define SELECT_IT_MIGRATION_ORDER "(SELECT type, prio, anonLevel, expire, 
hash, value, oid FROM gn090 "\
-                                  "WHERE (expire = $1 AND oid < $2)"\
-                                  " AND expire > $3 AND type!=3"\
-                                  " ORDER BY expire DESC,oid DESC LIMIT 1) "\
-                                  "UNION "\
-                                  "(SELECT type, prio, anonLevel, expire, 
hash, value, oid FROM gn090 "\
-                                  "WHERE (expire < $1 AND oid != $2)"          
\
-                                  " AND expire > $3 AND type!=3"\
-                                  " ORDER BY expire DESC,oid DESC LIMIT 1)"\
-                                  "ORDER BY expire DESC,oid DESC LIMIT 1"
-
 /**
  * After how many ms "busy" should a DB operation fail for good?
  * A low value makes sure that we are more responsive to requests
@@ -140,7 +101,7 @@
   /**
    * Number of entries found so far
    */
-  long long count;
+  unsigned long long count;
   
   /**
    * Offset this iteration starts at.
@@ -153,26 +114,16 @@
   uint64_t blimit_off;
   
   /**
-   *  Overall number of matching entries.
+   * Current total number of entries found so far, big-endian.
    */
-  unsigned long long total;
+  uint64_t bcount;
   
   /**
-   * Expiration value of previous result (possible parameter), big-endian.
+   *  Overall number of matching entries.
    */
-  uint64_t blast_expire;
+  unsigned long long total;
   
   /**
-   * Row ID of last result (possible paramter), big-endian.
-   */
-  uint32_t blast_rowid;
-  
-  /**
-   * Priority of last result (possible parameter), big-endian.
-   */
-  uint32_t blast_prio;
-  
-  /**
    * Type of block (possible paramter), big-endian.
    */
   uint32_t btype;
@@ -181,6 +132,11 @@
    * Flag set to GNUNET_YES to stop iteration.
    */
   int end_it;
+
+  /**
+   * Flag to indicate that there should only be one result.
+   */
+  int one_shot;
 };
 
 
@@ -336,6 +292,7 @@
   GNUNET_free_non_null (conninfo);
   ret = PQexec (plugin->dbh,
                 "CREATE TABLE gn090 ("
+                "  repl INTEGER NOT NULL DEFAULT 0,"
                 "  type INTEGER NOT NULL DEFAULT 0,"
                 "  prio INTEGER NOT NULL DEFAULT 0,"
                 "  anonLevel INTEGER NOT NULL DEFAULT 0,"
@@ -385,7 +342,6 @@
         }
     }
   PQclear (ret);
-#if 1
   ret = PQexec (plugin->dbh,
                 "ALTER TABLE gn090 ALTER value SET STORAGE EXTERNAL");
   if (GNUNET_OK != 
@@ -421,44 +377,43 @@
       return GNUNET_SYSERR;
     }
   PQclear (ret);
-#endif
   if ((GNUNET_OK !=
        pq_prepare (plugin,
                   "getvt",
                    "SELECT type, prio, anonLevel, expire, hash, value, oid 
FROM gn090 "
                    "WHERE hash=$1 AND vhash=$2 AND type=$3 "
-                   "AND oid > $4 ORDER BY oid ASC LIMIT 1 OFFSET $5",
-                   5,
+                  "ORDER BY oid ASC LIMIT 1 OFFSET $4",
+                   4,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
                   "gett",
                    "SELECT type, prio, anonLevel, expire, hash, value, oid 
FROM gn090 "
-                   "WHERE hash=$1 AND type=$2"
-                   "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
-                   4,
+                   "WHERE hash=$1 AND type=$2 "
+                  "ORDER BY oid ASC LIMIT 1 OFFSET $3",
+                   3,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
                   "getv",
                    "SELECT type, prio, anonLevel, expire, hash, value, oid 
FROM gn090 "
-                   "WHERE hash=$1 AND vhash=$2"
-                   "AND oid > $3 ORDER BY oid ASC LIMIT 1 OFFSET $4",
-                   4,
+                   "WHERE hash=$1 AND vhash=$2 "
+                  "ORDER BY oid ASC LIMIT 1 OFFSET $3",
+                   3,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
                   "get",
                    "SELECT type, prio, anonLevel, expire, hash, value, oid 
FROM gn090 "
-                   "WHERE hash=$1"
-                   "AND oid > $2 ORDER BY oid ASC LIMIT 1 OFFSET $3",
-                   3,
+                   "WHERE hash=$1 "
+                  "ORDER BY oid ASC LIMIT 1 OFFSET $2",
+                   2,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
                   "put",
-                   "INSERT INTO gn090 (type, prio, anonLevel, expire, hash, 
vhash, value) "
-                   "VALUES ($1, $2, $3, $4, $5, $6, $7)",
+                   "INSERT INTO gn090 (repl, type, prio, anonLevel, expire, 
hash, vhash, value) "
+                   "VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
                    8,
                    __LINE__)) ||
       (GNUNET_OK !=
@@ -470,32 +425,42 @@
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
-                  "select_low_priority",
-                   SELECT_IT_LOW_PRIORITY,
-                   2,
+                  "decrepl",
+                   "UPDATE gn090 SET repl = GREATEST (repl - 1, 0) "
+                   "WHERE oid = $1",
+                   1,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
                   "select_non_anonymous",
-                   SELECT_IT_NON_ANONYMOUS,
-                   2,
+                  "SELECT type, prio, anonLevel, expire, hash, value, oid FROM 
gn090 "
+                  "WHERE anonLevel = 0 ORDER BY oid DESC LIMIT 1 OFFSET $1",
+                   1,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
-                  "select_expiration_time",
-                   SELECT_IT_EXPIRATION_TIME,
-                   2,
+                  "select_expiration_order",
+                  "(SELECT type, prio, anonLevel, expire, hash, value, oid 
FROM gn090 " 
+                  "WHERE expire < $1 ORDER BY prio ASC LIMIT 1) "              
        
+                  "UNION "                                             
+                  "(SELECT type, prio, anonLevel, expire, hash, value, oid 
FROM gn090 " 
+                  "ORDER BY prio ASC LIMIT 1) "                        
+                  "ORDER BY expire ASC LIMIT 1",
+                   1,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
-                  "select_migration_order",
-                   SELECT_IT_MIGRATION_ORDER,
-                   3,
+                  "select_replication_order",
+                  "SELECT type, prio, anonLevel, expire, hash, value, oid FROM 
gn090 " \
+                  "ORDER BY repl DESC,RANDOM() LIMIT 1",
+                   0,
                    __LINE__)) ||
       (GNUNET_OK !=
        pq_prepare (plugin,
                   "delrow",
-                   "DELETE FROM gn090 " "WHERE oid=$1", 1, __LINE__)))
+                   "DELETE FROM gn090 " "WHERE oid=$1", 
+                  1,
+                  __LINE__)))
     {
       PQfinish (plugin->dbh);
       plugin->dbh = NULL;
@@ -610,8 +575,10 @@
   uint32_t btype = htonl (type);
   uint32_t bprio = htonl (priority);
   uint32_t banon = htonl (anonymity);
+  uint32_t brepl = htonl (replication);
   uint64_t bexpi = GNUNET_TIME_absolute_hton (expiration).abs_value__;
   const char *paramValues[] = {
+    (const char *) &brepl,
     (const char *) &btype,
     (const char *) &bprio,
     (const char *) &banon,
@@ -621,6 +588,7 @@
     (const char *) data
   };
   int paramLengths[] = {
+    sizeof (brepl),
     sizeof (btype),
     sizeof (bprio),
     sizeof (banon),
@@ -629,11 +597,11 @@
     sizeof (GNUNET_HashCode),
     size
   };
-  const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1 };
+  const int paramFormats[] = { 1, 1, 1, 1, 1, 1, 1, 1 };
 
   GNUNET_CRYPTO_hash (data, size, &vhash);
   ret = PQexecPrepared (plugin->dbh,
-                        "put", 7, paramValues, paramLengths, paramFormats, 1);
+                        "put", 8, paramValues, paramLengths, paramFormats, 1);
   if (GNUNET_OK != check_result (plugin, ret,
                                  PGRES_COMMAND_OK,
                                  "PQexecPrepared", "put", __LINE__))
@@ -649,6 +617,7 @@
   return GNUNET_OK;
 }
 
+
 /**
  * Function invoked on behalf of a "PluginIterator"
  * asking the database plugin to call the iterator
@@ -690,15 +659,11 @@
                 GNUNET_TIME_UNIT_ZERO_ABS, 0);
       GNUNET_free (nrc);
       return;
-    }
-  
-  if (nrc->count == 0)
-    nrc->blimit_off = GNUNET_htonll (nrc->off);
-  else
-    nrc->blimit_off = GNUNET_htonll (0);
-  if (nrc->count + nrc->off == nrc->total)
-    nrc->blast_rowid = htonl (0); /* back to start */
-  
+    }  
+  if (nrc->off == nrc->total)
+    nrc->off = 0;
+  nrc->blimit_off = GNUNET_htonll (nrc->off);
+  nrc->bcount = GNUNET_htonll ((uint64_t) nrc->count);
   res = PQexecPrepared (plugin->dbh,
                        nrc->pname,
                        nrc->nparams,
@@ -773,14 +738,10 @@
   priority = ntohl (*(uint32_t *) PQgetvalue (res, 0, 1));
   anonymity = ntohl ( *(uint32_t *) PQgetvalue (res, 0, 2));
   expiration_time.abs_value = GNUNET_ntohll (*(uint64_t *) PQgetvalue (res, 0, 
3));
-  memcpy (&key, PQgetvalue (res, 0, 4), sizeof (GNUNET_HashCode));
+  memcpy (&key, 
+         PQgetvalue (res, 0, 4), 
+         sizeof (GNUNET_HashCode));
   size = PQgetlength (res, 0, 5);
-
-  nrc->blast_prio = htonl (priority);
-  nrc->blast_expire = GNUNET_htonll (expiration_time.abs_value);
-  nrc->blast_rowid = htonl (rowid);
-  nrc->count++;
-
 #if DEBUG_POSTGRES
   GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG,
                   "datastore-postgres",
@@ -789,7 +750,7 @@
                   (unsigned int) type);
 #endif
   iret = nrc->iter (nrc->iter_cls,
-                   nrc,
+                   (nrc->one_shot == GNUNET_YES) ? NULL : nrc,
                    &key,
                    size,
                    PQgetvalue (res, 0, 5),
@@ -799,6 +760,11 @@
                    expiration_time,
                    rowid);
   PQclear (res);
+  if (iret != GNUNET_NO)
+    {
+      nrc->count++;
+      nrc->off++;
+    }
   if (iret == GNUNET_SYSERR)
     {
 #if DEBUG_POSTGRES
@@ -828,6 +794,8 @@
 #endif
        }
     }
+  if (nrc->one_shot == GNUNET_YES) 
+    GNUNET_free (nrc);
 }
 
 
@@ -858,183 +826,6 @@
 
 
 /**
- * Update the priority for a particular key in the datastore.  If
- * the expiration time in value is different than the time found in
- * the datastore, the higher value should be kept.  For the
- * anonymity level, the lower value is to be used.  The specified
- * priority should be added to the existing priority, ignoring the
- * priority in value.
- *
- * Note that it is possible for multiple values to match this put.
- * In that case, all of the respective values are updated.
- *
- * @param cls our "struct Plugin*"
- * @param uid unique identifier of the datum
- * @param delta by how much should the priority
- *     change?  If priority + delta < 0 the
- *     priority should be set to 0 (never go
- *     negative).
- * @param expire new expiration time should be the
- *     MAX of any existing expiration time and
- *     this value
- * @param msg set to error message
- * @return GNUNET_OK on success
- */
-static int
-postgres_plugin_update (void *cls,
-                       uint64_t uid,
-                       int delta, struct GNUNET_TIME_Absolute expire,
-                       char **msg)
-{
-  struct Plugin *plugin = cls;
-  PGresult *ret;
-  int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
-  uint32_t boid = htonl ( (uint32_t) uid);
-  uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
-  const char *paramValues[] = {
-    (const char *) &bdelta,
-    (const char *) &bexpire,
-    (const char *) &boid,
-  };
-  int paramLengths[] = {
-    sizeof (bdelta),
-    sizeof (bexpire),
-    sizeof (boid),
-  };
-  const int paramFormats[] = { 1, 1, 1 };
-
-  ret = PQexecPrepared (plugin->dbh,
-                        "update",
-                        3, paramValues, paramLengths, paramFormats, 1);
-  if (GNUNET_OK != check_result (plugin,
-                                ret,
-                                 PGRES_COMMAND_OK,
-                                 "PQexecPrepared", "update", __LINE__))
-    return GNUNET_SYSERR;
-  PQclear (ret);
-  return GNUNET_OK;
-}
-
-
-/**
- * Call a method for each key in the database and
- * call the callback method on it.
- *
- * @param plugin global context
- * @param type entries of which type should be considered?
- * @param is_asc ascending or descending iteration?
- * @param iter_select which SELECT method should be used?
- * @param iter maybe NULL (to just count); iter
- *     should return GNUNET_SYSERR to abort the
- *     iteration, GNUNET_NO to delete the entry and
- *     continue and GNUNET_OK to continue iterating
- * @param iter_cls closure for 'iter'
- */
-static void
-postgres_iterate (struct Plugin *plugin,
-                 unsigned int type,
-                  int is_asc,
-                  unsigned int iter_select,
-                  PluginIterator iter, void *iter_cls)
-{
-  struct NextRequestClosure *nrc;
-
-  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
-  nrc->count = UINT32_MAX;
-  nrc->plugin = plugin;
-  nrc->iter = iter;
-  nrc->iter_cls = iter_cls;
-  if (is_asc)
-    {
-      nrc->blast_prio = htonl (0);
-      nrc->blast_rowid = htonl (0);
-      nrc->blast_expire = htonl (0);
-    }
-  else
-    {
-      nrc->blast_prio = htonl (0x7FFFFFFFL);
-      nrc->blast_rowid = htonl (0xFFFFFFFF);
-      nrc->blast_expire = GNUNET_htonll (0x7FFFFFFFFFFFFFFFLL);
-    }
-  switch (iter_select)
-    {
-    case 0:
-      nrc->pname = "select_low_priority";
-      nrc->nparams = 2;
-      nrc->paramValues[0] = (const char *) &nrc->blast_prio;
-      nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
-      nrc->paramLengths[0] = sizeof (nrc->blast_prio);
-      nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
-      break;
-    case 1:
-      nrc->pname = "select_non_anonymous";
-      nrc->nparams = 2;
-      nrc->paramValues[0] = (const char *) &nrc->blast_prio;
-      nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
-      nrc->paramLengths[0] = sizeof (nrc->blast_prio);
-      nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
-      break;
-    case 2:
-      nrc->pname = "select_expiration_time";
-      nrc->nparams = 2;
-      nrc->paramValues[0] = (const char *) &nrc->blast_expire;
-      nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
-      nrc->paramLengths[0] = sizeof (nrc->blast_expire);
-      nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
-      break;
-    case 3:
-      nrc->pname = "select_migration_order";
-      nrc->nparams = 3;
-      nrc->paramValues[0] = (const char *) &nrc->blast_expire;
-      nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
-      nrc->paramValues[2] = (const char *) &nrc->bnow;
-      nrc->paramLengths[0] = sizeof (nrc->blast_expire);
-      nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
-      nrc->paramLengths[2] = sizeof (nrc->bnow);
-      break;
-    default:
-      GNUNET_break (0);
-      iter (iter_cls, 
-           NULL, NULL, 0, NULL, 0, 0, 0, 
-           GNUNET_TIME_UNIT_ZERO_ABS, 0);
-      GNUNET_free (nrc);
-      return;
-    }
-  nrc->bnow = GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get 
()).abs_value__;
-  postgres_plugin_next_request (nrc,
-                               GNUNET_NO);
-}
-
-
-/**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
- *
- * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
- */
-static void
-postgres_plugin_iter_low_priority (void *cls,
-                                  enum GNUNET_BLOCK_Type type,
-                                  PluginIterator iter,
-                                  void *iter_cls)
-{
-  struct Plugin *plugin = cls;
-  
-  postgres_iterate (plugin,
-                   type, 
-                   GNUNET_YES, 0, 
-                   iter, iter_cls);
-}
-
-
-
-
-/**
  * Iterate over the results for a particular key
  * in the datastore.
  *
@@ -1063,12 +854,7 @@
   const int paramFormats[] = { 1, 1, 1, 1, 1 };
   PGresult *ret;
 
-  if (key == NULL)
-    {
-      postgres_plugin_iter_low_priority (plugin, type, 
-                                        iter, iter_cls);
-      return;
-    }
+  GNUNET_assert (key != NULL);
   nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
   nrc->plugin = plugin;
   nrc->iter = iter;
@@ -1087,11 +873,9 @@
           nrc->paramLengths[1] = sizeof (nrc->vhash);
           nrc->paramValues[2] = (const char *) &nrc->btype;
           nrc->paramLengths[2] = sizeof (nrc->btype);
-          nrc->paramValues[3] = (const char *) &nrc->blast_rowid;
-          nrc->paramLengths[3] = sizeof (nrc->blast_rowid);
-          nrc->paramValues[4] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[4] = sizeof (nrc->blimit_off);
-          nrc->nparams = 5;
+          nrc->paramValues[3] = (const char *) &nrc->blimit_off;
+          nrc->paramLengths[3] = sizeof (nrc->blimit_off);
+          nrc->nparams = 4;
           nrc->pname = "getvt";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND 
vhash=$2 AND type=$3",
@@ -1105,11 +889,9 @@
         {
           nrc->paramValues[1] = (const char *) &nrc->btype;
           nrc->paramLengths[1] = sizeof (nrc->btype);
-          nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
-          nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
-          nrc->paramValues[3] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[3] = sizeof (nrc->blimit_off);
-          nrc->nparams = 4;
+          nrc->paramValues[2] = (const char *) &nrc->blimit_off;
+          nrc->paramLengths[2] = sizeof (nrc->blimit_off);
+          nrc->nparams = 3;
           nrc->pname = "gett";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND 
type=$2",
@@ -1126,11 +908,9 @@
         {
           nrc->paramValues[1] = (const char *) &nrc->vhash;
           nrc->paramLengths[1] = sizeof (nrc->vhash);
-          nrc->paramValues[2] = (const char *) &nrc->blast_rowid;
-          nrc->paramLengths[2] = sizeof (nrc->blast_rowid);
-          nrc->paramValues[3] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[3] = sizeof (nrc->blimit_off);
-          nrc->nparams = 4;
+          nrc->paramValues[2] = (const char *) &nrc->blimit_off;
+          nrc->paramLengths[2] = sizeof (nrc->blimit_off);
+          nrc->nparams = 3;
           nrc->pname = "getv";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1 AND 
vhash=$2",
@@ -1142,11 +922,9 @@
         }
       else
         {
-          nrc->paramValues[1] = (const char *) &nrc->blast_rowid;
-          nrc->paramLengths[1] = sizeof (nrc->blast_rowid);
-          nrc->paramValues[2] = (const char *) &nrc->blimit_off;
-          nrc->paramLengths[2] = sizeof (nrc->blimit_off);
-          nrc->nparams = 3;
+          nrc->paramValues[1] = (const char *) &nrc->blimit_off;
+          nrc->paramLengths[1] = sizeof (nrc->blimit_off);
+          nrc->nparams = 2;
           nrc->pname = "get";
           ret = PQexecParams (plugin->dbh,
                               "SELECT count(*) FROM gn090 WHERE hash=$1",
@@ -1200,6 +978,131 @@
 
 
 /**
+ * Select a subset of the items in the datastore and call
+ * the given iterator for each of them.
+ *
+ * @param cls our "struct Plugin*"
+ * @param type entries of which type should be considered?
+ *        Use 0 for any type.
+ * @param iter function to call on each matching value;
+ *        will be called once with a NULL value at the end
+ * @param iter_cls closure for iter
+ */
+static void
+postgres_plugin_iter_zero_anonymity (void *cls,
+                                    enum GNUNET_BLOCK_Type type,
+                                    PluginIterator iter,
+                                    void *iter_cls)
+{
+  struct Plugin *plugin = cls;
+  struct NextRequestClosure *nrc;
+
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->btype = htonl ((uint32_t) type);
+  nrc->plugin = plugin;
+  nrc->iter = iter;
+  nrc->iter_cls = iter_cls;
+  nrc->pname = "select_non_anonymous";
+  nrc->nparams = 1;
+  nrc->paramLengths[0] = sizeof (nrc->bcount);
+  nrc->paramValues[0] = (const char*) &nrc->bcount;
+  postgres_plugin_next_request (nrc,
+                               GNUNET_NO);
+}
+
+/**
+ * Context for 'repl_iter' function.
+ */
+struct ReplCtx
+{
+  
+  /**
+   * Plugin handle.
+   */
+  struct Plugin *plugin;
+  
+  /**
+   * Function to call for the result (or the NULL).
+   */
+  PluginIterator iter;
+  
+  /**
+   * Closure for iter.
+   */
+  void *iter_cls;
+};
+
+
+/**
+ * Wrapper for the iterator for 'sqlite_plugin_replication_get'.
+ * Decrements the replication counter and calls the original
+ * iterator.
+ *
+ * @param cls closure
+ * @param next_cls closure to pass to the "next" function.
+ * @param key key for the content
+ * @param size number of bytes in data
+ * @param data content stored
+ * @param type type of the content
+ * @param priority priority of the content
+ * @param anonymity anonymity-level for the content
+ * @param expiration expiration time for the content
+ * @param uid unique identifier for the datum;
+ *        maybe 0 if no unique identifier is available
+ *
+ * @return GNUNET_SYSERR to abort the iteration, GNUNET_OK to continue
+ *         (continue on call to "next", of course),
+ *         GNUNET_NO to delete the item and continue (if supported)
+ */
+static int
+repl_iter (void *cls,
+          void *next_cls,
+          const GNUNET_HashCode *key,
+          uint32_t size,
+          const void *data,
+          enum GNUNET_BLOCK_Type type,
+          uint32_t priority,
+          uint32_t anonymity,
+          struct GNUNET_TIME_Absolute expiration, 
+          uint64_t uid)
+{
+  struct ReplCtx *rc = cls;
+  struct Plugin *plugin = rc->plugin;
+  int ret;
+  PGresult *qret;
+  uint32_t boid;
+
+  ret = rc->iter (rc->iter_cls,
+                 next_cls, key,
+                 size, data, 
+                 type, priority, anonymity, expiration,
+                 uid);
+  if (NULL != key)
+    {
+      boid = htonl ( (uint32_t) uid);
+      const char *paramValues[] = {
+       (const char *) &boid,
+      };
+      int paramLengths[] = {
+       sizeof (boid),
+      };
+      const int paramFormats[] = { 1 };
+      qret = PQexecPrepared (plugin->dbh,
+                           "decrepl",
+                           1, paramValues, paramLengths, paramFormats, 1);
+      if (GNUNET_OK != check_result (plugin,
+                                    qret,
+                                    PGRES_COMMAND_OK,
+                                    "PQexecPrepared", 
+                                    "decrepl", __LINE__))
+       return GNUNET_SYSERR;
+      PQclear (qret);
+    }
+  return ret;
+}
+
+
+/**
  * Get a random item for replication.  Returns a single, not expired, random 
item
  * from those with the highest replication counters.  The item's 
  * replication counter is decremented by one IF it was positive before.
@@ -1213,9 +1116,21 @@
 postgres_plugin_replication_get (void *cls,
                                 PluginIterator iter, void *iter_cls)
 {
-  /* FIXME: not implemented! */
-  iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
-       GNUNET_TIME_UNIT_ZERO_ABS, 0);
+  struct Plugin *plugin = cls;
+  struct NextRequestClosure *nrc;
+  struct ReplCtx rc;
+
+  rc.plugin = plugin;
+  rc.iter = iter;
+  rc.iter_cls = iter_cls;
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->one_shot = GNUNET_YES;
+  nrc->plugin = plugin;
+  nrc->iter = &repl_iter;
+  nrc->iter_cls = &rc;
+  nrc->pname = "select_replication_order";
+  nrc->nparams = 0;
+  postgres_next_request_cont (nrc, NULL);
 }
 
 
@@ -1231,34 +1146,80 @@
 postgres_plugin_expiration_get (void *cls,
                                PluginIterator iter, void *iter_cls)
 {
-  /* FIXME: not implemented! */
-  iter (iter_cls, NULL, NULL, 0, NULL, 0, 0, 0, 
-       GNUNET_TIME_UNIT_ZERO_ABS, 0);
+  struct Plugin *plugin = cls;
+  struct NextRequestClosure *nrc;
+  uint64_t btime;
+  
+  btime = GNUNET_htonll (GNUNET_TIME_absolute_get ().abs_value);
+  nrc = GNUNET_malloc (sizeof (struct NextRequestClosure));
+  nrc->one_shot = GNUNET_YES;
+  nrc->plugin = plugin;
+  nrc->iter = iter;
+  nrc->iter_cls = iter_cls;
+  nrc->pname = "select_expiration_order";
+  nrc->nparams = 1;
+  nrc->paramValues[0] = (const char *) &btime;
+  nrc->paramLengths[0] = sizeof (btime);
+  postgres_next_request_cont (nrc, NULL);
 }
 
 
 /**
- * Select a subset of the items in the datastore and call
- * the given iterator for each of them.
+ * Update the priority for a particular key in the datastore.  If
+ * the expiration time in value is different than the time found in
+ * the datastore, the higher value should be kept.  For the
+ * anonymity level, the lower value is to be used.  The specified
+ * priority should be added to the existing priority, ignoring the
+ * priority in value.
  *
+ * Note that it is possible for multiple values to match this put.
+ * In that case, all of the respective values are updated.
+ *
  * @param cls our "struct Plugin*"
- * @param type entries of which type should be considered?
- *        Use 0 for any type.
- * @param iter function to call on each matching value;
- *        will be called once with a NULL value at the end
- * @param iter_cls closure for iter
+ * @param uid unique identifier of the datum
+ * @param delta by how much should the priority
+ *     change?  If priority + delta < 0 the
+ *     priority should be set to 0 (never go
+ *     negative).
+ * @param expire new expiration time should be the
+ *     MAX of any existing expiration time and
+ *     this value
+ * @param msg set to error message
+ * @return GNUNET_OK on success
  */
-static void
-postgres_plugin_iter_zero_anonymity (void *cls,
-                                    enum GNUNET_BLOCK_Type type,
-                                    PluginIterator iter,
-                                    void *iter_cls)
+static int
+postgres_plugin_update (void *cls,
+                       uint64_t uid,
+                       int delta, struct GNUNET_TIME_Absolute expire,
+                       char **msg)
 {
   struct Plugin *plugin = cls;
+  PGresult *ret;
+  int32_t bdelta = (int32_t) htonl ((uint32_t) delta);
+  uint32_t boid = htonl ( (uint32_t) uid);
+  uint64_t bexpire = GNUNET_TIME_absolute_hton (expire).abs_value__;
+  const char *paramValues[] = {
+    (const char *) &bdelta,
+    (const char *) &bexpire,
+    (const char *) &boid,
+  };
+  int paramLengths[] = {
+    sizeof (bdelta),
+    sizeof (bexpire),
+    sizeof (boid),
+  };
+  const int paramFormats[] = { 1, 1, 1 };
 
-  postgres_iterate (plugin, 
-                   type, GNUNET_NO, 1,
-                   iter, iter_cls);
+  ret = PQexecPrepared (plugin->dbh,
+                        "update",
+                        3, paramValues, paramLengths, paramFormats, 1);
+  if (GNUNET_OK != check_result (plugin,
+                                ret,
+                                 PGRES_COMMAND_OK,
+                                 "PQexecPrepared", "update", __LINE__))
+    return GNUNET_SYSERR;
+  PQclear (ret);
+  return GNUNET_OK;
 }
 
 




reply via email to

[Prev in Thread] Current Thread [Next in Thread]