diff --git a/README.md b/README.md index af1c9dc..8559f21 100644 --- a/README.md +++ b/README.md @@ -169,10 +169,10 @@ and `{}` for a mutually exclusive keyword. | Show DML Execution Plan | `EXPLAIN {INSERT\|UPDATE\|DELETE} ...;` | | | Show Query Execution Plan with Stats | `EXPLAIN ANALYZE SELECT ...;` | | | Show DML Execution Plan with Stats | `EXPLAIN ANALYZE {INSERT\|UPDATE\|DELETE} ...;` | | -| Start Read-Write Transaction | `BEGIN [RW] [PRIORITY {HIGH\|MEDIUM\|LOW}];` | See [Request Priority](#request-priority) for details on the priority. | +| Start Read-Write Transaction | `BEGIN [RW] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG ];` | See [Request Priority](#request-priority) for details on the priority. The tag you set is used as both transaction tag and request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).| | Commit Read-Write Transaction | `COMMIT;` | | | Rollback Read-Write Transaction | `ROLLBACK;` | | -| Start Read-Only Transaction | `BEGIN RO [{\|}] [PRIORITY {HIGH\|MEDIUM\|LOW}];` | `` and `` is used for stale read. See [Request Priority](#request-priority) for details on the priority. | +| Start Read-Only Transaction | `BEGIN RO [{\|}] [PRIORITY {HIGH\|MEDIUM\|LOW}] [TAG ];` | `` and `` is used for stale read. See [Request Priority](#request-priority) for details on the priority. The tag you set is used as request tag. See also [Transaction Tags and Request Tags](#transaction-tags-and-request-tags).| | End Read-Only Transaction | `CLOSE;` | | | Exit CLI | `EXIT;` | | @@ -259,6 +259,46 @@ BEGIN RO 2021-04-01T23:47:44+00:00 PRIORITY MEDIUM; Note that transaction-level priority takes precedence over command-level priority. +## Transaction Tags and Request Tags + +In a read-write transaction, you can add a tag following `BEGIN RW TAG `. +spanner-cli adds the tag set in `BEGIN RW TAG` as a transaction tag. +The tag will also be used as request tags within the transaction. + +``` +# Read-write transaction +# transaction_tag = tx1 ++--------------------+ +| BEGIN RW TAG tx1; | +| | +| SELECT val | +| FROM tab1 +-----request_tag = tx1 +| WHERE id = 1; | +| | +| UPDATE tab1 | +| SET val = 10 +-----request_tag = tx1 +| WHERE id = 1; | +| | +| COMMIT; | ++--------------------+ +``` + +In a read-only transaction, you can add a tag following `BEGIN RO TAG `. +Since read-only transaction doesn't support transaction tag, spanner-cli adds the tag set in `BEGIN RO TAG` as request tags. +``` +# Read-only transaction +# transaction_tag = N/A ++--------------------+ +| BEGIN RO TAG tx2; | +| | +| SELECT SUM(val) | +| FROM tab1 +-----request_tag = tx2 +| WHERE id = 1; | +| | +| CLOSE; | ++--------------------+ +``` + ## Using with the Cloud Spanner Emulator This tool supports the [Cloud Spanner Emulator](https://cloud.google.com/spanner/docs/emulator) via the [`SPANNER_EMULATOR_HOST` environment variable](https://cloud.google.com/spanner/docs/emulator#client-libraries). diff --git a/session.go b/session.go index 4291db7..b15dca8 100644 --- a/session.go +++ b/session.go @@ -59,6 +59,7 @@ type Session struct { } type transactionContext struct { + tag string priority pb.RequestOptions_Priority sendHeartbeat bool // Becomes true only after a user-driven query is executed on the transaction. rwTxn *spanner.ReadWriteStmtBasedTransaction @@ -109,7 +110,7 @@ func (s *Session) InReadOnlyTransaction() bool { } // BeginReadWriteTransaction starts read-write transaction. -func (s *Session) BeginReadWriteTransaction(priority pb.RequestOptions_Priority) error { +func (s *Session) BeginReadWriteTransaction(priority pb.RequestOptions_Priority, tag string) error { if s.InReadWriteTransaction() { return errors.New("read-write transaction is already running") } @@ -122,12 +123,14 @@ func (s *Session) BeginReadWriteTransaction(priority pb.RequestOptions_Priority) opts := spanner.TransactionOptions{ CommitOptions: spanner.CommitOptions{ReturnCommitStats: true}, CommitPriority: priority, + TransactionTag: tag, } txn, err := spanner.NewReadWriteStmtBasedTransactionWithOptions(s.ctx, s.client, opts) if err != nil { return err } s.tc = &transactionContext{ + tag: tag, priority: priority, rwTxn: txn, } @@ -163,7 +166,7 @@ func (s *Session) RollbackReadWriteTransaction() error { } // BeginReadOnlyTransaction starts read-only transaction and returns the snapshot timestamp for the transaction if successful. -func (s *Session) BeginReadOnlyTransaction(typ timestampBoundType, staleness time.Duration, timestamp time.Time, priority pb.RequestOptions_Priority) (time.Time, error) { +func (s *Session) BeginReadOnlyTransaction(typ timestampBoundType, staleness time.Duration, timestamp time.Time, priority pb.RequestOptions_Priority, tag string) (time.Time, error) { if s.InReadOnlyTransaction() { return time.Time{}, errors.New("read-only transaction is already running") } @@ -193,6 +196,7 @@ func (s *Session) BeginReadOnlyTransaction(typ timestampBoundType, staleness tim } s.tc = &transactionContext{ + tag: tag, priority: priority, roTxn: txn, } @@ -252,11 +256,13 @@ func (s *Session) RunAnalyzeQuery(stmt spanner.Statement) (*pb.QueryPlan, error) func (s *Session) runQueryWithOptions(stmt spanner.Statement, opts spanner.QueryOptions) (*spanner.RowIterator, *spanner.ReadOnlyTransaction) { if s.InReadWriteTransaction() { + opts.RequestTag = s.tc.tag iter := s.tc.rwTxn.QueryWithOptions(s.ctx, stmt, opts) s.tc.sendHeartbeat = true return iter, nil } if s.InReadOnlyTransaction() { + opts.RequestTag = s.tc.tag return s.tc.roTxn.QueryWithOptions(s.ctx, stmt, opts), s.tc.roTxn } @@ -272,7 +278,8 @@ func (s *Session) RunUpdate(stmt spanner.Statement) (int64, error) { } opts := spanner.QueryOptions{ - Priority: s.currentPriority(), + Priority: s.currentPriority(), + RequestTag: s.tc.tag, } rowCount, err := s.tc.rwTxn.UpdateWithOptions(s.ctx, stmt, opts) s.tc.sendHeartbeat = true diff --git a/session_test.go b/session_test.go index 8a14acc..8f6b625 100644 --- a/session_test.go +++ b/session_test.go @@ -70,7 +70,7 @@ func TestRequestPriority(t *testing.T) { } // Read-Write Transaction. - if err := session.BeginReadWriteTransaction(test.transactionPriority); err != nil { + if err := session.BeginReadWriteTransaction(test.transactionPriority, ""); err != nil { t.Fatalf("failed to begin read write transaction: %v", err) } iter, _ := session.RunQuery(spanner.NewStatement("SELECT * FROM t1")) @@ -87,7 +87,7 @@ func TestRequestPriority(t *testing.T) { } // Read-Only Transaction. - if _, err := session.BeginReadOnlyTransaction(strong, 0, time.Now(), test.transactionPriority); err != nil { + if _, err := session.BeginReadOnlyTransaction(strong, 0, time.Now(), test.transactionPriority, ""); err != nil { t.Fatalf("failed to begin read only transaction: %v", err) } iter, _ = session.RunQueryWithStats(spanner.NewStatement("SELECT * FROM t1")) diff --git a/statement.go b/statement.go index 38f68de..f4f2584 100644 --- a/statement.go +++ b/statement.go @@ -108,8 +108,8 @@ var ( pdmlRe = regexp.MustCompile(`(?is)^PARTITIONED\s+((?:INSERT|UPDATE|DELETE)\s+.+$)`) // Transaction - beginRwRe = regexp.MustCompile(`(?is)^BEGIN(?:\s+RW)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`) - beginRoRe = regexp.MustCompile(`(?is)^BEGIN\s+RO(?:\s+([^\s]+))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?$`) + beginRwRe = regexp.MustCompile(`(?is)^BEGIN(?:\s+RW)?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?(?:\s+TAG\s+(.+))?$`) + beginRoRe = regexp.MustCompile(`(?is)^BEGIN\s+RO(?:\s+([^\s]+))?(?:\s+PRIORITY\s+(HIGH|MEDIUM|LOW))?(?:\s+TAG\s+(.+))?$`) commitRe = regexp.MustCompile(`(?is)^COMMIT$`) rollbackRe = regexp.MustCompile(`(?is)^ROLLBACK$`) closeRe = regexp.MustCompile(`(?is)^CLOSE$`) @@ -919,22 +919,26 @@ func runInNewOrExistRwTxForExplain(session *Session, f func() (affected int64, p type BeginRwStatement struct { Priority pb.RequestOptions_Priority + Tag string } func newBeginRwStatement(input string) (*BeginRwStatement, error) { matched := beginRwRe.FindStringSubmatch(input) - if matched[1] == "" { - return &BeginRwStatement{}, nil + stmt := &BeginRwStatement{} + + if matched[1] != "" { + priority, err := parsePriority(matched[1]) + if err != nil { + return nil, err + } + stmt.Priority = priority } - priority, err := parsePriority(matched[1]) - if err != nil { - return nil, err + if matched[2] != "" { + stmt.Tag = matched[2] } - return &BeginRwStatement{ - Priority: priority, - }, nil + return stmt, nil } func (s *BeginRwStatement) Execute(session *Session) (*Result, error) { @@ -945,7 +949,7 @@ func (s *BeginRwStatement) Execute(session *Session) (*Result, error) { return nil, errors.New("you're in read-only transaction. Please finish the transaction by 'CLOSE;'") } - if err := session.BeginReadWriteTransaction(s.Priority); err != nil { + if err := session.BeginReadWriteTransaction(s.Priority, s.Tag); err != nil { return nil, err } @@ -1004,6 +1008,7 @@ type BeginRoStatement struct { Staleness time.Duration Timestamp time.Time Priority pb.RequestOptions_Priority + Tag string } func newBeginRoStatement(input string) (*BeginRoStatement, error) { @@ -1035,6 +1040,10 @@ func newBeginRoStatement(input string) (*BeginRoStatement, error) { stmt.Priority = priority } + if matched[3] != "" { + stmt.Tag = matched[3] + } + return stmt, nil } @@ -1048,7 +1057,7 @@ func (s *BeginRoStatement) Execute(session *Session) (*Result, error) { close.Execute(session) } - ts, err := session.BeginReadOnlyTransaction(s.TimestampBoundType, s.Staleness, s.Timestamp, s.Priority) + ts, err := session.BeginReadOnlyTransaction(s.TimestampBoundType, s.Staleness, s.Timestamp, s.Priority, s.Tag) if err != nil { return nil, err } diff --git a/statement_test.go b/statement_test.go index 7ce9633..1944d82 100644 --- a/statement_test.go +++ b/statement_test.go @@ -210,6 +210,57 @@ func TestBuildStatement(t *testing.T) { Priority: pb.RequestOptions_PRIORITY_LOW, }, }, + { + desc: "BEGIN statement with TAG", + input: "BEGIN TAG app=spanner-cli,env=test", + want: &BeginRwStatement{ + Tag: "app=spanner-cli,env=test", + }, + }, + { + desc: "BEGIN RW statement with TAG", + input: "BEGIN RW TAG app=spanner-cli,env=test", + want: &BeginRwStatement{ + Tag: "app=spanner-cli,env=test", + }, + }, + { + desc: "BEGIN PRIORITY statement with TAG", + input: "BEGIN PRIORITY MEDIUM TAG app=spanner-cli,env=test", + want: &BeginRwStatement{ + Priority: pb.RequestOptions_PRIORITY_MEDIUM, + Tag: "app=spanner-cli,env=test", + }, + }, + { + desc: "BEGIN statement with TAG whitespace", + input: "BEGIN TAG app=spanner-cli env=test", + want: &BeginRwStatement{ + Tag: "app=spanner-cli env=test", + }, + }, + { + desc: "BEGIN RW statement with TAG whitespace", + input: "BEGIN RW TAG app=spanner-cli env=test", + want: &BeginRwStatement{ + Tag: "app=spanner-cli env=test", + }, + }, + { + desc: "BEGIN PRIORITY statement with TAG whitespace", + input: "BEGIN PRIORITY MEDIUM TAG app=spanner-cli env=test", + want: &BeginRwStatement{ + Priority: pb.RequestOptions_PRIORITY_MEDIUM, + Tag: "app=spanner-cli env=test", + }, + }, + { + desc: "BEGIN statement with TAG quoted", + input: "BEGIN TAG app=\"spanner-cli\" env='dev'", + want: &BeginRwStatement{ + Tag: "app=\"spanner-cli\" env='dev'", + }, + }, { desc: "BEGIN RO statement", input: "BEGIN RO", @@ -240,6 +291,52 @@ func TestBuildStatement(t *testing.T) { Priority: pb.RequestOptions_PRIORITY_HIGH, }, }, + { + desc: "BEGIN RO statement with TAG", + input: "BEGIN RO TAG app=spanner-cli,env=test", + want: &BeginRoStatement{ + TimestampBoundType: strong, + Tag: "app=spanner-cli,env=test", + }, + }, + { + desc: "BEGIN RO staleness statement with TAG", + input: "BEGIN RO 10 TAG app=spanner-cli,env=test", + want: &BeginRoStatement{ + Staleness: time.Duration(10 * time.Second), + TimestampBoundType: exactStaleness, + Tag: "app=spanner-cli,env=test", + }, + }, + { + desc: "BEGIN RO read timestamp statement with TAG", + input: "BEGIN RO 2020-03-30T22:54:44.834017+09:00 TAG app=spanner-cli,env=test", + want: &BeginRoStatement{ + Timestamp: timestamp, + TimestampBoundType: readTimestamp, + Tag: "app=spanner-cli,env=test", + }, + skipLowerCase: true, + }, + { + desc: "BEGIN RO PRIORITY statement with TAG", + input: "BEGIN RO PRIORITY LOW TAG app=spanner-cli,env=test", + want: &BeginRoStatement{ + TimestampBoundType: strong, + Priority: pb.RequestOptions_PRIORITY_LOW, + Tag: "app=spanner-cli,env=test", + }, + }, + { + desc: "BEGIN RO staleness with PRIORITY statement with TAG", + input: "BEGIN RO 10 PRIORITY HIGH TAG app=spanner-cli,env=test", + want: &BeginRoStatement{ + Staleness: time.Duration(10 * time.Second), + TimestampBoundType: exactStaleness, + Priority: pb.RequestOptions_PRIORITY_HIGH, + Tag: "app=spanner-cli,env=test", + }, + }, { desc: "COMMIT statement", input: "COMMIT",