Source code for esis.es

# -*- coding: utf-8 -*-
"""Elasticsearch related funcionality."""
import hashlib
import logging
import os
import time

from urlparse import urlparse

import elasticsearch.helpers

from elasticsearch import Elasticsearch

from sqlalchemy.types import (
    BIGINT,
    BOOLEAN,
    CHAR,
    CLOB,
    DATE,
    DATETIME,
    DECIMAL,
    FLOAT,
    INTEGER,
    NCHAR,
    NUMERIC,
    NVARCHAR,
    NullType,
    REAL,
    SMALLINT,
    TEXT,
    TIME,
    TIMESTAMP,
    VARCHAR,
)

from esis.fs import TreeExplorer
from esis.db import (
    DBReader,
    Database,
    TableReader,
)


logger = logging.getLogger(__name__)


[docs]class Client(object): """Elasticsearch client wrapper. :param host: Elasticsearch host :type host: str :param port: Elasticsearch port :type port: int """ INDEX_NAME = 'sqlite' def __init__(self, host, port): """Create low level client.""" self.es_client = Elasticsearch([{'host': host, 'port': port}])
[docs] def index(self, directory): """Index all the information available in a directory. In elasticsearch there will be an index for each database and a document type for each table in the database. :param directory: Directory that should be indexed :type directory: str """ logger.debug('Indexing %r...', directory) start = time.time() documents_indexed = self._index_directory(directory) end = time.time() logger.info('%d documents indexed in %.2f seconds', documents_indexed, end - start)
def _index_directory(self, directory): """Index all databases under a given directory. :param directory: Path to the directory to explore :type directory: str :return: Documents indexed for this directory :rtype: int """ documents_indexed = 0 self._recreate_index(self.INDEX_NAME) tree_explorer = TreeExplorer(directory) for db_path in tree_explorer.paths(): with Database(db_path) as database: documents_indexed += self._index_database(database) return documents_indexed def _recreate_index(self, index_name): """Recreate elasticsearch index. It's checked that the index exists before trying to delete it to avoid failures. :param index_name: Elasticsearch index to delete :type index_name: str """ logger.debug('Recreating index (%s)...', index_name) if self.es_client.indices.exists(index_name): self.es_client.indices.delete(index_name) self.es_client.indices.create(index_name) def _index_database(self, database): """Index all tables in a database file. :param database: Database to be indexed :type database: :class:`esis.db.Database` :return: Documents indexed for this database :rtype: int """ # Recreate index for the given database documents_indexed = 0 logger.debug('Populating index (%s)...', self.INDEX_NAME) db_reader = DBReader(database) # Index the content of every database table for table_name in db_reader.tables(): table_reader = TableReader(database, table_name) documents_indexed += self._index_table(table_reader) return documents_indexed def _index_table(self, table_reader): """Index all rows in a database table. :param table_reader: Object to iterate through all rows in a table :type table_reader: :class:`esis.db.TableReader` :return: Documents indexed for this table :rtype: int """ documents_indexed = 0 # Workaround Elasticsearch document type limitations db_path = table_reader.database.db_filename table_name = table_reader.table.name document_type = hashlib.md5( '{}:{}'.format(db_path, table_name) ).hexdigest() # Translate database schema into an elasticsearch mapping table_schema = table_reader.get_schema() table_mapping = Mapping(document_type, table_schema) self.es_client.indices.put_mapping( index=self.INDEX_NAME, doc_type=document_type, body=table_mapping.mapping) db_filename = table_reader.database.db_filename documents = [ get_document(db_filename, table_name, row) for row in table_reader.rows() ] actions = ( get_index_action(self.INDEX_NAME, document_type, document) for document in documents ) documents_indexed, errors = elasticsearch.helpers.bulk( self.es_client, actions) if errors: logger.warning('Indexing errors reported: %s', errors) return documents_indexed
[docs] def search(self, query): """Yield all documents that match a given query. :param query: A simple query with data to search in elasticsearch :type query: str :return: Records that matched the query as returned by elasticsearch :rtype: list(dict) """ logger.debug('Searching %r...', query) body = { 'query': { 'match': { '_all': query, }, }, } response = self.es_client.search( body=body, scroll='5m', size=100, ) hits_info = response['hits'] hits_total = hits_info['total'] logger.info('%d documents matched', hits_total) hits = hits_info['hits'] yield hits if '_scroll_id' in response: scroll_id = response['_scroll_id'] while True: response = self.es_client.scroll( scroll_id=scroll_id, scroll='5m', ) hits = response['hits']['hits'] if not hits: break yield hits
[docs] def count(self): """Return indexed documents information. :returns: Indexed documents information :rtype: dict """ return self.es_client.count()
[docs] def clean(self): """Remove all indexed documents.""" self.es_client.indices.delete(index='_all')
[docs]class Mapping(object): """ElasticSearch mapping. :param document_type: Document type user for the database table :type document_type: str :param table_schema: Database table schema from sqlalchemy :type table_schema: dict(str, sqlalchemy.types.*) """ # Mapping from sqlalchemy types to elasticsearch ones # Note: The columns that have a type that maps to None, will be removed # from the final mapping to let elastic search figure out the type by # itself. This is because SQLite works with storage classes and type # affinities and it's not always clear what datatype data will really have. # In particular, values with NUMERIC affinity might be stored using any of # the five available storage classes, so it's not possible to predict for # all cases what type of data will be stored without looking at it as # elasticsearch does. SQL_TYPE_MAPPING = { BIGINT: 'long', BOOLEAN: 'boolean', CHAR: 'string', CLOB: 'string', DATE: None, DATETIME: 'date', DECIMAL: None, FLOAT: 'float', # TODO: Use 'integer' when data is in range INTEGER: 'long', NCHAR: 'string', NVARCHAR: 'string', NullType: None, NUMERIC: None, # TODO: Use 'float' when data is in range REAL: 'double', SMALLINT: 'integer', TEXT: 'string', TIME: 'date', # TODO: Map to something time specific? TIMESTAMP: 'date', VARCHAR: 'string', } def __init__(self, document_type, table_schema): """Map every column type to an elasticsearch mapping.""" # Database filename and table will be added to a metadata field columns_mapping = { '_metadata': { 'type': 'object', 'index': 'no', 'properties': { 'filename': { 'type': 'string', 'index': 'no', }, 'table': { 'type': 'string', 'index': 'no', }, } } } assert '_metadata' not in table_schema for column_name, column_sql_type in table_schema.iteritems(): column_mapping = self._get_column_mapping(column_sql_type) # Skip columns that don't have an mapping defined and let # elasticsearch figure out the mapping itself if column_mapping is None: continue columns_mapping[column_name] = column_mapping self.mapping = { document_type: { 'properties': columns_mapping, } } def _get_column_mapping(self, column_sql_type): """Return column mapping based on its name and type. :param column_sql_type: Database column type :type column_sql_type: sqlalchemy.types.* :return: Mapping for the given column name and type (if possible) :rtype: dict(str) | None """ column_es_type = self.SQL_TYPE_MAPPING[type(column_sql_type)] if column_es_type is None: return None column_mapping = {'type': column_es_type} return column_mapping
[docs]def get_document(db_filename, table_name, row): """Get document to be indexed from row. :param db_filename: Path to the database file :type db_filename: str :param table_name: Database table name :param row: Database row :type row: sqlalchemy.engine.result.RowProxy """ # Convert row to dictionary document = dict(row) # Add metadata to the document document.update( _metadata={ 'filename': db_filename, 'table': table_name, } ) # Avoid indexing binary data for field_name, field_data in document.items(): # Avoid indexing binary data if isinstance(field_data, buffer): logger.debug('%r field discarded before indexing', field_name) del document[field_name] # Avoid indexing local paths elif isinstance(field_data, basestring): url = urlparse(field_data) if (url.scheme == 'file' and os.path.exists(url.path)): logger.debug( '%r field discarded before indexing', field_name) del document[field_name] return document
[docs]def get_index_action(index_name, document_type, document): """Generate index action for a given document. :param index_name: Elasticsearch index to use :type index_name: str :param document_type: Elasticsearch document type to use :type index_name: str :param document: Document to be indexed :type row: dict :return: Action to be passed in bulk request :rtype: dict """ action = { '_index': index_name, '_type': document_type, '_source': document, } # Use the same _id field in elasticsearch as in the database table if '_id' in document: action['_id'] = document['_id'] return action