-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spanner): add support of using multiplexed session with ReadOnlyTransactions #10269
base: main
Are you sure you want to change the base?
Conversation
rahul2393
commented
May 28, 2024
•
edited
Loading
edited
- Enhanced support for multiplexed sessions in the Spanner client, improving session management and efficiency.
- Added metrics tracking for multiplexed sessions, providing better insights into usage patterns.
- Adjusted testing conditions for session pools and metrics to accurately reflect multiplexing configurations.
- Expanded test coverage to include scenarios for multiplexed sessions, ensuring robust session management functionality.
- Added automation script for testing processes in the CI environment, improving integration testing efficiency.
09cb87e
to
cf1ac68
Compare
cf1ac68
to
cab67e9
Compare
09d5513
to
89c64c2
Compare
b381f6c
to
703ffae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(First pass, continuing review tomorrow.)
spanner/integration_test.go
Outdated
if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" { | ||
return "true" | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not very important, but this seems a bit strange. Why are we returning a standardized string from this method, instead of a bool?
spanner/kokoro/presubmit.sh
Outdated
# See the License for the specific language governing permissions and | ||
# limitations under the License.. | ||
|
||
# TODO(deklerk): Add integration tests when it's secure to do so. b/64723143 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this TODO now ;-)
spanner/session.go
Outdated
if sh.client != nil { | ||
sh.client = nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just remove the nil check, it is safe to always set it to nil
spanner/session.go
Outdated
@@ -149,6 +160,10 @@ func (sh *sessionHandle) getClient() *vkit.Client { | |||
if sh.session == nil { | |||
return nil | |||
} | |||
if sh.session.isMultiplexed { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be safer/easier to read if we just check for if sh.client != nil
. That also allows us to use the same field for regular sessions if that would ever be handy, and it is clear to anyone reading the code.
spanner/session.go
Outdated
if sh.client != nil { | ||
sh.client = nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here also: Just remove the if check and always set to nil
spanner/session.go
Outdated
@@ -570,12 +595,20 @@ type sessionPool struct { | |||
// idleList caches idle session IDs. Session IDs in this list can be | |||
// allocated for use. | |||
idleList list.List | |||
// multiplexedSessions contains the multiplexed sessions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// multiplexedSessions contains the multiplexed sessions | |
// multiplexedSession contains the multiplexed session |
spanner/session.go
Outdated
// multiplexedSessionCreationError is the last error that occurred during multiplexed session | ||
// creation and is propagated to any waiters waiting for a session. | ||
multiplexedSessionCreationError error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Adding a comment for now, maybe it turns out to be void after reading the rest of the code)
We should only propagate this error to any waiters for the first creation of a multiplexed session. If refreshing a multiplexed session fails, then we should not propagate that error to the application, as the multiplexed session is likely to be usable for many days still.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if refreshing fails it will use existing multiplexed session. Added tests in TestMultiplexSessionWorker
0da6099
to
2e24f7e
Compare
spanner/session.go
Outdated
attrs := p.otConfig.attributeMap | ||
for _, attr := range attributes { | ||
attrs = append(attrs, attr) | ||
} | ||
m.Add(ctx, val, metric.WithAttributes(attrs...)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of creating a new attribute map here every time, can we just have two attribute maps set on otConfig
? One with multiplexed=true
and one with multiplexed=false
. This method is called for every transaction that is executed, and it feels a bit wasteful to create this map over and over again, when there are only two possible values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added separate maps, and updated tests
spanner/session.go
Outdated
@@ -813,13 +859,34 @@ func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels | |||
return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p) | |||
} | |||
|
|||
func (p *sessionPool) getMultiplexedSession(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of this method seems confusing to me. It is named get...
, but it calls a method called executeCreateMultiplexedSessions(...)
spanner/session.go
Outdated
if err != nil { | ||
return err | ||
} | ||
go p.sc.executeCreateMultiplexedSessions(ctx, client, p.sc.sessionLabels, p.sc.md, p) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this method:
go p.sc.executeCreateMultiplexedSessions(ctx, client, p.sc.sessionLabels, p.sc.md, p) | |
go p.sc.executeCreateMultiplexedSession(ctx, client, p.sc.sessionLabels, p.sc.md, p) |
spanner/session.go
Outdated
// sessionReady is executed by the SessionClient when a session has been | ||
// created and is ready to use. This method will add the new session to the | ||
// pool and decrease the number of sessions that is being created. | ||
func (p *sessionPool) sessionReady(s *session) { | ||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
// Clear any session creation error. | ||
if s.isMultiplexed { | ||
s.pool = p | ||
p.hc.register(s) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for self: This seems to register the multiplexed session with the normal health checker. That can be OK, as long as we treat it as a multiplexed session (e.g. no pings)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not registering now
spanner/session.go
Outdated
p.multiplexedSessionCreationError = nil | ||
p.recordStat(context.Background(), SessionsCount, 1, tagNumSessions, tag.Tag{Key: tagKeyIsMultiplexed, Value: "true"}) | ||
close(p.mayGetMultiplexedSession) | ||
p.mayGetMultiplexedSession = make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we re-creating the channel here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
spanner/sessionclient.go
Outdated
sc.mu.Unlock() | ||
if closed { | ||
err := spannerErrorf(codes.Canceled, "Session client closed") | ||
trace.TracePrintf(ctx, nil, "Session client closed while creating multiplexed sessions: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trace.TracePrintf(ctx, nil, "Session client closed while creating multiplexed sessions: %v", err) | |
trace.TracePrintf(ctx, nil, "Session client closed while creating a multiplexed session: %v", err) |
var mdForGFELatency metadata.MD | ||
response, err := client.CreateSession(contextWithOutgoingMetadata(ctx, sc.md, sc.disableRouteToLeader), &sppb.CreateSessionRequest{ | ||
Database: sc.database, | ||
// Multiplexed sessions do not support labels. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either remove the labels from the input arguments of this method, or combine this method with the createSession(..)
method and only conditionally add labels in the combined method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the labels
@@ -192,7 +205,7 @@ func TestOTMetrics_SessionPool_SessionsCount(t *testing.T) { | |||
|
|||
client.Single().ReadRow(context.Background(), "Users", spanner.Key{"alice"}, []string{"email"}) | |||
|
|||
attributesNumInUseSessions := append(getAttributes(client.ClientID()), attribute.Key("type").String("num_in_use_sessions")) | |||
//attributesNumInUseSessions := append(getAttributes(client.ClientID()), attribute.Key("type").String("num_in_use_sessions")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove commented code
@@ -5208,7 +5208,7 @@ func TestClient_CloseWithUnresponsiveBackend(t *testing.T) { | |||
server.TestSpanner.Freeze() | |||
defer server.TestSpanner.Unfreeze() | |||
|
|||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) | |||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have tests that verify that:
- Multiplexed sessions are used for single-use and multi-use read-only transactions (when enabled).
- Regular sessions are used for read/write transactions (when multiplexed sessions are enabled).
- Only one multiplexed session is created, even if there are more than 1 goroutine trying to execute a read-only transaction directly at startup before the pool has been initialized.
- That multiplexed sessions are refreshed at the configured interval.
- That if replacing a multiplexed session after the configured interval fails, it does not affect the existing multiplexed session. (That is: If the initial session creation succeeds, and replacing the session after 7 days fails, then the original session should continue to be used.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1,2,3 => Added in TestClient_MultiplexedSession
4,5 => Added in TestMultiplexSessionWorker
spanner/session.go
Outdated
@@ -1462,6 +1666,11 @@ func (hc *healthChecker) markDone(s *session) { | |||
// healthCheck checks the health of the session and pings it if needed. | |||
func (hc *healthChecker) healthCheck(s *session) { | |||
defer hc.markDone(s) | |||
// If the session is multiplexed and has been idle for more than 7 days, | |||
if s.isMultiplexed && s.createTime.Add(multiplexSessionIdleTime).Before(time.Now()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be part of the standard health-check feature for regular sessions. This method is called by a worker that looks for sessions that need a ping. That logic is not the right logic that should determine whether we need to check whether the multiplexed session should be refreshed. It is probably better to put it in a separate method that only takes care of refreshing multiplexed sessions.
Also, we should ensure that:
- The method (by default) does not need to be called very often. We are checking whether the session is more than 7 days old. That is something that can be checked at an interval of 10 minutes or something like that.
- We should only have one
createMultiplexedSession
call in-flight at any time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Created separate worker
multiplexSessionWorker
to refresh session with 10 minute interval. - Added test
TestClient_MultiplexedSession
to validate one in-flight reqcreateMultiplexedSession
at any time.
8a0e84c
to
fcb133a
Compare
spanner/client_test.go
Outdated
}, | ||
validate: func(server InMemSpannerServer) { | ||
// Validate the multiplexed session is used | ||
expected := map[string]interface{}{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here and in a couple of the other validations: Can we replace the expected
map with just a simple uint
variable, as that is the only thing that is put into the map?
spanner/client_test.go
Outdated
if !isMultiplexEnabled { | ||
expected["SessionsCount"] = uint(25) // BatchCreateSession request from regular session pool | ||
} | ||
if !testEqual(expected["SessionsCount"], server.TotalSessionsCreated()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check is going to be flaky in its current form (at least, I hope it will be), because:
- We always initiate the creation of both regular and multiplexed sessions if multiplexed sessions are enabled directly at startup.
- We by default create 4 * 25 regular sessions. This happens in the background.
- If multiplexed sessions are enabled, then we also create one multiplexed session. This happens in the background.
- Depending on what request is handled when, and whether mux sessions are enabled, then the number of sessions created will be 1, 25, 26, 50, 51, 75, 76, 100 or 101.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We always initiate the creation of both regular and multiplexed sessions if multiplexed sessions are enabled directly at startup.
Not true, regular sessions will only be created when number of requests waiting for regular session > minOpened, value for minOpened is 0 for setupMockedTestServer
so as long as in test we only use ReadOnly txn it won't trigger regular session creation
We by default create 4 * 25 regular sessions. This happens in the background.
Not true, we only create regular sessions when there are pending requests for regular session given MinOpened=0 config, which is the case here.
Depending on what request is handled when, and whether mux sessions are enabled, then the number of sessions created will be 1, 25, 26, 50, 51, 75, 76, 100 or 101.
For the test cases either it will be 1(in case test is making R/O txn only) or 26(in case its doing R/W txn)
spanner/client_test.go
Outdated
}, | ||
validate: func(server InMemSpannerServer) { | ||
// Validate the regular session is used | ||
if !testEqual(uint(25), server.TotalSessionsCreated()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that also here the number of sessions is not deterministic, as we have 4 BatchCreateSessions requests running in parallel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that also here the number of sessions is not deterministic, as we have 4 BatchCreateSessions requests running in parallel.
No, it will only create that many sessions which are needed which is 25(when mux disabled), 1 when enabled (because MinOpened=0 here)
spanner/integration_test.go
Outdated
@@ -386,6 +388,13 @@ func getInstanceConfig() string { | |||
return os.Getenv("GCLOUD_TESTS_GOLANG_SPANNER_INSTANCE_CONFIG") | |||
} | |||
|
|||
func getMultiplexEnableFlag() bool { | |||
if os.Getenv("GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS") == "true" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: You can just do return os.Getenv("...") == "true"
spanner/oc_test.go
Outdated
"open_session_count": "25", | ||
}, | ||
"true": { | ||
"open_session_count": "1", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This here also surprises me a bit. Are we not creating any regular sessions if multiplexed sessions are enabled? I would have expected us to create both in parallel, as we need regular sessions for read/write transactions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since MinOpened in the test is 0 no we don't create regular session at start we wait for requests which need it.
spanner/session.go
Outdated
// multiplexedSessionInUse is the number of transactions using multiplexed sessions. | ||
multiplexedSessionInUse uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// multiplexedSessionInUse is the number of transactions using multiplexed sessions. | |
multiplexedSessionInUse uint64 | |
// numTransactionsUsingMultiplexedSession is the number of transactions using the multiplexed session. | |
numTransactionsUsingMultiplexedSession uint64 |
spanner/session.go
Outdated
// maxMultiplexedSessionInUse is the maximum number of multiplexed sessions in use concurrently in the | ||
// current 10 minute interval. | ||
maxMultiplexedSessionInUse uint64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here as above regarding naming.
But: Do we need these two numbers? What do they tell us and/or the user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this since it is not adding any real benefit to customer other than drop in InUse and maxInUse metrics on customer when using multiplexed session.
spanner/session.go
Outdated
if isMultiplexed { | ||
// Ignore the error if multiplexed session already present | ||
if p.multiplexedSession != nil { | ||
p.multiplexedSession.checkingHealth = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope
if p.multiplexedSession != nil { | ||
p.multiplexedSession.checkingHealth = false | ||
p.multiplexedSessionCreationError = nil | ||
p.mayGetMultiplexedSession <- true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? If there already was a multiplexed session, then there should be no waiters, meaning that we should also not need to send this signal, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes this is to unblock maintainer to continue check, if we don't send here it will keep on waiting and maintainer will go blocked
spanner/session.go
Outdated
@@ -813,13 +882,53 @@ func (p *sessionPool) growPoolLocked(numSessions uint64, distributeOverChannels | |||
return p.sc.batchCreateSessions(int32(numSessions), distributeOverChannels, p) | |||
} | |||
|
|||
func (p *sessionPool) createMultiplexedSession(ctx context.Context) error { | |||
for c := range p.multiplexedSessionsReq { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took some time for me to understand what this does, but I think I now understand it:
- During session pool initialization, we call
go createMultiplexedSession(..)
. - That goroutine is then blocked on this
for
statement until there is a request for a multiplexed session. - The request can come from a read-only transaction (if multiplexed sessions are enabled), or from the background worker keeping the multiplexed session fresh.
That means that:
- The multiplexed session creation is always on the critical path of the first read-only transaction.
- The goroutine is stuck here forever if there is never a request for a multiplexed session.
I don't think that is what we would want. Instead, we should try to:
- Always start the creation of a multiplexed session as soon as possible if multiplexed sessions are enabled. This means that the creation will not be on the critical path of the first read-only transaction.
- Not call this method at all if multiplexed sessions are disabled.
I think that many of my above comments are also a direct consequence of this behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always start the creation of a multiplexed session as soon as possible if multiplexed sessions are enabled. This means that the creation will not be on the critical path of the first read-only transaction.
I updated to trigger createMultiplexedSession upon session pool init
Not call this method at all if multiplexed sessions are disabled.
Updated, now we spawn this background thread only when mux is enabled
90fff40
to
c58eff5
Compare