Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class HANAService extends SQLService {
try {
await require('@sap/cds-mtxs/lib').xt.serviceManager.get(tenant, { disableCache: true, invalidCredentials: credentials, retryUntil: deadline })
} catch (smErr) {
smErr.cause = err
throw new Error(`Failed connecting to pool - could not get valid credentials from Service Manager`, { cause: smErr })
smErr.cause = err
throw new Error(`Failed connecting to pool - could not get valid credentials from Service Manager`, { cause: smErr })
}
if (Date.now() < deadline) return create(tenant, start)
else throw new Error(`Pool exceeded for '${tenant}' within ${acquireTimeoutMillis}ms`, { cause: err })
Expand Down Expand Up @@ -212,6 +212,19 @@ class HANAService extends SQLService {
return new this.class.InsertResults(cqn, results)
}

async onUPSERT({ query, data }) {
const { sql, entries, cqn } = this.cqn2sql(query, data)
if (!sql) return // Do nothing when there is nothing to be done
const ps = await this.prepare(sql)
// HANA driver supports batch execution
const results = await (entries
? this.server.major <= 2
? entries.reduce((l, c) => l.then(() => this.ensureDBC() && ps.run(c)), Promise.resolve(0))
: entries.length > 1 ? this.ensureDBC() && await ps.runBatch(entries) : this.ensureDBC() && await ps.run(entries[0])
: this.ensureDBC() && ps.run())
return results.changes ?? results
}

async onNOTFOUND(req, next) {
try {
return await next()
Expand Down Expand Up @@ -309,11 +322,8 @@ class HANAService extends SQLService {
}

// prepare and exec are both implemented inside the drivers
prepare(sql, hasBlobs) {
const stmt = this.ensureDBC().prepare(sql, hasBlobs)
// we store the statements, to release them on commit/rollback all at once
this.dbc.statements.push(stmt)
return stmt
async prepare(sql, hasBlobs) {
return this.ensureDBC().prepare(sql, hasBlobs)
}

exec(sql) {
Expand Down Expand Up @@ -1302,7 +1312,7 @@ SELECT ${mixing} FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(${extraction}) ERROR ON E
async onSIMPLE({ query, data, event }) {
const { sql, values } = this.cqn2sql(query, data)
try {
let ps = await this.prepare(sql)
const ps = await this.prepare(sql)
return (this.ensureDBC() && await ps.run(values)).changes
} catch (err) {
// Allow drop to fail when the view or table does not exist
Expand Down Expand Up @@ -1360,25 +1370,16 @@ SELECT ${mixing} FROM JSON_TABLE(SRC.JSON, '$' COLUMNS(${extraction}) ERROR ON E

onBEGIN() {
DEBUG?.('BEGIN')
if (this.dbc) this.dbc.statements = []
return this.dbc?.begin()
}

onCOMMIT() {
DEBUG?.('COMMIT')
this.dbc?.statements?.forEach(stmt => stmt
.then(stmt => stmt.drop())
.catch(() => { })
)
return this.dbc?.commit()
}

onROLLBACK() {
DEBUG?.('ROLLBACK')
this.dbc?.statements?.forEach(stmt => stmt
.then(stmt => stmt.drop())
.catch(() => { })
)
return this.dbc?.rollback()
}

Expand Down
72 changes: 49 additions & 23 deletions hana/lib/drivers/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,73 @@ class HANADriver {
constructor(creds) {
this._creds = creds
this.connected = false
this.statements = {}
}

/**
* Generic prepare implementation for the native HANA drivers
* @param {string} sql The SQL string to be prepared
* @returns {import('@cap-js/db-service/lib/SQLService').PreparedStatement}
*/
async prepare(sql) {
const prep = module.exports.prom(
_prepare(sql, detached) {
let prep = (!detached && this.statements[sql]) || module.exports.prom(
this._native,
'prepare',
)(sql).then(stmt => {
stmt._parentConnection = this._native
return stmt
})
if (!detached) {
const release = {} // TODO: for node@22 use Promise.withResolvers()
release.promise = new Promise((resolve, reject) => {
release.resolve = resolve
release.reject = reject
})
release.promise.catch(() => { })
this.statements[sql] = release.promise
prep.catch(release.reject)
prep = prep.then(stmt => {
stmt.release = () => release.resolve(stmt)
return stmt
})
}

return prep
}

/**
* Generic prepare implementation for the native HANA drivers
* @param {string} sql The SQL string to be prepared
* @returns {import('@cap-js/db-service/lib/SQLService').PreparedStatement}
*/
async prepare(sql) {
return {
_prep: prep,
run: async params => {
const { values, streams } = this._extractStreams(params)
const stmt = await prep
let changes = await module.exports.prom(stmt, 'exec')(values)
await this._sendStreams(stmt, streams)
return { changes }
const stmt = await this._prepare(sql)
try {
let changes = await module.exports.prom(stmt, 'exec')(values)
await this._sendStreams(stmt, streams)
return { changes }
} finally { stmt.release() }
},
runBatch: async params => {
const stmt = await prep
const changes = await module.exports.prom(stmt, 'exec')(params)
return { changes: !Array.isArray(changes) ? changes : changes.reduce((l, c) => l + c, 0) }
const stmt = await this._prepare(sql)
try {
const changes = await module.exports.prom(stmt, 'exec')(params)
return { changes: !Array.isArray(changes) ? changes : changes.reduce((l, c) => l + c, 0) }
} finally { stmt.release() }
},
get: async params => {
const stmt = await prep
return (await module.exports.prom(stmt, 'exec')(params))[0]
const stmt = await this._prepare(sql)
try {
const ret = (await module.exports.prom(stmt, 'exec')(params))[0]
return ret
} finally { stmt.release() }
},
all: async params => {
const stmt = await prep
return module.exports.prom(stmt, 'exec')(params)
const stmt = await this._prepare(sql)
try {
const ret = await module.exports.prom(stmt, 'exec')(params)
stmt.release()
return ret
} finally { stmt.release() }
},
drop: async () => {
const stmt = await prep
return stmt.drop()
}
}
}

Expand Down
41 changes: 23 additions & 18 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class HANAClientDriver extends driver {
// It would be required to allow using getDate() on previous rows
if (hasBlobs) {
ret.all = async (values) => {
const stmt = await ret._prep
const stmt = await this._prepare(sql, true)
// Create result set
const reset = async function () {
if (this) await prom(this, 'close')()
Expand Down Expand Up @@ -120,26 +120,30 @@ class HANAClientDriver extends driver {

ret.run = async params => {
const { values, streams } = this._extractStreams(params)
const stmt = await ret._prep
let changes = await prom(stmt, 'exec')(values)
await this._sendStreams(stmt, streams)
// REVISIT: hana-client does not return any changes when doing an update with streams
// This causes the best assumption to be that the changes are one
// To get the correct information it is required to send a count with the update where clause
if (streams.length && changes === 0) {
changes = 1
}
return { changes }
const stmt = await this._prepare(sql)
try {
let changes = await prom(stmt, 'exec')(values)
await this._sendStreams(stmt, streams)
// REVISIT: hana-client does not return any changes when doing an update with streams
// This causes the best assumption to be that the changes are one
// To get the correct information it is required to send a count with the update where clause
if (streams.length && changes === 0) {
changes = 1
}
return { changes }
} finally { stmt.release() }
}

ret.proc = async (data, outParameters) => {
const stmt = await ret._prep
const rows = await prom(stmt, 'execQuery')(data)
return this._getResultForProcedure(rows, outParameters, stmt)
const stmt = await this._prepare(sql)
try {
const rows = await prom(stmt, 'execQuery')(data)
return this._getResultForProcedure(rows, outParameters, stmt)
} finally { stmt.release() }
}

ret.stream = async (values, one, objectMode) => {
const stmt = await ret._prep
const stmt = await this._prepare(sql, true)
values = Array.isArray(values) ? values : []
// Uses the native exec method instead of executeQuery to initialize a full stream
// As executeQuery does not request the whole result set at once
Expand All @@ -163,12 +167,13 @@ class HANAClientDriver extends driver {
if (rs.isNull(0)) return null
return Readable.from(streamBlob(rs, undefined, 0), { objectMode: false })
}
return rsIterator(rs, one, objectMode)
return rsIterator(rs, one, objectMode, () => { stmt.drop() })
}
return ret
}

async validate() {
if (Object.keys(this.statements).length > 10_000) return false
return this._native.state() === 'connected'
}

Expand Down Expand Up @@ -236,7 +241,7 @@ class HANAClientDriver extends driver {

HANAClientDriver.pool = true

async function rsIterator(rs, one, objectMode) {
async function rsIterator(rs, one, objectMode, onDone) {
rs._rowPosition = -1
rs.nextAsync = prom(rs, 'next')
rs.getValueAsync = prom(rs, 'getValue')
Expand Down Expand Up @@ -314,7 +319,7 @@ async function rsIterator(rs, one, objectMode) {
}
}

return resultSetStream(state, one, objectMode)
return resultSetStream(state, one, objectMode, onDone)
}

async function* streamBlob(rs, rowIndex = -1, columnIndex, binaryBuffer) {
Expand Down
63 changes: 33 additions & 30 deletions hana/lib/drivers/hdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class HDBDriver extends driver {
}

async validate() {
if (Object.keys(this.statements).length > 10_000) return false
return this._native.readyState === 'connected'
}

Expand Down Expand Up @@ -88,34 +89,36 @@ class HDBDriver extends driver {

if (hasBlobs) {
ret.all = async (values) => {
const stmt = await ret._prep
// Create result set
const rs = await prom(stmt, 'execute')(values)
const cols = rs.metadata.map(b => b.columnName)
const stream = rs.createReadStream()

const result = []
for await (const row of stream) {
const obj = {}
for (let i = 0; i < cols.length; i++) {
const col = cols[i]
// hdb returns large strings as streams sometimes
if (col === '_json_' && typeof row[col] === 'object') {
obj[col] = await text(row[col].createReadStream())
continue
const stmt = await this._prepare(sql)
try {
// Create result set
const rs = await prom(stmt, 'execute')(values)
const cols = rs.metadata.map(b => b.columnName)
const stream = rs.createReadStream()

const result = []
for await (const row of stream) {
const obj = {}
for (let i = 0; i < cols.length; i++) {
const col = cols[i]
// hdb returns large strings as streams sometimes
if (col === '_json_' && typeof row[col] === 'object') {
obj[col] = await text(row[col].createReadStream())
continue
}
obj[col] = i > 3
? row[col] === null
? null
: (
row[col].createReadStream?.()
|| row[col]
)
: row[col]
}
obj[col] = i > 3
? row[col] === null
? null
: (
row[col].createReadStream?.()
|| row[col]
)
: row[col]
result.push(obj)
}
result.push(obj)
}
return result
return result
} finally { stmt.release() }
}
}

Expand All @@ -125,9 +128,9 @@ class HDBDriver extends driver {
}

ret.stream = async (values, one, objectMode) => {
const stmt = await ret._prep
const stmt = await this._prepare(sql, true)
const rs = await prom(stmt, 'execute')(values || [])
return rsIterator(rs, one, objectMode)
return rsIterator(rs, one, objectMode, () => { stmt.drop() })
}
return ret
}
Expand Down Expand Up @@ -176,7 +179,7 @@ class HDBDriver extends driver {
}
}

async function rsIterator(rs, one, objectMode) {
async function rsIterator(rs, one, objectMode, onDone) {
// Raw binary data stream unparsed
const raw = rs.createBinaryStream()[Symbol.asyncIterator]()

Expand Down Expand Up @@ -397,7 +400,7 @@ async function rsIterator(rs, one, objectMode) {
}
}

return resultSetStream(state, one, objectMode)
return resultSetStream(state, one, objectMode, onDone)
}

const readString = function (state, isJson = false) {
Expand Down
4 changes: 3 additions & 1 deletion hana/lib/drivers/stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function rsNextRaw(state, next) {
return writeBlobs ? writeBlobs.then(_return) : _return()
}

async function rsIterator(state, one, objectMode) {
async function rsIterator(state, one, objectMode, onDone) {
const stream = state.stream = new Readable({
objectMode,
async read() {
Expand All @@ -160,6 +160,8 @@ async function rsIterator(state, one, objectMode) {
stream.push('[')
}

if (onDone) { stream.on('close', onDone) }

return stream
}

Expand Down
2 changes: 1 addition & 1 deletion hana/test/run.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ BEGIN
SELECT ERROR_CODE, ERROR_TEXT, :dur AS WAITED_SECONDS FROM M_PROCEDURE_ASYNC_EXECUTIONS WHERE ASYNC_CALL_ID = :ID;
END;`, [ASYNC_CALL_ID])
// Ensure that the procedure succeeded
expect(status.changes[1][0].ERROR_CODE).to.eq(0)
expect(status.changes[0].ERROR_CODE ?? status.changes[1][0].ERROR_CODE).to.eq(0)
throw new Error('ROLLBACK')
}).catch((err) => { if (err.message !== 'ROLLBACK') throw err })

Expand Down
1 change: 0 additions & 1 deletion hana/test/spatial.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ const cds = require('../../test/cds.js')
describe('Spatial Types', () => {
const { data, expect } = cds.test(__dirname + '/../../test/compliance/resources')
data.autoIsolation(true)
data.autoReset()

test('point', async () => {
const { HANA_ST } = cds.entities('edge.hana.literals')
Expand Down
Loading
Loading