Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce Java 21 Virtual Thread Pinning in IO operations #2553

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.NonNull;

Expand Down Expand Up @@ -66,6 +67,7 @@ boolean isOpen() {
private final AtomicReference<Throwable> shortCircuitFailure;
private final ApiFutureCallback<T> shortCircuitRegistrationCallback;

private final ReentrantLock lock;
private volatile State state;

private AsyncAppendingQueue(
Expand All @@ -85,26 +87,32 @@ private AsyncAppendingQueue(
shortCircuitFailure.compareAndSet(null, throwable);
}
};
lock = new ReentrantLock();
}

synchronized AsyncAppendingQueue<T> append(ApiFuture<T> value) throws ShortCircuitException {
checkState(state.isOpen(), "already closed");
Throwable throwable = shortCircuitFailure.get();
if (throwable != null) {
ShortCircuitException shortCircuitException = new ShortCircuitException(throwable);
finalResult.cancel(true);
throw shortCircuitException;
AsyncAppendingQueue<T> append(ApiFuture<T> value) throws ShortCircuitException {
lock.lock();
try {
checkState(state.isOpen(), "already closed");
Throwable throwable = shortCircuitFailure.get();
if (throwable != null) {
ShortCircuitException shortCircuitException = new ShortCircuitException(throwable);
finalResult.cancel(true);
throw shortCircuitException;
}
checkNotNull(value, "value must not be null");

Element<T> newElement = newElement(value);
queue.offer(newElement);
boolean isFull = queue.size() == maxElementsPerCompact;
if (isFull) {
Element<T> compact = compact(exec);
queue.offer(compact);
}
return this;
} finally {
lock.unlock();
}
checkNotNull(value, "value must not be null");

Element<T> newElement = newElement(value);
queue.offer(newElement);
boolean isFull = queue.size() == maxElementsPerCompact;
if (isFull) {
Element<T> compact = compact(exec);
queue.offer(compact);
}
return this;
}

ApiFuture<T> getResult() {
Expand All @@ -116,35 +124,40 @@ T await() {
}

@Override
public synchronized void close() {
if (!state.isOpen()) {
return;
}
state = State.CLOSING;

if (queue.isEmpty()) {
NoSuchElementException neverAppendedTo = new NoSuchElementException("Never appended to");
finalResult.setException(neverAppendedTo);
throw neverAppendedTo;
} else {
Element<T> transform = compact(exec);

ApiFutures.addCallback(
transform.getValue(),
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable err) {
finalResult.setException(err);
}

@Override
public void onSuccess(T t) {
finalResult.set(t);
}
},
exec);
public void close() {
lock.lock();
try {
if (!state.isOpen()) {
return;
}
state = State.CLOSING;

if (queue.isEmpty()) {
NoSuchElementException neverAppendedTo = new NoSuchElementException("Never appended to");
finalResult.setException(neverAppendedTo);
throw neverAppendedTo;
} else {
Element<T> transform = compact(exec);

ApiFutures.addCallback(
transform.getValue(),
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable err) {
finalResult.setException(err);
}

@Override
public void onSuccess(T t) {
finalResult.set(t);
}
},
exec);
}
state = State.CLOSED;
} finally {
lock.unlock();
}
state = State.CLOSED;
}

@NonNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.locks.ReentrantLock;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
Expand All @@ -41,42 +42,64 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
private int chunkSize = _2MiB;
private BufferHandle bufferHandle;
private LazyReadChannel<?, T> lazyReadChannel;
protected final ReentrantLock lock;

protected BaseStorageReadChannel(Decoder<T, BlobInfo> objectDecoder) {
this.objectDecoder = objectDecoder;
this.result = SettableApiFuture.create();
this.open = true;
this.byteRangeSpec = ByteRangeSpec.nullRange();
this.lock = new ReentrantLock();
}

@Override
public final synchronized void setChunkSize(int chunkSize) {
StorageException.wrapIOException(() -> maybeResetChannel(true));
this.chunkSize = chunkSize;
public final void setChunkSize(int chunkSize) {
lock.lock();
try {
StorageException.wrapIOException(() -> maybeResetChannel(true));
this.chunkSize = chunkSize;
} finally {
lock.unlock();
}
}

@Override
public final synchronized boolean isOpen() {
return open;
public final boolean isOpen() {
lock.lock();
try {
return open;
} finally {
lock.unlock();
}
}

@Override
public final synchronized void close() {
open = false;
if (internalGetLazyChannel().isOpen()) {
ReadableByteChannel channel = internalGetLazyChannel().getChannel();
StorageException.wrapIOException(channel::close);
public final void close() {
lock.lock();
try {
open = false;
if (internalGetLazyChannel().isOpen()) {
ReadableByteChannel channel = internalGetLazyChannel().getChannel();
StorageException.wrapIOException(channel::close);
}
} finally {
lock.unlock();
}
}

@Override
public final synchronized StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
public final StorageReadChannel setByteRangeSpec(ByteRangeSpec byteRangeSpec) {
requireNonNull(byteRangeSpec, "byteRangeSpec must be non null");
if (!this.byteRangeSpec.equals(byteRangeSpec)) {
StorageException.wrapIOException(() -> maybeResetChannel(false));
this.byteRangeSpec = byteRangeSpec;
lock.lock();
try {
if (!this.byteRangeSpec.equals(byteRangeSpec)) {
StorageException.wrapIOException(() -> maybeResetChannel(false));
this.byteRangeSpec = byteRangeSpec;
}
return this;
} finally {
lock.unlock();
}
return this;
}

@Override
Expand All @@ -85,42 +108,47 @@ public final ByteRangeSpec getByteRangeSpec() {
}

@Override
public final synchronized int read(ByteBuffer dst) throws IOException {
// BlobReadChannel only considered itself closed if close had been called on it.
if (!open) {
throw new ClosedChannelException();
}
long diff = byteRangeSpec.length();
// the check on beginOffset >= 0 used to be a precondition on seek(long)
// move it here to preserve existing behavior while allowing new negative offsets
if (diff <= 0 && byteRangeSpec.beginOffset() >= 0) {
return -1;
}
public final int read(ByteBuffer dst) throws IOException {
lock.lock();
try {
// trap if the fact that tmp is already closed, and instead return -1
ReadableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return -1;
}
int read = tmp.read(dst);
if (read != -1) {
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
// BlobReadChannel only considered itself closed if close had been called on it.
if (!open) {
throw new ClosedChannelException();
}
return read;
} catch (StorageException e) {
if (e.getCode() == 416) {
// HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
// BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
// Emulate that same behavior here to preserve behavior compatibility, though this should
// be removed in the next major version.
long diff = byteRangeSpec.length();
// the check on beginOffset >= 0 used to be a precondition on seek(long)
// move it here to preserve existing behavior while allowing new negative offsets
if (diff <= 0 && byteRangeSpec.beginOffset() >= 0) {
return -1;
} else {
throw new IOException(e);
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
try {
// trap if the fact that tmp is already closed, and instead return -1
ReadableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return -1;
}
int read = tmp.read(dst);
if (read != -1) {
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
}
return read;
} catch (StorageException e) {
if (e.getCode() == 416) {
// HttpStorageRpc turns 416 into a null etag with an empty byte array, leading
// BlobReadChannel to believe it read 0 bytes, returning -1 and leaving the channel open.
// Emulate that same behavior here to preserve behavior compatibility, though this should
// be removed in the next major version.
return -1;
} else {
throw new IOException(e);
}
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(StorageException.coalesce(e));
}
} finally {
lock.unlock();
}
}

Expand Down
Loading
Loading