signal timer by youknowone · Pull Request #6535 · RustPython/RustPython
#[cfg(any(unix, windows))]
#[cfg(any(target_os = "linux", target_os = "android"))] mod ffi { unsafe extern "C" { pub fn getitimer(which: libc::c_int, curr_value: *mut libc::itimerval) -> libc::c_int; pub fn setitimer( which: libc::c_int, new_value: *const libc::itimerval, old_value: *mut libc::itimerval, ) -> libc::c_int; } }
#[pyattr] use crate::signal::NSIG;
// Interval timer constants #[cfg(all(unix, not(target_os = "android")))] #[pyattr] use libc::{ITIMER_PROF, ITIMER_REAL, ITIMER_VIRTUAL};
#[cfg(target_os = "android")] #[pyattr] const ITIMER_REAL: libc::c_int = 0; #[cfg(target_os = "android")] #[pyattr] const ITIMER_VIRTUAL: libc::c_int = 1; #[cfg(target_os = "android")] #[pyattr] const ITIMER_PROF: libc::c_int = 2;
#[cfg(unix)] #[pyattr(name = "ItimerError", once)] fn itimer_error(vm: &VirtualMachine) -> PyTypeRef { vm.ctx.new_exception_type( "signal", "ItimerError", Some(vec![vm.ctx.exceptions.os_error.to_owned()]), ) }
#[cfg(any(unix, windows))] pub(super) fn init_signal_handlers( module: &Py<crate::builtins::PyModule>,
#[cfg(unix)] #[pyfunction] fn pause(vm: &VirtualMachine) -> PyResult<()> { unsafe { libc::pause() }; signal::check_signals(vm)?; Ok(()) }
#[cfg(unix)] fn timeval_to_double(tv: &libc::timeval) -> f64 { tv.tv_sec as f64 + (tv.tv_usec as f64 / 1_000_000.0) }
#[cfg(unix)] fn double_to_timeval(val: f64) -> libc::timeval { libc::timeval { tv_sec: val.trunc() as _, tv_usec: ((val.fract()) * 1_000_000.0) as _, } }
#[cfg(unix)] fn itimerval_to_tuple(it: &libc::itimerval) -> (f64, f64) { ( timeval_to_double(&it.it_value), timeval_to_double(&it.it_interval), ) }
#[cfg(unix)] #[pyfunction] fn setitimer( which: i32, seconds: ArgIntoFloat, interval: OptionalArg<ArgIntoFloat>, vm: &VirtualMachine, ) -> PyResult<(f64, f64)> { let seconds: f64 = seconds.into(); let interval: f64 = interval.map(|v| v.into()).unwrap_or(0.0); let new = libc::itimerval { it_value: double_to_timeval(seconds), it_interval: double_to_timeval(interval), }; let mut old = std::mem::MaybeUninit::<libc::itimerval>::uninit(); #[cfg(any(target_os = "linux", target_os = "android"))] let ret = unsafe { ffi::setitimer(which, &new, old.as_mut_ptr()) }; #[cfg(not(any(target_os = "linux", target_os = "android")))] let ret = unsafe { libc::setitimer(which, &new, old.as_mut_ptr()) }; if ret != 0 { let err = std::io::Error::last_os_error(); let itimer_error = itimer_error(vm); return Err(vm.new_exception_msg(itimer_error, err.to_string())); } let old = unsafe { old.assume_init() }; Ok(itimerval_to_tuple(&old)) }
#[cfg(unix)] #[pyfunction] fn getitimer(which: i32, vm: &VirtualMachine) -> PyResult<(f64, f64)> { let mut old = std::mem::MaybeUninit::<libc::itimerval>::uninit(); #[cfg(any(target_os = "linux", target_os = "android"))] let ret = unsafe { ffi::getitimer(which, old.as_mut_ptr()) }; #[cfg(not(any(target_os = "linux", target_os = "android")))] let ret = unsafe { libc::getitimer(which, old.as_mut_ptr()) }; if ret != 0 { let err = std::io::Error::last_os_error(); let itimer_error = itimer_error(vm); return Err(vm.new_exception_msg(itimer_error, err.to_string())); } let old = unsafe { old.assume_init() }; Ok(itimerval_to_tuple(&old)) }
#[pyfunction] fn default_int_handler( _signum: PyObjectRef,