443 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			443 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""
 | 
						|
Utility functions for
 | 
						|
 | 
						|
- building and importing modules on test time, using a temporary location
 | 
						|
- detecting if compilers are present
 | 
						|
- determining paths to tests
 | 
						|
 | 
						|
"""
 | 
						|
import atexit
 | 
						|
import concurrent.futures
 | 
						|
import contextlib
 | 
						|
import glob
 | 
						|
import os
 | 
						|
import shutil
 | 
						|
import subprocess
 | 
						|
import sys
 | 
						|
import tempfile
 | 
						|
from importlib import import_module
 | 
						|
from pathlib import Path
 | 
						|
 | 
						|
import pytest
 | 
						|
 | 
						|
import numpy
 | 
						|
from numpy._utils import asunicode
 | 
						|
from numpy.f2py._backends._meson import MesonBackend
 | 
						|
from numpy.testing import IS_WASM, temppath
 | 
						|
 | 
						|
#
 | 
						|
# Check if compilers are available at all...
 | 
						|
#
 | 
						|
 | 
						|
def check_language(lang, code_snippet=None):
 | 
						|
    if sys.platform == "win32":
 | 
						|
        pytest.skip("No Fortran tests on Windows (Issue #25134)", allow_module_level=True)
 | 
						|
    tmpdir = tempfile.mkdtemp()
 | 
						|
    try:
 | 
						|
        meson_file = os.path.join(tmpdir, "meson.build")
 | 
						|
        with open(meson_file, "w") as f:
 | 
						|
            f.write("project('check_compilers')\n")
 | 
						|
            f.write(f"add_languages('{lang}')\n")
 | 
						|
            if code_snippet:
 | 
						|
                f.write(f"{lang}_compiler = meson.get_compiler('{lang}')\n")
 | 
						|
                f.write(f"{lang}_code = '''{code_snippet}'''\n")
 | 
						|
                f.write(
 | 
						|
                    f"_have_{lang}_feature ="
 | 
						|
                    f"{lang}_compiler.compiles({lang}_code,"
 | 
						|
                    f" name: '{lang} feature check')\n"
 | 
						|
                )
 | 
						|
        try:
 | 
						|
            runmeson = subprocess.run(
 | 
						|
                ["meson", "setup", "btmp"],
 | 
						|
                check=False,
 | 
						|
                cwd=tmpdir,
 | 
						|
                capture_output=True,
 | 
						|
            )
 | 
						|
        except subprocess.CalledProcessError:
 | 
						|
            pytest.skip("meson not present, skipping compiler dependent test", allow_module_level=True)
 | 
						|
        return runmeson.returncode == 0
 | 
						|
    finally:
 | 
						|
        shutil.rmtree(tmpdir)
 | 
						|
 | 
						|
 | 
						|
fortran77_code = '''
 | 
						|
C Example Fortran 77 code
 | 
						|
      PROGRAM HELLO
 | 
						|
      PRINT *, 'Hello, Fortran 77!'
 | 
						|
      END
 | 
						|
'''
 | 
						|
 | 
						|
fortran90_code = '''
 | 
						|
! Example Fortran 90 code
 | 
						|
program hello90
 | 
						|
  type :: greeting
 | 
						|
    character(len=20) :: text
 | 
						|
  end type greeting
 | 
						|
 | 
						|
  type(greeting) :: greet
 | 
						|
  greet%text = 'hello, fortran 90!'
 | 
						|
  print *, greet%text
 | 
						|
end program hello90
 | 
						|
'''
 | 
						|
 | 
						|
# Dummy class for caching relevant checks
 | 
						|
class CompilerChecker:
 | 
						|
    def __init__(self):
 | 
						|
        self.compilers_checked = False
 | 
						|
        self.has_c = False
 | 
						|
        self.has_f77 = False
 | 
						|
        self.has_f90 = False
 | 
						|
 | 
						|
    def check_compilers(self):
 | 
						|
        if (not self.compilers_checked) and (not sys.platform == "cygwin"):
 | 
						|
            with concurrent.futures.ThreadPoolExecutor() as executor:
 | 
						|
                futures = [
 | 
						|
                    executor.submit(check_language, "c"),
 | 
						|
                    executor.submit(check_language, "fortran", fortran77_code),
 | 
						|
                    executor.submit(check_language, "fortran", fortran90_code)
 | 
						|
                ]
 | 
						|
 | 
						|
                self.has_c = futures[0].result()
 | 
						|
                self.has_f77 = futures[1].result()
 | 
						|
                self.has_f90 = futures[2].result()
 | 
						|
 | 
						|
            self.compilers_checked = True
 | 
						|
 | 
						|
 | 
						|
if not IS_WASM:
 | 
						|
    checker = CompilerChecker()
 | 
						|
    checker.check_compilers()
 | 
						|
 | 
						|
def has_c_compiler():
 | 
						|
    return checker.has_c
 | 
						|
 | 
						|
def has_f77_compiler():
 | 
						|
    return checker.has_f77
 | 
						|
 | 
						|
def has_f90_compiler():
 | 
						|
    return checker.has_f90
 | 
						|
 | 
						|
def has_fortran_compiler():
 | 
						|
    return (checker.has_f90 and checker.has_f77)
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Maintaining a temporary module directory
 | 
						|
#
 | 
						|
 | 
						|
_module_dir = None
 | 
						|
_module_num = 5403
 | 
						|
 | 
						|
if sys.platform == "cygwin":
 | 
						|
    NUMPY_INSTALL_ROOT = Path(__file__).parent.parent.parent
 | 
						|
    _module_list = list(NUMPY_INSTALL_ROOT.glob("**/*.dll"))
 | 
						|
 | 
						|
 | 
						|
def _cleanup():
 | 
						|
    global _module_dir
 | 
						|
    if _module_dir is not None:
 | 
						|
        try:
 | 
						|
            sys.path.remove(_module_dir)
 | 
						|
        except ValueError:
 | 
						|
            pass
 | 
						|
        try:
 | 
						|
            shutil.rmtree(_module_dir)
 | 
						|
        except OSError:
 | 
						|
            pass
 | 
						|
        _module_dir = None
 | 
						|
 | 
						|
 | 
						|
def get_module_dir():
 | 
						|
    global _module_dir
 | 
						|
    if _module_dir is None:
 | 
						|
        _module_dir = tempfile.mkdtemp()
 | 
						|
        atexit.register(_cleanup)
 | 
						|
        if _module_dir not in sys.path:
 | 
						|
            sys.path.insert(0, _module_dir)
 | 
						|
    return _module_dir
 | 
						|
 | 
						|
 | 
						|
def get_temp_module_name():
 | 
						|
    # Assume single-threaded, and the module dir usable only by this thread
 | 
						|
    global _module_num
 | 
						|
    get_module_dir()
 | 
						|
    name = "_test_ext_module_%d" % _module_num
 | 
						|
    _module_num += 1
 | 
						|
    if name in sys.modules:
 | 
						|
        # this should not be possible, but check anyway
 | 
						|
        raise RuntimeError("Temporary module name already in use.")
 | 
						|
    return name
 | 
						|
 | 
						|
 | 
						|
def _memoize(func):
 | 
						|
    memo = {}
 | 
						|
 | 
						|
    def wrapper(*a, **kw):
 | 
						|
        key = repr((a, kw))
 | 
						|
        if key not in memo:
 | 
						|
            try:
 | 
						|
                memo[key] = func(*a, **kw)
 | 
						|
            except Exception as e:
 | 
						|
                memo[key] = e
 | 
						|
                raise
 | 
						|
        ret = memo[key]
 | 
						|
        if isinstance(ret, Exception):
 | 
						|
            raise ret
 | 
						|
        return ret
 | 
						|
 | 
						|
    wrapper.__name__ = func.__name__
 | 
						|
    return wrapper
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Building modules
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
@_memoize
 | 
						|
def build_module(source_files, options=[], skip=[], only=[], module_name=None):
 | 
						|
    """
 | 
						|
    Compile and import a f2py module, built from the given files.
 | 
						|
 | 
						|
    """
 | 
						|
 | 
						|
    code = f"import sys; sys.path = {sys.path!r}; import numpy.f2py; numpy.f2py.main()"
 | 
						|
 | 
						|
    d = get_module_dir()
 | 
						|
    # gh-27045 : Skip if no compilers are found
 | 
						|
    if not has_fortran_compiler():
 | 
						|
        pytest.skip("No Fortran compiler available")
 | 
						|
 | 
						|
    # Copy files
 | 
						|
    dst_sources = []
 | 
						|
    f2py_sources = []
 | 
						|
    for fn in source_files:
 | 
						|
        if not os.path.isfile(fn):
 | 
						|
            raise RuntimeError(f"{fn} is not a file")
 | 
						|
        dst = os.path.join(d, os.path.basename(fn))
 | 
						|
        shutil.copyfile(fn, dst)
 | 
						|
        dst_sources.append(dst)
 | 
						|
 | 
						|
        base, ext = os.path.splitext(dst)
 | 
						|
        if ext in (".f90", ".f95", ".f", ".c", ".pyf"):
 | 
						|
            f2py_sources.append(dst)
 | 
						|
 | 
						|
    assert f2py_sources
 | 
						|
 | 
						|
    # Prepare options
 | 
						|
    if module_name is None:
 | 
						|
        module_name = get_temp_module_name()
 | 
						|
    gil_options = []
 | 
						|
    if '--freethreading-compatible' not in options and '--no-freethreading-compatible' not in options:
 | 
						|
        # default to disabling the GIL if unset in options
 | 
						|
        gil_options = ['--freethreading-compatible']
 | 
						|
    f2py_opts = ["-c", "-m", module_name] + options + gil_options + f2py_sources
 | 
						|
    f2py_opts += ["--backend", "meson"]
 | 
						|
    if skip:
 | 
						|
        f2py_opts += ["skip:"] + skip
 | 
						|
    if only:
 | 
						|
        f2py_opts += ["only:"] + only
 | 
						|
 | 
						|
    # Build
 | 
						|
    cwd = os.getcwd()
 | 
						|
    try:
 | 
						|
        os.chdir(d)
 | 
						|
        cmd = [sys.executable, "-c", code] + f2py_opts
 | 
						|
        p = subprocess.Popen(cmd,
 | 
						|
                             stdout=subprocess.PIPE,
 | 
						|
                             stderr=subprocess.STDOUT)
 | 
						|
        out, err = p.communicate()
 | 
						|
        if p.returncode != 0:
 | 
						|
            raise RuntimeError(f"Running f2py failed: {cmd[4:]}\n{asunicode(out)}")
 | 
						|
    finally:
 | 
						|
        os.chdir(cwd)
 | 
						|
 | 
						|
        # Partial cleanup
 | 
						|
        for fn in dst_sources:
 | 
						|
            os.unlink(fn)
 | 
						|
 | 
						|
    # Rebase (Cygwin-only)
 | 
						|
    if sys.platform == "cygwin":
 | 
						|
        # If someone starts deleting modules after import, this will
 | 
						|
        # need to change to record how big each module is, rather than
 | 
						|
        # relying on rebase being able to find that from the files.
 | 
						|
        _module_list.extend(
 | 
						|
            glob.glob(os.path.join(d, f"{module_name:s}*"))
 | 
						|
        )
 | 
						|
        subprocess.check_call(
 | 
						|
            ["/usr/bin/rebase", "--database", "--oblivious", "--verbose"]
 | 
						|
            + _module_list
 | 
						|
        )
 | 
						|
 | 
						|
    # Import
 | 
						|
    return import_module(module_name)
 | 
						|
 | 
						|
 | 
						|
@_memoize
 | 
						|
def build_code(source_code,
 | 
						|
               options=[],
 | 
						|
               skip=[],
 | 
						|
               only=[],
 | 
						|
               suffix=None,
 | 
						|
               module_name=None):
 | 
						|
    """
 | 
						|
    Compile and import Fortran code using f2py.
 | 
						|
 | 
						|
    """
 | 
						|
    if suffix is None:
 | 
						|
        suffix = ".f"
 | 
						|
    with temppath(suffix=suffix) as path:
 | 
						|
        with open(path, "w") as f:
 | 
						|
            f.write(source_code)
 | 
						|
        return build_module([path],
 | 
						|
                            options=options,
 | 
						|
                            skip=skip,
 | 
						|
                            only=only,
 | 
						|
                            module_name=module_name)
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Building with meson
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
class SimplifiedMesonBackend(MesonBackend):
 | 
						|
    def __init__(self, *args, **kwargs):
 | 
						|
        super().__init__(*args, **kwargs)
 | 
						|
 | 
						|
    def compile(self):
 | 
						|
        self.write_meson_build(self.build_dir)
 | 
						|
        self.run_meson(self.build_dir)
 | 
						|
 | 
						|
 | 
						|
def build_meson(source_files, module_name=None, **kwargs):
 | 
						|
    """
 | 
						|
    Build a module via Meson and import it.
 | 
						|
    """
 | 
						|
 | 
						|
    # gh-27045 : Skip if no compilers are found
 | 
						|
    if not has_fortran_compiler():
 | 
						|
        pytest.skip("No Fortran compiler available")
 | 
						|
 | 
						|
    build_dir = get_module_dir()
 | 
						|
    if module_name is None:
 | 
						|
        module_name = get_temp_module_name()
 | 
						|
 | 
						|
    # Initialize the MesonBackend
 | 
						|
    backend = SimplifiedMesonBackend(
 | 
						|
        modulename=module_name,
 | 
						|
        sources=source_files,
 | 
						|
        extra_objects=kwargs.get("extra_objects", []),
 | 
						|
        build_dir=build_dir,
 | 
						|
        include_dirs=kwargs.get("include_dirs", []),
 | 
						|
        library_dirs=kwargs.get("library_dirs", []),
 | 
						|
        libraries=kwargs.get("libraries", []),
 | 
						|
        define_macros=kwargs.get("define_macros", []),
 | 
						|
        undef_macros=kwargs.get("undef_macros", []),
 | 
						|
        f2py_flags=kwargs.get("f2py_flags", []),
 | 
						|
        sysinfo_flags=kwargs.get("sysinfo_flags", []),
 | 
						|
        fc_flags=kwargs.get("fc_flags", []),
 | 
						|
        flib_flags=kwargs.get("flib_flags", []),
 | 
						|
        setup_flags=kwargs.get("setup_flags", []),
 | 
						|
        remove_build_dir=kwargs.get("remove_build_dir", False),
 | 
						|
        extra_dat=kwargs.get("extra_dat", {}),
 | 
						|
    )
 | 
						|
 | 
						|
    backend.compile()
 | 
						|
 | 
						|
    # Import the compiled module
 | 
						|
    sys.path.insert(0, f"{build_dir}/{backend.meson_build_dir}")
 | 
						|
    return import_module(module_name)
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Unittest convenience
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
class F2PyTest:
 | 
						|
    code = None
 | 
						|
    sources = None
 | 
						|
    options = []
 | 
						|
    skip = []
 | 
						|
    only = []
 | 
						|
    suffix = ".f"
 | 
						|
    module = None
 | 
						|
    _has_c_compiler = None
 | 
						|
    _has_f77_compiler = None
 | 
						|
    _has_f90_compiler = None
 | 
						|
 | 
						|
    @property
 | 
						|
    def module_name(self):
 | 
						|
        cls = type(self)
 | 
						|
        return f'_{cls.__module__.rsplit(".", 1)[-1]}_{cls.__name__}_ext_module'
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def setup_class(cls):
 | 
						|
        if sys.platform == "win32":
 | 
						|
            pytest.skip("Fails with MinGW64 Gfortran (Issue #9673)")
 | 
						|
        F2PyTest._has_c_compiler = has_c_compiler()
 | 
						|
        F2PyTest._has_f77_compiler = has_f77_compiler()
 | 
						|
        F2PyTest._has_f90_compiler = has_f90_compiler()
 | 
						|
        F2PyTest._has_fortran_compiler = has_fortran_compiler()
 | 
						|
 | 
						|
    def setup_method(self):
 | 
						|
        if self.module is not None:
 | 
						|
            return
 | 
						|
 | 
						|
        codes = self.sources or []
 | 
						|
        if self.code:
 | 
						|
            codes.append(self.suffix)
 | 
						|
 | 
						|
        needs_f77 = any(str(fn).endswith(".f") for fn in codes)
 | 
						|
        needs_f90 = any(str(fn).endswith(".f90") for fn in codes)
 | 
						|
        needs_pyf = any(str(fn).endswith(".pyf") for fn in codes)
 | 
						|
 | 
						|
        if needs_f77 and not self._has_f77_compiler:
 | 
						|
            pytest.skip("No Fortran 77 compiler available")
 | 
						|
        if needs_f90 and not self._has_f90_compiler:
 | 
						|
            pytest.skip("No Fortran 90 compiler available")
 | 
						|
        if needs_pyf and not self._has_fortran_compiler:
 | 
						|
            pytest.skip("No Fortran compiler available")
 | 
						|
 | 
						|
        # Build the module
 | 
						|
        if self.code is not None:
 | 
						|
            self.module = build_code(
 | 
						|
                self.code,
 | 
						|
                options=self.options,
 | 
						|
                skip=self.skip,
 | 
						|
                only=self.only,
 | 
						|
                suffix=self.suffix,
 | 
						|
                module_name=self.module_name,
 | 
						|
            )
 | 
						|
 | 
						|
        if self.sources is not None:
 | 
						|
            self.module = build_module(
 | 
						|
                self.sources,
 | 
						|
                options=self.options,
 | 
						|
                skip=self.skip,
 | 
						|
                only=self.only,
 | 
						|
                module_name=self.module_name,
 | 
						|
            )
 | 
						|
 | 
						|
 | 
						|
#
 | 
						|
# Helper functions
 | 
						|
#
 | 
						|
 | 
						|
 | 
						|
def getpath(*a):
 | 
						|
    # Package root
 | 
						|
    d = Path(numpy.f2py.__file__).parent.resolve()
 | 
						|
    return d.joinpath(*a)
 | 
						|
 | 
						|
 | 
						|
@contextlib.contextmanager
 | 
						|
def switchdir(path):
 | 
						|
    curpath = Path.cwd()
 | 
						|
    os.chdir(path)
 | 
						|
    try:
 | 
						|
        yield
 | 
						|
    finally:
 | 
						|
        os.chdir(curpath)
 |