Update test_mmap.py from Cpython v3.11.2 by Masorubka1 · Pull Request #4841 · RustPython/RustPython
@@ -1,11 +1,15 @@
from test.support import (requires, _2G, _4G, gc_collect, cpython_only)
from test.support import (
requires, _2G, _4G, gc_collect, cpython_only, is_emscripten
)
from test.support.import_helper import import_module
from test.support.os_helper import TESTFN, unlink
import unittest
import os
import re
import itertools
import random
import socket
import string
import sys
import weakref
Expand All
@@ -14,6 +18,16 @@
PAGESIZE = mmap.PAGESIZE
tagname_prefix = f'python_{os.getpid()}_test_mmap' def random_tagname(length=10): suffix = ''.join(random.choices(string.ascii_uppercase, k=length)) return f'{tagname_prefix}_{suffix}'
# Python's mmap module dup()s the file descriptor. Emscripten's FS layer # does not materialize file changes through a dupped fd to a new mmap. if is_emscripten: raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
class MmapTests(unittest.TestCase):
Expand Down
Expand Up
@@ -609,21 +623,23 @@ def test_tagname(self):
data1 = b"0123456789"
data2 = b"abcdefghij"
assert len(data1) == len(data2)
tagname1 = random_tagname()
tagname2 = random_tagname()
# Test same tag m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname=tagname1) m1[:] = data1 m2 = mmap.mmap(-1, len(data2), tagname="foo") m2 = mmap.mmap(-1, len(data2), tagname=tagname1) m2[:] = data2 self.assertEqual(m1[:], data2) self.assertEqual(m2[:], data2) m2.close() m1.close()
# Test different tag m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname=tagname1) m1[:] = data1 m2 = mmap.mmap(-1, len(data2), tagname="boo") m2 = mmap.mmap(-1, len(data2), tagname=tagname2) m2[:] = data2 self.assertEqual(m1[:], data1) self.assertEqual(m2[:], data2)Expand All
@@ -634,17 +650,18 @@ def test_tagname(self):
@unittest.skipUnless(os.name == 'nt', 'requires Windows')
def test_sizeof(self):
m1 = mmap.mmap(-1, 100)
tagname = "foo"
tagname = random_tagname()
m2 = mmap.mmap(-1, 100, tagname=tagname)
self.assertEqual(sys.getsizeof(m2),
sys.getsizeof(m1) + len(tagname) + 1)
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_crasher_on_windows(self): # Should not crash (Issue 1733986) m = mmap.mmap(-1, 1000, tagname="foo") tagname = random_tagname() m = mmap.mmap(-1, 1000, tagname=tagname) try: mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size except: pass m.close()Expand Down
Expand Up
@@ -707,7 +724,6 @@ def test_write_returning_the_number_of_bytes_written(self):
self.assertEqual(mm.write(b"yz"), 2)
self.assertEqual(mm.write(b"python"), 6)
@unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') def test_resize_past_pos(self): m = mmap.mmap(-1, 8192) self.addCleanup(m.close)Expand Down
Expand Up
@@ -797,6 +813,80 @@ def test_madvise(self):
self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, 2), None)
self.assertEqual(m.madvise(mmap.MADV_NORMAL, 0, size), None)
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_up_when_mapped_to_pagefile(self): """If the mmap is backed by the pagefile ensure a resize up can happen and that the original data is still in place """ start_size = PAGESIZE new_size = 2 * start_size data = bytes(random.getrandbits(8) for _ in range(start_size))
m = mmap.mmap(-1, start_size) m[:] = data m.resize(new_size) self.assertEqual(len(m), new_size) self.assertEqual(m[:start_size], data[:start_size])
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_down_when_mapped_to_pagefile(self): """If the mmap is backed by the pagefile ensure a resize down up can happen and that a truncated form of the original data is still in place """ start_size = PAGESIZE new_size = start_size // 2 data = bytes(random.getrandbits(8) for _ in range(start_size))
m = mmap.mmap(-1, start_size) m[:] = data m.resize(new_size) self.assertEqual(len(m), new_size) self.assertEqual(m[:new_size], data[:new_size])
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_fails_if_mapping_held_elsewhere(self): """If more than one mapping is held against a named file on Windows, neither mapping can be resized """ start_size = 2 * PAGESIZE reduced_size = PAGESIZE
f = open(TESTFN, 'wb+') f.truncate(start_size) try: m1 = mmap.mmap(f.fileno(), start_size) m2 = mmap.mmap(f.fileno(), start_size) with self.assertRaises(OSError): m1.resize(reduced_size) with self.assertRaises(OSError): m2.resize(reduced_size) m2.close() m1.resize(reduced_size) self.assertEqual(m1.size(), reduced_size) self.assertEqual(os.stat(f.fileno()).st_size, reduced_size) finally: f.close()
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_succeeds_with_error_for_second_named_mapping(self): """If a more than one mapping exists of the same name, none of them can be resized: they'll raise an Exception and leave the original mapping intact """ start_size = 2 * PAGESIZE reduced_size = PAGESIZE tagname = random_tagname() data_length = 8 data = bytes(random.getrandbits(8) for _ in range(data_length))
m1 = mmap.mmap(-1, start_size, tagname=tagname) m2 = mmap.mmap(-1, start_size, tagname=tagname) m1[:data_length] = data self.assertEqual(m2[:data_length], data) with self.assertRaises(OSError): m1.resize(reduced_size) self.assertEqual(m1.size(), start_size) self.assertEqual(m1[:data_length], data) self.assertEqual(m2[:data_length], data)
class LargeMmapTests(unittest.TestCase):
Expand Down
PAGESIZE = mmap.PAGESIZE
tagname_prefix = f'python_{os.getpid()}_test_mmap' def random_tagname(length=10): suffix = ''.join(random.choices(string.ascii_uppercase, k=length)) return f'{tagname_prefix}_{suffix}'
# Python's mmap module dup()s the file descriptor. Emscripten's FS layer # does not materialize file changes through a dupped fd to a new mmap. if is_emscripten: raise unittest.SkipTest("incompatible with Emscripten's mmap emulation.")
class MmapTests(unittest.TestCase):
# Test same tag m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname=tagname1) m1[:] = data1 m2 = mmap.mmap(-1, len(data2), tagname="foo") m2 = mmap.mmap(-1, len(data2), tagname=tagname1) m2[:] = data2 self.assertEqual(m1[:], data2) self.assertEqual(m2[:], data2) m2.close() m1.close()
# Test different tag m1 = mmap.mmap(-1, len(data1), tagname="foo") m1 = mmap.mmap(-1, len(data1), tagname=tagname1) m1[:] = data1 m2 = mmap.mmap(-1, len(data2), tagname="boo") m2 = mmap.mmap(-1, len(data2), tagname=tagname2) m2[:] = data2 self.assertEqual(m1[:], data1) self.assertEqual(m2[:], data2)
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_crasher_on_windows(self): # Should not crash (Issue 1733986) m = mmap.mmap(-1, 1000, tagname="foo") tagname = random_tagname() m = mmap.mmap(-1, 1000, tagname=tagname) try: mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size mmap.mmap(-1, 5000, tagname=tagname)[:] # same tagname, but larger size except: pass m.close()
@unittest.skipIf(os.name == 'nt', 'cannot resize anonymous mmaps on Windows') def test_resize_past_pos(self): m = mmap.mmap(-1, 8192) self.addCleanup(m.close)
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_up_when_mapped_to_pagefile(self): """If the mmap is backed by the pagefile ensure a resize up can happen and that the original data is still in place """ start_size = PAGESIZE new_size = 2 * start_size data = bytes(random.getrandbits(8) for _ in range(start_size))
m = mmap.mmap(-1, start_size) m[:] = data m.resize(new_size) self.assertEqual(len(m), new_size) self.assertEqual(m[:start_size], data[:start_size])
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_down_when_mapped_to_pagefile(self): """If the mmap is backed by the pagefile ensure a resize down up can happen and that a truncated form of the original data is still in place """ start_size = PAGESIZE new_size = start_size // 2 data = bytes(random.getrandbits(8) for _ in range(start_size))
m = mmap.mmap(-1, start_size) m[:] = data m.resize(new_size) self.assertEqual(len(m), new_size) self.assertEqual(m[:new_size], data[:new_size])
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_fails_if_mapping_held_elsewhere(self): """If more than one mapping is held against a named file on Windows, neither mapping can be resized """ start_size = 2 * PAGESIZE reduced_size = PAGESIZE
f = open(TESTFN, 'wb+') f.truncate(start_size) try: m1 = mmap.mmap(f.fileno(), start_size) m2 = mmap.mmap(f.fileno(), start_size) with self.assertRaises(OSError): m1.resize(reduced_size) with self.assertRaises(OSError): m2.resize(reduced_size) m2.close() m1.resize(reduced_size) self.assertEqual(m1.size(), reduced_size) self.assertEqual(os.stat(f.fileno()).st_size, reduced_size) finally: f.close()
@unittest.skipUnless(os.name == 'nt', 'requires Windows') def test_resize_succeeds_with_error_for_second_named_mapping(self): """If a more than one mapping exists of the same name, none of them can be resized: they'll raise an Exception and leave the original mapping intact """ start_size = 2 * PAGESIZE reduced_size = PAGESIZE tagname = random_tagname() data_length = 8 data = bytes(random.getrandbits(8) for _ in range(data_length))
m1 = mmap.mmap(-1, start_size, tagname=tagname) m2 = mmap.mmap(-1, start_size, tagname=tagname) m1[:data_length] = data self.assertEqual(m2[:data_length], data) with self.assertRaises(OSError): m1.resize(reduced_size) self.assertEqual(m1.size(), start_size) self.assertEqual(m1[:data_length], data) self.assertEqual(m2[:data_length], data)
class LargeMmapTests(unittest.TestCase):