Source code for aim2dat.io.utils

"""Utils for read and write functions."""

# Standard library imports
import os
import re
import io
from functools import wraps
from contextlib import contextmanager


[docs] def read_structure(pattern, preset_kwargs={}): """Decorate functions that parse structure(s).""" def decorator(func): func._is_read_structure_method = True func._pattern = pattern func._preset_kwargs = preset_kwargs @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) return wrapper return decorator
[docs] def read_multiple(pattern, is_read_strct_method=False, preset_kwargs={}): """Add support for a list of multiple files or folder paths (decorator).""" # The following cases need to be covered: # Single file as file name/file content/file object # Multiple files as List of files or folder path def _check_file(file_like, file_dict, re_pattern, is_strict): if os.path.isfile(file_like): file_name = os.path.split(file_like)[1] elif hasattr(file_like, "filename"): # Support AiiDA single file: file_name = file_like.filename else: return None match = None if re_pattern is None else re_pattern.match(file_name) if match or not is_strict: file_dict["file"].append(file_like) file_dict["file_name"].append(file_name) for key, val in match.groupdict().items(): file_dict[key].append(val) def read_func_decorator(func): func._is_read_structure_method = is_read_strct_method func._pattern = pattern func._preset_kwargs = preset_kwargs @wraps(func) def wrapper(*args, **kwargs): # Get file(s) argument: files = None if len(args) > 0: files = args[0] args = args[1:] else: for file_arg in ["file", "files", "folder_path"]: # TODO adapt? if files in kwargs: files = kwargs.pop(file_arg) break # Find files to open: if not isinstance(files, (list, tuple)): files = [files] re_pattern = None file_dict = {"file": [], "file_name": []} if pattern: re_pattern = re.compile(pattern) for k0 in re_pattern.groupindex.keys(): file_dict[k0] = [] for file_like in files: if os.path.isdir(file_like): [ _check_file(os.path.join(file_like, f0), file_dict, re_pattern, True) for f0 in os.listdir(file_like) ] else: _check_file(file_like, file_dict, re_pattern, False) # TODO add regex to error message: if len(file_dict["file"]) == 0: raise ValueError("No files with the correct naming scheme found.") # TODO add check for number of files and policy to handle multiple files. return func(file_dict, *args, **kwargs) return wrapper return read_func_decorator
[docs] @contextmanager def custom_open(file, mode="r", **kwargs): """ Open files by distinguishing custom file classes (such as AiiDA's SingleFileData) with an open function. """ has_close = True if os.path.exists(file): resource = open(file, mode=mode, **kwargs) else: resource = io.StringIO(file) has_close = False try: yield resource finally: if has_close: resource.close()