◐ Shell
clean mode source ↗

gh-102765: Updated isdir/isfile/islink/exists to use Py_GetFileInformationByName in ntpath when available by finnagin · Pull Request #103485 · python/cpython

Shame that we have a whole lot more duplicated code now, but we don't really have any good alternatives in C without losing the straight-line execution.

Here's a getPosixFileType() function that returns the POSIX file type (e.g. S_IFDIR) and optionally the reparse tag. The latter is needed for isjunction() and possibly other tests that could be added later. It takes separate reparseDirectory and reparseFile parameters for the by-name fast path. This allows the fast path to skip reparsing a file reparse point when checking isdir() and skip reparsing a directory reparse point when checking isfile(). I implemented two helper functions, fileTypeFromDeviceType() and posixFileTypeFromFileInfo(), which could also be used more generally in the os.stat() implementation.

static DWORD
fileTypeFromDeviceType(DWORD deviceType)
{
    switch (deviceType) {
    case FILE_DEVICE_DISK:
    case FILE_DEVICE_VIRTUAL_DISK:
    case FILE_DEVICE_DFS:
    case FILE_DEVICE_CD_ROM:
    case FILE_DEVICE_CONTROLLER:
    case FILE_DEVICE_DATALINK:
    case FILE_DEVICE_DISK_FILE_SYSTEM:
    case FILE_DEVICE_CD_ROM_FILE_SYSTEM:
        return FILE_TYPE_DISK;

    case FILE_DEVICE_CONSOLE:
    case FILE_DEVICE_NULL:
    case FILE_DEVICE_KEYBOARD:
    case FILE_DEVICE_MODEM:
    case FILE_DEVICE_MOUSE:
    case FILE_DEVICE_PARALLEL_PORT:
    case FILE_DEVICE_PRINTER:
    case FILE_DEVICE_SCREEN:
    case FILE_DEVICE_SERIAL_PORT:
    case FILE_DEVICE_SOUND:
        return FILE_TYPE_CHAR;

    case FILE_DEVICE_NAMED_PIPE:
        return FILE_TYPE_PIPE;

    default:
        return FILE_TYPE_UNKNOWN;
    }
}


static unsigned int
posixFileTypeFromFileInfo(DWORD fileType, DWORD fileAttributes,
                          DWORD reparseTag)
{
    if ((fileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) &&
        reparseTag == IO_REPARSE_TAG_SYMLINK) {
        return S_IFLNK;
    }
    if (fileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
        return S_IFDIR;
    }
    switch (fileType) {
    case FILE_TYPE_DISK:
        // BUGBUG: define S_IFBLK as 0x6000 in PC/pyconfig.h
        return fileAttributes ? S_IFREG : 0x6000;
    case FILE_TYPE_CHAR:
        return S_IFCHR;
    case FILE_TYPE_PIPE:
        // BUGBUG: define S_IFIFO as _S_IFIFO in PC/pyconfig.h. UCRT neglects
        // to define it for _CRT_INTERNAL_NONSTDC_NAMES in sys/stat.h.
        return _S_IFIFO;
    }
    return 0;
}


static BOOL
getPosixFileType(path_t *path, int *pPosixFileType, DWORD *pReparseTag,
                 BOOL reparseDirectory, BOOL reparseFile)
{
    HANDLE hfile;
    DWORD fileType = 0;
    DWORD fileAttributes = 0;
    DWORD reparseTag = 0;
    int posixFileType = 0;
    BOOL reparse = reparseDirectory || reparseFile;
    BOOL queryByHandle = TRUE;
    BOOL closeFile = TRUE;
    BOOL result = FALSE;

    if (path->wide) {
        FILE_STAT_BASIC_INFORMATION statInfo;
        if (_Py_GetFileInformationByName(path->wide, FileStatBasicByNameInfo,
                &statInfo, sizeof(statInfo)))
        {
            if (statInfo.FileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
                if (statInfo.FileAttributes & FILE_ATTRIBUTE_DIRECTORY) {
                    if (!reparseDirectory) {
                        queryByHandle = FALSE;
                    }
                }
                else if (!reparseFile) {
                    queryByHandle = FALSE;
                }
            }
            else {
                queryByHandle = FALSE;
            }
            if (!queryByHandle) {
                result = TRUE;
                fileType = fileTypeFromDeviceType(statInfo.DeviceType);
                fileAttributes = statInfo.FileAttributes;
                if (fileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
                    reparseTag = statInfo.ReparseTag;
                }
            }
        }
        else {
            switch(GetLastError()) {
            case ERROR_FILE_NOT_FOUND:
            case ERROR_PATH_NOT_FOUND:
            case ERROR_NOT_READY:
            case ERROR_BAD_NET_NAME:
            case ERROR_BAD_NETPATH:
            case ERROR_BAD_PATHNAME:
            case ERROR_INVALID_NAME:
            case ERROR_FILENAME_EXCED_RANGE:
                queryByHandle = FALSE;
                break;
            }
        }
    }
    if (queryByHandle) {
        if (path->fd != -1) {
            closeFile = FALSE;
            hfile = _Py_get_osfhandle_noraise(path->fd);
        }
        else {
            DWORD flags = FILE_FLAG_BACKUP_SEMANTICS;
            if (!reparse) {
                flags |= FILE_FLAG_OPEN_REPARSE_POINT;
            }
            hfile = CreateFileW(path->wide, FILE_READ_ATTRIBUTES, 0, NULL,
                                OPEN_EXISTING, flags, NULL);
        }
        if (hfile != INVALID_HANDLE_VALUE) {
            fileType = GetFileType(hfile);
            if (fileType != FILE_TYPE_UNKNOWN || GetLastError() == NO_ERROR) {
                result = TRUE;
            }
            if (closeFile || fileType == FILE_TYPE_DISK) {
                FILE_ATTRIBUTE_TAG_INFO fati;
                FILE_BASIC_INFO fbi;
                if (GetFileInformationByHandleEx(hfile, FileAttributeTagInfo,
                        &fati, sizeof(fati)))
                {
                    fileAttributes = fati.FileAttributes;
                    if (fileAttributes & FILE_ATTRIBUTE_REPARSE_POINT) {
                        reparseTag = fati.ReparseTag;
                    }
                }
                else if (GetFileInformationByHandleEx(hfile, FileBasicInfo,
                            &fbi, sizeof(fbi)))
                {
                    fileAttributes = fbi.FileAttributes;
                }
            }
            if (closeFile) {
                CloseHandle(hfile);
            }
        }
        else if (path->wide) {
            int status;
            STRUCT_STAT st;
            switch (GetLastError()) {
            case ERROR_ACCESS_DENIED:
            case ERROR_SHARING_VIOLATION:
            case ERROR_CANT_ACCESS_FILE:
            case ERROR_INVALID_PARAMETER:
                if (reparse) {
                    status = STAT(path->wide, &st);
                }
                else {
                    status = LSTAT(path->wide, &st);
                }
                if (status == 0) {
                    result = TRUE;
                    posixFileType = st.st_mode & S_IFMT;
                    reparseTag = st.st_reparse_tag;
                }
            }
        }
    }
    if (result) {
        if (posixFileType) {
            *pPosixFileType = posixFileType;
        }
        else {
            *pPosixFileType = posixFileTypeFromFileInfo(
                                 fileType, fileAttributes, reparseTag);
        }
        if (pReparseTag) {
            *pReparseTag = reparseTag;
        }
    }
    return result;
}

Example usage:

static PyObject *
os__path_isdir_impl(PyObject *module, PyObject *path)
{
    BOOL success;
    int fileType;
    path_t _path = PATH_T_INITIALIZE("isdir", "path", 0, 1);

    if (!path_converter(path, &_path)) {
        path_cleanup(&_path);
        if (PyErr_ExceptionMatches(PyExc_ValueError)) {
            PyErr_Clear();
            Py_RETURN_FALSE;
        }
        return NULL;
    }

    Py_BEGIN_ALLOW_THREADS
    success = getPosixFileType(&_path, &fileType, NULL, TRUE, FALSE);
    Py_END_ALLOW_THREADS

    path_cleanup(&_path);
    if (success && fileType == S_IFDIR) {
        Py_RETURN_TRUE;
    }
    Py_RETURN_FALSE;
}


static PyObject *
os__path_isfile_impl(PyObject *module, PyObject *path)
{
    BOOL success;
    int fileType;
    path_t _path = PATH_T_INITIALIZE("isfile", "path", 0, 1);

    if (!path_converter(path, &_path)) {
        path_cleanup(&_path);
        if (PyErr_ExceptionMatches(PyExc_ValueError)) {
            PyErr_Clear();
            Py_RETURN_FALSE;
        }
        return NULL;
    }

    Py_BEGIN_ALLOW_THREADS
    success = getPosixFileType(&_path, &fileType, NULL, FALSE, TRUE);
    Py_END_ALLOW_THREADS

    path_cleanup(&_path);
    if (success && fileType == S_IFREG) {
        Py_RETURN_TRUE;
    }
    Py_RETURN_FALSE;
}


static PyObject *
os__path_exists_impl(PyObject *module, PyObject *path)
{
    BOOL success;
    int fileType;
    path_t _path = PATH_T_INITIALIZE("exists", "path", 0, 1);

    if (!path_converter(path, &_path)) {
        path_cleanup(&_path);
        if (PyErr_ExceptionMatches(PyExc_ValueError)) {
            PyErr_Clear();
            Py_RETURN_FALSE;
        }
        return NULL;
    }

    Py_BEGIN_ALLOW_THREADS
    success = getPosixFileType(&_path, &fileType, NULL, TRUE, TRUE);
    Py_END_ALLOW_THREADS

    path_cleanup(&_path);
    if (success) {
        Py_RETURN_TRUE;
    }
    Py_RETURN_FALSE;
}

static PyObject *
os__path_islink_impl(PyObject *module, PyObject *path)
{
    BOOL success;
    int fileType;
    path_t _path = PATH_T_INITIALIZE("islink", "path", 0, 1);

    if (!path_converter(path, &_path)) {
        path_cleanup(&_path);
        if (PyErr_ExceptionMatches(PyExc_ValueError)) {
            PyErr_Clear();
            Py_RETURN_FALSE;
        }
        return NULL;
    }

    Py_BEGIN_ALLOW_THREADS
    success = getPosixFileType(&_path, &fileType, NULL, FALSE, FALSE);
    Py_END_ALLOW_THREADS

    path_cleanup(&_path);
    if (success && fileType == S_IFLNK) {
        Py_RETURN_TRUE;
    }
    Py_RETURN_FALSE;
}


static PyObject *
os__path_isjunction_impl(PyObject *module, PyObject *path)
{
    BOOL success;
    int fileType;
    DWORD reparseTag;
    path_t _path = PATH_T_INITIALIZE("islink", "path", 0, 1);

    if (!path_converter(path, &_path)) {
        path_cleanup(&_path);
        if (PyErr_ExceptionMatches(PyExc_ValueError)) {
            PyErr_Clear();
            Py_RETURN_FALSE;
        }
        return NULL;
    }

    Py_BEGIN_ALLOW_THREADS
    success = getPosixFileType(&_path, &fileType, &reparseTag, FALSE, FALSE);
    Py_END_ALLOW_THREADS

    path_cleanup(&_path);
    if (success && fileType == S_IFDIR &&
        reparseTag == IO_REPARSE_TAG_MOUNT_POINT)
    {
        Py_RETURN_TRUE;
    }
    Py_RETURN_FALSE;
}