ssl module for windows by youknowone · Pull Request #6332 · RustPython/RustPython
if hostname.parse::<std::net::IpAddr>().is_ok() { return Err(vm.new_value_error("server_hostname cannot be an IP address")); } // IP addresses are allowed as server_hostname // SNI will not be sent for IP addresses
if hostname.contains('\0') { return Err(vm.new_type_error("embedded null character"));
// Load successfully found certificates for cert in result.certs { let is_ca = cert::is_ca_certificate(cert.as_ref()); if store.add(cert).is_ok() { *self.x509_cert_count.write() += 1; if is_ca { *self.ca_cert_count.write() += 1; #[cfg(windows)] { // Windows: Use schannel to load from both ROOT and CA stores use schannel::cert_store::CertStore;
let store_names = ["ROOT", "CA"]; let open_fns = [CertStore::open_current_user, CertStore::open_local_machine];
for store_name in store_names { for open_fn in &open_fns { if let Ok(cert_store) = open_fn(store_name) { for cert_ctx in cert_store.certs() { let der_bytes = cert_ctx.to_der(); let cert = rustls::pki_types::CertificateDer::from(der_bytes.to_vec()); let is_ca = cert::is_ca_certificate(cert.as_ref()); if store.add(cert).is_ok() { *self.x509_cert_count.write() += 1; if is_ca { *self.ca_cert_count.write() += 1; } } } } } } }
// If there were errors but some certs loaded, just continue // If NO certs loaded and there were errors, report the first error if *self.x509_cert_count.read() == 0 && !result.errors.is_empty() { return Err(vm.new_os_error(format!( "Failed to load native certificates: {}", result.errors[0] ))); if *self.x509_cert_count.read() == 0 { return Err(vm.new_os_error( "Failed to load certificates from Windows store".to_owned(), )); }
Ok(()) }
Ok(()) #[cfg(not(windows))] { let result = rustls_native_certs::load_native_certs();
// Load successfully found certificates for cert in result.certs { let is_ca = cert::is_ca_certificate(cert.as_ref()); if store.add(cert).is_ok() { *self.x509_cert_count.write() += 1; if is_ca { *self.ca_cert_count.write() += 1; } } }
// If there were errors but some certs loaded, just continue // If NO certs loaded and there were errors, report the first error if *self.x509_cert_count.read() == 0 && !result.errors.is_empty() { return Err(vm.new_os_error(format!( "Failed to load native certificates: {}", result.errors[0] ))); }
Ok(()) } }
#[pymethod]
// Create loader (without ca_certs_der - default certs don't go to get_ca_certs()) let mut lazy_ca_certs = Vec::new(); let mut loader = cert::CertLoader::new(&mut store, &mut lazy_ca_certs); #[cfg(windows)] { // Windows: Load system certificates first, then additionally load from env // see: test_load_default_certs_env_windows let _ = self.load_system_certificates(&mut store, vm);
// Try Python os.environ first (allows runtime env changes) // This checks SSL_CERT_FILE and SSL_CERT_DIR from Python's os.environ let loaded = self.try_load_from_python_environ(&mut loader, vm)?; let mut lazy_ca_certs = Vec::new(); let mut loader = cert::CertLoader::new(&mut store, &mut lazy_ca_certs); let _ = self.try_load_from_python_environ(&mut loader, vm)?; }
// Fallback to system certificates if environment variables didn't provide any if !loaded { let _ = self.load_system_certificates(&mut store, vm); #[cfg(not(windows))] { // Non-Windows: Try env vars first; only fallback to system certs if not set // see: test_load_default_certs_env let mut lazy_ca_certs = Vec::new(); let mut loader = cert::CertLoader::new(&mut store, &mut lazy_ca_certs); let loaded = self.try_load_from_python_environ(&mut loader, vm)?;
if !loaded { let _ = self.load_system_certificates(&mut store, vm); } }
// If no certificates were loaded from system, fallback to webpki-roots (Mozilla CA bundle)
// Check if it's a bare IP address (not allowed for SNI) if hostname.parse::<std::net::IpAddr>().is_ok() { return Err(vm.new_value_error("server_hostname cannot be an IP address")); } // IP addresses are allowed // SNI will not be sent for IP addresses
// Check for NULL bytes if hostname.contains('\0') {
// Unified write logic - no need to match on Client/Server anymore let mut writer = conn.writer(); writer .write_all(data_bytes.as_ref()) .map_err(|e| vm.new_os_error(format!("Write failed: {e}")))?; let is_bio = self.is_bio_mode(); let data: &[u8] = data_bytes.as_ref();
// Flush to get TLS-encrypted data (writer automatically flushed on drop) // Send encrypted data to socket if conn.wants_write() { let is_bio = self.is_bio_mode(); // Write data in chunks to avoid filling the internal TLS buffer // rustls has a limited internal buffer, so we need to flush periodically const CHUNK_SIZE: usize = 16384; // 16KB chunks (typical TLS record size) let mut written = 0;
if is_bio { // BIO mode: Write ALL pending TLS data to outgoing BIO // This prevents hangs where Python's ssl_io_loop waits for data self.write_pending_tls(conn, vm)?; } else { // Socket mode: Try once and may return SSLWantWriteError let mut buf = Vec::new(); conn.write_tls(&mut buf) .map_err(|e| vm.new_os_error(format!("TLS write failed: {e}")))?;
if !buf.is_empty() { // Wait for socket to be ready for writing let timed_out = self.sock_wait_for_io_impl(SelectKind::Write, vm)?; if timed_out { return Err(vm.new_os_error("Write operation timed out")); } while written < data.len() { let chunk_end = std::cmp::min(written + CHUNK_SIZE, data.len()); let chunk = &data[written..chunk_end];
// Write chunk to TLS layer { let mut writer = conn.writer(); use std::io::Write; writer .write_all(chunk) .map_err(|e| vm.new_os_error(format!("Write failed: {e}")))?; }
written = chunk_end;
// Send encrypted data to socket // Convert BlockingIOError to SSLWantWriteError match self.sock_send(buf, vm) { Ok(_) => {} Err(e) => { if is_blocking_io_error(&e, vm) { // Non-blocking socket would block - return SSLWantWriteError return Err(create_ssl_want_write_error(vm)); // Flush TLS data to socket after each chunk if conn.wants_write() { if is_bio { self.write_pending_tls(conn, vm)?; } else { // Socket mode: flush all pending TLS data while conn.wants_write() { let mut buf = Vec::new(); conn.write_tls(&mut buf) .map_err(|e| vm.new_os_error(format!("TLS write failed: {e}")))?;
if !buf.is_empty() { let timed_out = self.sock_wait_for_io_impl(SelectKind::Write, vm)?; if timed_out { return Err(vm.new_os_error("Write operation timed out")); }
match self.sock_send(buf, vm) { Ok(_) => {} Err(e) => { if is_blocking_io_error(&e, vm) { return Err(create_ssl_want_write_error(vm)); } return Err(e); } } return Err(e); } } }
#[cfg(not(any(target_os = "macos", target_os = "linux")))] #[cfg(windows)] let (default_cafile, default_capath) = { // Windows uses certificate store, not file paths // Return empty strings to avoid None being passed to os.path.isfile() (Some(""), Some("")) };
#[cfg(not(any(target_os = "macos", target_os = "linux", windows)))] let (default_cafile, default_capath): (Option<&str>, Option<&str>) = (None, None);
let tuple = vm.ctx.new_tuple(vec![
// Windows-specific certificate store enumeration functions #[cfg(windows)] #[pyfunction] fn enum_certificates(store_name: PyStrRef, vm: &VirtualMachine) -> PyResult<Vec<PyObjectRef>> { use schannel::{RawPointer, cert_context::ValidUses, cert_store::CertStore}; use windows_sys::Win32::Security::Cryptography;
// Try both Current User and Local Machine stores let open_fns = [CertStore::open_current_user, CertStore::open_local_machine]; let stores = open_fns .iter() .filter_map(|open| open(store_name.as_str()).ok()) .collect::<Vec<_>>();
// If no stores could be opened, raise OSError if stores.is_empty() { return Err(vm.new_os_error(format!( "failed to open certificate store {:?}", store_name.as_str() ))); }
let certs = stores.iter().flat_map(|s| s.certs()).map(|c| { let cert = vm.ctx.new_bytes(c.to_der().to_owned()); let enc_type = unsafe { let ptr = c.as_ptr() as *const Cryptography::CERT_CONTEXT; (*ptr).dwCertEncodingType }; let enc_type = match enc_type { Cryptography::X509_ASN_ENCODING => vm.new_pyobj("x509_asn"), Cryptography::PKCS_7_ASN_ENCODING => vm.new_pyobj("pkcs_7_asn"), other => vm.new_pyobj(other), }; let usage: PyObjectRef = match c.valid_uses() { Ok(ValidUses::All) => vm.ctx.new_bool(true).into(), Ok(ValidUses::Oids(oids)) => { match crate::builtins::PyFrozenSet::from_iter( vm, oids.into_iter().map(|oid| vm.ctx.new_str(oid).into()), ) { Ok(set) => set.into_ref(&vm.ctx).into(), Err(_) => vm.ctx.new_bool(true).into(), } } Err(_) => vm.ctx.new_bool(true).into(), }; Ok(vm.new_tuple((cert, enc_type, usage)).into()) }); certs.collect::<PyResult<Vec<_>>>() }
#[cfg(windows)] #[pyfunction] fn enum_crls(store_name: PyStrRef, vm: &VirtualMachine) -> PyResult<Vec<PyObjectRef>> { use windows_sys::Win32::Security::Cryptography::{ CRL_CONTEXT, CertCloseStore, CertEnumCRLsInStore, CertOpenSystemStoreW, X509_ASN_ENCODING, };
let store_name_wide: Vec<u16> = store_name .as_str() .encode_utf16() .chain(std::iter::once(0)) .collect();
// Open system store let store = unsafe { CertOpenSystemStoreW(0, store_name_wide.as_ptr()) };
if store.is_null() { return Err(vm.new_os_error(format!( "failed to open certificate store {:?}", store_name.as_str() ))); }
let mut result = Vec::new();
let mut crl_context: *const CRL_CONTEXT = std::ptr::null(); loop { crl_context = unsafe { CertEnumCRLsInStore(store, crl_context) }; if crl_context.is_null() { break; }
let crl = unsafe { &*crl_context }; let crl_bytes = unsafe { std::slice::from_raw_parts(crl.pbCrlEncoded, crl.cbCrlEncoded as usize) };
let enc_type = if crl.dwCertEncodingType == X509_ASN_ENCODING { vm.new_pyobj("x509_asn") } else { vm.new_pyobj(crl.dwCertEncodingType) };
result.push( vm.new_tuple((vm.ctx.new_bytes(crl_bytes.to_vec()), enc_type)) .into(), ); }
unsafe { CertCloseStore(store, 0) };
Ok(result) }
// Certificate type for SSL module (pure Rust implementation) #[pyattr] #[pyclass(module = "_ssl", name = "Certificate")]