Skip to content

Commit 91de4b9

Browse files
authored
Cursor: avoid closing connection twice if error received after destroy() (#2836)
* Cursor: avoid closing connection twice if error received after destroy() Includes test case from @nathanjcochran * Resolve lint violations * revert fix to check tests fail without it * Re-introdce fix This reverts commit 5f5d42a. --------- Co-authored-by: alxndrsn <alxndrsn>
1 parent ebba3d8 commit 91de4b9

File tree

2 files changed

+84
-0
lines changed

2 files changed

+84
-0
lines changed

packages/pg-cursor/index.js

+3
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,9 @@ class Cursor extends EventEmitter {
151151
}
152152

153153
handleError(msg) {
154+
// If this cursor has already closed, don't try to handle the error.
155+
if (this.state === 'done') return
156+
154157
// If we're in an initialized state we've never been submitted
155158
// and don't have a connection instance reference yet.
156159
// This can happen if you queue a stream and close the client before

packages/pg-query-stream/test/error.ts

+81
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,85 @@ describe('error recovery', () => {
8989
await client.end()
9090
})
9191
})
92+
93+
it('should work if used after timeout error', async () => {
94+
const pool = new Pool({ max: 1, connectionTimeoutMillis: 400, statement_timeout: 400 })
95+
96+
const res1 = await pool.query('SELECT 1 AS a')
97+
assert.deepStrictEqual(res1.rows, [{ a: 1 }])
98+
99+
const query = new QueryStream('SELECT 2 AS b')
100+
const client = await pool.connect()
101+
const stream = await client.query(query)
102+
103+
await assert.rejects(() => pool.query('SELECT TRUE'), { message: 'timeout exceeded when trying to connect' })
104+
105+
await stream.destroy()
106+
await client.release()
107+
108+
const res2 = await pool.query('SELECT 4 AS d')
109+
assert.deepStrictEqual(res2.rows, [{ d: 4 }])
110+
111+
await pool.end()
112+
})
113+
114+
it('should work if used after syntax error', async () => {
115+
const pool = new Pool({ max: 1, statement_timeout: 100 }) // statement_timeout is required here, so maybe this is just another timeout error?
116+
117+
const res1 = await pool.query('SELECT 1 AS a')
118+
assert.deepStrictEqual(res1.rows, [{ a: 1 }])
119+
120+
const query = new QueryStream('SELECT 2 AS b')
121+
const client = await pool.connect()
122+
const stream = await client.query(query)
123+
124+
await new Promise((resolve) => setTimeout(resolve, 10))
125+
126+
await stream.destroy()
127+
await client.release()
128+
129+
const res2 = await pool.query('SELECT 4 AS d')
130+
assert.deepStrictEqual(res2.rows, [{ d: 4 }])
131+
132+
await pool.end()
133+
})
134+
135+
it('should work after cancelling query', async () => {
136+
const pool = new Pool()
137+
const conn = await pool.connect()
138+
139+
// Get connection PID for sake of pg_cancel_backend() call
140+
const result = await conn.query('SELECT pg_backend_pid() AS pid;')
141+
const { pid } = result.rows[0]
142+
143+
const stream = conn.query(new QueryStream('SELECT pg_sleep(10);'))
144+
stream.on('data', (chunk) => {
145+
// Switches stream into readableFlowing === true mode
146+
})
147+
stream.on('error', (err) => {
148+
// Errors are expected due to pg_cancel_backend() call
149+
})
150+
151+
// Create a promise that is resolved when the stream is closed
152+
const closed = new Promise((res) => {
153+
stream.on('close', res)
154+
})
155+
156+
// Wait 100ms before cancelling the query
157+
await new Promise((res) => setTimeout(res, 100))
158+
159+
// Cancel pg_sleep(10) query
160+
await pool.query('SELECT pg_cancel_backend($1);', [pid])
161+
162+
// Destroy stream and wait for it to be closed
163+
stream.destroy()
164+
await closed
165+
166+
// Subsequent query on same connection should succeed
167+
const res = await conn.query('SELECT 1 AS a;')
168+
assert.deepStrictEqual(res.rows, [{ a: 1 }])
169+
170+
conn.release()
171+
await pool.end()
172+
})
92173
})

0 commit comments

Comments
 (0)