diff --git a/datastore/query_test.go b/datastore/query_test.go index 36d72b40c8ac..6c191f09326a 100644 --- a/datastore/query_test.go +++ b/datastore/query_test.go @@ -763,8 +763,8 @@ func TestReadOptions(t *testing.T) { } // Test errors. for _, q := range []*Query{ - NewQuery("").Transaction(&Transaction{id: nil}), - NewQuery("").Transaction(&Transaction{id: tid}).EventualConsistency(), + NewQuery("").Transaction(&Transaction{id: nil, state: transactionStateExpired}), + NewQuery("").Transaction(&Transaction{id: tid, state: transactionStateInProgress}).EventualConsistency(), } { req := &pb.RunQueryRequest{} if err := q.toRunQueryRequest(req); err == nil { diff --git a/datastore/transaction.go b/datastore/transaction.go index cbd64b342527..647e50b27942 100644 --- a/datastore/transaction.go +++ b/datastore/transaction.go @@ -37,6 +37,14 @@ type transactionSettings struct { readOnly bool prevID []byte // ID of the transaction to retry readTime *timestamppb.Timestamp + + // When set, skips the initial BeginTransaction RPC call to obtain txn id and + // uses the piggybacked txn id from first read rpc call. + // If there are no read operations on transaction, BeginTransaction RPC call is made + // before rollback or commit + // Currently, this setting is set but unused + // TODO: b/291258189 - Use this setting + beginLater bool } // newTransactionSettings creates a transactionSettings with a given TransactionOption slice. @@ -91,6 +99,7 @@ var ReadOnly TransactionOption func init() { ReadOnly = readOnly{} + BeginLater = beginLater{} } type readOnly struct{} @@ -99,6 +108,25 @@ func (readOnly) apply(s *transactionSettings) { s.readOnly = true } +// BeginLater is a TransactionOption that can be used to improve transaction performance +// Currently, it is a no-op +// TODO: b/291258189 - Add implementation +var BeginLater TransactionOption + +type beginLater struct{} + +func (beginLater) apply(s *transactionSettings) { + s.beginLater = true +} + +type transactionState int + +const ( + transactionStateNotStarted transactionState = iota // Currently unused + transactionStateInProgress + transactionStateExpired +) + // Transaction represents a set of datastore operations to be committed atomically. // // Operations are enqueued by calling the Put and Delete methods on Transaction @@ -114,6 +142,8 @@ type Transaction struct { ctx context.Context mutations []*pb.Mutation // The mutations to apply. pending map[int]*PendingKey // Map from mutation index to incomplete keys pending transaction completion. + settings *transactionSettings + state transactionState } // NewTransaction starts a new transaction. @@ -167,6 +197,8 @@ func (c *Client) newTransaction(ctx context.Context, s *transactionSettings) (_ client: c, mutations: nil, pending: make(map[int]*PendingKey), + state: transactionStateInProgress, + settings: s, }, nil } @@ -223,7 +255,7 @@ func (t *Transaction) Commit() (c *Commit, err error) { t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Commit") defer func() { trace.EndSpan(t.ctx, err) }() - if t.id == nil { + if t.state == transactionStateExpired { return nil, errExpiredTransaction } req := &pb.CommitRequest{ @@ -237,7 +269,7 @@ func (t *Transaction) Commit() (c *Commit, err error) { if status.Code(err) == codes.Aborted { return nil, ErrConcurrentTransaction } - t.id = nil // mark the transaction as expired + t.state = transactionStateExpired // mark the transaction as expired if err != nil { return nil, err } @@ -264,15 +296,14 @@ func (t *Transaction) Rollback() (err error) { t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.Rollback") defer func() { trace.EndSpan(t.ctx, err) }() - if t.id == nil { + if t.state == transactionStateExpired { return errExpiredTransaction } - id := t.id - t.id = nil + t.state = transactionStateExpired _, err = t.client.client.Rollback(t.ctx, &pb.RollbackRequest{ ProjectId: t.client.dataset, DatabaseId: t.client.databaseID, - Transaction: id, + Transaction: t.id, }) return err } @@ -303,7 +334,7 @@ func (t *Transaction) GetMulti(keys []*Key, dst interface{}) (err error) { t.ctx = trace.StartSpan(t.ctx, "cloud.google.com/go/datastore.Transaction.GetMulti") defer func() { trace.EndSpan(t.ctx, err) }() - if t.id == nil { + if t.state == transactionStateExpired { return errExpiredTransaction } opts := &pb.ReadOptions{ @@ -336,7 +367,7 @@ func (t *Transaction) Put(key *Key, src interface{}) (*PendingKey, error) { // element of src in the same order. // TODO(jba): rewrite in terms of Mutate. func (t *Transaction) PutMulti(keys []*Key, src interface{}) (ret []*PendingKey, err error) { - if t.id == nil { + if t.state == transactionStateExpired { return nil, errExpiredTransaction } mutations, err := putMutations(keys, src) @@ -376,7 +407,7 @@ func (t *Transaction) Delete(key *Key) error { // DeleteMulti is a batch version of Delete. // TODO(jba): rewrite in terms of Mutate. func (t *Transaction) DeleteMulti(keys []*Key) (err error) { - if t.id == nil { + if t.state == transactionStateExpired { return errExpiredTransaction } mutations, err := deleteMutations(keys) @@ -396,7 +427,7 @@ func (t *Transaction) DeleteMulti(keys []*Key) (err error) { // // For an example, see Client.Mutate. func (t *Transaction) Mutate(muts ...*Mutation) ([]*PendingKey, error) { - if t.id == nil { + if t.state == transactionStateExpired { return nil, errExpiredTransaction } pmuts, err := mutationProtos(muts)