◐ Shell
clean mode source ↗

bpo-24905: Support BLOB incremental I/O in sqlite module by palaviv · Pull Request #271 · python/cpython

Expand Up @@ -511,6 +511,177 @@ def CheckLastRowIDInsertOR(self): self.assertEqual(results, expected)

class BlobTests(unittest.TestCase): def setUp(self): self.cx = sqlite.connect(":memory:") self.cx.execute("create table test(id integer primary key, blob_col blob)") self.blob_data = b"a" * 100 self.cx.execute("insert into test(blob_col) values (?)", (self.blob_data, )) self.blob = self.cx.open_blob("test", "blob_col", 1) self.second_data = b"b" * 100
def tearDown(self): self.blob.close() self.cx.close()
def CheckLength(self): self.assertEqual(len(self.blob), 100)
def CheckTell(self): self.assertEqual(self.blob.tell(), 0)
def CheckSeekFromBlobStart(self): self.blob.seek(10) self.assertEqual(self.blob.tell(), 10) self.blob.seek(10, 0) self.assertEqual(self.blob.tell(), 10)
def CheckSeekFromCurrentPosition(self): self.blob.seek(10, 1) self.blob.seek(10, 1) self.assertEqual(self.blob.tell(), 20)
def CheckSeekFromBlobEnd(self): self.blob.seek(-10, 2) self.assertEqual(self.blob.tell(), 90)
def CheckBlobSeekOverBlobSize(self): with self.assertRaises(ValueError): self.blob.seek(1000)
def CheckBlobSeekUnderBlobSize(self): with self.assertRaises(ValueError): self.blob.seek(-10)
def CheckBlobRead(self): self.assertEqual(self.blob.read(), self.blob_data)
def CheckBlobReadSize(self): self.assertEqual(len(self.blob.read(10)), 10)
def CheckBlobReadAdvanceOffset(self): self.blob.read(10) self.assertEqual(self.blob.tell(), 10)
def CheckBlobReadStartAtOffset(self): self.blob.seek(10) self.blob.write(self.second_data[:10]) self.blob.seek(10) self.assertEqual(self.blob.read(10), self.second_data[:10])
def CheckBlobWrite(self): self.blob.write(self.second_data) self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], self.second_data)
def CheckBlobWriteAtOffset(self): self.blob.seek(50) self.blob.write(self.second_data[:50]) self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], self.blob_data[:50] + self.second_data[:50])
def CheckBlobWriteAdvanceOffset(self): self.blob.write(self.second_data[:50]) self.assertEqual(self.blob.tell(), 50)
def CheckBlobWriteMoreThenBlobSize(self): with self.assertRaises(ValueError): self.blob.write(b"a" * 1000)
def CheckBlobReadAfterRowChange(self): self.cx.execute("UPDATE test SET blob_col='aaaa' where id=1") with self.assertRaises(sqlite.OperationalError): self.blob.read()
def CheckBlobWriteAfterRowChange(self): self.cx.execute("UPDATE test SET blob_col='aaaa' where id=1") with self.assertRaises(sqlite.OperationalError): self.blob.write(b"aaa")
def CheckBlobWriteWhenReadOnly(self): read_only_blob = \ self.cx.open_blob("test", "blob_col", 1, readonly=True) with self.assertRaises(sqlite.OperationalError): read_only_blob.write(b"aaa") read_only_blob.close()
def CheckBlobOpenWithBadDb(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("test", "blob_col", 1, dbname="notexisintg")
def CheckBlobOpenWithBadTable(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("notexisintg", "blob_col", 1)
def CheckBlobOpenWithBadColumn(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("test", "notexisting", 1)
def CheckBlobOpenWithBadRow(self): with self.assertRaises(sqlite.OperationalError): self.cx.open_blob("test", "blob_col", 2)
def CheckBlobGetItem(self): self.assertEqual(self.blob[5], b"a")
def CheckBlobGetItemIndexOutOfRange(self): with self.assertRaises(IndexError): self.blob[105] with self.assertRaises(IndexError): self.blob[-105]
def CheckBlobGetItemNegativeIndex(self): self.assertEqual(self.blob[-5], b"a")
def CheckBlobGetItemInvalidIndex(self): with self.assertRaises(TypeError): self.blob[b"a"]
def CheckBlobGetSlice(self): self.assertEqual(self.blob[5:10], b"aaaaa")
def CheckBlobGetSliceNegativeIndex(self): self.assertEqual(self.blob[5:-5], self.blob_data[5:-5])
def CheckBlobGetSliceInvalidIndex(self): with self.assertRaises(TypeError): self.blob[5:b"a"]
def CheckBlobGetSliceWithSkip(self): self.blob.write(b"abcdefghij") self.assertEqual(self.blob[0:10:2], b"acegi")
def CheckBlobSetItem(self): self.blob[0] = b"b" self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"b" + self.blob_data[1:])
def CheckBlobSetSlice(self): self.blob[0:5] = b"bbbbb" self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"bbbbb" + self.blob_data[5:])
def CheckBlobSetSliceWithSkip(self): self.blob[0:10:2] = b"bbbbb" self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], b"bababababa" + self.blob_data[10:])
def CheckBlobGetEmptySlice(self): self.assertEqual(self.blob[5:5], b"")
def CheckBlobSetSliceWrongLength(self): with self.assertRaises(IndexError): self.blob[5:10] = b"a"
def CheckBlobConcatNotSupported(self): with self.assertRaises(SystemError): self.blob + self.blob
def CheckBlobRepeateNotSupported(self): with self.assertRaises(SystemError): self.blob * 5
def CheckBlobContainsNotSupported(self): with self.assertRaises(SystemError): b"aaaaa" in self.blob
@unittest.skipUnless(threading, 'This test requires threading.') class ThreadTests(unittest.TestCase): def setUp(self): self.con = sqlite.connect(":memory:") Expand Down Expand Up @@ -768,6 +939,15 @@ def CheckClosedCurExecute(self): with self.assertRaises(sqlite.ProgrammingError): cur.execute("select 4")
def CheckClosedBlobRead(self): con = sqlite.connect(":memory:") con.execute("create table test(id integer primary key, blob_col blob)") con.execute("insert into test(blob_col) values (zeroblob(100))") blob = con.open_blob("test", "blob_col", 1) con.close() with self.assertRaises(sqlite.ProgrammingError): blob.read()
def CheckClosedCreateFunction(self): con = sqlite.connect(":memory:") con.close() Expand Down Expand Up @@ -921,6 +1101,69 @@ def CheckOnConflictReplace(self): self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')])


class ClosedBlobTests(unittest.TestCase): def setUp(self): self.cx = sqlite.connect(":memory:") self.cx.execute("create table test(id integer primary key, blob_col blob)") self.cx.execute("insert into test(blob_col) values (zeroblob(100))")
def tearDown(self): self.cx.close()
def CheckClosedRead(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.read()
def CheckClosedWrite(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.write(b"aaaaaaaaa")
def CheckClosedSeek(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.seek(10)
def CheckClosedTell(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.tell()
def CheckClosedClose(self): self.blob = self.cx.open_blob("test", "blob_col", 1) self.blob.close() with self.assertRaises(sqlite.ProgrammingError): self.blob.close()

class BlobContextManagerTests(unittest.TestCase): def setUp(self): self.cx = sqlite.connect(":memory:") self.cx.execute("create table test(id integer primary key, blob_col blob)") self.cx.execute("insert into test(blob_col) values (zeroblob(100))")
def tearDown(self): self.cx.close()
def CheckContextExecute(self): data = b"a" * 100 with self.cx.open_blob("test", "blob_col", 1) as blob: blob.write(data) self.assertEqual(self.cx.execute("select blob_col from test").fetchone()[0], data)
def CheckContextCloseBlob(self): with self.cx.open_blob("test", "blob_col", 1) as blob: blob.seek(10) with self.assertRaises(sqlite.ProgrammingError): blob.close()

def suite(): module_suite = unittest.makeSuite(ModuleTests, "Check") connection_suite = unittest.makeSuite(ConnectionTests, "Check") Expand All @@ -931,11 +1174,14 @@ def suite(): closed_con_suite = unittest.makeSuite(ClosedConTests, "Check") closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check") on_conflict_suite = unittest.makeSuite(SqliteOnConflictTests, "Check") blob_suite = unittest.makeSuite(BlobTests, "Check") closed_blob_suite = unittest.makeSuite(ClosedBlobTests, "Check") blob_context_manager_suite = unittest.makeSuite(BlobContextManagerTests, "Check") return unittest.TestSuite(( module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, on_conflict_suite, )) module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite, on_conflict_suite, blob_suite, closed_blob_suite, blob_context_manager_suite, ))
def test(): runner = unittest.TextTestRunner() Expand Down