[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[taler-cashless2ecash] 03/03: fix: long-poll
From: |
gnunet |
Subject: |
[taler-cashless2ecash] 03/03: fix: long-poll |
Date: |
Tue, 09 Apr 2024 22:43:45 +0200 |
This is an automated email from the git hooks/post-receive script.
joel-haeberli pushed a commit to branch master
in repository cashless2ecash.
commit 470230fc83bc8ada370d6422e506d51f899d1a8d
Author: Joel-Haeberli <haebu@rubigen.ch>
AuthorDate: Tue Apr 9 22:43:26 2024 +0200
fix: long-poll
---
c2ec/bank-integration.go | 3 +-
c2ec/c2ec-config.yaml | 2 +-
c2ec/db.go | 2 +-
c2ec/db/0000-c2ec_schema.sql | 2 +-
c2ec/main.go | 2 +-
c2ec/postgres.go | 89 ++++++++++++++++++++++++++++++-------------
simulation/c2ec-simulation | Bin 7562494 -> 7572674 bytes
simulation/http-util.go | 2 +-
simulation/sim-terminal.go | 49 +++++++++++++++++++++---
simulation/sim-wallet.go | 15 +++++---
10 files changed, 121 insertions(+), 45 deletions(-)
diff --git a/c2ec/bank-integration.go b/c2ec/bank-integration.go
index 8dfb40d..ecae2d5 100644
--- a/c2ec/bank-integration.go
+++ b/c2ec/bank-integration.go
@@ -193,9 +193,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req
*http.Request) {
statusChannel := make(chan WithdrawalOperationStatus)
errChan := make(chan error)
- // listen for status change in goroutine
go DB.ListenForWithdrawalStatusChange(timeoutCtx,
WithdrawalIdentifier(wopid), statusChannel, errChan)
-
for {
select {
case <-timeoutCtx.Done():
@@ -222,6 +220,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req
*http.Request) {
return
case <-statusChannel:
getWithdrawalOrWriteError(wopid, res,
req.RequestURI)
+ return
}
}
}
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
index 1ee8ffc..6ae068e 100644
--- a/c2ec/c2ec-config.yaml
+++ b/c2ec/c2ec-config.yaml
@@ -1,7 +1,7 @@
c2ec:
prod: false
host: "localhost"
- port: 8081
+ port: 8082
unix-domain-socket: false
unix-socket-path: "c2ec.sock"
fail-on-missing-attestors: false # forced if prod=true
diff --git a/c2ec/db.go b/c2ec/db.go
index a65ad55..0a799d1 100644
--- a/c2ec/db.go
+++ b/c2ec/db.go
@@ -155,5 +155,5 @@ type C2ECDatabase interface {
wopid WithdrawalIdentifier,
out chan WithdrawalOperationStatus,
errs chan error,
- ) (WithdrawalOperationStatus, error)
+ )
}
diff --git a/c2ec/db/0000-c2ec_schema.sql b/c2ec/db/0000-c2ec_schema.sql
index 061989c..cdf81e7 100644
--- a/c2ec/db/0000-c2ec_schema.sql
+++ b/c2ec/db/0000-c2ec_schema.sql
@@ -81,7 +81,7 @@ CREATE TABLE IF NOT EXISTS withdrawal (
wopid BYTEA CHECK (LENGTH(wopid)=32) NOT NULL,
reserve_pub_key BYTEA CHECK (LENGTH(reserve_pub_key)=32) NOT NULL,
registration_ts INT8 NOT NULL,
- amount taler_amount_currency NOT NULL,
+ amount taler_amount_currency,
fees taler_amount_currency,
withdrawal_status withdrawal_operation_status NOT NULL DEFAULT 'pending',
terminal_id INT8 NOT NULL REFERENCES terminal(terminal_id),
diff --git a/c2ec/main.go b/c2ec/main.go
index fc3b869..c6e149c 100644
--- a/c2ec/main.go
+++ b/c2ec/main.go
@@ -127,7 +127,7 @@ func setupAttestors(cfg *C2ECConfig) error {
if cfg.Server.IsProd || cfg.Server.StrictAttestors {
panic("no provider entry for " + provider.Name)
} else {
- fmt.Println("non-strict attestor
initialization. skipping", provider)
+ LogWarn("non-strict attestor initialization.
skipping", provider.Name)
continue
}
}
diff --git a/c2ec/postgres.go b/c2ec/postgres.go
index 2627905..8fc277b 100644
--- a/c2ec/postgres.go
+++ b/c2ec/postgres.go
@@ -3,10 +3,12 @@ package main
import (
"bytes"
"context"
+ "encoding/base32"
"encoding/base64"
"errors"
"fmt"
"math"
+ "strconv"
"time"
"github.com/jackc/pgx/v5"
@@ -18,7 +20,7 @@ import (
const PS_ASC_SELECTOR = "ASC"
const PS_DESC_SELECTOR = "DESC"
-const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" +
+const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" +
WITHDRAWAL_FIELD_NAME_WOPID + "," +
WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," +
WITHDRAWAL_FIELD_NAME_STATUS + "," +
@@ -116,12 +118,17 @@ func (db *C2ECPostgres) RegisterWithdrawal(
terminalId uint64,
) error {
+ resPubKeyBytes, err :=
base32.HexEncoding.DecodeString(string(resPubKey))
+ if err != nil {
+ return err
+ }
+
ts := time.Now()
res, err := db.pool.Query(
db.ctx,
PS_INSERT_WITHDRAWAL,
wopid,
- resPubKey,
+ resPubKeyBytes,
SELECTED,
ts.Unix(),
terminalId,
@@ -130,7 +137,13 @@ func (db *C2ECPostgres) RegisterWithdrawal(
LogError("postgres", err)
return err
}
- res.Close()
+ defer res.Close()
+ if res.Err() != nil {
+ LogError("postgres", err)
+ return err
+ }
+ LogInfo("postgres", "query="+PS_INSERT_WITHDRAWAL)
+ LogInfo("postgres", "registered withdrawal successfully. affected
rows="+strconv.Itoa(int(res.CommandTag().RowsAffected())))
return nil
}
@@ -159,6 +172,7 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid string)
(*Withdrawal, error)
if len(withdrawals) < 1 {
return nil, nil
}
+ LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID)
return withdrawals[0], nil
}
}
@@ -187,6 +201,7 @@ func (db *C2ECPostgres)
GetWithdrawalByProviderTransactionId(tid string) (*Withd
if len(withdrawals) < 1 {
return nil, nil
}
+ LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID)
return withdrawals[0], nil
}
}
@@ -210,6 +225,7 @@ func (db *C2ECPostgres) NotifyPayment(
return err
}
res.Close()
+ LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION)
return nil
}
@@ -234,6 +250,7 @@ func (db *C2ECPostgres) GetAttestableWithdrawals()
([]*Withdrawal, error) {
return nil, err
}
+ LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS)
return withdrawals, nil
}
}
@@ -260,6 +277,7 @@ func (db *C2ECPostgres) FinaliseWithdrawal(
return err
}
res.Close()
+ LogInfo("postgres", "query="+PS_FINALISE_PAYMENT)
return nil
}
@@ -320,6 +338,7 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int,
delta int) ([]*Withdr
return nil, err
}
+ LogInfo("postgres", "query="+PS_CONFIRMED_TRANSACTIONS)
return withdrawals, nil
}
}
@@ -350,6 +369,7 @@ func (db *C2ECPostgres) GetTerminalProviderByName(name
string) (*Provider, error
return nil, nil
}
+ LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_NAME)
return provider[0], nil
}
}
@@ -380,6 +400,7 @@ func (db *C2ECPostgres)
GetTerminalProviderByPaytoTargetType(paytoTargetType str
return nil, nil
}
+ LogInfo("postgres",
"query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE)
return provider[0], nil
}
}
@@ -410,6 +431,7 @@ func (db *C2ECPostgres) GetTerminalById(id int) (*Terminal,
error) {
return nil, nil
}
+ LogInfo("postgres", "query="+PS_GET_TERMINAL_BY_ID)
return terminals[0], nil
}
}
@@ -439,6 +461,7 @@ func (db *C2ECPostgres) GetTransferById(requestUid
HashCode) (*Transfer, error)
if len(transfers) < 1 {
return nil, nil
}
+ LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
return transfers[0], nil
}
@@ -457,6 +480,7 @@ func (db *C2ECPostgres) AddTransfer(requestId HashCode,
requestHash string) erro
return err
}
res.Close()
+ LogInfo("postgres", "query="+PS_ADD_TRANSFER)
return nil
}
@@ -465,38 +489,45 @@ func (db *C2ECPostgres) ListenForWithdrawalStatusChange(
wopid WithdrawalIdentifier,
out chan WithdrawalOperationStatus,
errs chan error,
-) (WithdrawalOperationStatus, error) {
+) {
pgNotification := make(chan *pgconn.Notification)
- channel := "w_" +
base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes())
- listener := newChannelListener(db.pool.Config().ConnConfig, channel,
pgNotification)
go func() {
+
+ connstr := PostgresConnectionString(&CONFIG.Database)
+ cfg, err := pgx.ParseConfig(connstr)
+ if err != nil {
+ errs <- err
+ }
+
+ channel := "w_" +
base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes())
+ listener := newChannelListener(cfg, channel, pgNotification)
LogInfo("postgres", fmt.Sprintf("listening for %s", wopid))
+
if err := listener.Listen(ctx); err != nil {
LogError("postgres", err)
errs <- err
}
- // close the channel we send results, because listener has
finished.
- close(pgNotification)
}()
- select {
- case e := <-errs:
- LogError("postgres", e)
- return "", e
- case <-ctx.Done():
- err := ctx.Err()
- msg := "context sent done signal while listening for status
change"
- if err != nil {
- LogError("postgres", err)
- } else {
+ for {
+ select {
+ case e := <-errs:
+ LogError("postgres", e)
+ errs <- e
+ case <-ctx.Done():
+ err := ctx.Err()
+ msg := "context sent done signal while listening for
status change"
+ if err != nil {
+ LogError("postgres", err)
+ }
LogWarn("postgres", msg)
+ errs <- errors.New(msg)
+ case n := <-pgNotification:
+ LogInfo("postgres", fmt.Sprintf("received notification
for channel %s: %s", n.Channel, n.Payload))
+ out <- WithdrawalOperationStatus(n.Payload)
}
- return "", errors.New(msg)
- case n := <-pgNotification:
- LogInfo("postgres", fmt.Sprintf("received notification for
channel %s: %s", n.Channel, n.Payload))
- return WithdrawalOperationStatus(n.Payload), nil
}
}
@@ -509,16 +540,22 @@ func newChannelListener(
listener := &pgxlisten.Listener{
Connect: func(ctx context.Context) (*pgx.Conn, error) {
+ LogInfo("postgres", "connecting to the database")
return pgx.ConnectConfig(ctx, cfg)
},
}
listener.Handle(cn, pgxlisten.HandlerFunc(func(ctx context.Context,
notification *pgconn.Notification, conn *pgx.Conn) error {
- select {
- case out <- notification:
- case <-ctx.Done():
+ LogInfo("postgres", fmt.Sprintf("handling postgres
notification. channel=%s", notification.Channel))
+ for {
+ select {
+ case out <- notification:
+ LogInfo("postgres", fmt.Sprintf("received
notification. channel=%s, notification=%s", notification.Channel,
notification.Payload))
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
- return nil
}))
return listener
diff --git a/simulation/c2ec-simulation b/simulation/c2ec-simulation
index cf0d4fb..14a6430 100755
Binary files a/simulation/c2ec-simulation and b/simulation/c2ec-simulation
differ
diff --git a/simulation/http-util.go b/simulation/http-util.go
index 2f9e34a..b2fe628 100644
--- a/simulation/http-util.go
+++ b/simulation/http-util.go
@@ -178,7 +178,7 @@ func HttpGet[T any](
}
req.Header.Add("Accept", codec.HttpApplicationContentHeader())
- fmt.Printf("requesting %s\n", url)
+ fmt.Printf("requesting GET %s\n", url)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, -1, err
diff --git a/simulation/sim-terminal.go b/simulation/sim-terminal.go
index 1d48dd0..ecc1813 100644
--- a/simulation/sim-terminal.go
+++ b/simulation/sim-terminal.go
@@ -1,10 +1,12 @@
package main
import (
+ "bytes"
"crypto/rand"
"encoding/base64"
"errors"
"fmt"
+ "net/http"
"strconv"
"time"
)
@@ -17,7 +19,7 @@ const TERMINAL_USER_ID = TERMINAL_PROVIDER + "-1"
// retrieved from the cli tool when added the terminal
const TERMINAL_ACCESS_TOKEN = "secret"
-const SIM_TERMINAL_LONG_POLL_MS_STR = "20000" // 20 seconds
+const SIM_TERMINAL_LONG_POLL_MS_STR = "5000" // 20 seconds
const QR_CODE_CONTENT_BASE = "taler://withdraw/localhost:8081/c2ec/"
@@ -43,8 +45,11 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out
chan *SimulatedPhysical
// -> start long polling always before showing the QR code
awaitSelection := make(chan *C2ECWithdrawalStatus)
longPollFailed := make(chan error)
+
+ out <- &SimulatedPhysicalInteraction{Msg: uri}
+
+ fmt.Println("now sending long poll request to c2ec from terminal and
await parameter selection")
go func() {
- // long poll for parameter selection notification by c2ec
url := FormatUrl(
C2EC_BANK_WITHDRAWAL_STATUS_URL,
@@ -68,8 +73,6 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan
*SimulatedPhysical
awaitSelection <- response
}()
- out <- &SimulatedPhysicalInteraction{Msg: uri}
-
for {
select {
case w := <-awaitSelection:
@@ -78,12 +81,46 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out
chan *SimulatedPhysical
if !DISABLE_DELAYS {
time.Sleep(time.Duration(TERMINAL_ACCEPT_CARD_DELAY_MS))
}
- fmt.Println("the card was tead by the terminal.
simulating the payment using the providers backend. delay:",
PROVIDER_BACKEND_PAYMENT_DELAY_MS)
+ fmt.Println("card accepted. terminal waits for response
of provider backend.")
if !DISABLE_DELAYS {
time.Sleep(time.Duration(PROVIDER_BACKEND_PAYMENT_DELAY_MS))
}
- // sending payment notification now...
+ fmt.Println("payment was processed at the provider
backend. sending payment notification.")
+ paymentNotification := &C2ECPaymentNotification{
+ ProviderTransactionId:
"simulation-transaction-id-0",
+ Amount: Amount{
+ Currency: "CHF",
+ Fraction: 10,
+ Value: 10,
+ },
+ Fees: Amount{
+ Currency: "CHF",
+ Fraction: 10,
+ Value: 0,
+ },
+ }
+ cdc := NewJsonCodec[C2ECPaymentNotification]()
+ pnbytes, err := cdc.EncodeToBytes(paymentNotification)
+ if err != nil {
+ fmt.Println("failed serializing payment
notification")
+ kill <- err
+ }
+ paymentUrl := FormatUrl(
+ C2EC_BANK_WITHDRAWAL_PAYMENT_URL,
+ map[string]string{"wopid": wopid},
+ map[string]string{},
+ )
+ _, err = http.Post(
+ paymentUrl,
+ cdc.HttpApplicationContentHeader(),
+ bytes.NewReader(pnbytes),
+ )
+ if err != nil {
+ fmt.Println("error on POST request:",
err.Error())
+ kill <- err
+ }
+ fmt.Println("Terminal flow ended")
case f := <-longPollFailed:
fmt.Println("long-polling for selection failed...
error:", err.Error())
kill <- f
diff --git a/simulation/sim-wallet.go b/simulation/sim-wallet.go
index 3a4413a..5162a3c 100644
--- a/simulation/sim-wallet.go
+++ b/simulation/sim-wallet.go
@@ -13,7 +13,7 @@ import (
"time"
)
-const SIM_WALLET_LONG_POLL_MS_STR = "20000" // 20 seconds
+const SIM_WALLET_LONG_POLL_MS_STR = "5000" // 20 seconds
func Wallet(in chan *SimulatedPhysicalInteraction, out chan
*SimulatedPhysicalInteraction, kill chan error) {
@@ -60,6 +60,11 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan
*SimulatedPhysicalIn
bytes.NewReader(regByte.Bytes()),
)
+ if err != nil {
+ fmt.Println("error on POST request:", err.Error())
+ kill <- err
+ }
+
if res.StatusCode != 204 {
fmt.Println("response status from registration:",
res.StatusCode)
kill <- errors.New("failed registering the withdrawal
parameters")
@@ -70,9 +75,9 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan
*SimulatedPhysicalIn
// Start long poll for confirmed or abort
awaitConfirmationOrAbortion := make(chan *C2ECWithdrawalStatus)
longPollFailed := make(chan error)
- go func() {
- // long poll for parameter selection notification by c2ec
+ // long poll for parameter selection notification by c2ec
+ go func() {
url := FormatUrl(
C2EC_BANK_WITHDRAWAL_STATUS_URL,
map[string]string{"wopid": wopid},
@@ -91,7 +96,6 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan
*SimulatedPhysicalIn
longPollFailed <- errors.New("status of withdrawal
status response was " + strconv.Itoa(status))
return
}
-
awaitConfirmationOrAbortion <- response
}()
@@ -108,11 +112,10 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out
chan *SimulatedPhysicalIn
os.Exit(0)
}
case f := <-longPollFailed:
- fmt.Println("long-polling for selection failed...
error:", err.Error())
+ fmt.Println("long-polling for selection failed...
error:", f.Error())
kill <- f
}
}
-
}
// returns wopid.
--
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.