bpo-5038: Use iterable objects in case of mmap file source · Pull Request #11904 · python/cpython
from test import support
def _read_post_body(self, request_handler): encoding = request_handler.headers.get("Transfer-Encoding") if encoding is None: length = int(request_handler.headers.get("Content-Length")) if encoding == "chunked": request = b"" while True: line = request_handler.rfile.readline() length = int(line, 16) if length == 0: request_handler.rfile.readline() break request = request + request_handler.rfile.read(length) request_handler.rfile.read(2) else: request = self.rfile.read(length)
return request
def handle_request(self, request_handler): """Performs digest authentication on the given HTTP request handler. Returns True if authentication was successful, False
if request_handler.is_POST == True: request_handler.post_body = self._read_post_body(request_handler)
if len(self._users) == 0: return True
def do_POST(self): encoding = self.headers.get("Transfer-Encoding") if encoding is None: length = int(self.headers.get("Content-Length")) if encoding == "chunked": request = b"" while True: line = self.rfile.readline() length = int(line, 16) if length == 0: self.rfile.readline() break request = request + self.rfile.read(length) self.rfile.read(2) length = len(request) else: request = self.rfile.read(length) if length > 0: if not self.headers.get("Authorization", ""): self.do_AUTHHEAD() self.wfile.write(b"No Auth header received") return elif self.headers.get( "Authorization", "") == "Basic " + self.ENCODED_AUTH: self.send_response(200, "OK") self.end_headers() self.wfile.write(request) else: self.do_AUTHHEAD() return else: self.send_response(400, "Empty request data") self.end_headers()
# Proxy test infrastructure
def do_POST(self): (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( self.path, "http") self.short_path = path self.is_POST = True if self.digest_auth_handler.handle_request(self): request = self.post_body if len(request) > 0: self.send_response(200, "OK") self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(request) else: self.send_response(400, "Empty request data") self.end_headers()
# Test cases
class BasicAuthTests(unittest.TestCase):
def test_basic_auth_mmap_post(self): data = "field=value".encode("ascii") mem = mmap.mmap(-1, len(data)) mem[:] = data auth_handler = urllib.request.HTTPBasicAuthHandler() auth_handler.add_password(realm=self.REALM, uri=self.server_url, user=self.USER, passwd=self.PASSWD) opener = urllib.request.build_opener(auth_handler) urllib.request.install_opener(opener) try: response = urllib.request.urlopen(self.server_url, mem) except urllib.error.HTTPError: self.fail("Basic auth failed for the url: %s" % self.server_url) response_data = response.read() self.assertEqual(response_data, data)
class ProxyAuthTests(unittest.TestCase): URL = "http://localhost"
def test_proxy_auth_mmap_post(self): data = "field=value".encode("ascii") mem = mmap.mmap(-1, len(data)) mem[:] = data self.proxy_digest_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) response = self.opener.open(self.URL, mem) self.assertEqual(response.read(), data)
def GetRequestHandler(responses):