Skip to content

Commit

Permalink
Support Transaction Tags and Request Tags (#128)
Browse files Browse the repository at this point in the history
* Add transaction tags support for Read-Write Transaction

* Add request tags support for Read-only transaction and Read-write transaction.

* Add Transaction Tags and Request Tags section in README

* Fix: Handle transaction tags and request tags as one tag
  • Loading branch information
takabow authored Apr 20, 2022
1 parent e536f72 commit 8ac33c9
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 19 deletions.
44 changes: 42 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ and `{}` for a mutually exclusive keyword.
| Show DML Execution Plan | `EXPLAIN {INSERT\|UPDATE\|DELETE} ...;` | |
| Show Query Execution Plan with Stats | `EXPLAIN ANALYZE SELECT ...;` | |
| Show DML Execution Plan with Stats | `EXPLAIN ANALYZE {INSERT\|UPDATE\|DELETE} ...;` | |
| Start Read-Write Transaction | `BEGIN [RW] [PRIORITY {HIGH\|MEDIUM\|LOW}];` | See [Request Priority](#request-priority) for details on the priority. |
| Start Read-Write Transaction | `BEGIN [RW] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG <tag>];` | See [Request Priority](#request-priority) for details on the priority. The tag you set is used as both transaction tag and request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).|
| Commit Read-Write Transaction | `COMMIT;` | |
| Rollback Read-Write Transaction | `ROLLBACK;` | |
| Start Read-Only Transaction | `BEGIN RO [{<seconds>\|<RFC3339-formatted time>}] [PRIORITY {HIGH\|MEDIUM\|LOW}];` | `<seconds>` and `<RFC3339-formatted time>` is used for stale read. See [Request Priority](#request-priority) for details on the priority. |
| Start Read-Only Transaction | `BEGIN RO [{<seconds>\|<RFC3339-formatted time>}] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG <tag>];` | `<seconds>` and `<RFC3339-formatted time>` is used for stale read. See [Request Priority](#request-priority) for details on the priority. The tag you set is used as request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).|
| End Read-Only Transaction | `CLOSE;` | |
| Exit CLI | `EXIT;` | |

Expand Down Expand Up @@ -259,6 +259,46 @@ BEGIN RO 2021-04-01T23:47:44+00:00 PRIORITY MEDIUM;

Note that transaction-level priority takes precedence over command-level priority.

## Transaction Tags and Request Tags

In a read-write transaction, you can add a tag following `BEGIN RW TAG <tag>`.
spanner-cli adds the tag set in `BEGIN RW TAG` as a transaction tag.
The tag will also be used as request tags within the transaction.

```
# Read-write transaction
# transaction_tag = tx1
+--------------------+
| BEGIN RW TAG tx1; |
| |
| SELECT val |
| FROM tab1 +-----request_tag = tx1
| WHERE id = 1; |
| |
| UPDATE tab1 |
| SET val = 10 +-----request_tag = tx1
| WHERE id = 1; |
| |
| COMMIT; |
+--------------------+
```

In a read-only transaction, you can add a tag following `BEGIN RO TAG <tag>`.
Since read-only transaction doesn't support transaction tag, spanner-cli adds the tag set in `BEGIN RO TAG` as request tags.
```
# Read-only transaction
# transaction_tag = N/A
+--------------------+
| BEGIN RO TAG tx2; |
| |
| SELECT SUM(val) |
| FROM tab1 +-----request_tag = tx2
| WHERE id = 1; |
| |
| CLOSE; |
+--------------------+
```

## Using with the Cloud Spanner Emulator

This tool supports the [Cloud Spanner Emulator](https://cloud.google.com/spanner/docs/emulator) via the [`SPANNER_EMULATOR_HOST` environment variable](https://cloud.google.com/spanner/docs/emulator#client-libraries).
Expand Down
13 changes: 10 additions & 3 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type Session struct {
}

type transactionContext struct {
tag string
priority pb.RequestOptions_Priority
sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction.
rwTxn *spanner.ReadWriteStmtBasedTransaction
Expand Down Expand Up @@ -109,7 +110,7 @@ func (s *Session) InReadOnlyTransaction() bool {
}

// BeginReadWriteTransaction starts read-write transaction.
func (s *Session) BeginReadWriteTransaction(priority pb.RequestOptions_Priority) error {
func (s *Session) BeginReadWriteTransaction(priority pb.RequestOptions_Priority, tag string) error {
if s.InReadWriteTransaction() {
return errors.New("read-write transaction is already running")
}
Expand All @@ -122,12 +123,14 @@ func (s *Session) BeginReadWriteTransaction(priority pb.RequestOptions_Priority)
opts := spanner.TransactionOptions{
CommitOptions: spanner.CommitOptions{ReturnCommitStats: true},
CommitPriority: priority,
TransactionTag: tag,
}
txn, err := spanner.NewReadWriteStmtBasedTransactionWithOptions(s.ctx, s.client, opts)
if err != nil {
return err
}
s.tc = &transactionContext{
tag: tag,
priority: priority,
rwTxn: txn,
}
Expand Down Expand Up @@ -163,7 +166,7 @@ func (s *Session) RollbackReadWriteTransaction() error {
}

// BeginReadOnlyTransaction starts read-only transaction and returns the snapshot timestamp for the transaction if successful.
func (s *Session) BeginReadOnlyTransaction(typ timestampBoundType, staleness time.Duration, timestamp time.Time, priority pb.RequestOptions_Priority) (time.Time, error) {
func (s *Session) BeginReadOnlyTransaction(typ timestampBoundType, staleness time.Duration, timestamp time.Time, priority pb.RequestOptions_Priority, tag string) (time.Time, error) {
if s.InReadOnlyTransaction() {
return time.Time{}, errors.New("read-only transaction is already running")
}
Expand Down Expand Up @@ -193,6 +196,7 @@ func (s *Session) BeginReadOnlyTransaction(typ timestampBoundType, staleness tim
}

s.tc = &transactionContext{
tag: tag,
priority: priority,
roTxn: txn,
}
Expand Down Expand Up @@ -252,11 +256,13 @@ func (s *Session) RunAnalyzeQuery(stmt spanner.Statement) (*pb.QueryPlan, error)

func (s *Session) runQueryWithOptions(stmt spanner.Statement, opts spanner.QueryOptions) (*spanner.RowIterator, *spanner.ReadOnlyTransaction) {
if s.InReadWriteTransaction() {
opts.RequestTag = s.tc.tag
iter := s.tc.rwTxn.QueryWithOptions(s.ctx, stmt, opts)
s.tc.sendHeartbeat = true
return iter, nil
}
if s.InReadOnlyTransaction() {
opts.RequestTag = s.tc.tag
return s.tc.roTxn.QueryWithOptions(s.ctx, stmt, opts), s.tc.roTxn
}

Expand All @@ -272,7 +278,8 @@ func (s *Session) RunUpdate(stmt spanner.Statement) (int64, error) {
}

opts := spanner.QueryOptions{
Priority: s.currentPriority(),
Priority: s.currentPriority(),
RequestTag: s.tc.tag,
}
rowCount, err := s.tc.rwTxn.UpdateWithOptions(s.ctx, stmt, opts)
s.tc.sendHeartbeat = true
Expand Down
4 changes: 2 additions & 2 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestRequestPriority(t *testing.T) {
}

// Read-Write Transaction.
if err := session.BeginReadWriteTransaction(test.transactionPriority); err != nil {
if err := session.BeginReadWriteTransaction(test.transactionPriority, ""); err != nil {
t.Fatalf("failed to begin read write transaction: %v", err)
}
iter, _ := session.RunQuery(spanner.NewStatement("SELECT * FROM t1"))
Expand All @@ -87,7 +87,7 @@ func TestRequestPriority(t *testing.T) {
}

// Read-Only Transaction.
if _, err := session.BeginReadOnlyTransaction(strong, 0, time.Now(), test.transactionPriority); err != nil {
if _, err := session.BeginReadOnlyTransaction(strong, 0, time.Now(), test.transactionPriority, ""); err != nil {
t.Fatalf("failed to begin read only transaction: %v", err)
}
iter, _ = session.RunQueryWithStats(spanner.NewStatement("SELECT * FROM t1"))
Expand Down
33 changes: 21 additions & 12 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ var (
pdmlRe = regexp.MustCompile(`(?is)^PARTITIONED\s+((?:INSERT|UPDATE|DELETE)\s+.+$)`)

// Transaction
beginRwRe = regexp.MustCompile(`(?is)^BEGIN(?:\s+RW)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`)
beginRoRe = regexp.MustCompile(`(?is)^BEGIN\s+RO(?:\s+([^\s]+))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`)
beginRwRe = regexp.MustCompile(`(?is)^BEGIN(?:\s+RW)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?(?:\s+TAG\s+(.+))?$`)
beginRoRe = regexp.MustCompile(`(?is)^BEGIN\s+RO(?:\s+([^\s]+))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?(?:\s+TAG\s+(.+))?$`)
commitRe = regexp.MustCompile(`(?is)^COMMIT$`)
rollbackRe = regexp.MustCompile(`(?is)^ROLLBACK$`)
closeRe = regexp.MustCompile(`(?is)^CLOSE$`)
Expand Down Expand Up @@ -919,22 +919,26 @@ func runInNewOrExistRwTxForExplain(session *Session, f func() (affected int64, p

type BeginRwStatement struct {
Priority pb.RequestOptions_Priority
Tag string
}

func newBeginRwStatement(input string) (*BeginRwStatement, error) {
matched := beginRwRe.FindStringSubmatch(input)
if matched[1] == "" {
return &BeginRwStatement{}, nil
stmt := &BeginRwStatement{}

if matched[1] != "" {
priority, err := parsePriority(matched[1])
if err != nil {
return nil, err
}
stmt.Priority = priority
}

priority, err := parsePriority(matched[1])
if err != nil {
return nil, err
if matched[2] != "" {
stmt.Tag = matched[2]
}

return &BeginRwStatement{
Priority: priority,
}, nil
return stmt, nil
}

func (s *BeginRwStatement) Execute(session *Session) (*Result, error) {
Expand All @@ -945,7 +949,7 @@ func (s *BeginRwStatement) Execute(session *Session) (*Result, error) {
return nil, errors.New("you're in read-only transaction. Please finish the transaction by 'CLOSE;'")
}

if err := session.BeginReadWriteTransaction(s.Priority); err != nil {
if err := session.BeginReadWriteTransaction(s.Priority, s.Tag); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1004,6 +1008,7 @@ type BeginRoStatement struct {
Staleness time.Duration
Timestamp time.Time
Priority pb.RequestOptions_Priority
Tag string
}

func newBeginRoStatement(input string) (*BeginRoStatement, error) {
Expand Down Expand Up @@ -1035,6 +1040,10 @@ func newBeginRoStatement(input string) (*BeginRoStatement, error) {
stmt.Priority = priority
}

if matched[3] != "" {
stmt.Tag = matched[3]
}

return stmt, nil
}

Expand All @@ -1048,7 +1057,7 @@ func (s *BeginRoStatement) Execute(session *Session) (*Result, error) {
close.Execute(session)
}

ts, err := session.BeginReadOnlyTransaction(s.TimestampBoundType, s.Staleness, s.Timestamp, s.Priority)
ts, err := session.BeginReadOnlyTransaction(s.TimestampBoundType, s.Staleness, s.Timestamp, s.Priority, s.Tag)
if err != nil {
return nil, err
}
Expand Down
97 changes: 97 additions & 0 deletions statement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,57 @@ func TestBuildStatement(t *testing.T) {
Priority: pb.RequestOptions_PRIORITY_LOW,
},
},
{
desc: "BEGIN statement with TAG",
input: "BEGIN TAG app=spanner-cli,env=test",
want: &BeginRwStatement{
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "BEGIN RW statement with TAG",
input: "BEGIN RW TAG app=spanner-cli,env=test",
want: &BeginRwStatement{
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "BEGIN PRIORITY statement with TAG",
input: "BEGIN PRIORITY MEDIUM TAG app=spanner-cli,env=test",
want: &BeginRwStatement{
Priority: pb.RequestOptions_PRIORITY_MEDIUM,
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "BEGIN statement with TAG whitespace",
input: "BEGIN TAG app=spanner-cli env=test",
want: &BeginRwStatement{
Tag: "app=spanner-cli env=test",
},
},
{
desc: "BEGIN RW statement with TAG whitespace",
input: "BEGIN RW TAG app=spanner-cli env=test",
want: &BeginRwStatement{
Tag: "app=spanner-cli env=test",
},
},
{
desc: "BEGIN PRIORITY statement with TAG whitespace",
input: "BEGIN PRIORITY MEDIUM TAG app=spanner-cli env=test",
want: &BeginRwStatement{
Priority: pb.RequestOptions_PRIORITY_MEDIUM,
Tag: "app=spanner-cli env=test",
},
},
{
desc: "BEGIN statement with TAG quoted",
input: "BEGIN TAG app=\"spanner-cli\" env='dev'",
want: &BeginRwStatement{
Tag: "app=\"spanner-cli\" env='dev'",
},
},
{
desc: "BEGIN RO statement",
input: "BEGIN RO",
Expand Down Expand Up @@ -240,6 +291,52 @@ func TestBuildStatement(t *testing.T) {
Priority: pb.RequestOptions_PRIORITY_HIGH,
},
},
{
desc: "BEGIN RO statement with TAG",
input: "BEGIN RO TAG app=spanner-cli,env=test",
want: &BeginRoStatement{
TimestampBoundType: strong,
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "BEGIN RO staleness statement with TAG",
input: "BEGIN RO 10 TAG app=spanner-cli,env=test",
want: &BeginRoStatement{
Staleness: time.Duration(10 * time.Second),
TimestampBoundType: exactStaleness,
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "BEGIN RO read timestamp statement with TAG",
input: "BEGIN RO 2020-03-30T22:54:44.834017+09:00 TAG app=spanner-cli,env=test",
want: &BeginRoStatement{
Timestamp: timestamp,
TimestampBoundType: readTimestamp,
Tag: "app=spanner-cli,env=test",
},
skipLowerCase: true,
},
{
desc: "BEGIN RO PRIORITY statement with TAG",
input: "BEGIN RO PRIORITY LOW TAG app=spanner-cli,env=test",
want: &BeginRoStatement{
TimestampBoundType: strong,
Priority: pb.RequestOptions_PRIORITY_LOW,
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "BEGIN RO staleness with PRIORITY statement with TAG",
input: "BEGIN RO 10 PRIORITY HIGH TAG app=spanner-cli,env=test",
want: &BeginRoStatement{
Staleness: time.Duration(10 * time.Second),
TimestampBoundType: exactStaleness,
Priority: pb.RequestOptions_PRIORITY_HIGH,
Tag: "app=spanner-cli,env=test",
},
},
{
desc: "COMMIT statement",
input: "COMMIT",
Expand Down

0 comments on commit 8ac33c9

Please sign in to comment.