diff --git a/package-lock.json b/package-lock.json index 74cf668..afc6ca1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,7 @@ "version": "0.0.1", "license": "MIT", "dependencies": { + "apache-arrow": "^21.1.0", "cmake-js": "^8.0.0", "node-addon-api": "^6.0.0" }, @@ -30,6 +31,36 @@ "node": ">=18.0.0" } }, + "node_modules/@swc/helpers": { + "version": "0.5.21", + "resolved": "https://registry.npmjs.org/@swc/helpers/-/helpers-0.5.21.tgz", + "integrity": "sha512-jI/VAmtdjB/RnI8GTnokyX7Ug8c+g+ffD6QRLa6XQewtnGyukKkKSk3wLTM3b5cjt1jNh9x0jfVlagdN2gDKQg==", + "license": "Apache-2.0", + "dependencies": { + "tslib": "^2.8.0" + } + }, + "node_modules/@types/command-line-args": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/@types/command-line-args/-/command-line-args-5.2.3.tgz", + "integrity": "sha512-uv0aG6R0Y8WHZLTamZwtfsDLVRnOa+n+n5rEvFWL5Na5gZ8V2Teab/duDPFzIIIhs9qizDpcavCusCLJZu62Kw==", + "license": "MIT" + }, + "node_modules/@types/command-line-usage": { + "version": "5.0.4", + "resolved": "https://registry.npmjs.org/@types/command-line-usage/-/command-line-usage-5.0.4.tgz", + "integrity": "sha512-BwR5KP3Es/CSht0xqBcUXS3qCAUVXwpRKsV2+arxeb65atasuXG9LykC9Ab10Cw3s2raH92ZqOeILaQbsB2ACg==", + "license": "MIT" + }, + "node_modules/@types/node": { + "version": "24.12.4", + "resolved": "https://registry.npmjs.org/@types/node/-/node-24.12.4.tgz", + "integrity": "sha512-GUUEShf+PBCGW2KaXwcIt3Yk+e3pkKwWKb9GSyM9WQVE+ep2jzmHdGsHzu4wgcZy5fN9FBdVzjpBQsYlpfpgLA==", + "license": "MIT", + "dependencies": { + "undici-types": "~7.16.0" + } + }, "node_modules/ansi-colors": { "version": "4.1.3", "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz", @@ -78,6 +109,26 @@ "node": ">= 8" } }, + "node_modules/apache-arrow": { + "version": "21.1.0", + "resolved": "https://registry.npmjs.org/apache-arrow/-/apache-arrow-21.1.0.tgz", + "integrity": "sha512-kQrYLxhC+NTVVZ4CCzGF6L/uPVOzJmD1T3XgbiUnP7oTeVFOFgEUu6IKNwCDkpFoBVqDKQivlX4RUFqqnWFlEA==", + "license": "Apache-2.0", + "dependencies": { + "@swc/helpers": "^0.5.11", + "@types/command-line-args": "^5.2.3", + "@types/command-line-usage": "^5.0.4", + "@types/node": "^24.0.3", + "command-line-args": "^6.0.1", + "command-line-usage": "^7.0.1", + "flatbuffers": "^25.1.24", + "json-bignum": "^0.0.3", + "tslib": "^2.6.2" + }, + "bin": { + "arrow2csv": "bin/arrow2csv.js" + } + }, "node_modules/argparse": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", @@ -85,6 +136,15 @@ "dev": true, "license": "Python-2.0" }, + "node_modules/array-back": { + "version": "6.2.3", + "resolved": "https://registry.npmjs.org/array-back/-/array-back-6.2.3.tgz", + "integrity": "sha512-SGDvmg6QTYiTxCBkYVmThcoa67uLl35pyzRHdpCGBOcqFy6BtwnphoFPk7LhJshD+Yk1Kt35WGWeZPTgwR4Fhw==", + "license": "MIT", + "engines": { + "node": ">=12.17" + } + }, "node_modules/assertion-error": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", @@ -181,7 +241,6 @@ "version": "4.1.2", "resolved": "https://registry.npmjs.org/chalk/-/chalk-4.1.2.tgz", "integrity": "sha512-oKnbhFyRIXpUuez8iBMmyEa4nbj4IOQyuhc/wy9kY7/WVPcwIO9VA668Pu8RkO7+0G76SLROeyw9CpQ061i4mA==", - "dev": true, "license": "MIT", "dependencies": { "ansi-styles": "^4.1.0", @@ -194,11 +253,25 @@ "url": "https://github.com/chalk/chalk?sponsor=1" } }, + "node_modules/chalk-template": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/chalk-template/-/chalk-template-0.4.0.tgz", + "integrity": "sha512-/ghrgmhfY8RaSdeo43hNXxpoHAtxdbskUHjPpfqUWGttFgycUhYPGx3YZBCnUCvOa7Doivn1IZec3DEGFoMgLg==", + "license": "MIT", + "dependencies": { + "chalk": "^4.1.2" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/chalk/chalk-template?sponsor=1" + } + }, "node_modules/chalk/node_modules/supports-color": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/supports-color/-/supports-color-7.2.0.tgz", "integrity": "sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==", - "dev": true, "license": "MIT", "dependencies": { "has-flag": "^4.0.0" @@ -309,6 +382,44 @@ "integrity": "sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==", "license": "MIT" }, + "node_modules/command-line-args": { + "version": "6.0.2", + "resolved": "https://registry.npmjs.org/command-line-args/-/command-line-args-6.0.2.tgz", + "integrity": "sha512-AIjYVxrV9X752LmPDLbVYv8aMCuHPSLZJXEo2qo/xJfv+NYhaZ4sMSF01rM+gHPaMgvPM0l5D/F+Qx+i2WfSmQ==", + "license": "MIT", + "dependencies": { + "array-back": "^6.2.3", + "find-replace": "^5.0.2", + "lodash.camelcase": "^4.3.0", + "typical": "^7.3.0" + }, + "engines": { + "node": ">=12.20" + }, + "peerDependencies": { + "@75lb/nature": "latest" + }, + "peerDependenciesMeta": { + "@75lb/nature": { + "optional": true + } + } + }, + "node_modules/command-line-usage": { + "version": "7.0.4", + "resolved": "https://registry.npmjs.org/command-line-usage/-/command-line-usage-7.0.4.tgz", + "integrity": "sha512-85UdvzTNx/+s5CkSgBm/0hzP80RFHAa7PsfeADE5ezZF3uHz3/Tqj9gIKGT9PTtpycc3Ua64T0oVulGfKxzfqg==", + "license": "MIT", + "dependencies": { + "array-back": "^6.2.2", + "chalk-template": "^0.4.0", + "table-layout": "^4.1.1", + "typical": "^7.3.0" + }, + "engines": { + "node": ">=12.20.0" + } + }, "node_modules/debug": { "version": "4.4.3", "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.3.tgz", @@ -412,6 +523,23 @@ "node": ">=8" } }, + "node_modules/find-replace": { + "version": "5.0.2", + "resolved": "https://registry.npmjs.org/find-replace/-/find-replace-5.0.2.tgz", + "integrity": "sha512-Y45BAiE3mz2QsrN2fb5QEtO4qb44NcS7en/0y9PEVsg351HsLeVclP8QPMH79Le9sH3rs5RSwJu99W0WPZO43Q==", + "license": "MIT", + "engines": { + "node": ">=14" + }, + "peerDependencies": { + "@75lb/nature": "latest" + }, + "peerDependenciesMeta": { + "@75lb/nature": { + "optional": true + } + } + }, "node_modules/find-up": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/find-up/-/find-up-5.0.0.tgz", @@ -439,6 +567,12 @@ "flat": "cli.js" } }, + "node_modules/flatbuffers": { + "version": "25.9.23", + "resolved": "https://registry.npmjs.org/flatbuffers/-/flatbuffers-25.9.23.tgz", + "integrity": "sha512-MI1qs7Lo4Syw0EOzUl0xjs2lsoeqFku44KpngfIduHBYvzm8h2+7K8YMQh1JtVVVrUvhLpNwqVi4DERegUJhPQ==", + "license": "Apache-2.0" + }, "node_modules/fs-extra": { "version": "11.3.4", "resolved": "https://registry.npmjs.org/fs-extra/-/fs-extra-11.3.4.tgz", @@ -538,7 +672,6 @@ "version": "4.0.0", "resolved": "https://registry.npmjs.org/has-flag/-/has-flag-4.0.0.tgz", "integrity": "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==", - "dev": true, "license": "MIT", "engines": { "node": ">=8" @@ -679,6 +812,14 @@ "js-yaml": "bin/js-yaml.js" } }, + "node_modules/json-bignum": { + "version": "0.0.3", + "resolved": "https://registry.npmjs.org/json-bignum/-/json-bignum-0.0.3.tgz", + "integrity": "sha512-2WHyXj3OfHSgNyuzDbSxI1w2jgw5gkWSWhS7Qg4bWXx1nLk3jnbwfUeS0PSba3IzpTUWdHxBieELUzXRjQB2zg==", + "engines": { + "node": ">=0.8" + } + }, "node_modules/jsonfile": { "version": "6.2.0", "resolved": "https://registry.npmjs.org/jsonfile/-/jsonfile-6.2.0.tgz", @@ -707,6 +848,12 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "node_modules/lodash.camelcase": { + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz", + "integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==", + "license": "MIT" + }, "node_modules/log-symbols": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/log-symbols/-/log-symbols-4.1.0.tgz", @@ -1101,6 +1248,19 @@ "url": "https://github.com/chalk/supports-color?sponsor=1" } }, + "node_modules/table-layout": { + "version": "4.1.1", + "resolved": "https://registry.npmjs.org/table-layout/-/table-layout-4.1.1.tgz", + "integrity": "sha512-iK5/YhZxq5GO5z8wb0bY1317uDF3Zjpha0QFFLA8/trAoiLbQD0HUbMesEaxyzUgDxi2QlcbM8IvqOlEjgoXBA==", + "license": "MIT", + "dependencies": { + "array-back": "^6.2.2", + "wordwrapjs": "^5.1.0" + }, + "engines": { + "node": ">=12.17" + } + }, "node_modules/tar": { "version": "7.5.13", "resolved": "https://registry.npmjs.org/tar/-/tar-7.5.13.tgz", @@ -1140,6 +1300,12 @@ "node": ">=8.0" } }, + "node_modules/tslib": { + "version": "2.8.1", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz", + "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", + "license": "0BSD" + }, "node_modules/type-detect": { "version": "4.1.0", "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.1.0.tgz", @@ -1150,6 +1316,21 @@ "node": ">=4" } }, + "node_modules/typical": { + "version": "7.3.0", + "resolved": "https://registry.npmjs.org/typical/-/typical-7.3.0.tgz", + "integrity": "sha512-ya4mg/30vm+DOWfBg4YK3j2WD6TWtRkCbasOJr40CseYENzCUby/7rIvXA99JGsQHeNxLbnXdyLLxKSv3tauFw==", + "license": "MIT", + "engines": { + "node": ">=12.17" + } + }, + "node_modules/undici-types": { + "version": "7.16.0", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-7.16.0.tgz", + "integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==", + "license": "MIT" + }, "node_modules/universalify": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/universalify/-/universalify-2.0.1.tgz", @@ -1180,6 +1361,15 @@ "node": "^20.17.0 || >=22.9.0" } }, + "node_modules/wordwrapjs": { + "version": "5.1.1", + "resolved": "https://registry.npmjs.org/wordwrapjs/-/wordwrapjs-5.1.1.tgz", + "integrity": "sha512-0yweIbkINJodk27gX9LBGMzyQdBDan3s/dEAiwBOj+Mf0PPyWL6/rikalkv8EeD0E8jm4o5RXEOrFTP3NXbhJg==", + "license": "MIT", + "engines": { + "node": ">=12.17" + } + }, "node_modules/workerpool": { "version": "6.5.1", "resolved": "https://registry.npmjs.org/workerpool/-/workerpool-6.5.1.tgz", diff --git a/package.json b/package.json index 71f710c..63d51d1 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "tmp": "^0.2.3" }, "dependencies": { + "apache-arrow": "^21.1.0", "cmake-js": "^8.0.0", "node-addon-api": "^6.0.0" } diff --git a/src_cpp/include/node_connection.h b/src_cpp/include/node_connection.h index 7a2b638..dbddf17 100644 --- a/src_cpp/include/node_connection.h +++ b/src_cpp/include/node_connection.h @@ -32,8 +32,10 @@ class NodeConnection : public Napi::ObjectWrap { void SetQueryTimeout(const Napi::CallbackInfo& info); Napi::Value ExecuteAsync(const Napi::CallbackInfo& info); Napi::Value QueryAsync(const Napi::CallbackInfo& info); + Napi::Value QueryArrowAsync(const Napi::CallbackInfo& info); Napi::Value ExecuteSync(const Napi::CallbackInfo& info); Napi::Value QuerySync(const Napi::CallbackInfo& info); + Napi::Value QueryArrowSync(const Napi::CallbackInfo& info); Napi::Value CreateArrowTableSync(const Napi::CallbackInfo& info); Napi::Value CreateArrowRelTableSync(const Napi::CallbackInfo& info); Napi::Value DropArrowTableSync(const Napi::CallbackInfo& info); @@ -188,5 +190,40 @@ class ConnectionQueryAsyncWorker : public Napi::AsyncWorker { std::optional progressCallback; }; +class ConnectionQueryArrowAsyncWorker : public Napi::AsyncWorker { +public: + ConnectionQueryArrowAsyncWorker(Napi::Function& callback, + std::shared_ptr& connection, std::shared_ptr& database, + std::string statement, int64_t chunkSize, NodeQueryResult* nodeQueryResult) + : Napi::AsyncWorker(callback), connection(connection), database(database), + statement(std::move(statement)), chunkSize(chunkSize), nodeQueryResult(nodeQueryResult) {} + + ~ConnectionQueryArrowAsyncWorker() override = default; + + void Execute() override { + try { + auto result = connection->queryAsArrow(statement, chunkSize); + if (!result->isSuccess()) { + SetError(result->getErrorMessage()); + return; + } + nodeQueryResult->AdoptQueryResult(std::move(result), connection, database); + } catch (const std::exception& exc) { + SetError(std::string(exc.what())); + } + } + + void OnOK() override { Callback().Call({Env().Null()}); } + + void OnError(Napi::Error const& error) override { Callback().Call({error.Value()}); } + +private: + std::shared_ptr connection; + std::shared_ptr database; + std::string statement; + int64_t chunkSize; + NodeQueryResult* nodeQueryResult; +}; + } // namespace main } // namespace lbug diff --git a/src_cpp/include/node_query_result.h b/src_cpp/include/node_query_result.h index b259207..5e32fbf 100644 --- a/src_cpp/include/node_query_result.h +++ b/src_cpp/include/node_query_result.h @@ -43,6 +43,7 @@ class NodeQueryResult : public Napi::ObjectWrap { Napi::Value GetColumnNamesSync(const Napi::CallbackInfo& info); Napi::Value GetQuerySummarySync(const Napi::CallbackInfo& info); Napi::Value GetQuerySummaryAsync(const Napi::CallbackInfo& info); + Napi::Value GetCSRSync(const Napi::CallbackInfo& info); void PopulateColumnNames(); void Close(const Napi::CallbackInfo& info); void Close(); diff --git a/src_cpp/node_connection.cpp b/src_cpp/node_connection.cpp index 7ae3234..9f8ed92 100644 --- a/src_cpp/node_connection.cpp +++ b/src_cpp/node_connection.cpp @@ -82,8 +82,10 @@ Napi::Object NodeConnection::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("initSync", &NodeConnection::InitSync), InstanceMethod("executeAsync", &NodeConnection::ExecuteAsync), InstanceMethod("queryAsync", &NodeConnection::QueryAsync), + InstanceMethod("queryArrowAsync", &NodeConnection::QueryArrowAsync), InstanceMethod("executeSync", &NodeConnection::ExecuteSync), InstanceMethod("querySync", &NodeConnection::QuerySync), + InstanceMethod("queryArrowSync", &NodeConnection::QueryArrowSync), InstanceMethod("createArrowTableSync", &NodeConnection::CreateArrowTableSync), InstanceMethod("createArrowRelTableSync", &NodeConnection::CreateArrowRelTableSync), InstanceMethod("dropArrowTableSync", &NodeConnection::DropArrowTableSync), @@ -226,6 +228,37 @@ Napi::Value NodeConnection::QueryAsync(const Napi::CallbackInfo& info) { return info.Env().Undefined(); } +Napi::Value NodeConnection::QueryArrowAsync(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + auto statement = info[0].As().Utf8Value(); + auto chunkSize = info[1].As().Int64Value(); + auto nodeQueryResult = Napi::ObjectWrap::Unwrap(info[2].As()); + auto callback = info[3].As(); + auto asyncWorker = new ConnectionQueryArrowAsyncWorker( + callback, connection, database, statement, chunkSize, nodeQueryResult); + asyncWorker->Queue(); + return info.Env().Undefined(); +} + +Napi::Value NodeConnection::QueryArrowSync(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + auto statement = info[0].As().Utf8Value(); + auto chunkSize = info[1].As().Int64Value(); + auto nodeQueryResult = Napi::ObjectWrap::Unwrap(info[2].As()); + try { + auto result = connection->queryAsArrow(statement, chunkSize); + if (!result->isSuccess()) { + Napi::Error::New(env, result->getErrorMessage()).ThrowAsJavaScriptException(); + } + nodeQueryResult->AdoptQueryResult(std::move(result), connection, database); + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + Napi::Value NodeConnection::CreateArrowTableSync(const Napi::CallbackInfo& info) { Napi::Env env = info.Env(); Napi::HandleScope scope(env); diff --git a/src_cpp/node_query_result.cpp b/src_cpp/node_query_result.cpp index 931b5c5..3ab1a47 100644 --- a/src_cpp/node_query_result.cpp +++ b/src_cpp/node_query_result.cpp @@ -5,6 +5,7 @@ #include "include/node_util.h" #include "main/lbug.h" +#include "main/query_result/arrow_query_result.h" using namespace lbug::main; @@ -28,6 +29,7 @@ Napi::Object NodeQueryResult::Init(Napi::Env env, Napi::Object exports) { InstanceMethod("getColumnNamesSync", &NodeQueryResult::GetColumnNamesSync), InstanceMethod("getQuerySummaryAsync", &NodeQueryResult::GetQuerySummaryAsync), InstanceMethod("getQuerySummarySync", &NodeQueryResult::GetQuerySummarySync), + InstanceMethod("getCSRSync", &NodeQueryResult::GetCSRSync), InstanceMethod("close", &NodeQueryResult::Close)}); constructor = Napi::Persistent(t); @@ -247,6 +249,51 @@ Napi::Value NodeQueryResult::GetColumnNamesSync(const Napi::CallbackInfo& info) return env.Undefined(); } +namespace { + +struct CSRArrayBufferHolder { + explicit CSRArrayBufferHolder(ArrowQueryResult::CSRArrowArray array) + : array{std::move(array)} {} + ArrowQueryResult::CSRArrowArray array; +}; + +Napi::BigUint64Array WrapCSRArray(Napi::Env env, ArrowQueryResult::CSRArrowArray array) { + auto length = static_cast(array.array.length); + auto* data = const_cast(array.array.buffers[1]); + auto* holder = new CSRArrayBufferHolder(std::move(array)); + auto buffer = Napi::ArrayBuffer::New(env, data, length * sizeof(uint64_t), + [](Napi::Env, void*, CSRArrayBufferHolder* holder) { delete holder; }, holder); + return Napi::BigUint64Array::New(env, length, buffer, 0, napi_biguint64_array); +} + +} // namespace + +Napi::Value NodeQueryResult::GetCSRSync(const Napi::CallbackInfo& info) { + Napi::Env env = info.Env(); + Napi::HandleScope scope(env); + try { + auto& queryResult = GetQueryResult(); + auto* arrowQueryResult = dynamic_cast(&queryResult); + if (arrowQueryResult == nullptr || !arrowQueryResult->hasCSRMetadata()) { + throw std::runtime_error("CSR export is only supported for Arrow query results " + "with native CSR metadata."); + } + auto csr = arrowQueryResult->getCSRArrowArrays(); + Napi::Object result = Napi::Object::New(env); + result.Set("indptr", WrapCSRArray(env, std::move(csr.indptr))); + result.Set("indices", WrapCSRArray(env, std::move(csr.indices))); + if (csr.edgeIDs.has_value()) { + result.Set("edgeIds", WrapCSRArray(env, std::move(*csr.edgeIDs))); + } else { + result.Set("edgeIds", env.Null()); + } + return result; + } catch (const std::exception& exc) { + Napi::Error::New(env, std::string(exc.what())).ThrowAsJavaScriptException(); + } + return env.Undefined(); +} + void NodeQueryResult::PopulateColumnNames() { if (this->columnNames != nullptr) { return; diff --git a/src_js/connection.js b/src_js/connection.js index 2cf665d..ccbce2b 100644 --- a/src_js/connection.js +++ b/src_js/connection.js @@ -2,6 +2,7 @@ const LbugNative = require("./lbug_native.js"); const QueryResult = require("./query_result.js"); +const { ArrowQueryResult } = require("./query_result.js"); const PreparedStatement = require("./prepared_statement.js"); class Connection { @@ -320,6 +321,59 @@ class Connection { return this._unwrapMultipleQueryResultsSync(nodeQueryResult); } + /** + * Execute a query with the native Arrow result collector. + * @param {String} statement the statement to execute. + * @param {Number} chunkSize native Arrow chunk size. + * @returns {Promise} a promise that resolves to the Arrow query result. + */ + queryArrow(statement, chunkSize = 1000) { + return new Promise((resolve, reject) => { + if (typeof statement !== "string") { + return reject(new Error("statement must be a string.")); + } + if (typeof chunkSize !== "number" || chunkSize <= 0) { + return reject(new Error("chunkSize must be a positive number.")); + } + this._getConnection() + .then((connection) => { + const nodeQueryResult = new LbugNative.NodeQueryResult(); + try { + connection.queryArrowAsync(statement, chunkSize, nodeQueryResult, (err) => { + if (err) { + return reject(err); + } + return resolve(new ArrowQueryResult(this, nodeQueryResult, chunkSize)); + }); + } catch (e) { + return reject(e); + } + }) + .catch((err) => { + return reject(err); + }); + }); + } + + /** + * Execute a query synchronously with the native Arrow result collector. + * @param {String} statement the statement to execute. + * @param {Number} chunkSize native Arrow chunk size. + * @returns {lbug.ArrowQueryResult} the Arrow query result. + */ + queryArrowSync(statement, chunkSize = 1000) { + if (typeof statement !== "string") { + throw new Error("statement must be a string."); + } + if (typeof chunkSize !== "number" || chunkSize <= 0) { + throw new Error("chunkSize must be a positive number."); + } + const connection = this._getConnectionSync(); + const nodeQueryResult = new LbugNative.NodeQueryResult(); + connection.queryArrowSync(statement, chunkSize, nodeQueryResult); + return new ArrowQueryResult(this, nodeQueryResult, chunkSize); + } + /** * Create an Arrow memory-backed node table from Arrow C Data Interface pointers. * Ownership of the schema and arrays is transferred to Ladybug. diff --git a/src_js/index.js b/src_js/index.js index 1b960d0..2f48dcc 100644 --- a/src_js/index.js +++ b/src_js/index.js @@ -4,6 +4,7 @@ const Connection = require("./connection.js"); const Database = require("./database.js"); const PreparedStatement = require("./prepared_statement.js"); const QueryResult = require("./query_result.js"); +const { ArrowQueryResult } = require("./query_result.js"); function json(value) { const stringValue = typeof value === "string" ? value : JSON.stringify(value); @@ -20,6 +21,7 @@ module.exports = { Database, PreparedStatement, QueryResult, + ArrowQueryResult, json, get VERSION() { return Database.getVersion(); diff --git a/src_js/index.mjs b/src_js/index.mjs index 428ca92..6aa7d30 100644 --- a/src_js/index.mjs +++ b/src_js/index.mjs @@ -5,6 +5,7 @@ export const Database = lbug.Database; export const Connection = lbug.Connection; export const PreparedStatement = lbug.PreparedStatement; export const QueryResult = lbug.QueryResult; +export const ArrowQueryResult = lbug.ArrowQueryResult; export const json = lbug.json; export const VERSION = lbug.VERSION; export const STORAGE_VERSION = lbug.STORAGE_VERSION; diff --git a/src_js/lbug.d.ts b/src_js/lbug.d.ts index 265b8bd..88760b7 100644 --- a/src_js/lbug.d.ts +++ b/src_js/lbug.d.ts @@ -1,4 +1,3 @@ - /** * A nullable type that can be either T or null. */ @@ -29,6 +28,15 @@ export type ProgressCallback = ( */ export type NativePointer = bigint | object; +/** + * Zero-copy CSR arrays returned by an Arrow query result. + */ +export interface CSRResult { + indptr: BigUint64Array; + indices: BigUint64Array; + edgeIds: BigUint64Array | null; +} + /** * Represents a node ID in the graph database. */ @@ -288,6 +296,22 @@ export class Connection { */ querySync(statement: string): QueryResult | QueryResult[]; + /** + * Execute a query with the native Arrow result collector. + * @param statement The statement to execute + * @param chunkSize Native Arrow chunk size + * @returns Promise that resolves to the Arrow query result + */ + queryArrow(statement: string, chunkSize?: number): Promise; + + /** + * Execute a query synchronously with the native Arrow result collector. + * @param statement The statement to execute + * @param chunkSize Native Arrow chunk size + * @returns The Arrow query result + */ + queryArrowSync(statement: string, chunkSize?: number): ArrowQueryResult; + /** * Create an Arrow memory-backed node table from Arrow C Data Interface pointers. * Ownership of schemaPtr and arraysPtr is transferred to Ladybug. @@ -441,6 +465,16 @@ export class QueryResult { close(): void; } +/** + * Represents an Arrow-native query result. + */ +export class ArrowQueryResult extends QueryResult { + /** + * Get zero-copy native CSR arrays. + */ + csr(): CSRResult; +} + /** * Default export for the Lbug module. */ @@ -449,6 +483,7 @@ declare const lbug: { Connection: typeof Connection; PreparedStatement: typeof PreparedStatement; QueryResult: typeof QueryResult; + ArrowQueryResult: typeof ArrowQueryResult; json: typeof json; VERSION: string; STORAGE_VERSION: bigint; diff --git a/src_js/query_result.js b/src_js/query_result.js index 944f23d..2cb0a25 100644 --- a/src_js/query_result.js +++ b/src_js/query_result.js @@ -239,4 +239,27 @@ class QueryResult { } } +class ArrowQueryResult extends QueryResult { + /** + * Internal constructor. Use `Connection.queryArrow` or `Connection.queryArrowSync`. + * @param {Connection} connection the connection object. + * @param {LbugNative.NodeQueryResult} queryResult the native query result object. + * @param {Number} chunkSize native Arrow result chunk size. + */ + constructor(connection, queryResult, chunkSize) { + super(connection, queryResult); + this._chunkSize = chunkSize; + } + + /** + * Get zero-copy native CSR arrays from an Arrow query result. + * @returns {{indptr: BigUint64Array, indices: BigUint64Array, edgeIds: BigUint64Array|null}} + */ + csr() { + this._checkClosed(); + return this._queryResult.getCSRSync(); + } +} + module.exports = QueryResult; +module.exports.ArrowQueryResult = ArrowQueryResult; diff --git a/test/test.js b/test/test.js index 4efa4b7..a107a55 100644 --- a/test/test.js +++ b/test/test.js @@ -13,6 +13,7 @@ describe("lbug", () => { importTest("Database", "./test_database.js"); importTest("Connection", "./test_connection.js"); importTest("Query result", "./test_query_result.js"); + importTest("Arrow query", "./test_arrow_query.js"); importTest("Data types", "./test_data_type.js"); importTest("Query parameters", "./test_parameter.js"); importTest("Concurrent query execution", "./test_concurrency.js"); diff --git a/test/test_arrow_query.js b/test/test_arrow_query.js new file mode 100644 index 0000000..1d67814 --- /dev/null +++ b/test/test_arrow_query.js @@ -0,0 +1,114 @@ +const { assert } = require("chai"); +const { makeVector } = require("apache-arrow"); + +const itWhenCSRMetadataIsSupported = process.platform === "win32" ? it.skip : it; + +function toNumbers(values) { + return Array.from(values, Number); +} + +function reconstructWithEdgeIds(csr) { + const indptr = toNumbers(csr.indptr); + const indices = toNumbers(csr.indices); + const edgeIds = toNumbers(csr.edgeIds); + const rows = []; + for (let srcRowId = 0; srcRowId < indptr.length - 1; srcRowId++) { + for (let idx = indptr[srcRowId]; idx < indptr[srcRowId + 1]; idx++) { + rows.push([srcRowId, edgeIds[idx], indices[idx]]); + } + } + return rows; +} + +function reconstructWithoutEdgeIds(csr) { + const indptr = toNumbers(csr.indptr); + const indices = toNumbers(csr.indices); + const rows = []; + for (let srcRowId = 0; srcRowId < indptr.length - 1; srcRowId++) { + for (let idx = indptr[srcRowId]; idx < indptr[srcRowId + 1]; idx++) { + rows.push([srcRowId, indices[idx]]); + } + } + return rows; +} + +describe("Arrow query CSR", function () { + itWhenCSRMetadataIsSupported("should expose zero-copy CSR arrays with relationship row ids", function () { + const query = ` + MATCH (a:person)-[b:knows]->(c:person) + RETURN a.rowid, b.rowid, c.rowid + `; + const rows = conn.querySync(query).getAllSync().map((row) => [ + row["a.rowid"], + row["b.rowid"], + row["c.rowid"], + ]); + + const csr = conn.queryArrowSync(query, 8).csr(); + + assert.instanceOf(csr.indptr, BigUint64Array); + assert.instanceOf(csr.indices, BigUint64Array); + assert.instanceOf(csr.edgeIds, BigUint64Array); + assert.deepEqual(reconstructWithEdgeIds(csr), rows); + }); + + itWhenCSRMetadataIsSupported("should expose zero-copy CSR arrays without relationship row ids", function () { + const query = ` + MATCH (a:person)-[:knows]->(c:person) + RETURN a.rowid, c.rowid + `; + const rows = conn.querySync(query).getAllSync().map((row) => [ + row["a.rowid"], + row["c.rowid"], + ]); + + const csr = conn.queryArrowSync(query, 8).csr(); + + assert.instanceOf(csr.indptr, BigUint64Array); + assert.instanceOf(csr.indices, BigUint64Array); + assert.isNull(csr.edgeIds); + assert.deepEqual(reconstructWithoutEdgeIds(csr), rows); + }); + + itWhenCSRMetadataIsSupported("should support the async queryArrow API", async function () { + const query = ` + MATCH (a:person)-[b:knows]->(c:person) + RETURN a.rowid, b.rowid, c.rowid + `; + + const result = await conn.queryArrow(query, 8); + const csr = result.csr(); + + assert.instanceOf(csr.indptr, BigUint64Array); + assert.instanceOf(csr.indices, BigUint64Array); + assert.instanceOf(csr.edgeIds, BigUint64Array); + }); + + itWhenCSRMetadataIsSupported("should allow CSR arrays to be wrapped by apache-arrow vectors", function () { + const query = ` + MATCH (a:person)-[b:knows]->(c:person) + RETURN a.rowid, b.rowid, c.rowid + `; + const csr = conn.queryArrowSync(query, 8).csr(); + + const indptr = makeVector(csr.indptr); + const indices = makeVector(csr.indices); + const edgeIds = makeVector(csr.edgeIds); + + assert.equal(indptr.length, csr.indptr.length); + assert.equal(indices.length, csr.indices.length); + assert.equal(edgeIds.length, csr.edgeIds.length); + assert.equal(indptr.type.toString(), "Uint64"); + assert.equal(indices.type.toString(), "Uint64"); + assert.equal(edgeIds.type.toString(), "Uint64"); + }); + + it("should reject csr() for Arrow results without CSR metadata", function () { + const result = conn.queryArrowSync("MATCH (a:person) RETURN a.fName", 8); + + assert.throws( + () => result.csr(), + /CSR export is only supported/ + ); + }); +});