[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[libeufin] 05/08: background jobs
From: |
gnunet |
Subject: |
[libeufin] 05/08: background jobs |
Date: |
Wed, 08 Feb 2023 14:32:21 +0100 |
This is an automated email from the git hooks/post-receive script.
ms pushed a commit to branch master
in repository libeufin.
commit c696a3748b72eb5eb83f576885b3ea28d9c814bc
Author: MS <ms@taler.net>
AuthorDate: Wed Feb 8 14:15:19 2023 +0100
background jobs
fix invocation of root coroutine.
---
nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 10 ++-
.../main/kotlin/tech/libeufin/nexus/Scheduling.kt | 88 ++++++++++------------
2 files changed, 47 insertions(+), 51 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index 6795f8e0..f09a152b 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -30,6 +30,10 @@ import com.github.ajalt.clikt.parameters.types.int
import execThrowableOrTerminate
import com.github.ajalt.clikt.core.*
import com.github.ajalt.clikt.parameters.options.*
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.launch
+import kotlinx.coroutines.newSingleThreadContext
import startServer
import tech.libeufin.nexus.iso20022.parseCamtMessage
import tech.libeufin.nexus.server.client
@@ -70,10 +74,8 @@ class Serve : CliktCommand("Run nexus HTTP server") {
private val logLevel by option()
override fun run() {
setLogLevel(logLevel)
- execThrowableOrTerminate {
- dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME))
- }
- startOperationScheduler(client)
+ execThrowableOrTerminate {
dbCreateTables(getDbConnFromEnv(NEXUS_DB_ENV_VAR_NAME)) }
+ CoroutineScope(Dispatchers.IO).launch(fallback) {
startOperationScheduler(client) }
if (withUnixSocket != null) {
startServer(
withUnixSocket!!,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
index d4420db6..86c86a37 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Scheduling.kt
@@ -24,9 +24,8 @@ import com.cronutils.model.time.ExecutionTime
import com.cronutils.parser.CronParser
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.HttpClient
-import kotlinx.coroutines.CoroutineExceptionHandler
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.*
+import kotlinx.coroutines.GlobalScope.coroutineContext
import kotlinx.coroutines.time.delay
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.nexus.bankaccount.fetchBankAccountTransactions
@@ -36,6 +35,7 @@ import java.lang.IllegalArgumentException
import java.time.Duration
import java.time.Instant
import java.time.ZonedDateTime
+import kotlin.coroutines.coroutineContext
import kotlin.system.exitProcess
private data class TaskSchedule(
@@ -96,59 +96,53 @@ object NexusCron {
CronParser(cronDefinition)
}
}
-/**
- * Fails whenever a unmanaged Throwable reaches the root coroutine.
- */
+
+// Fails whenever a unmanaged Throwable reaches the root coroutine.
val fallback = CoroutineExceptionHandler { _, err ->
logger.error(err.stackTraceToString())
exitProcess(1)
}
-fun startOperationScheduler(httpClient: HttpClient) {
- GlobalScope.launch(fallback) {
- while (true) {
- // First, assign next execution time stamps to all tasks that need
them
- transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
- }.forEach {
- val cron = try {
- NexusCron.parser.parse(it.taskCronspec)
- } catch (e: IllegalArgumentException) {
- logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
- return@forEach
- }
- val zonedNow = ZonedDateTime.now()
- val et = ExecutionTime.forCron(cron)
- val next = et.nextExecution(zonedNow)
- logger.info("scheduling task ${it.taskName} at $next (now
is $zonedNow)")
- it.nextScheduledExecutionSec = next.get().toEpochSecond()
+suspend fun startOperationScheduler(httpClient: HttpClient) {
+ while (true) {
+ // First, assign next execution time stamps to all tasks that need them
+ transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec.isNull()
+ }.forEach {
+ val cron = try {
+ NexusCron.parser.parse(it.taskCronspec)
+ } catch (e: IllegalArgumentException) {
+ logger.error("invalid cronspec in schedule
${it.resourceType}/${it.resourceId}/${it.taskName}")
+ return@forEach
}
+ val zonedNow = ZonedDateTime.now()
+ val et = ExecutionTime.forCron(cron)
+ val next = et.nextExecution(zonedNow)
+ logger.info("scheduling task ${it.taskName} at $next (now is
$zonedNow)")
+ it.nextScheduledExecutionSec = next.get().toEpochSecond()
}
-
- val nowSec = Instant.now().epochSecond
- // Second, find tasks that are due
- val dueTasks = transaction {
- NexusScheduledTaskEntity.find {
- NexusScheduledTasksTable.nextScheduledExecutionSec lessEq
nowSec
- }.map {
- TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
- }
+ }
+ val nowSec = Instant.now().epochSecond
+ // Second, find tasks that are due
+ val dueTasks = transaction {
+ NexusScheduledTaskEntity.find {
+ NexusScheduledTasksTable.nextScheduledExecutionSec lessEq
nowSec
+ }.map {
+ TaskSchedule(it.id.value, it.taskName, it.taskType,
it.resourceType, it.resourceId, it.taskParams)
}
- // Execute those due tasks
- dueTasks.forEach {
- runTask(httpClient, it)
- transaction {
- val t = NexusScheduledTaskEntity.findById(it.taskId)
- if (t != null) {
- // Reset next scheduled execution
- t.nextScheduledExecutionSec = null
- t.prevScheduledExecutionSec = nowSec
- }
+ } // Execute those due tasks
+ dueTasks.forEach {
+ runTask(httpClient, it)
+ transaction {
+ val t = NexusScheduledTaskEntity.findById(it.taskId)
+ if (t != null) {
+ // Reset next scheduled execution
+ t.nextScheduledExecutionSec = null
+ t.prevScheduledExecutionSec = nowSec
}
}
-
- // Wait a bit
- delay(Duration.ofSeconds(1))
}
+ // Wait a bit
+ delay(Duration.ofSeconds(1))
}
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
- [libeufin] branch master updated (64c2d251 -> 6726bb63), gnunet, 2023/02/08
- [libeufin] 05/08: background jobs,
gnunet <=
- [libeufin] 02/08: EBICS subscriber creation., gnunet, 2023/02/08
- [libeufin] 01/08: help message, gnunet, 2023/02/08
- [libeufin] 03/08: comments, gnunet, 2023/02/08
- [libeufin] 04/08: Tests environment., gnunet, 2023/02/08
- [libeufin] 06/08: implementing #7521, gnunet, 2023/02/08
- [libeufin] 07/08: tests: needed to specify a correct EBICS host ID., gnunet, 2023/02/08
- [libeufin] 08/08: testing the implementation of #7521, gnunet, 2023/02/08