[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Guile-commits] 22/23: add examples/ethreads/memcached-{client, server}
From: |
Andy Wingo |
Subject: |
[Guile-commits] 22/23: add examples/ethreads/memcached-{client, server} |
Date: |
Thu, 24 Mar 2016 14:26:04 +0000 |
wingo pushed a commit to branch wip-ethreads
in repository guile.
commit 5cb29b08504ba07c18002d3dab9302505e4593a2
Author: Andy Wingo <address@hidden>
Date: Fri Mar 30 19:36:24 2012 +0200
add examples/ethreads/memcached-{client,server}
* examples/ethreads/memcached-client.scm:
* examples/ethreads/memcached-server.scm: Two new ethreads examples.
Not really optimized, no consideration for memory use, but perhaps
instructive.
---
examples/ethreads/memcached-client.scm | 156 ++++++++++++++++++++++++++++++++
examples/ethreads/memcached-server.scm | 156 ++++++++++++++++++++++++++++++++
2 files changed, 312 insertions(+), 0 deletions(-)
diff --git a/examples/ethreads/memcached-client.scm
b/examples/ethreads/memcached-client.scm
new file mode 100644
index 0000000..bf9752b
--- /dev/null
+++ b/examples/ethreads/memcached-client.scm
@@ -0,0 +1,156 @@
+;;; Simple memcached client implementation
+
+;; Copyright (C) 2012 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this library; if not, write to the Free Software
+;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+;; 02110-1301 USA
+
+(use-modules (rnrs bytevectors)
+ (ice-9 eports)
+ (ice-9 ethreads)
+ (ice-9 match))
+
+(define (server-error eport msg . args)
+ (apply format (current-error-port) msg args)
+ (newline (current-error-port))
+ (close-eport eport)
+ (suspend))
+
+(define (read-line eport)
+ (define (end-of-line? c)
+ (or (eqv? c #\newline) (eqv? c #\return)))
+ (call-with-values
+ (lambda ()
+ ;; Restrict to 512 chars to avoid denial of service attacks.
+ (get-latin1-string-delimited eport end-of-line? #:max-chars 512))
+ (lambda (str delim)
+ (cond
+ ((not delim)
+ (server-error eport "Line too long: ~S" str))
+ ((eof-object? delim)
+ (server-error eport "EOF while reading line: ~S" str))
+ (else
+ (when (and (eqv? delim #\return)
+ (eqv? (lookahead-latin1-char eport) #\newline))
+ (get-latin1-char eport))
+ str)))))
+
+(define (parse-int eport val)
+ (let ((num (string->number val)))
+ (unless (and num (integer? num) (exact? num) (>= num 0))
+ (server-error eport "Expected a non-negative integer: ~s" val))
+ num))
+
+(define (make-item flags bv)
+ (vector flags bv))
+(define (item-flags item)
+ (vector-ref item 0))
+(define (item-bv item)
+ (vector-ref item 1))
+
+(define (get eport . keys)
+ (put-utf8-string eport "get ")
+ (put-utf8-string eport (string-join keys " "))
+ (put-utf8-string eport "\r\n")
+ (drain-output eport)
+ (let lp ((vals '()))
+ (let ((line (read-line eport)))
+ (match (string-split line #\space)
+ (("VALUE" key flags length)
+ (let* ((flags (parse-int eport flags))
+ (length (parse-int eport length)))
+ (unless (member key keys)
+ (server-error eport "Unknown key: ~a" key))
+ (when (assoc key vals)
+ (server-error eport "Already have response for key: ~a" key))
+ (let ((bv (get-bytevector-n eport length)))
+ (unless (= (bytevector-length bv) length)
+ (server-error eport "Expected ~A bytes, got ~A" length bv))
+ (unless (eqv? (get-latin1-char eport) #\return)
+ (server-error eport "Expected \\r"))
+ (unless (eqv? (get-latin1-char eport) #\newline)
+ (server-error eport "Expected \\n"))
+ (lp (acons key (make-item flags bv) vals)))))
+ (("END")
+ (reverse vals))
+ (_
+ (server-error eport "Bad line: ~A" line))))))
+
+(define* (set eport key flags exptime bytes #:key noreply?)
+ (put-utf8-string eport "set ")
+ (put-utf8-string eport key)
+ (put-utf8-char eport #\space)
+ (put-utf8-string eport (number->string flags))
+ (put-utf8-char eport #\space)
+ (put-utf8-string eport (number->string exptime))
+ (put-utf8-char eport #\space)
+ (put-utf8-string eport (number->string (bytevector-length bytes)))
+ (when noreply?
+ (put-utf8-string eport " noreply"))
+ (put-utf8-string eport "\r\n")
+ (put-bytevector eport bytes)
+ (put-utf8-string eport "\r\n")
+ (drain-output eport)
+ (let ((line (read-line eport)))
+ (match line
+ ("STORED" #t)
+ ("NOT_STORED" #t)
+ (_
+ (server-error eport "Unexpected response from server: ~A" line)))))
+
+(define (connect addrinfo)
+ (let ((eport (file-port->eport (socket (addrinfo:fam addrinfo)
+ (addrinfo:socktype addrinfo)
+ (addrinfo:protocol addrinfo)))))
+ ;; Disable Nagle's algorithm. We buffer ourselves.
+ (setsockopt (eport-fd eport) IPPROTO_TCP TCP_NODELAY 0)
+ (connect-eport eport (addrinfo:addr addrinfo))
+ eport))
+
+(define *active-clients* 0)
+
+(define (client-loop addrinfo n num-connections)
+ (set! *active-clients* (1+ *active-clients*))
+ (let ((eport (connect addrinfo))
+ (key (string-append "test-" (number->string n))))
+ (let lp ((m 0))
+ (when (< m num-connections)
+ (let ((v (string->utf8 (number->string m))))
+ (set eport key 0 0 v)
+ (let* ((response (get eport key))
+ (item (assoc-ref response key)))
+ (unless item
+ (server-error eport "Not found: ~A" key))
+ (unless (equal? (item-bv item) v)
+ (server-error eport "Bad response: ~A (expected ~A)" (item-bv
item) v))
+ (lp (1+ m))))))
+ (close-eport eport))
+ (set! *active-clients* (1- *active-clients*))
+ (when (zero? *active-clients*)
+ (exit 0)))
+
+(define (run-memcached-test num-clients num-connections)
+ ;; The getaddrinfo call blocks, unfortunately. Call it once before
+ ;; spawning clients.
+ (let ((addrinfo (car (getaddrinfo "localhost" (number->string 11211)))))
+ (let lp ((n 0))
+ (when (< n num-clients)
+ (spawn
+ (lambda ()
+ (client-loop addrinfo n num-connections)))
+ (lp (1+ n)))))
+ (run))
+
+(apply run-memcached-test (map string->number (cdr (program-arguments))))
diff --git a/examples/ethreads/memcached-server.scm
b/examples/ethreads/memcached-server.scm
new file mode 100644
index 0000000..a5e28a8
--- /dev/null
+++ b/examples/ethreads/memcached-server.scm
@@ -0,0 +1,156 @@
+;;; Simple memcached server implementation
+
+;; Copyright (C) 2012 Free Software Foundation, Inc.
+
+;; This library is free software; you can redistribute it and/or
+;; modify it under the terms of the GNU Lesser General Public
+;; License as published by the Free Software Foundation; either
+;; version 3 of the License, or (at your option) any later version.
+;;
+;; This library is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this library; if not, write to the Free Software
+;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+;; 02110-1301 USA
+
+(use-modules (rnrs bytevectors)
+ (ice-9 eports)
+ (ice-9 ethreads)
+ (ice-9 match))
+
+(define (make-default-socket family addr port)
+ (let ((sock (socket PF_INET SOCK_STREAM 0)))
+ (setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
+ (fcntl sock F_SETFD FD_CLOEXEC)
+ (bind sock family addr port)
+ sock))
+
+(define (client-error eport msg . args)
+ (put-utf8-string eport (apply format #f msg args))
+ (put-utf8-string eport "\r\n")
+ (drain-output eport)
+ (close-eport eport)
+ (suspend))
+
+(define (read-line eport)
+ (define (end-of-line? c)
+ (or (eqv? c #\newline) (eqv? c #\return)))
+ (call-with-values
+ (lambda ()
+ ;; Restrict to 512 chars to avoid denial of service attacks.
+ (get-latin1-string-delimited eport end-of-line? #:max-chars 512))
+ (lambda (str delim)
+ (cond
+ ((not delim)
+ (client-error eport "Line too long: ~S" str))
+ ((eof-object? delim)
+ (client-error eport "EOF while reading line: ~S" str))
+ (else
+ (when (and (eqv? delim #\return)
+ (eqv? (lookahead-latin1-char eport) #\newline))
+ (get-latin1-char eport))
+ str)))))
+
+(define (parse-int eport val)
+ (let ((num (string->number val)))
+ (unless (and num (integer? num) (exact? num) (>= num 0))
+ (client-error eport "Expected a non-negative integer: ~s" val))
+ num))
+
+(define (make-item flags bv)
+ (vector flags bv))
+(define (item-flags item)
+ (vector-ref item 0))
+(define (item-bv item)
+ (vector-ref item 1))
+
+(define *commands* (make-hash-table))
+
+(define-syntax-rule (define-command (name eport store . pat)
+ body body* ...)
+ (begin
+ (define (name eport store args)
+ (match args
+ (pat body body* ...)
+ (else
+ (client-error eport "Bad line: ~A ~S" 'name (string-join args " ")))))
+ (hashq-set! *commands* 'name name)))
+
+(define-command (get eport store key* ...)
+ (let lp ((key* key*))
+ (match key*
+ ((key key* ...)
+ (let ((item (hash-ref store key)))
+ (when item
+ (put-utf8-string eport "VALUE ")
+ (put-utf8-string eport key)
+ (put-utf8-char eport #\space)
+ (put-utf8-string eport (number->string (item-flags item)))
+ (put-utf8-char eport #\space)
+ (put-utf8-string eport (number->string
+ (bytevector-length (item-bv item))))
+ (put-utf8-char eport #\return)
+ (put-utf8-char eport #\newline)
+ (put-bytevector eport (item-bv item))
+ (put-utf8-string eport "\r\n"))
+ (lp key*)))
+ (()
+ (put-utf8-string eport "END\r\n")))))
+
+(define-command (set eport store key flags exptime bytes
+ . (and noreply (or ("noreply") ())))
+ (let* ((flags (parse-int eport flags))
+ (exptime (parse-int eport exptime))
+ (bytes (parse-int eport bytes)))
+ (let ((bv (get-bytevector-n eport bytes)))
+ (unless (= (bytevector-length bv) bytes)
+ (client-error eport "Tried to read ~A bytes, only read ~A"
+ bytes (bytevector-length bv)))
+ (hash-set! store key (make-item flags bv))
+ (when (eqv? (lookahead-latin1-char eport) #\return)
+ (get-latin1-char eport))
+ (when (eqv? (lookahead-latin1-char eport) #\newline)
+ (get-latin1-char eport)))
+ (put-utf8-string eport "STORED\r\n")))
+
+(define (client-loop eport store)
+ (let loop ()
+ (let* ((args (string-split (read-line eport) #\space))
+ (verb (string->symbol (car args)))
+ (proc (hashq-ref *commands* verb)))
+ (unless proc
+ (client-error eport "Bad command: ~a" verb))
+ (proc eport store (cdr args)))
+ (drain-output eport)
+ (if (eof-object? (lookahead-u8 eport))
+ (close-eport eport)
+ (loop))))
+
+(define (socket-loop esocket store)
+ (let loop ()
+ (let ((client (accept-eport esocket)))
+ ;; Disable Nagle's algorithm. We buffer ourselves.
+ (setsockopt (eport-fd client) IPPROTO_TCP TCP_NODELAY 0)
+ (spawn (lambda () (client-loop client store)))
+ (loop))))
+
+(define* (run-memcached #:key
+ (host #f)
+ (family AF_INET)
+ (addr (if host
+ (inet-pton family host)
+ INADDR_LOOPBACK))
+ (port 11211)
+ (socket (make-default-socket family addr port)))
+ (listen socket 128)
+ (sigaction SIGPIPE SIG_IGN)
+ (spawn
+ (lambda ()
+ (socket-loop (file-port->eport socket) (make-hash-table))))
+ (run))
+
+(run-memcached)
- [Guile-commits] 15/23: (web server ethreads): more use of latin1 accessors, (continued)
- [Guile-commits] 15/23: (web server ethreads): more use of latin1 accessors, Andy Wingo, 2016/03/24
- [Guile-commits] 01/23: add (ice-9 nio), Andy Wingo, 2016/03/24
- [Guile-commits] 20/23: eports: nonblocking connect-eport, Andy Wingo, 2016/03/24
- [Guile-commits] 14/23: refactoring to (web server ethreads) read-http-line, Andy Wingo, 2016/03/24
- [Guile-commits] 02/23: add (ice-9 eports), Andy Wingo, 2016/03/24
- [Guile-commits] 17/23: getsockopt: allow raw file descriptors, Andy Wingo, 2016/03/24
- [Guile-commits] 16/23: eports: add put-utf8-char, put-utf8-string, Andy Wingo, 2016/03/24
- [Guile-commits] 03/23: add (ice-9 epoll), Andy Wingo, 2016/03/24
- [Guile-commits] 21/23: eports tweak, Andy Wingo, 2016/03/24
- [Guile-commits] 07/23: add (web server ethreads), Andy Wingo, 2016/03/24
- [Guile-commits] 22/23: add examples/ethreads/memcached-{client, server},
Andy Wingo <=