(declare (unit pthreads) (uses srfi-18 srfi-34 srfi-35) (uses atomic) ; init thread extnsns (uses library) ; gc (fixnum-arithmetic) (disable-interrupts) (usual-integrations) (no-bound-checks) (no-procedure-checks-for-usual-bindings) (bound-to-procedure ) (foreign-declare #< #endif #include #include #ifndef NO_THREAD_LOOP #define THREAD_POOL_SIZE 5 #include #include #include typedef int (*askemos_request_function_t)(void *); typedef struct _askemos_pool_entry { askemos_request_function_t function; void *data; void *callback; } askemos_pool_entry_t; struct askemos_pool { pthread_mutex_t mutex; pthread_cond_t has_job; pthread_cond_t has_space; unsigned short int total, next, free; askemos_pool_entry_t *r; }; static void * worker_thread_loop(void *arg); static void askemos_pool_entry_init(struct askemos_pool * pool, askemos_pool_entry_t * r); static int askemos_pool_init(struct askemos_pool * pool) { int i; pool->next = 0; pool->total = pool->free = 50; pthread_mutex_init(&pool->mutex, NULL); pthread_cond_init(&pool->has_job, NULL); pthread_cond_init(&pool->has_space, NULL); pool->r = malloc(sizeof(askemos_pool_entry_t) * pool->total); for (i = 0; i < pool->total; ++i) { pool->r[i].function = NULL; pool->r[i].data = NULL; pool->r[i].callback = NULL; } for (i = 0; i < THREAD_POOL_SIZE; ++i) { int e; pthread_t thread; pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, /* PTHREAD_CREATE_DETACHED */ PTHREAD_CREATE_JOINABLE); e = pthread_create(&thread, &attr, worker_thread_loop, pool); pthread_attr_destroy(&attr); } return 0; } static int askemos_pool_send(struct askemos_pool * pool, askemos_request_function_t function, void *data, void *callback) { askemos_pool_entry_t *result = NULL; pthread_mutex_lock(&pool->mutex); do { if (pool->free) { result = &pool->r[pool->next]; pool->next = (pool->next + 1) % pool->total; --pool->free; } else { pthread_mutex_unlock(&pool->mutex); return 1; /* fprintf(stderr, "DANGER: chicken waiting on thread pool space\n"); pthread_cond_wait(&pool->has_space, &pool->mutex); */ } } while( result == NULL ); result->function = function; result->data = data; result->callback = callback; pthread_mutex_unlock(&pool->mutex); pthread_cond_signal(&pool->has_job); return 0; } /* askemos_pool_put returns an entry into the queue (LI) and returns * the result to rscheme. The latter is questionable, but we avoid to * take yet another lock around the interpreter callback. */ static void askemos_pool_receive(struct askemos_pool * pool, askemos_request_function_t *function, void **data, void **callback) { askemos_pool_entry_t *result = NULL; pthread_mutex_lock(&pool->mutex); do { if (pool->free != pool->total) { unsigned short int target = (pool->next + pool->free) % pool->total; result = &pool->r[target]; ++pool->free; *function = result->function; *data = result->data; *callback = result->callback; } else { pthread_cond_wait(&pool->has_job, &pool->mutex); } } while( result == NULL ); pthread_mutex_unlock(&pool->mutex); pthread_cond_signal(&pool->has_space); } static struct askemos_pool *request_pool = NULL; int start_asynchronous_request(askemos_request_function_t function, void *data, void *callback) { if( request_pool == NULL ) { fprintf(stderr, "thread pool not initialised\n"); exit(1); } return askemos_pool_send(request_pool, function, data, callback); } static pthread_mutex_t callback_mutex; #ifdef ___CHICKEN static int the_interrupt_pipe[2] = {0, 0}; static void *the_callback = NULL; static void *the_callback_result = NULL; static C_word the_result = C_SCHEME_FALSE; static void *integer_result = NULL; static int C_notify_external_interrupt() { static char buf[1] = { (char) 254 }; if(write(the_interrupt_pipe[1], buf, 1)) ; } #endif static void * worker_thread_loop(void *arg) { struct askemos_pool *pool = arg; askemos_request_function_t function; void *data; void *callback; int result; // pthread_cleanup_push(worker_thread_unlock, ressources); while (1) { askemos_pool_receive(request_pool, &function, &data, &callback); result = (*function)(data); pthread_mutex_lock(&callback_mutex); the_result = result; the_callback = callback; the_callback_result = integer_result; /* CHICKEN_interrupt(1); */ C_notify_external_interrupt(); } // pthread_cleanup_pop(1); return NULL; } #ifdef ___CHICKEN void C_interrupt_call(void *callback, void *result, void* value) { pthread_mutex_lock(&callback_mutex); the_result = (C_word) value; the_callback_result = result; the_callback = callback; /* CHICKEN_interrupt(1); */ C_notify_external_interrupt(); } #endif void askemos_pre_init(void *intres) { pthread_mutex_init(&callback_mutex, NULL); request_pool = malloc(sizeof(struct askemos_pool)); askemos_pool_init(request_pool); integer_result=intres; #ifdef ___CHICKEN if( pipe(the_interrupt_pipe) == -1 ) fprintf(stderr, "Failed to open interrupt pipe\n"); #endif } #else /* NO_THREAD_LOOP */ typedef int (*askemos_request_function_t)(void *); int start_asynchronous_request(askemos_request_function_t function, void *data, void *callback){} void askemos_pre_init(void *intres) { fprintf(stderr, "thread pool not initialised\n"); } #endif /* int test_askemos_thread_sleep(void *data) { int time = (int) data; sleep(time); return time; } */ EOF ) ) (module pthreads ( external-wait pthread-pool-load ) (import scheme (except chicken with-exception-handler condition?) foreign lolevel extras (except srfi-18 raise with-exception-handler) srfi-34 srfi-35 atomic) (define pthread-pool-load (make-semaphore 'pthread-pool-load 40)) (define-foreign-variable interrupt-callback c-pointer "the_callback") (define callback-result (foreign-lambda* int ((c-pointer result)) "return(* (int *) result);")) (define callback-result-root (make-gc-root callback-result)) (define (make-gc-root obj) ((foreign-lambda* c-pointer ((scheme-object obj)) "C_GC_ROOT *r=CHICKEN_new_gc_root();" "CHICKEN_gc_root_set(r, obj);" "return(r);") obj)) ((foreign-lambda* void ((c-pointer f)) "askemos_pre_init(f);") callback-result-root) (define (handle-callback) (if interrupt-callback (let ((cb ((foreign-lambda* scheme-object () "return(CHICKEN_gc_root_ref(the_callback));"))) (rc (((foreign-lambda* scheme-object () "return(CHICKEN_gc_root_ref(the_callback_result));")) ((foreign-lambda* c-pointer () "return(&the_result);"))))) (set! interrupt-callback #f) ((foreign-lambda* void () ;; "CHICKEN_interrupt(0);" "pthread_mutex_unlock(&callback_mutex);")) (thread-start! (make-thread (lambda () (cb rc)) 'handle-callback))) ((foreign-lambda* void () "fprintf(stderr, \"Ignored callback -- does this mess up things?\\n\");"))) ) (define external-wait (thread-start! (make-thread (lambda () (let ((fd ((foreign-lambda* int () "return(the_interrupt_pipe[0]);")))) (do () (#f) (thread-wait-for-i/o! fd) ((foreign-lambda* void () "static int buf[1]; int r = read(the_interrupt_pipe[0], buf, 1);")) (handle-callback)))) "external-wait"))) ) ;; module pthreads