◐ Shell
clean mode source ↗

[3.9] bpo-27334: roll back transaction if sqlite3 context manager fails to commit (GH-26202) by erlend-aasland · Pull Request #27944 · python/cpython

Expand Up @@ -21,11 +21,13 @@ # misrepresented as being the original software. # 3. This notice may not be removed or altered from any source distribution.
import subprocess import threading import unittest import sqlite3 as sqlite import sys
from test.support import TESTFN, unlink from test.support import SHORT_TIMEOUT, TESTFN, unlink

class ModuleTests(unittest.TestCase): Expand Down Expand Up @@ -954,6 +956,77 @@ def CheckOnConflictReplace(self): self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])

class MultiprocessTests(unittest.TestCase): CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000. # Defaults to 30 ms
def tearDown(self): unlink(TESTFN)
def test_ctx_mgr_rollback_if_commit_failed(self): # bpo-27334: ctx manager does not rollback if commit fails SCRIPT = f"""if 1: import sqlite3 def wait(): print("started") assert "database is locked" in input()
cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT}) cx.create_function("wait", 0, wait) with cx: cx.execute("create table t(t)") try: # execute two transactions; both will try to lock the db cx.executescript(''' -- start a transaction and wait for parent begin transaction; select * from t; select wait(); rollback;
-- start a new transaction; would fail if parent holds lock begin transaction; select * from t; rollback; ''') finally: cx.close() """
# spawn child process proc = subprocess.Popen( [sys.executable, "-c", SCRIPT], encoding="utf-8", bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) self.addCleanup(proc.communicate)
# wait for child process to start self.assertEqual("started", proc.stdout.readline().strip())
cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT) try: # context manager should correctly release the db lock with cx: cx.execute("insert into t values('test')") except sqlite.OperationalError as exc: proc.stdin.write(str(exc)) else: proc.stdin.write("no error") finally: cx.close()
# terminate child process self.assertIsNone(proc.returncode) try: proc.communicate(input="end", timeout=SHORT_TIMEOUT) except subprocess.TimeoutExpired: proc.kill() proc.communicate() raise self.assertEqual(proc.returncode, 0)

def suite(): module_suite = unittest.makeSuite(ModuleTests, "Check") connection_suite = unittest.makeSuite(ConnectionTests, "Check") Expand All @@ -965,10 +1038,11 @@ def suite(): closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") uninit_con_suite = unittest.makeSuite(UninitialisedConnectionTests) multiproc_con_suite = unittest.makeSuite(MultiprocessTests) return unittest.TestSuite(( module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, on_conflict_suite, uninit_con_suite, on_conflict_suite, uninit_con_suite, multiproc_con_suite, ))
def test(): Expand Down