Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runQueryStream keeps loading entities into memory, not respecting stream backpressure #859

Open
kirillgroshkov opened this issue Jul 22, 2021 · 0 comments
Labels
api: datastore Issues related to the googleapis/nodejs-datastore API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@kirillgroshkov
Copy link

There is a runQueryStream method that returns a Transform, which can be used in a pipeline.

We have a script that needs to load big number of entities from Datastore (>100K), where each Entity has id and a string field (s), which contains rather long string (say, 500Kb). This script needs to load all Entities and dump it into an ndjson-file. For the reproduction, ndjson part is not important, we can consume the data with "void-consumer".

What we expect is that we create a Node.js pipeline, pipe the Datastore's stream into our "consumer", and Node.js will handle backpressure for us. Backpressure means that if Consumer is slower than producer (this is true in our case), Node.js will pause pulling in the data from the Producer (datastore runQueryStream call), not no go out-of-memory.

Pipeline itself works, as proven by our logging. If I create an artificial Consumer that just does setTimeout 100ms and then passes-through the data (which is then discarded and garbage-collected), our logging confirms that backpressure works and no more "objects" that Consumer can consume flows-in. We make our Consumer sequential (non-parallel) for the minimal reproduction.

What happens in reality is that Datastore library keeps downloading data from the Cloud (as seen by Network activity with ~9Mb/second rate), while not passing this data to the Node.js stream, which creates an OOM-explosion.

Expected behavior is that runQueryStream should pause downloading data from the Cloud to respect Node's backpressure.

Here I'm sharing somewhat minimal repro that I created. Please tell me if you'll need it even more minimal.

Important is that I've tried it both with legacy grpc library (we use it by default) and with @grpc/grpc-js. The latter gives a different Network rate (~5Mb/s instead of ~9Mb/s), but behaves similarly in terms of not respecting backpressure.

Minimal repro:

import {
  transformLogProgress,
} from '@naturalcycles/nodejs-lib'
import { datastoreDB } from '@src/db/datastore/datastore.service'
import { pipeline, Transform, Writable } from 'stream'
import { promisify } from 'util'

const _pipeline = promisify(pipeline);

(async () => {
  const ds = datastoreDB.ds()

  await _pipeline([
    ds.runQueryStream(ds.createQuery('UserFertilityCache')),

    // This thing logs every 100's object + some memory and speed metrics
    transformLogProgress({
      logEvery: 100,
    }),

    // This is a fake Consumer that introduces a 100ms delay before passing the data further
    new Transform({
      objectMode: true,
      transform(chunk: any, _encoding, cb) {
        setTimeout(() => cb(null, chunk), 100)
      },
    }),

    // This is a "no-op" consumer that just discards the data
    new Writable({
      objectMode: true,
      write(chunk, _encoding, cb) {
        cb()
      },
    }),
  ])
})()

Runtime logs:

node 14.17.3, NODE_OPTIONS: --max-old-space-size=2000
{ progress: 0, heapUsed: 67, rss: 143, rps10: 0, rpsTotal: 0 }
{ progress: 100, heapUsed: 99, rss: 214, rps10: 6, rpsTotal: 11 }
{ progress: 200, heapUsed: 172, rss: 306, rps10: 7, rpsTotal: 11 }
{ progress: 300, heapUsed: 232, rss: 374, rps10: 8, rpsTotal: 11 }
{ progress: 400, heapUsed: 325, rss: 486, rps10: 8, rpsTotal: 10 }
{ progress: 500, heapUsed: 429, rss: 612, rps10: 8, rpsTotal: 10 }
{ progress: 600, heapUsed: 526, rss: 707, rps10: 9, rpsTotal: 10 }
{ progress: 700, heapUsed: 620, rss: 815, rps10: 9, rpsTotal: 10 }
{ progress: 800, heapUsed: 716, rss: 915, rps10: 9, rpsTotal: 10 }
{ progress: 900, heapUsed: 823, rss: 1036, rps10: 9, rpsTotal: 10 }
{ progress: 1000, heapUsed: 920, rss: 1136, rps10: 10, rpsTotal: 10 }
2021-07-22 11:43:17 progress took 1m40s so far to process 1000 rows, ~36K/hour
{ progress: 1100, heapUsed: 998, rss: 1227, rps10: 10, rpsTotal: 10 }
{ progress: 1200, heapUsed: 1089, rss: 1328, rps10: 10, rpsTotal: 10 }
{ progress: 1300, heapUsed: 1190, rss: 1434, rps10: 10, rpsTotal: 10 }
{ progress: 1400, heapUsed: 1280, rss: 1526, rps10: 10, rpsTotal: 10 }
{ progress: 1500, heapUsed: 1366, rss: 1621, rps10: 10, rpsTotal: 10 }
{ progress: 1600, heapUsed: 1448, rss: 1719, rps10: 10, rpsTotal: 10 }
{ progress: 1700, heapUsed: 1554, rss: 1826, rps10: 10, rpsTotal: 10 }
{ progress: 1800, heapUsed: 1656, rss: 1932, rps10: 10, rpsTotal: 10 }
{ progress: 1900, heapUsed: 1755, rss: 2032, rps10: 10, rpsTotal: 10 }
{ progress: 2000, heapUsed: 1852, rss: 2132, rps10: 10, rpsTotal: 10 }
2021-07-22 11:44:58 progress took 3m21s so far to process 2000 rows, ~36K/hour

<--- Last few GCs --->

[18012:0x110008000]   210698 ms: Scavenge 1927.8 (2003.9) -> 1927.5 (2004.9) MB, 2.4 / 0.0 ms  (average mu = 0.973, current mu = 0.961) allocation failure 
[18012:0x110008000]   210913 ms: Scavenge 1931.4 (2005.6) -> 1930.9 (2006.1) MB, 2.8 / 0.0 ms  (average mu = 0.973, current mu = 0.961) allocation failure 
[18012:0x110008000]   210917 ms: Scavenge 1931.5 (2014.2) -> 1931.0 (2016.5) MB, 3.5 / 0.0 ms  (average mu = 0.973, current mu = 0.961) task 


<--- JS stacktrace --->

FATAL ERROR: MarkCompactCollector: young object promotion failed Allocation failed - JavaScript heap out of memory
 1: 0x1012da6a5 node::Abort() (.cold.1) [/usr/local/bin/node]
 2: 0x1000a6309 node::Abort() [/usr/local/bin/node]
 3: 0x1000a646f node::OnFatalError(char const*, char const*) [/usr/local/bin/node]
 4: 0x1001e8f17 v8::Utils::ReportOOMFailure(v8::internal::Isolate*, char const*, bool) [/usr/local/bin/node]
 5: 0x1001e8eb3 v8::internal::V8::FatalProcessOutOfMemory(v8::internal::Isolate*, char const*, bool) [/usr/local/bin/node]
 6: 0x100395db5 v8::internal::Heap::FatalProcessOutOfMemory(char const*) [/usr/local/bin/node]
 7: 0x1003f1d13 v8::internal::EvacuateNewSpaceVisitor::Visit(v8::internal::HeapObject, int) [/usr/local/bin/node]
 8: 0x1003d957b void v8::internal::LiveObjectVisitor::VisitBlackObjectsNoFail<v8::internal::EvacuateNewSpaceVisitor, v8::internal::MajorNonAtomicMarkingState>(v8::internal::MemoryChunk*, v8::internal::MajorNonAtomicMarkingState*, v8::internal::EvacuateNewSpaceVisitor*, v8::internal::LiveObjectVisitor::IterationMode) [/usr/local/bin/node]
 9: 0x1003d90c5 v8::internal::FullEvacuator::RawEvacuatePage(v8::internal::MemoryChunk*, long*) [/usr/local/bin/node]
10: 0x1003d8e06 v8::internal::Evacuator::EvacuatePage(v8::internal::MemoryChunk*) [/usr/local/bin/node]
11: 0x1003f673e v8::internal::PageEvacuationTask::RunInParallel(v8::internal::ItemParallelJob::Task::Runner) [/usr/local/bin/node]
12: 0x1003b07b2 v8::internal::ItemParallelJob::Task::RunInternal() [/usr/local/bin/node]
13: 0x1003b0c38 v8::internal::ItemParallelJob::Run() [/usr/local/bin/node]
14: 0x1003dae75 void v8::internal::MarkCompactCollectorBase::CreateAndExecuteEvacuationTasks<v8::internal::FullEvacuator, v8::internal::MarkCompactCollector>(v8::internal::MarkCompactCollector*, v8::internal::ItemParallelJob*, v8::internal::MigrationObserver*, long) [/usr/local/bin/node]
15: 0x1003daa76 v8::internal::MarkCompactCollector::EvacuatePagesInParallel() [/usr/local/bin/node]
16: 0x1003c62a7 v8::internal::MarkCompactCollector::Evacuate() [/usr/local/bin/node]
17: 0x1003c3b3b v8::internal::MarkCompactCollector::CollectGarbage() [/usr/local/bin/node]
18: 0x10039647b v8::internal::Heap::MarkCompact() [/usr/local/bin/node]
19: 0x100392a69 v8::internal::Heap::PerformGarbageCollection(v8::internal::GarbageCollector, v8::GCCallbackFlags) [/usr/local/bin/node]
20: 0x1003908b0 v8::internal::Heap::CollectGarbage(v8::internal::AllocationSpace, v8::internal::GarbageCollectionReason, v8::GCCallbackFlags) [/usr/local/bin/node]
21: 0x10039ef9a v8::internal::Heap::AllocateRawWithLightRetrySlowPath(int, v8::internal::AllocationType, v8::internal::AllocationOrigin, v8::internal::AllocationAlignment) [/usr/local/bin/node]
22: 0x10039f021 v8::internal::Heap::AllocateRawWithRetryOrFailSlowPath(int, v8::internal::AllocationType, v8::internal::AllocationOrigin, v8::internal::AllocationAlignment) [/usr/local/bin/node]
23: 0x1003683ed v8::internal::FactoryBase<v8::internal::Factory>::NewRawOneByteString(int, v8::internal::AllocationType) [/usr/local/bin/node]
24: 0x10036e89e v8::internal::Factory::NewStringFromUtf8(v8::internal::Vector<char const> const&, v8::internal::AllocationType) [/usr/local/bin/node]
25: 0x10020b395 v8::String::NewFromUtf8(v8::Isolate*, char const*, v8::NewStringType, int) [/usr/local/bin/node]
26: 0x100161b23 node::StringBytes::Encode(v8::Isolate*, char const*, unsigned long, node::encoding, v8::Local<v8::Value>*) [/usr/local/bin/node]
27: 0x100087e6a void node::Buffer::(anonymous namespace)::StringSlice<(node::encoding)1>(v8::FunctionCallbackInfo<v8::Value> const&) [/usr/local/bin/node]
28: 0x100a0bacd Builtins_CallApiCallback [/usr/local/bin/node]
error Command failed with signal "SIGABRT".

Network tab that consistently shows a "maximum" rate of ~9Mb/sec, regardless of backpressure or delay settings in the Consumer (e.g, setting timeout to 200ms or 1000ms doesn't change the network rate):

image

Environment details

  • OS: Linux (server), MacOS Big Sur (dev machine)
  • Node.js version: 14.17.3
  • npm version: 6.14.13
  • @google-cloud/datastore version: 6.4.6

Steps to reproduce

See the minimal-repro code.

@product-auto-label product-auto-label bot added the api: datastore Issues related to the googleapis/nodejs-datastore API. label Jul 22, 2021
@crwilcox crwilcox added priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. labels Jul 22, 2021
kirillgroshkov added a commit to NaturalCycles/datastore-lib that referenced this issue Oct 7, 2021
Works around this issue: googleapis/nodejs-datastore#859
by implementing a custom Readable that respects backpressure.
To not get too slow, allows to set `rssLimitMB`.
Until `rssLimitMB` is reached - it works like the official library ("downloads whole internet"),
but gently pauses as soons as `rssLimitMB` is reached.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: datastore Issues related to the googleapis/nodejs-datastore API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

2 participants