[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[taler-cashless2ecash] branch master updated: fix: transfer
From: |
gnunet |
Subject: |
[taler-cashless2ecash] branch master updated: fix: transfer |
Date: |
Sat, 18 May 2024 09:25:29 +0200 |
This is an automated email from the git hooks/post-receive script.
joel-haeberli pushed a commit to branch master
in repository cashless2ecash.
The following commit(s) were added to refs/heads/master by this push:
new 5bef918 fix: transfer
5bef918 is described below
commit 5bef9183bf0fe7fd329e934b9e73b9c758ac8098
Author: Joel-Haeberli <haebu@rubigen.ch>
AuthorDate: Sat May 18 09:25:15 2024 +0200
fix: transfer
---
c2ec/api-wire-gateway.go | 2 +-
c2ec/c2ec-config.yaml | 2 +-
c2ec/db-postgres.go | 33 ++++++++++++++++++++++++++++++-
c2ec/db/test_c2ec_simulation.sql | 19 ------------------
c2ec/db/test_c2ec_simulation_rollback.sql | 20 -------------------
c2ec/exponential-backoff.go | 2 +-
c2ec/logger.go | 4 ++++
c2ec/main.go | 4 ++--
c2ec/proc-attestor.go | 24 +++++++++++-----------
c2ec/proc-retrier.go | 19 +++++++++++-------
c2ec/proc-transfer.go | 15 +++++++-------
c2ec/wallee-client.go | 4 ++++
12 files changed, 76 insertions(+), 72 deletions(-)
diff --git a/c2ec/api-wire-gateway.go b/c2ec/api-wire-gateway.go
index 4bb6abb..1f250a3 100644
--- a/c2ec/api-wire-gateway.go
+++ b/c2ec/api-wire-gateway.go
@@ -3,7 +3,7 @@ package main
import (
"errors"
"log"
- http "net/http"
+ "net/http"
"strconv"
"time"
)
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
index 1e98218..dc9e79e 100644
--- a/c2ec/c2ec-config.yaml
+++ b/c2ec/c2ec-config.yaml
@@ -8,7 +8,7 @@ c2ec:
fail-on-missing-attestors: false # forced if prod=true
credit-account: "payto://IBAN/CH50030202099498" # this account must be
specified at the providers backends as well
currency: "CHF"
- max-retries: 3
+ max-retries: 100
retry-delay-ms: 1000
wire-gateway:
username: "wire"
diff --git a/c2ec/db-postgres.go b/c2ec/db-postgres.go
index 384022d..a92df66 100644
--- a/c2ec/db-postgres.go
+++ b/c2ec/db-postgres.go
@@ -140,6 +140,9 @@ const PS_GET_TRANSFERS_DESC_MAX = "SELECT * FROM " +
TRANSFER_TABLE_NAME +
" OFFSET ((SELECT COUNT(*) FROM " + TRANSFER_TABLE_NAME +
" WHERE " + TRANSFER_FIELD_NAME_STATUS + "=0)-1)" // TODO Timestamp
based offset (-time since request)
+const PS_GET_TRANSFERS_BY_STATUS = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+ " WHERE " + TRANSFER_FIELD_NAME_STATUS + "=$1"
+
// Postgres implementation of the C2ECDatabase
type C2ECPostgres struct {
C2ECDatabase
@@ -686,7 +689,6 @@ func (db *C2ECPostgres) GetTransferById(requestUid []byte)
(*Transfer, error) {
LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
return transfer, nil
}
-
}
func (db *C2ECPostgres) AddTransfer(
@@ -808,6 +810,35 @@ func (db *C2ECPostgres) GetTransfers(start int, delta int)
([]*Transfer, error)
}
}
+func (db *C2ECPostgres) GetTransfersByState(status int) ([]*Transfer, error) {
+
+ if rows, err := db.pool.Query(
+ db.ctx,
+ PS_GET_TRANSFERS_BY_STATUS,
+ status,
+ ); err != nil {
+ LogError("postgres", err)
+ if rows != nil {
+ rows.Close()
+ }
+ return nil, err
+ } else {
+
+ defer rows.Close()
+
+ transfers, err := pgx.CollectRows(rows,
pgx.RowToAddrOfStructByName[Transfer])
+ if err != nil {
+ LogError("postgres", err)
+ return nil, err
+ }
+
+ // this will fill up the logs...
+ // LogInfo("postgres", "query="+PS_GET_TRANSFERS_BY_STATUS)
+ // LogInfo("postgres", "size of transfer
list="+strconv.Itoa(len(transfers)))
+ return removeNulls(transfers), nil
+ }
+}
+
// Sets up a a listener for the given channel.
// Notifications will be sent through the out channel.
func (db *C2ECPostgres) NewListener(
diff --git a/c2ec/db/test_c2ec_simulation.sql b/c2ec/db/test_c2ec_simulation.sql
deleted file mode 100644
index 4492798..0000000
--- a/c2ec/db/test_c2ec_simulation.sql
+++ /dev/null
@@ -1,19 +0,0 @@
-BEGIN;
-
-SET search_path TO c2ec;
-
-DROP TABLE IF EXISTS p_id;
-
-INSERT INTO provider (name, payto_target_type, backend_base_url,
backend_credentials)
- VALUES ('Simulation', 'void', 'will be simulated', 'no creds');
-
-SELECT provider_id INTO p_id FROM provider WHERE name = 'Simulation';
-
-INSERT INTO terminal (access_token, description, provider_id)
- VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 'this is a simulated
terminal', (SELECT * FROM p_id));
-
-DROP TABLE IF EXISTS p_id;
-
-COMMIT;
-
-SELECT * FROM provider;
diff --git a/c2ec/db/test_c2ec_simulation_rollback.sql
b/c2ec/db/test_c2ec_simulation_rollback.sql
deleted file mode 100644
index ab31112..0000000
--- a/c2ec/db/test_c2ec_simulation_rollback.sql
+++ /dev/null
@@ -1,20 +0,0 @@
-BEGIN;
-
-SET search_path TO c2ec;
-
-DROP TABLE IF EXISTS p_r_id;
-DROP TABLE IF EXISTS t_r_id;
-
-SELECT provider_id INTO p_r_id FROM provider WHERE name = 'Simulation';
-SELECT terminal_id INTO t_r_id FROM terminal WHERE provider_id = (SELECT *
FROM p_r_id);
-
-DELETE FROM withdrawal WHERE terminal_id = (SELECT * FROM t_r_id);
-DELETE FROM terminal WHERE provider_id = (SELECT * FROM p_r_id);
-DELETE FROM provider WHERE provider_id = (SELECT * FROM p_r_id);
-
-DROP TABLE IF EXISTS p_r_id;
-DROP TABLE IF EXISTS t_r_id;
-
-COMMIT;
-
-SELECT * FROM provider;
\ No newline at end of file
diff --git a/c2ec/exponential-backoff.go b/c2ec/exponential-backoff.go
index b503ee6..074fb7a 100644
--- a/c2ec/exponential-backoff.go
+++ b/c2ec/exponential-backoff.go
@@ -61,7 +61,7 @@ func randomizeBackoff(backoff int64) int64 {
if subtracted < 0 {
return 0
}
- return backoff - randomizedThreshold
+ return subtracted
}
return backoff + randomizedThreshold
}
diff --git a/c2ec/logger.go b/c2ec/logger.go
index 89dd7e8..86b5d85 100644
--- a/c2ec/logger.go
+++ b/c2ec/logger.go
@@ -37,6 +37,10 @@ func LogInfo(src string, msg string) {
func logAppendError(src string, level LogLevel, err error) {
+ if err == nil {
+ fmt.Println("wanted to log from " + src + " but err was nil")
+ return
+ }
logAppend(src, level, err.Error())
}
diff --git a/c2ec/main.go b/c2ec/main.go
index 2287c30..e47cf1c 100644
--- a/c2ec/main.go
+++ b/c2ec/main.go
@@ -87,7 +87,7 @@ func main() {
transferCtx, transferCancel := context.WithCancel(context.Background())
defer transferCancel()
transferErrs := make(chan error)
- RunRefunder(transferCtx, transferErrs)
+ RunTransferrer(transferCtx, transferErrs)
LogInfo("main", "refunder is running")
router := http.NewServeMux()
@@ -127,7 +127,7 @@ func main() {
case <-transferCtx.Done():
transferCancel() // first run old cancellation function
transferCtx, transferCancel =
context.WithCancel(context.Background())
- RunRefunder(transferCtx, transferErrs)
+ RunTransferrer(transferCtx, transferErrs)
case attestationError := <-attestorErrs:
LogError("main-from-proc-attestor", attestationError)
case retryError := <-retryErrs:
diff --git a/c2ec/proc-attestor.go b/c2ec/proc-attestor.go
index 98d2b63..c535607 100644
--- a/c2ec/proc-attestor.go
+++ b/c2ec/proc-attestor.go
@@ -30,7 +30,7 @@ func RunAttestor(
func attestationCallback(notification *Notification, errs chan error) {
- LogInfo("attestor", fmt.Sprintf("retrieved information on channel=%s
with payload=%s", notification.Channel, notification.Payload))
+ LogInfo("proc-attestor", fmt.Sprintf("retrieved information on
channel=%s with payload=%s", notification.Channel, notification.Payload))
// The payload is formatted like:
"{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}"
// the validation is strict. This means, that the dispatcher emits an
error
@@ -60,7 +60,7 @@ func attestationCallback(notification *Notification, errs
chan error) {
transaction, err := client.GetTransaction(providerTransactionId)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
prepareRetryOrAbort(withdrawalRowId, errs)
return
}
@@ -80,7 +80,7 @@ func finaliseOrSetRetry(
if transaction == nil {
err := errors.New("transaction was nil. will set retry or
abort")
- LogError("attestor", err)
+ LogError("proc-attestor", err)
errs <- err
prepareRetryOrAbort(withdrawalRowId, errs)
return
@@ -94,7 +94,7 @@ func finaliseOrSetRetry(
err := DB.FinaliseWithdrawal(withdrawalRowId,
CONFIRMED, completionProof)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
prepareRetryOrAbort(withdrawalRowId, errs)
}
} else {
@@ -104,7 +104,7 @@ func finaliseOrSetRetry(
if transaction.AbortWithdrawal() {
err := DB.FinaliseWithdrawal(withdrawalRowId,
ABORTED, completionProof)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
prepareRetryOrAbort(withdrawalRowId,
errs)
return
}
@@ -130,26 +130,24 @@ func prepareRetryOrAbort(
withdrawal, err := DB.GetWithdrawalById(withdrawalRowId)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
errs <- err
return
}
- // TODO retry will not work like this at the moment
- execRetry := ShouldStartRetry(time.Unix(*withdrawal.LastRetryTs, 0),
int(withdrawal.RetryCounter), MAX_BACKOFF_MS)
- if !execRetry {
- LogInfo("attestor", fmt.Sprintf("max retries for withdrawal
with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalRowId))
+ if withdrawal.RetryCounter >= CONFIG.Server.MaxRetries {
+
+ LogInfo("proc-attestor", fmt.Sprintf("max retries for
withdrawal with id=%d was reached. withdrawal is aborted.",
withdrawal.WithdrawalRowId))
err := DB.FinaliseWithdrawal(withdrawalRowId, ABORTED,
make([]byte, 0))
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
}
} else {
lastRetryTs := time.Now().Unix()
err := DB.SetLastRetry(withdrawalRowId, lastRetryTs)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
}
}
-
}
diff --git a/c2ec/proc-retrier.go b/c2ec/proc-retrier.go
index f00313e..24bd6b0 100644
--- a/c2ec/proc-retrier.go
+++ b/c2ec/proc-retrier.go
@@ -18,44 +18,49 @@ func RunRetrier(ctx context.Context, errs chan error) {
make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE),
errs,
)
+
+ go func() {
+ for {
+ time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) *
time.Millisecond)
+
+ }
+ }()
}
func retryCallback(n *Notification, errs chan error) {
withdrawalId, err := strconv.Atoi(n.Payload)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
withdrawal, err := DB.GetWithdrawalById(withdrawalId)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
provider, err := DB.GetProviderByTerminal(withdrawal.TerminalId)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
err = DB.SetRetryCounter(withdrawalId, int(withdrawal.RetryCounter)+1)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
- time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)
-
client := PROVIDER_CLIENTS[provider.Name]
transaction, err :=
client.GetTransaction(*withdrawal.ProviderTransactionId)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
diff --git a/c2ec/proc-transfer.go b/c2ec/proc-transfer.go
index 770bfa4..2b17a8a 100644
--- a/c2ec/proc-transfer.go
+++ b/c2ec/proc-transfer.go
@@ -20,7 +20,7 @@ const TRANSFER_STATUS_FAILED = -1
const MAX_TRANSFER_BACKOFF_MS = 24 * 60 * 60 * 1000 // 1 day
// Sets up and runs an attestor in the background. This must be called at
startup.
-func RunRefunder(
+func RunTransferrer(
ctx context.Context,
errs chan error,
) {
@@ -34,7 +34,6 @@ func RunRefunder(
)
go func() {
-
for {
time.Sleep(REFUND_RETRY_INTERVAL_SECONDS * time.Second)
executePendingTransfers(errs)
@@ -44,7 +43,7 @@ func RunRefunder(
func transferCallback(notification *Notification, errs chan error) {
- LogInfo("refunder", fmt.Sprintf("retrieved information on channel=%s
with payload=%s", notification.Channel, notification.Payload))
+ LogInfo("proc-transfer", fmt.Sprintf("retrieved information on
channel=%s with payload=%s", notification.Channel, notification.Payload))
transferRequestUidBase64 := notification.Payload
if transferRequestUidBase64 == "" {
@@ -60,7 +59,7 @@ func transferCallback(notification *Notification, errs chan
error) {
transfer, err := DB.GetTransferById(transferRequestUid)
if err != nil {
- LogError("refunder", err)
+ LogError("proc-transfer", err)
transferFailed(transfer, errs)
errs <- err
}
@@ -74,7 +73,7 @@ func transferCallback(notification *Notification, errs chan
error) {
provider, err :=
DB.GetTerminalProviderByPaytoTargetType(paytoTargetType)
if err != nil {
- LogError("refunder", err)
+ LogError("proc-transfer", err)
transferFailed(transfer, errs)
errs <- err
}
@@ -86,7 +85,7 @@ func transferCallback(notification *Notification, errs chan
error) {
err = client.Refund(tid)
if err != nil {
- LogError("refunder", err)
+ LogError("proc-transfer", err)
transferFailed(transfer, errs)
return
}
@@ -107,6 +106,7 @@ func executePendingTransfers(errs chan error) {
transfers, err := DB.GetTransfersByState(TRANSFER_STATUS_RETRY)
if err != nil {
LogError("proc-transfer", err)
+ errs <- err
return
}
@@ -115,6 +115,7 @@ func executePendingTransfers(errs chan error) {
shouldRetry := ShouldStartRetry(time.Unix(t.TransferTs, 0),
int(t.Retries), MAX_TRANSFER_BACKOFF_MS)
if !shouldRetry {
LogInfo("proc-transfer", fmt.Sprintf("not retrying
transfer %d, because backoff not yet exceeded", t.RowId))
+ continue
}
paytoTargetType, tid, err :=
ParsePaytoWalleeTransaction(t.CreditAccount)
@@ -140,11 +141,11 @@ func executePendingTransfers(errs chan error) {
err = client.Refund(tid)
if err != nil {
LogError("proc-transfer", err)
+ transferFailed(t, errs)
errs <- err
continue
}
}
- close(errs)
}
func transferFailed(
diff --git a/c2ec/wallee-client.go b/c2ec/wallee-client.go
index a459028..95a9292 100644
--- a/c2ec/wallee-client.go
+++ b/c2ec/wallee-client.go
@@ -147,6 +147,10 @@ func (w *WalleeClient) GetTransaction(transactionId
string) (ProviderTransaction
func (sc *WalleeClient) FormatPayto(w *Withdrawal) string {
+ if w == nil || w.ProviderTransactionId == nil {
+ LogError("wallee-client", errors.New("withdrawal or provider
transaction identifier was nil"))
+ return ""
+ }
return fmt.Sprintf("payto://wallee-transaction/%s",
*w.ProviderTransactionId)
}
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [taler-cashless2ecash] branch master updated: fix: transfer,
gnunet <=