◐ Shell
clean mode source ↗

bpo-5038: Use iterable objects in case of mmap file source · Pull Request #11904 · python/cpython

Expand Up @@ -7,6 +7,7 @@ import threading import unittest import hashlib import mmap
from test import support
Expand Down Expand Up @@ -153,6 +154,25 @@ def _return_auth_challenge(self, request_handler): request_handler.wfile.write(b"Proxy Authentication Required.") return False
def _read_post_body(self, request_handler): encoding = request_handler.headers.get("Transfer-Encoding") if encoding is None: length = int(request_handler.headers.get("Content-Length")) if encoding == "chunked": request = b"" while True: line = request_handler.rfile.readline() length = int(line, 16) if length == 0: request_handler.rfile.readline() break request = request + request_handler.rfile.read(length) request_handler.rfile.read(2) else: request = self.rfile.read(length)
return request
def handle_request(self, request_handler): """Performs digest authentication on the given HTTP request handler. Returns True if authentication was successful, False Expand All @@ -162,6 +182,9 @@ def handle_request(self, request_handler): disabled and this method will always return True. """
if request_handler.is_POST == True: request_handler.post_body = self._read_post_body(request_handler)
if len(self._users) == 0: return True
Expand Down Expand Up @@ -238,7 +261,39 @@ def do_GET(self): # Request Unauthorized self.do_AUTHHEAD()

def do_POST(self): encoding = self.headers.get("Transfer-Encoding") if encoding is None: length = int(self.headers.get("Content-Length")) if encoding == "chunked": request = b"" while True: line = self.rfile.readline() length = int(line, 16) if length == 0: self.rfile.readline() break request = request + self.rfile.read(length) self.rfile.read(2) length = len(request) else: request = self.rfile.read(length) if length > 0: if not self.headers.get("Authorization", ""): self.do_AUTHHEAD() self.wfile.write(b"No Auth header received") return elif self.headers.get( "Authorization", "") == "Basic " + self.ENCODED_AUTH: self.send_response(200, "OK") self.end_headers() self.wfile.write(request) else: self.do_AUTHHEAD() return else: self.send_response(400, "Empty request data") self.end_headers()
# Proxy test infrastructure
Expand All @@ -264,6 +319,7 @@ def do_GET(self): (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( self.path, "http") self.short_path = path self.is_POST = False if self.digest_auth_handler.handle_request(self): self.send_response(200, "OK") self.send_header("Content-Type", "text/html") Expand All @@ -273,6 +329,22 @@ def do_GET(self): self.wfile.write(b"Our apologies, but our server is down due to " b"a sudden zombie invasion.")
def do_POST(self): (scm, netloc, path, params, query, fragment) = urllib.parse.urlparse( self.path, "http") self.short_path = path self.is_POST = True if self.digest_auth_handler.handle_request(self): request = self.post_body if len(request) > 0: self.send_response(200, "OK") self.send_header("Content-Type", "text/plain") self.end_headers() self.wfile.write(request) else: self.send_response(400, "Empty request data") self.end_headers()
# Test cases
class BasicAuthTests(unittest.TestCase): Expand Down Expand Up @@ -314,6 +386,21 @@ def test_basic_auth_httperror(self): urllib.request.install_opener(urllib.request.build_opener(ah)) self.assertRaises(urllib.error.HTTPError, urllib.request.urlopen, self.server_url)
def test_basic_auth_mmap_post(self): data = "field=value".encode("ascii") mem = mmap.mmap(-1, len(data)) mem[:] = data auth_handler = urllib.request.HTTPBasicAuthHandler() auth_handler.add_password(realm=self.REALM, uri=self.server_url, user=self.USER, passwd=self.PASSWD) opener = urllib.request.build_opener(auth_handler) urllib.request.install_opener(opener) try: response = urllib.request.urlopen(self.server_url, mem) except urllib.error.HTTPError: self.fail("Basic auth failed for the url: %s" % self.server_url) response_data = response.read() self.assertEqual(response_data, data)
class ProxyAuthTests(unittest.TestCase): URL = "http://localhost" Expand Down Expand Up @@ -392,6 +479,15 @@ def test_proxy_qop_auth_int_works_or_throws_urlerror(self): pass result.close()
def test_proxy_auth_mmap_post(self): data = "field=value".encode("ascii") mem = mmap.mmap(-1, len(data)) mem[:] = data self.proxy_digest_handler.add_password(self.REALM, self.URL, self.USER, self.PASSWD) response = self.opener.open(self.URL, mem) self.assertEqual(response.read(), data)

def GetRequestHandler(responses):
Expand Down