Source code for esis.fs

# -*- coding: utf-8 -*-
"""Filesystem functionality."""
import logging
import os

import magic

from sqlalchemy.exc import OperationalError

from esis.db import Database


logger = logging.getLogger(__name__)


[docs]class TreeExplorer(object): """Look for sqlite files in a tree and return the valid ones. :param directory: Base directory for the tree to be explored. :type directory: str :param blacklist: List of relative directories to skip :type blacklist: list(str) """ def __init__(self, directory, blacklist=None): """Initialize tree explorer.""" self.directory = directory self.blacklist = blacklist if blacklist is not None else []
[docs] def paths(self): """Return paths to valid databases found under directory. :return: Paths to valid databases :rtype: list(str) """ db_paths = self._explore() logger.debug( '%d database paths found under %s:\n%s', len(db_paths), self.directory, '\n'.join(os.path.relpath(db_path, self.directory) for db_path in db_paths)) # Filter out files that don't pass sqlite's quick check # that just can't be opened valid_paths = [] for db_path in db_paths: try: with Database(db_path) as database: if database.run_quick_check(): valid_paths.append(db_path) except OperationalError: logger.warning('Unable to open: %s', db_path) continue logger.debug( '%d database paths passed the integrity check:\n%s', len(valid_paths), '\n'.join(os.path.relpath(valid_path, self.directory) for valid_path in valid_paths)) return valid_paths
def _explore(self): """Walk from base directory and return files that match pattern. :returns: SQLite files found under directory :rtype: list(str) """ db_paths = [] for (dirpath, dirnames, filenames) in os.walk(self.directory): logger.debug('Exploring %s...', dirpath) # Check if any subdirectory is blacklisted blacklisted_dirnames = [ dirname for dirname in dirnames if os.path.relpath( os.path.join(dirpath, dirname), self.directory, ) in self.blacklist ] if blacklisted_dirnames: logger.debug( 'Subdirectories blacklisted: %s', blacklisted_dirnames) for blacklisted_dirname in blacklisted_dirnames: # Note: if dirnames is updated in place, os.walk will recurse # only in the remaining directories dirnames.remove(blacklisted_dirname) # Check if any filename is a sqlite database for filename in filenames: db_path = os.path.join(dirpath, filename) # Skip missing files like broken symbolic links if not os.path.isfile(db_path): logger.warning('Unable to access file: %r', db_path) continue if 'SQLite' in magic.from_file(db_path): db_paths.append(db_path) return db_paths