gnunet-svn
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[taler-cashless2ecash] 03/03: fix: long-poll


From: gnunet
Subject: [taler-cashless2ecash] 03/03: fix: long-poll
Date: Tue, 09 Apr 2024 22:43:45 +0200

This is an automated email from the git hooks/post-receive script.

joel-haeberli pushed a commit to branch master
in repository cashless2ecash.

commit 470230fc83bc8ada370d6422e506d51f899d1a8d
Author: Joel-Haeberli <haebu@rubigen.ch>
AuthorDate: Tue Apr 9 22:43:26 2024 +0200

    fix: long-poll
---
 c2ec/bank-integration.go     |   3 +-
 c2ec/c2ec-config.yaml        |   2 +-
 c2ec/db.go                   |   2 +-
 c2ec/db/0000-c2ec_schema.sql |   2 +-
 c2ec/main.go                 |   2 +-
 c2ec/postgres.go             |  89 ++++++++++++++++++++++++++++++-------------
 simulation/c2ec-simulation   | Bin 7562494 -> 7572674 bytes
 simulation/http-util.go      |   2 +-
 simulation/sim-terminal.go   |  49 +++++++++++++++++++++---
 simulation/sim-wallet.go     |  15 +++++---
 10 files changed, 121 insertions(+), 45 deletions(-)

diff --git a/c2ec/bank-integration.go b/c2ec/bank-integration.go
index 8dfb40d..ecae2d5 100644
--- a/c2ec/bank-integration.go
+++ b/c2ec/bank-integration.go
@@ -193,9 +193,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req 
*http.Request) {
                statusChannel := make(chan WithdrawalOperationStatus)
                errChan := make(chan error)
 
-               // listen for status change in goroutine
                go DB.ListenForWithdrawalStatusChange(timeoutCtx, 
WithdrawalIdentifier(wopid), statusChannel, errChan)
-
                for {
                        select {
                        case <-timeoutCtx.Done():
@@ -222,6 +220,7 @@ func handleWithdrawalStatus(res http.ResponseWriter, req 
*http.Request) {
                                return
                        case <-statusChannel:
                                getWithdrawalOrWriteError(wopid, res, 
req.RequestURI)
+                               return
                        }
                }
        }
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
index 1ee8ffc..6ae068e 100644
--- a/c2ec/c2ec-config.yaml
+++ b/c2ec/c2ec-config.yaml
@@ -1,7 +1,7 @@
 c2ec:
   prod: false
   host: "localhost"
-  port: 8081
+  port: 8082
   unix-domain-socket: false
   unix-socket-path: "c2ec.sock"
   fail-on-missing-attestors: false # forced if prod=true
diff --git a/c2ec/db.go b/c2ec/db.go
index a65ad55..0a799d1 100644
--- a/c2ec/db.go
+++ b/c2ec/db.go
@@ -155,5 +155,5 @@ type C2ECDatabase interface {
                wopid WithdrawalIdentifier,
                out chan WithdrawalOperationStatus,
                errs chan error,
-       ) (WithdrawalOperationStatus, error)
+       )
 }
diff --git a/c2ec/db/0000-c2ec_schema.sql b/c2ec/db/0000-c2ec_schema.sql
index 061989c..cdf81e7 100644
--- a/c2ec/db/0000-c2ec_schema.sql
+++ b/c2ec/db/0000-c2ec_schema.sql
@@ -81,7 +81,7 @@ CREATE TABLE IF NOT EXISTS withdrawal (
     wopid BYTEA CHECK (LENGTH(wopid)=32) NOT NULL,
     reserve_pub_key BYTEA CHECK (LENGTH(reserve_pub_key)=32) NOT NULL,
     registration_ts INT8 NOT NULL,
-    amount taler_amount_currency NOT NULL,
+    amount taler_amount_currency,
     fees taler_amount_currency,
     withdrawal_status withdrawal_operation_status NOT NULL DEFAULT 'pending',
     terminal_id INT8 NOT NULL REFERENCES terminal(terminal_id),
diff --git a/c2ec/main.go b/c2ec/main.go
index fc3b869..c6e149c 100644
--- a/c2ec/main.go
+++ b/c2ec/main.go
@@ -127,7 +127,7 @@ func setupAttestors(cfg *C2ECConfig) error {
                        if cfg.Server.IsProd || cfg.Server.StrictAttestors {
                                panic("no provider entry for " + provider.Name)
                        } else {
-                               fmt.Println("non-strict attestor 
initialization. skipping", provider)
+                               LogWarn("non-strict attestor initialization. 
skipping", provider.Name)
                                continue
                        }
                }
diff --git a/c2ec/postgres.go b/c2ec/postgres.go
index 2627905..8fc277b 100644
--- a/c2ec/postgres.go
+++ b/c2ec/postgres.go
@@ -3,10 +3,12 @@ package main
 import (
        "bytes"
        "context"
+       "encoding/base32"
        "encoding/base64"
        "errors"
        "fmt"
        "math"
+       "strconv"
        "time"
 
        "github.com/jackc/pgx/v5"
@@ -18,7 +20,7 @@ import (
 const PS_ASC_SELECTOR = "ASC"
 const PS_DESC_SELECTOR = "DESC"
 
-const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + "  (" +
+const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" +
        WITHDRAWAL_FIELD_NAME_WOPID + "," +
        WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," +
        WITHDRAWAL_FIELD_NAME_STATUS + "," +
@@ -116,12 +118,17 @@ func (db *C2ECPostgres) RegisterWithdrawal(
        terminalId uint64,
 ) error {
 
+       resPubKeyBytes, err := 
base32.HexEncoding.DecodeString(string(resPubKey))
+       if err != nil {
+               return err
+       }
+
        ts := time.Now()
        res, err := db.pool.Query(
                db.ctx,
                PS_INSERT_WITHDRAWAL,
                wopid,
-               resPubKey,
+               resPubKeyBytes,
                SELECTED,
                ts.Unix(),
                terminalId,
@@ -130,7 +137,13 @@ func (db *C2ECPostgres) RegisterWithdrawal(
                LogError("postgres", err)
                return err
        }
-       res.Close()
+       defer res.Close()
+       if res.Err() != nil {
+               LogError("postgres", err)
+               return err
+       }
+       LogInfo("postgres", "query="+PS_INSERT_WITHDRAWAL)
+       LogInfo("postgres", "registered withdrawal successfully. affected 
rows="+strconv.Itoa(int(res.CommandTag().RowsAffected())))
        return nil
 }
 
@@ -159,6 +172,7 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid string) 
(*Withdrawal, error)
                if len(withdrawals) < 1 {
                        return nil, nil
                }
+               LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID)
                return withdrawals[0], nil
        }
 }
@@ -187,6 +201,7 @@ func (db *C2ECPostgres) 
GetWithdrawalByProviderTransactionId(tid string) (*Withd
                if len(withdrawals) < 1 {
                        return nil, nil
                }
+               LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID)
                return withdrawals[0], nil
        }
 }
@@ -210,6 +225,7 @@ func (db *C2ECPostgres) NotifyPayment(
                return err
        }
        res.Close()
+       LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION)
        return nil
 }
 
@@ -234,6 +250,7 @@ func (db *C2ECPostgres) GetAttestableWithdrawals() 
([]*Withdrawal, error) {
                        return nil, err
                }
 
+               LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS)
                return withdrawals, nil
        }
 }
@@ -260,6 +277,7 @@ func (db *C2ECPostgres) FinaliseWithdrawal(
                return err
        }
        res.Close()
+       LogInfo("postgres", "query="+PS_FINALISE_PAYMENT)
        return nil
 }
 
@@ -320,6 +338,7 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, 
delta int) ([]*Withdr
                        return nil, err
                }
 
+               LogInfo("postgres", "query="+PS_CONFIRMED_TRANSACTIONS)
                return withdrawals, nil
        }
 }
@@ -350,6 +369,7 @@ func (db *C2ECPostgres) GetTerminalProviderByName(name 
string) (*Provider, error
                        return nil, nil
                }
 
+               LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_NAME)
                return provider[0], nil
        }
 }
@@ -380,6 +400,7 @@ func (db *C2ECPostgres) 
GetTerminalProviderByPaytoTargetType(paytoTargetType str
                        return nil, nil
                }
 
+               LogInfo("postgres", 
"query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE)
                return provider[0], nil
        }
 }
@@ -410,6 +431,7 @@ func (db *C2ECPostgres) GetTerminalById(id int) (*Terminal, 
error) {
                        return nil, nil
                }
 
+               LogInfo("postgres", "query="+PS_GET_TERMINAL_BY_ID)
                return terminals[0], nil
        }
 }
@@ -439,6 +461,7 @@ func (db *C2ECPostgres) GetTransferById(requestUid 
HashCode) (*Transfer, error)
                if len(transfers) < 1 {
                        return nil, nil
                }
+               LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
                return transfers[0], nil
        }
 
@@ -457,6 +480,7 @@ func (db *C2ECPostgres) AddTransfer(requestId HashCode, 
requestHash string) erro
                return err
        }
        res.Close()
+       LogInfo("postgres", "query="+PS_ADD_TRANSFER)
        return nil
 }
 
@@ -465,38 +489,45 @@ func (db *C2ECPostgres) ListenForWithdrawalStatusChange(
        wopid WithdrawalIdentifier,
        out chan WithdrawalOperationStatus,
        errs chan error,
-) (WithdrawalOperationStatus, error) {
+) {
 
        pgNotification := make(chan *pgconn.Notification)
-       channel := "w_" + 
base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes())
-       listener := newChannelListener(db.pool.Config().ConnConfig, channel, 
pgNotification)
 
        go func() {
+
+               connstr := PostgresConnectionString(&CONFIG.Database)
+               cfg, err := pgx.ParseConfig(connstr)
+               if err != nil {
+                       errs <- err
+               }
+
+               channel := "w_" + 
base64.StdEncoding.EncodeToString(bytes.NewBufferString(string(wopid)).Bytes())
+               listener := newChannelListener(cfg, channel, pgNotification)
                LogInfo("postgres", fmt.Sprintf("listening for %s", wopid))
+
                if err := listener.Listen(ctx); err != nil {
                        LogError("postgres", err)
                        errs <- err
                }
-               // close the channel we send results, because listener has 
finished.
-               close(pgNotification)
        }()
 
-       select {
-       case e := <-errs:
-               LogError("postgres", e)
-               return "", e
-       case <-ctx.Done():
-               err := ctx.Err()
-               msg := "context sent done signal while listening for status 
change"
-               if err != nil {
-                       LogError("postgres", err)
-               } else {
+       for {
+               select {
+               case e := <-errs:
+                       LogError("postgres", e)
+                       errs <- e
+               case <-ctx.Done():
+                       err := ctx.Err()
+                       msg := "context sent done signal while listening for 
status change"
+                       if err != nil {
+                               LogError("postgres", err)
+                       }
                        LogWarn("postgres", msg)
+                       errs <- errors.New(msg)
+               case n := <-pgNotification:
+                       LogInfo("postgres", fmt.Sprintf("received notification 
for channel %s: %s", n.Channel, n.Payload))
+                       out <- WithdrawalOperationStatus(n.Payload)
                }
-               return "", errors.New(msg)
-       case n := <-pgNotification:
-               LogInfo("postgres", fmt.Sprintf("received notification for 
channel %s: %s", n.Channel, n.Payload))
-               return WithdrawalOperationStatus(n.Payload), nil
        }
 }
 
@@ -509,16 +540,22 @@ func newChannelListener(
 
        listener := &pgxlisten.Listener{
                Connect: func(ctx context.Context) (*pgx.Conn, error) {
+                       LogInfo("postgres", "connecting to the database")
                        return pgx.ConnectConfig(ctx, cfg)
                },
        }
 
        listener.Handle(cn, pgxlisten.HandlerFunc(func(ctx context.Context, 
notification *pgconn.Notification, conn *pgx.Conn) error {
-               select {
-               case out <- notification:
-               case <-ctx.Done():
+               LogInfo("postgres", fmt.Sprintf("handling postgres 
notification. channel=%s", notification.Channel))
+               for {
+                       select {
+                       case out <- notification:
+                               LogInfo("postgres", fmt.Sprintf("received 
notification. channel=%s, notification=%s", notification.Channel, 
notification.Payload))
+                               return nil
+                       case <-ctx.Done():
+                               return ctx.Err()
+                       }
                }
-               return nil
        }))
 
        return listener
diff --git a/simulation/c2ec-simulation b/simulation/c2ec-simulation
index cf0d4fb..14a6430 100755
Binary files a/simulation/c2ec-simulation and b/simulation/c2ec-simulation 
differ
diff --git a/simulation/http-util.go b/simulation/http-util.go
index 2f9e34a..b2fe628 100644
--- a/simulation/http-util.go
+++ b/simulation/http-util.go
@@ -178,7 +178,7 @@ func HttpGet[T any](
        }
        req.Header.Add("Accept", codec.HttpApplicationContentHeader())
 
-       fmt.Printf("requesting %s\n", url)
+       fmt.Printf("requesting GET %s\n", url)
        res, err := http.DefaultClient.Do(req)
        if err != nil {
                return nil, -1, err
diff --git a/simulation/sim-terminal.go b/simulation/sim-terminal.go
index 1d48dd0..ecc1813 100644
--- a/simulation/sim-terminal.go
+++ b/simulation/sim-terminal.go
@@ -1,10 +1,12 @@
 package main
 
 import (
+       "bytes"
        "crypto/rand"
        "encoding/base64"
        "errors"
        "fmt"
+       "net/http"
        "strconv"
        "time"
 )
@@ -17,7 +19,7 @@ const TERMINAL_USER_ID = TERMINAL_PROVIDER + "-1"
 // retrieved from the cli tool when added the terminal
 const TERMINAL_ACCESS_TOKEN = "secret"
 
-const SIM_TERMINAL_LONG_POLL_MS_STR = "20000" // 20 seconds
+const SIM_TERMINAL_LONG_POLL_MS_STR = "5000" // 20 seconds
 
 const QR_CODE_CONTENT_BASE = "taler://withdraw/localhost:8081/c2ec/"
 
@@ -43,8 +45,11 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out 
chan *SimulatedPhysical
        // -> start long polling always before showing the QR code
        awaitSelection := make(chan *C2ECWithdrawalStatus)
        longPollFailed := make(chan error)
+
+       out <- &SimulatedPhysicalInteraction{Msg: uri}
+
+       fmt.Println("now sending long poll request to c2ec from terminal and 
await parameter selection")
        go func() {
-               // long poll for parameter selection notification by c2ec
 
                url := FormatUrl(
                        C2EC_BANK_WITHDRAWAL_STATUS_URL,
@@ -68,8 +73,6 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out chan 
*SimulatedPhysical
                awaitSelection <- response
        }()
 
-       out <- &SimulatedPhysicalInteraction{Msg: uri}
-
        for {
                select {
                case w := <-awaitSelection:
@@ -78,12 +81,46 @@ func Terminal(in chan *SimulatedPhysicalInteraction, out 
chan *SimulatedPhysical
                        if !DISABLE_DELAYS {
                                
time.Sleep(time.Duration(TERMINAL_ACCEPT_CARD_DELAY_MS))
                        }
-                       fmt.Println("the card was tead by the terminal. 
simulating the payment using the providers backend. delay:", 
PROVIDER_BACKEND_PAYMENT_DELAY_MS)
+                       fmt.Println("card accepted. terminal waits for response 
of provider backend.")
                        if !DISABLE_DELAYS {
                                
time.Sleep(time.Duration(PROVIDER_BACKEND_PAYMENT_DELAY_MS))
                        }
-                       // sending payment notification now...
 
+                       fmt.Println("payment was processed at the provider 
backend. sending payment notification.")
+                       paymentNotification := &C2ECPaymentNotification{
+                               ProviderTransactionId: 
"simulation-transaction-id-0",
+                               Amount: Amount{
+                                       Currency: "CHF",
+                                       Fraction: 10,
+                                       Value:    10,
+                               },
+                               Fees: Amount{
+                                       Currency: "CHF",
+                                       Fraction: 10,
+                                       Value:    0,
+                               },
+                       }
+                       cdc := NewJsonCodec[C2ECPaymentNotification]()
+                       pnbytes, err := cdc.EncodeToBytes(paymentNotification)
+                       if err != nil {
+                               fmt.Println("failed serializing payment 
notification")
+                               kill <- err
+                       }
+                       paymentUrl := FormatUrl(
+                               C2EC_BANK_WITHDRAWAL_PAYMENT_URL,
+                               map[string]string{"wopid": wopid},
+                               map[string]string{},
+                       )
+                       _, err = http.Post(
+                               paymentUrl,
+                               cdc.HttpApplicationContentHeader(),
+                               bytes.NewReader(pnbytes),
+                       )
+                       if err != nil {
+                               fmt.Println("error on POST request:", 
err.Error())
+                               kill <- err
+                       }
+                       fmt.Println("Terminal flow ended")
                case f := <-longPollFailed:
                        fmt.Println("long-polling for selection failed... 
error:", err.Error())
                        kill <- f
diff --git a/simulation/sim-wallet.go b/simulation/sim-wallet.go
index 3a4413a..5162a3c 100644
--- a/simulation/sim-wallet.go
+++ b/simulation/sim-wallet.go
@@ -13,7 +13,7 @@ import (
        "time"
 )
 
-const SIM_WALLET_LONG_POLL_MS_STR = "20000" // 20 seconds
+const SIM_WALLET_LONG_POLL_MS_STR = "5000" // 20 seconds
 
 func Wallet(in chan *SimulatedPhysicalInteraction, out chan 
*SimulatedPhysicalInteraction, kill chan error) {
 
@@ -60,6 +60,11 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan 
*SimulatedPhysicalIn
                bytes.NewReader(regByte.Bytes()),
        )
 
+       if err != nil {
+               fmt.Println("error on POST request:", err.Error())
+               kill <- err
+       }
+
        if res.StatusCode != 204 {
                fmt.Println("response status from registration:", 
res.StatusCode)
                kill <- errors.New("failed registering the withdrawal 
parameters")
@@ -70,9 +75,9 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan 
*SimulatedPhysicalIn
        // Start long poll for confirmed or abort
        awaitConfirmationOrAbortion := make(chan *C2ECWithdrawalStatus)
        longPollFailed := make(chan error)
-       go func() {
-               // long poll for parameter selection notification by c2ec
+       // long poll for parameter selection notification by c2ec
 
+       go func() {
                url := FormatUrl(
                        C2EC_BANK_WITHDRAWAL_STATUS_URL,
                        map[string]string{"wopid": wopid},
@@ -91,7 +96,6 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out chan 
*SimulatedPhysicalIn
                        longPollFailed <- errors.New("status of withdrawal 
status response was " + strconv.Itoa(status))
                        return
                }
-
                awaitConfirmationOrAbortion <- response
        }()
 
@@ -108,11 +112,10 @@ func Wallet(in chan *SimulatedPhysicalInteraction, out 
chan *SimulatedPhysicalIn
                                os.Exit(0)
                        }
                case f := <-longPollFailed:
-                       fmt.Println("long-polling for selection failed... 
error:", err.Error())
+                       fmt.Println("long-polling for selection failed... 
error:", f.Error())
                        kill <- f
                }
        }
-
 }
 
 // returns wopid.

-- 
To stop receiving notification emails like this one, please contact
gnunet@gnunet.org.



reply via email to

[Prev in Thread] Current Thread [Next in Thread]