gh-102765: Updated isdir/isfile/islink/exists to use Py_GetFileInformationByName in ntpath when available by finnagin · Pull Request #103485 · python/cpython
Shame that we have a whole lot more duplicated code now, but we don't really have any good alternatives in C without losing the straight-line execution.
Here's a getPosixFileType() function that returns the POSIX file type (e.g. S_IFDIR) and optionally the reparse tag. The latter is needed for isjunction() and possibly other tests that could be added later. It takes separate reparseDirectory and reparseFile parameters for the by-name fast path. This allows the fast path to skip reparsing a file reparse point when checking isdir() and skip reparsing a directory reparse point when checking isfile(). I implemented two helper functions, fileTypeFromDeviceType() and posixFileTypeFromFileInfo(), which could also be used more generally in the os.stat() implementation.
static DWORD fileTypeFromDeviceType(DWORD deviceType) { switch (deviceType) { case FILE_DEVICE_DISK: case FILE_DEVICE_VIRTUAL_DISK: case FILE_DEVICE_DFS: case FILE_DEVICE_CD_ROM: case FILE_DEVICE_CONTROLLER: case FILE_DEVICE_DATALINK: case FILE_DEVICE_DISK_FILE_SYSTEM: case FILE_DEVICE_CD_ROM_FILE_SYSTEM: return FILE_TYPE_DISK; case FILE_DEVICE_CONSOLE: case FILE_DEVICE_NULL: case FILE_DEVICE_KEYBOARD: case FILE_DEVICE_MODEM: case FILE_DEVICE_MOUSE: case FILE_DEVICE_PARALLEL_PORT: case FILE_DEVICE_PRINTER: case FILE_DEVICE_SCREEN: case FILE_DEVICE_SERIAL_PORT: case FILE_DEVICE_SOUND: return FILE_TYPE_CHAR; case FILE_DEVICE_NAMED_PIPE: return FILE_TYPE_PIPE; default: return FILE_TYPE_UNKNOWN; } } static unsigned int posixFileTypeFromFileInfo(DWORD fileType, DWORD fileAttributes, DWORD reparseTag) { if ((fileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) && reparseTag == IO_REPARSE_TAG_SYMLINK) { return S_IFLNK; } if (fileAttributes & FILE_ATTRIBUTE_DIRECTORY) { return S_IFDIR; } switch (fileType) { case FILE_TYPE_DISK: // BUGBUG: define S_IFBLK as 0x6000 in PC/pyconfig.h return fileAttributes ? S_IFREG : 0x6000; case FILE_TYPE_CHAR: return S_IFCHR; case FILE_TYPE_PIPE: // BUGBUG: define S_IFIFO as _S_IFIFO in PC/pyconfig.h. UCRT neglects // to define it for _CRT_INTERNAL_NONSTDC_NAMES in sys/stat.h. return _S_IFIFO; } return 0; } static BOOL getPosixFileType(path_t *path, int *pPosixFileType, DWORD *pReparseTag, BOOL reparseDirectory, BOOL reparseFile) { HANDLE hfile; DWORD fileType = 0; DWORD fileAttributes = 0; DWORD reparseTag = 0; int posixFileType = 0; BOOL reparse = reparseDirectory || reparseFile; BOOL queryByHandle = TRUE; BOOL closeFile = TRUE; BOOL result = FALSE; if (path->wide) { FILE_STAT_BASIC_INFORMATION statInfo; if (_Py_GetFileInformationByName(path->wide, FileStatBasicByNameInfo, &statInfo, sizeof(statInfo))) { if (statInfo.FileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { if (statInfo.FileAttributes & FILE_ATTRIBUTE_DIRECTORY) { if (!reparseDirectory) { queryByHandle = FALSE; } } else if (!reparseFile) { queryByHandle = FALSE; } } else { queryByHandle = FALSE; } if (!queryByHandle) { result = TRUE; fileType = fileTypeFromDeviceType(statInfo.DeviceType); fileAttributes = statInfo.FileAttributes; if (fileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { reparseTag = statInfo.ReparseTag; } } } else { switch(GetLastError()) { case ERROR_FILE_NOT_FOUND: case ERROR_PATH_NOT_FOUND: case ERROR_NOT_READY: case ERROR_BAD_NET_NAME: case ERROR_BAD_NETPATH: case ERROR_BAD_PATHNAME: case ERROR_INVALID_NAME: case ERROR_FILENAME_EXCED_RANGE: queryByHandle = FALSE; break; } } } if (queryByHandle) { if (path->fd != -1) { closeFile = FALSE; hfile = _Py_get_osfhandle_noraise(path->fd); } else { DWORD flags = FILE_FLAG_BACKUP_SEMANTICS; if (!reparse) { flags |= FILE_FLAG_OPEN_REPARSE_POINT; } hfile = CreateFileW(path->wide, FILE_READ_ATTRIBUTES, 0, NULL, OPEN_EXISTING, flags, NULL); } if (hfile != INVALID_HANDLE_VALUE) { fileType = GetFileType(hfile); if (fileType != FILE_TYPE_UNKNOWN || GetLastError() == NO_ERROR) { result = TRUE; } if (closeFile || fileType == FILE_TYPE_DISK) { FILE_ATTRIBUTE_TAG_INFO fati; FILE_BASIC_INFO fbi; if (GetFileInformationByHandleEx(hfile, FileAttributeTagInfo, &fati, sizeof(fati))) { fileAttributes = fati.FileAttributes; if (fileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) { reparseTag = fati.ReparseTag; } } else if (GetFileInformationByHandleEx(hfile, FileBasicInfo, &fbi, sizeof(fbi))) { fileAttributes = fbi.FileAttributes; } } if (closeFile) { CloseHandle(hfile); } } else if (path->wide) { int status; STRUCT_STAT st; switch (GetLastError()) { case ERROR_ACCESS_DENIED: case ERROR_SHARING_VIOLATION: case ERROR_CANT_ACCESS_FILE: case ERROR_INVALID_PARAMETER: if (reparse) { status = STAT(path->wide, &st); } else { status = LSTAT(path->wide, &st); } if (status == 0) { result = TRUE; posixFileType = st.st_mode & S_IFMT; reparseTag = st.st_reparse_tag; } } } } if (result) { if (posixFileType) { *pPosixFileType = posixFileType; } else { *pPosixFileType = posixFileTypeFromFileInfo( fileType, fileAttributes, reparseTag); } if (pReparseTag) { *pReparseTag = reparseTag; } } return result; }
Example usage:
static PyObject * os__path_isdir_impl(PyObject *module, PyObject *path) { BOOL success; int fileType; path_t _path = PATH_T_INITIALIZE("isdir", "path", 0, 1); if (!path_converter(path, &_path)) { path_cleanup(&_path); if (PyErr_ExceptionMatches(PyExc_ValueError)) { PyErr_Clear(); Py_RETURN_FALSE; } return NULL; } Py_BEGIN_ALLOW_THREADS success = getPosixFileType(&_path, &fileType, NULL, TRUE, FALSE); Py_END_ALLOW_THREADS path_cleanup(&_path); if (success && fileType == S_IFDIR) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } static PyObject * os__path_isfile_impl(PyObject *module, PyObject *path) { BOOL success; int fileType; path_t _path = PATH_T_INITIALIZE("isfile", "path", 0, 1); if (!path_converter(path, &_path)) { path_cleanup(&_path); if (PyErr_ExceptionMatches(PyExc_ValueError)) { PyErr_Clear(); Py_RETURN_FALSE; } return NULL; } Py_BEGIN_ALLOW_THREADS success = getPosixFileType(&_path, &fileType, NULL, FALSE, TRUE); Py_END_ALLOW_THREADS path_cleanup(&_path); if (success && fileType == S_IFREG) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } static PyObject * os__path_exists_impl(PyObject *module, PyObject *path) { BOOL success; int fileType; path_t _path = PATH_T_INITIALIZE("exists", "path", 0, 1); if (!path_converter(path, &_path)) { path_cleanup(&_path); if (PyErr_ExceptionMatches(PyExc_ValueError)) { PyErr_Clear(); Py_RETURN_FALSE; } return NULL; } Py_BEGIN_ALLOW_THREADS success = getPosixFileType(&_path, &fileType, NULL, TRUE, TRUE); Py_END_ALLOW_THREADS path_cleanup(&_path); if (success) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } static PyObject * os__path_islink_impl(PyObject *module, PyObject *path) { BOOL success; int fileType; path_t _path = PATH_T_INITIALIZE("islink", "path", 0, 1); if (!path_converter(path, &_path)) { path_cleanup(&_path); if (PyErr_ExceptionMatches(PyExc_ValueError)) { PyErr_Clear(); Py_RETURN_FALSE; } return NULL; } Py_BEGIN_ALLOW_THREADS success = getPosixFileType(&_path, &fileType, NULL, FALSE, FALSE); Py_END_ALLOW_THREADS path_cleanup(&_path); if (success && fileType == S_IFLNK) { Py_RETURN_TRUE; } Py_RETURN_FALSE; } static PyObject * os__path_isjunction_impl(PyObject *module, PyObject *path) { BOOL success; int fileType; DWORD reparseTag; path_t _path = PATH_T_INITIALIZE("islink", "path", 0, 1); if (!path_converter(path, &_path)) { path_cleanup(&_path); if (PyErr_ExceptionMatches(PyExc_ValueError)) { PyErr_Clear(); Py_RETURN_FALSE; } return NULL; } Py_BEGIN_ALLOW_THREADS success = getPosixFileType(&_path, &fileType, &reparseTag, FALSE, FALSE); Py_END_ALLOW_THREADS path_cleanup(&_path); if (success && fileType == S_IFDIR && reparseTag == IO_REPARSE_TAG_MOUNT_POINT) { Py_RETURN_TRUE; } Py_RETURN_FALSE; }