[3.9] bpo-27334: roll back transaction if sqlite3 context manager fails to commit (GH-26202) by erlend-aasland · Pull Request #27944 · python/cpython
import subprocess import threading import unittest import sqlite3 as sqlite import sys
from test.support import TESTFN, unlink from test.support import SHORT_TIMEOUT, TESTFN, unlink
class ModuleTests(unittest.TestCase):
class MultiprocessTests(unittest.TestCase): CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000. # Defaults to 30 ms
def tearDown(self): unlink(TESTFN)
def test_ctx_mgr_rollback_if_commit_failed(self): # bpo-27334: ctx manager does not rollback if commit fails SCRIPT = f"""if 1: import sqlite3 def wait(): print("started") assert "database is locked" in input()
cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT}) cx.create_function("wait", 0, wait) with cx: cx.execute("create table t(t)") try: # execute two transactions; both will try to lock the db cx.executescript(''' -- start a transaction and wait for parent begin transaction; select * from t; select wait(); rollback;
-- start a new transaction; would fail if parent holds lock begin transaction; select * from t; rollback; ''') finally: cx.close() """
# spawn child process proc = subprocess.Popen( [sys.executable, "-c", SCRIPT], encoding="utf-8", bufsize=0, stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) self.addCleanup(proc.communicate)
# wait for child process to start self.assertEqual("started", proc.stdout.readline().strip())
cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT) try: # context manager should correctly release the db lock with cx: cx.execute("insert into t values('test')") except sqlite.OperationalError as exc: proc.stdin.write(str(exc)) else: proc.stdin.write("no error") finally: cx.close()
# terminate child process self.assertIsNone(proc.returncode) try: proc.communicate(input="end", timeout=SHORT_TIMEOUT) except subprocess.TimeoutExpired: proc.kill() proc.communicate() raise self.assertEqual(proc.returncode, 0)
def suite(): module_suite = unittest.makeSuite(ModuleTests, "Check") connection_suite = unittest.makeSuite(ConnectionTests, "Check")
def test():