Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support of using multiplexed session with ReadOnlyTransactions #10269

Open
wants to merge 16 commits into
base: main
Choose a base branch
from

Conversation

rahul2393
Copy link
Contributor

@rahul2393 rahul2393 commented May 28, 2024

  • Enhanced support for multiplexed sessions in the Spanner client, improving session management and efficiency.
  • Added metrics tracking for multiplexed sessions, providing better insights into usage patterns.
  • Adjusted testing conditions for session pools and metrics to accurately reflect multiplexing configurations.
  • Expanded test coverage to include scenarios for multiplexed sessions, ensuring robust session management functionality.
  • Added automation script for testing processes in the CI environment, improving integration testing efficiency.

@rahul2393 rahul2393 requested review from a team as code owners May 28, 2024 05:40
@rahul2393 rahul2393 added the do not merge Indicates a pull request not ready for merge, due to either quality or timing. label May 28, 2024
@product-auto-label product-auto-label bot added the api: spanner Issues related to the Spanner API. label May 28, 2024
@rahul2393 rahul2393 force-pushed the multiplexed-sessions branch 4 times, most recently from 09cb87e to cf1ac68 Compare June 3, 2024 05:29
@rahul2393 rahul2393 removed the do not merge Indicates a pull request not ready for merge, due to either quality or timing. label Jun 4, 2024
@rahul2393 rahul2393 force-pushed the multiplexed-sessions branch 4 times, most recently from 09d5513 to 89c64c2 Compare June 5, 2024 09:23
@rahul2393 rahul2393 force-pushed the multiplexed-sessions branch 6 times, most recently from b381f6c to 703ffae Compare July 9, 2024 10:51
@rahul2393
Copy link
Contributor Author

/cc @olavloite @harshachinta

Copy link
Contributor

@olavloite olavloite left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(First pass, continuing review tomorrow.)

Comment on lines 392 to 394
if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" {
return "true"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not very important, but this seems a bit strange. Why are we returning a standardized string from this method, instead of a bool?

# See the License for the specific language governing permissions and
# limitations under the License..

# TODO(deklerk): Add integration tests when it's secure to do so. b/64723143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can remove this TODO now ;-)

Comment on lines 126 to 128
if sh.client != nil {
sh.client = nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: just remove the nil check, it is safe to always set it to nil

@@ -149,6 +160,10 @@ func (sh *sessionHandle) getClient() *vkit.Client {
if sh.session == nil {
return nil
}
if sh.session.isMultiplexed {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be safer/easier to read if we just check for if sh.client != nil. That also allows us to use the same field for regular sessions if that would ever be handy, and it is clear to anyone reading the code.

Comment on lines 203 to 205
if sh.client != nil {
sh.client = nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also: Just remove the if check and always set to nil

@@ -570,12 +595,20 @@ type sessionPool struct {
// idleList caches idle session IDs. Session IDs in this list can be
// allocated for use.
idleList list.List
// multiplexedSessions contains the multiplexed sessions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// multiplexedSessions contains the multiplexed sessions
// multiplexedSession contains the multiplexed session

Comment on lines 609 to 621
// multiplexedSessionCreationError is the last error that occurred during multiplexed session
// creation and is propagated to any waiters waiting for a session.
multiplexedSessionCreationError error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Adding a comment for now, maybe it turns out to be void after reading the rest of the code)

We should only propagate this error to any waiters for the first creation of a multiplexed session. If refreshing a multiplexed session fails, then we should not propagate that error to the application, as the multiplexed session is likely to be usable for many days still.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if refreshing fails it will use existing multiplexed session. Added tests in TestMultiplexSessionWorker

@rahul2393 rahul2393 force-pushed the multiplexed-sessions branch 2 times, most recently from 0da6099 to 2e24f7e Compare July 19, 2024 05:40
Comment on lines 765 to 792
attrs := p.otConfig.attributeMap
for _, attr := range attributes {
attrs = append(attrs, attr)
}
m.Add(ctx, val, metric.WithAttributes(attrs...))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a new attribute map here every time, can we just have two attribute maps set on otConfig? One with multiplexed=true and one with multiplexed=false. This method is called for every transaction that is executed, and it feels a bit wasteful to create this map over and over again, when there are only two possible values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added separate maps, and updated tests

@@ -813,13 +859,34 @@ func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels
return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p)
}

func (p *sessionPool) getMultiplexedSession(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this method seems confusing to me. It is named get..., but it calls a method called executeCreateMultiplexedSessions(...)

if err != nil {
return err
}
go p.sc.executeCreateMultiplexedSessions(ctx, client, p.sc.sessionLabels, p.sc.md, p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename this method:

Suggested change
go p.sc.executeCreateMultiplexedSessions(ctx, client, p.sc.sessionLabels, p.sc.md, p)
go p.sc.executeCreateMultiplexedSession(ctx, client, p.sc.sessionLabels, p.sc.md, p)

// sessionReady is executed by the SessionClient when a session has been
// created and is ready to use. This method will add the new session to the
// pool and decrease the number of sessions that is being created.
func (p *sessionPool) sessionReady(s *session) {
p.mu.Lock()
defer p.mu.Unlock()
// Clear any session creation error.
if s.isMultiplexed {
s.pool = p
p.hc.register(s)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for self: This seems to register the multiplexed session with the normal health checker. That can be OK, as long as we treat it as a multiplexed session (e.g. no pings)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not registering now

p.multiplexedSessionCreationError = nil
p.recordStat(context.Background(), SessionsCount, 1, tagNumSessions, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"})
close(p.mayGetMultiplexedSession)
p.mayGetMultiplexedSession = make(chan struct{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we re-creating the channel here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

sc.mu.Unlock()
if closed {
err := spannerErrorf(codes.Canceled, "Session client closed")
trace.TracePrintf(ctx, nil, "Session client closed while creating multiplexed sessions: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trace.TracePrintf(ctx, nil, "Session client closed while creating multiplexed sessions: %v", err)
trace.TracePrintf(ctx, nil, "Session client closed while creating a multiplexed session: %v", err)

var mdForGFELatency metadata.MD
response, err := client.CreateSession(contextWithOutgoingMetadata(ctx, sc.md, sc.disableRouteToLeader), &sppb.CreateSessionRequest{
Database: sc.database,
// Multiplexed sessions do not support labels.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either remove the labels from the input arguments of this method, or combine this method with the createSession(..) method and only conditionally add labels in the combined method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the labels

@@ -192,7 +205,7 @@ func TestOTMetrics_SessionPool_SessionsCount(t *testing.T) {

client.Single().ReadRow(context.Background(), "Users", spanner.Key{"alice"}, []string{"email"})

attributesNumInUseSessions := append(getAttributes(client.ClientID()), attribute.Key("type").String("num_in_use_sessions"))
//attributesNumInUseSessions := append(getAttributes(client.ClientID()), attribute.Key("type").String("num_in_use_sessions"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove commented code

@@ -5208,7 +5208,7 @@ func TestClient_CloseWithUnresponsiveBackend(t *testing.T) {
server.TestSpanner.Freeze()
defer server.TestSpanner.Unfreeze()

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have tests that verify that:

  1. Multiplexed sessions are used for single-use and multi-use read-only transactions (when enabled).
  2. Regular sessions are used for read/write transactions (when multiplexed sessions are enabled).
  3. Only one multiplexed session is created, even if there are more than 1 goroutine trying to execute a read-only transaction directly at startup before the pool has been initialized.
  4. That multiplexed sessions are refreshed at the configured interval.
  5. That if replacing a multiplexed session after the configured interval fails, it does not affect the existing multiplexed session. (That is: If the initial session creation succeeds, and replacing the session after 7 days fails, then the original session should continue to be used.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1,2,3 => Added in TestClient_MultiplexedSession
4,5 => Added in TestMultiplexSessionWorker

@@ -1462,6 +1666,11 @@ func (hc *healthChecker) markDone(s *session) {
// healthCheck checks the health of the session and pings it if needed.
func (hc *healthChecker) healthCheck(s *session) {
defer hc.markDone(s)
// If the session is multiplexed and has been idle for more than 7 days,
if s.isMultiplexed && s.createTime.Add(multiplexSessionIdleTime).Before(time.Now()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be part of the standard health-check feature for regular sessions. This method is called by a worker that looks for sessions that need a ping. That logic is not the right logic that should determine whether we need to check whether the multiplexed session should be refreshed. It is probably better to put it in a separate method that only takes care of refreshing multiplexed sessions.
Also, we should ensure that:

  1. The method (by default) does not need to be called very often. We are checking whether the session is more than 7 days old. That is something that can be checked at an interval of 10 minutes or something like that.
  2. We should only have one createMultiplexedSession call in-flight at any time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Created separate worker multiplexSessionWorker to refresh session with 10 minute interval.
  2. Added test TestClient_MultiplexedSession to validate one in-flight req createMultiplexedSession at any time.

@rahul2393 rahul2393 force-pushed the multiplexed-sessions branch 2 times, most recently from 8a0e84c to fcb133a Compare July 23, 2024 13:51
@rahul2393 rahul2393 requested a review from olavloite July 23, 2024 13:56
},
validate: func(server InMemSpannerServer) {
// Validate the multiplexed session is used
expected := map[string]interface{}{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and in a couple of the other validations: Can we replace the expected map with just a simple uint variable, as that is the only thing that is put into the map?

if !isMultiplexEnabled {
expected["SessionsCount"] = uint(25) // BatchCreateSession request from regular session pool
}
if !testEqual(expected["SessionsCount"], server.TotalSessionsCreated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is going to be flaky in its current form (at least, I hope it will be), because:

  1. We always initiate the creation of both regular and multiplexed sessions if multiplexed sessions are enabled directly at startup.
  2. We by default create 4 * 25 regular sessions. This happens in the background.
  3. If multiplexed sessions are enabled, then we also create one multiplexed session. This happens in the background.
  4. Depending on what request is handled when, and whether mux sessions are enabled, then the number of sessions created will be 1, 25, 26, 50, 51, 75, 76, 100 or 101.

Copy link
Contributor Author

@rahul2393 rahul2393 Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. We always initiate the creation of both regular and multiplexed sessions if multiplexed sessions are enabled directly at startup.

Not true, regular sessions will only be created when number of requests waiting for regular session > minOpened, value for minOpened is 0 for setupMockedTestServer so as long as in test we only use ReadOnly txn it won't trigger regular session creation

We by default create 4 * 25 regular sessions. This happens in the background.

Not true, we only create regular sessions when there are pending requests for regular session given MinOpened=0 config, which is the case here.

Depending on what request is handled when, and whether mux sessions are enabled, then the number of sessions created will be 1, 25, 26, 50, 51, 75, 76, 100 or 101.

For the test cases either it will be 1(in case test is making R/O txn only) or 26(in case its doing R/W txn)

},
validate: func(server InMemSpannerServer) {
// Validate the regular session is used
if !testEqual(uint(25), server.TotalSessionsCreated()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that also here the number of sessions is not deterministic, as we have 4 BatchCreateSessions requests running in parallel.

Copy link
Contributor Author

@rahul2393 rahul2393 Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that also here the number of sessions is not deterministic, as we have 4 BatchCreateSessions requests running in parallel.

No, it will only create that many sessions which are needed which is 25(when mux disabled), 1 when enabled (because MinOpened=0 here)

@@ -386,6 +388,13 @@ func getInstanceConfig() string {
return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG")
}

func getMultiplexEnableFlag() bool {
if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can just do return os.Getenv("...") == "true"

"open_session_count": "25",
},
"true": {
"open_session_count": "1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This here also surprises me a bit. Are we not creating any regular sessions if multiplexed sessions are enabled? I would have expected us to create both in parallel, as we need regular sessions for read/write transactions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since MinOpened in the test is 0 no we don't create regular session at start we wait for requests which need it.

Comment on lines 643 to 644
// multiplexedSessionInUse is the number of transactions using multiplexed sessions.
multiplexedSessionInUse uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// multiplexedSessionInUse is the number of transactions using multiplexed sessions.
multiplexedSessionInUse uint64
// numTransactionsUsingMultiplexedSession is the number of transactions using the multiplexed session.
numTransactionsUsingMultiplexedSession uint64

Comment on lines 648 to 650
// maxMultiplexedSessionInUse is the maximum number of multiplexed sessions in use concurrently in the
// current 10 minute interval.
maxMultiplexedSessionInUse uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as above regarding naming.

But: Do we need these two numbers? What do they tell us and/or the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this since it is not adding any real benefit to customer other than drop in InUse and maxInUse metrics on customer when using multiplexed session.

if isMultiplexed {
// Ignore the error if multiplexed session already present
if p.multiplexedSession != nil {
p.multiplexedSession.checkingHealth = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope

if p.multiplexedSession != nil {
p.multiplexedSession.checkingHealth = false
p.multiplexedSessionCreationError = nil
p.mayGetMultiplexedSession <- true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? If there already was a multiplexed session, then there should be no waiters, meaning that we should also not need to send this signal, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes this is to unblock maintainer to continue check, if we don't send here it will keep on waiting and maintainer will go blocked

@@ -813,13 +882,53 @@ func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels
return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p)
}

func (p *sessionPool) createMultiplexedSession(ctx context.Context) error {
for c := range p.multiplexedSessionsReq {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took some time for me to understand what this does, but I think I now understand it:

  1. During session pool initialization, we call go createMultiplexedSession(..).
  2. That goroutine is then blocked on this for statement until there is a request for a multiplexed session.
  3. The request can come from a read-only transaction (if multiplexed sessions are enabled), or from the background worker keeping the multiplexed session fresh.

That means that:

  1. The multiplexed session creation is always on the critical path of the first read-only transaction.
  2. The goroutine is stuck here forever if there is never a request for a multiplexed session.

I don't think that is what we would want. Instead, we should try to:

  1. Always start the creation of a multiplexed session as soon as possible if multiplexed sessions are enabled. This means that the creation will not be on the critical path of the first read-only transaction.
  2. Not call this method at all if multiplexed sessions are disabled.

I think that many of my above comments are also a direct consequence of this behavior.

Copy link
Contributor Author

@rahul2393 rahul2393 Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always start the creation of a multiplexed session as soon as possible if multiplexed sessions are enabled. This means that the creation will not be on the critical path of the first read-only transaction.

I updated to trigger createMultiplexedSession upon session pool init

Not call this method at all if multiplexed sessions are disabled.

Updated, now we spawn this background thread only when mux is enabled

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: spanner Issues related to the Spanner API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants