# -*- coding: utf-8 -*-
"""
Data model for extracted information.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import copy
from abc import ABCMeta
from collections import MutableSequence
import json
import logging
from pprint import pprint
import six
from ..utils import python_2_unicode_compatible
from ..parse.elements import Any, W, I
from ..parse.auto import AutoSentenceParser, AutoTableParser
log = logging.getLogger(__name__)
[docs]class BaseType(six.with_metaclass(ABCMeta)):
# This is assigned by ModelMeta to match the attribute on the Model
name = None
[docs] def __init__(self, default=None, null=False, required=False, contextual=False, parse_expression=None, updatable=False, binding=False, ignore_when_merging=False):
"""
:param default: (Optional) The default value for this field if none is set.
:param bool null: (Optional) Include in serialized output even if value is None. Default False.
:param bool required: (Optional) Whether a value is required. Default False.
:param bool contextual: (Optional) Whether this value is contextual. Default False.
:param BaseParserElement parse_expression: (Optional) Expression for parsing, instance of a subclass of BaseParserElement. Default None.
:param bool updatable: (Optional) Whether the parse_expression can be changed by the document as parsing occurs. Default False.
:param bool binding: (Optional) If this option is set to True, any submodels that have an attribute with the same name must have the same value for this attribute. Default False/
:param bool ignore_when_merging: (Optional) If this option is set to True, any records with a different value for this field is treated as corresponding to the same physical record.
"""
self.default = copy.deepcopy(default)
self.null = null
self.required = required
self.contextual = contextual
self.parse_expression = parse_expression
self.updatable = updatable
self.binding = binding
self.ignore_when_merging = ignore_when_merging
if self.parse_expression is None and self.updatable:
print('No parse_expression supplied but updatable set as True for ', type(self))
print('updatable refers to whether parse_expression can be changed by the document as parsing occurs. Setting updatable to False.')
self.updatable = False
self.parse_expression = copy.copy(parse_expression)
self._default_parse_expression = parse_expression
# when a record is created from the table, this will be filled with the row/col header cateogry strings
# which helps merging based on same row/column category
self.table_row_categories = None
self.table_col_categories = None
[docs] def reset(self):
"""
Reset the parse expression to the initial value.
"""
if self.updatable:
self.parse_expression = copy.copy(self._default_parse_expression)
def __get__(self, instance, owner):
"""Descriptor for retrieving a value from a field in a Model."""
# Check if Model class is being called, rather than Model instance
if instance is None:
return self
# Get value from Model instance if available
value = instance._values.get(self.name)
# If value is None or empty string then return the default value, if set
# if value in [None, ''] and self.default is not None:
# return self.default
return value
def __set__(self, instance, value):
"""Descriptor for assigning a value to a field in a Model."""
instance._values[self.name] = self.process(value)
[docs] def process(self, value):
"""Convert an assigned value into the desired data format for this field."""
return value
[docs] def serialize(self, value, primitive=False):
"""Serialize this field."""
if hasattr(value, 'serialize'):
# i.e. value is a nested model
return value.serialize(primitive=primitive)
else:
return value
[docs] def is_empty(self, value):
"""Return whether a value is considered empty for the case of this field."""
return False
[docs]class StringType(BaseType):
""""""
[docs] def process(self, value):
"""Convert value to a unicode string. Useful in case lxml _ElementUnicodeResult are passed from parser."""
return six.text_type(value) if value is not None else None
[docs] def is_empty(self, value):
if value is not None and isinstance(value, str) and value:
return False
return True
[docs]class FloatType(BaseType):
"""An floating point number field."""
[docs] def process(self, value):
"""Convert value to a float."""
if value is not None:
return float(value)
return None
[docs] def is_empty(self, value):
if value is not None:
return False
return True
[docs]class ModelType(BaseType):
[docs] def __init__(self, model, **kwargs):
self.model_class = model
self.model_name = self.model_class.__name__
super(ModelType, self).__init__(**kwargs)
[docs] def serialize(self, value, primitive=False):
"""Serialize this field."""
return value.serialize(primitive=primitive)
[docs] def is_empty(self, value):
if isinstance(value, self.model_class):
return value.is_empty
return True
[docs]class ListType(BaseType):
[docs] def __init__(self, field, default=None, sorted=False, **kwargs):
super(ListType, self).__init__(**kwargs)
self.field = field
self.default = default if default is not None else []
self.sorted = sorted
def __set__(self, instance, value):
"""Descriptor for assigning a value to a ListField in a Model."""
# Run process for the nested field type for each value in list
if value is None:
instance._values[self.name] = None
else:
processed = [self.field.process(v) for v in value]
if self.sorted:
processed = sorted(processed)
instance._values[self.name] = processed
[docs] def serialize(self, value, primitive=False):
"""Serialize this field."""
return [self.field.serialize(v, primitive=primitive) for v in value]
[docs] def is_empty(self, value):
if isinstance(value, list) and len(value) != 0:
return False
return True
[docs]class SetType(BaseType):
[docs] def __init__(self, field, default=None, **kwargs):
super(SetType, self).__init__(**kwargs)
self.field = field
self.default = default if default is not None else set()
def __set__(self, instance, value):
"""Descriptor for assigning a value to a SetField in a Model."""
# Run process for the nested field type for each value in list
if value is None:
instance._values[self.name] = None
else:
instance._values[self.name] = set(self.field.process(v) for v in value)
[docs] def serialize(self, value, primitive=False):
"""Serialize this field."""
if value is None or len(value) == 0:
return None
# a list, instead of a set is needed for easy compatibility with JSON output formats
# a new sorted list instance ensures the same order for different runs
# sorting in place results in an empty list in this case
rec_list = list(self.field.serialize(v, primitive=primitive) for v in value)
return sorted(rec_list)
[docs] def is_empty(self, value):
if isinstance(value, set) and len(value) != 0:
return False
return True
[docs]@python_2_unicode_compatible
class BaseModel(six.with_metaclass(ModelMeta)):
""""""
fields = {}
parsers = [AutoSentenceParser(), AutoTableParser()]
specifier = None
_updated = False
[docs] def __init__(self, **raw_data):
""""""
self._values = {}
for key, value in six.iteritems(raw_data):
setattr(self, key, value)
# Set defaults
for key, field in six.iteritems(self.fields):
if key not in raw_data:
setattr(self, key, copy.copy(field.default))
self._record_method = None
self.was_updated = self._updated
@property
def is_unidentified(self):
"""
If there is no 'compound' field associated with the model but the compound is contextual
"""
try:
if 'compound' not in self.fields.keys():
return False
if not self.compound.contextual_fulfilled:
return self.compound.is_unidentified
except AttributeError:
return True
def __repr__(self):
return '<%s>' % (self.__class__.__name__,)
def __str__(self):
return '<%s>' % (self.__class__.__name__,)
def __eq__(self, other):
# TODO: Check this actually works as expected (what about default values?)
if isinstance(other, self.__class__):
log.debug(self._values, other._values)
return self._values == other._values
return False
def __iter__(self):
return iter(self.fields)
def __delattr__(self, attr):
"""Handle deletion of field values by setting to default if specified."""
# Set to default value
if attr in self.fields:
setattr(self, attr, self.fields[attr].default)
else:
super(BaseModel, self).__delattr__(attr)
def __getitem__(self, key):
"""Redirect dictionary-style field access to attribute-style."""
try:
if key in self.fields:
return getattr(self, key)
except AttributeError:
pass
raise KeyError(key)
def __setitem__(self, key, value):
"""Redirect dictionary-style field setting to attribute-style."""
if key not in self.fields:
raise KeyError(key)
return setattr(self, key, value)
def __contains__(self, name):
try:
val = getattr(self, name)
return val is not None
except AttributeError:
return False
def __hash__(self):
return str(self.serialize()).__hash__()
[docs] @classmethod
def reset_updatables(cls):
"""
Reset all updatable parse_expressions of properties associated with the class.
"""
for key, field in six.iteritems(cls.fields):
if cls.fields[key].updatable:
cls.fields[key].reset()
cls._updated = False
[docs] @classmethod
def update(cls, definitions, strict=True):
"""Update this Element's updatable attributes with new information from definitions
Arguments:
definitions {list} -- list of definitions found in this element
"""
log.debug("Updating model")
for definition in definitions:
for field in cls.fields:
if cls.fields[field].updatable:
matches = [i for i in cls.fields[field].parse_expression.scan(definition['tokens'])]
# print(matches)
if any(matches):
cls._updated = True
if strict:
cls.fields[field].parse_expression = cls.fields[field].parse_expression | W(str(definition['specifier']))
else:
cls.fields[field].parse_expression = cls.fields[field].parse_expression | I(str(definition['specifier']))
return
@property
def updated(self):
"""
True/False dependent on if a specifier within the model was updated.
"""
for field_name, field in six.iteritems(self.fields):
if hasattr(field, 'model_class'):
if hasattr(self[field_name], 'updated') and self[field_name].was_updated:
return True
return self.was_updated
[docs] def keys(self):
return list(iter(self))
[docs] def items(self):
return [(k, getattr(self, k)) for k in self]
[docs] def values(self):
return [getattr(self, k) for k in self]
[docs] def get(self, key, default=None):
return getattr(self, key, default)
@property
def contextual_fulfilled(self):
"""
Whether all the contextual fields have been extracted.
:return: True if all fields have been found, False if not.
:rtype: bool
"""
for field_name, field in six.iteritems(self.fields):
if hasattr(field, 'model_class'):
if self[field_name] == field.default and field.contextual:
return False
if hasattr(self[field_name], 'contextual_fulfilled') and \
not self[field_name].contextual_fulfilled:
log.debug('Is contextual')
return False
elif field.contextual and self[field_name] == field.default:
log.debug('Is contextual')
return False
log.debug('Not contextual')
return True
@property
def required_fulfilled(self):
"""
Whether all the required fields have been extracted.
:return: True if all fields have been found, False if not.
:rtype: bool
"""
return self._required_fulfilled(strict=True)
@property
def noncontextual_required_fulfilled(self):
"""
Whether all the non-contextual required fields have been extracted.
:return: True if all fields have been found, False if not.
:rtype: bool
"""
return self._required_fulfilled(strict=False)
def _required_fulfilled(self, strict):
for field_name, field in six.iteritems(self.fields):
if hasattr(field, 'model_class'):
if self[field_name] == field.default \
and field.required:
if not strict and field.contextual:
pass
else:
return False
if field.required and hasattr(self[field_name], 'required_fulfilled') and \
not self[field_name].required_fulfilled:
if not strict and field.contextual:
pass
else:
log.debug('Required unfulfilled')
return False
elif field.required and self[field_name] == field.default:
# print(self.serialize(), field_name, "did not exist")
if not strict and field.contextual:
pass
else:
return False
return True
[docs] def serialize(self, primitive=False):
"""Convert Model to python dictionary."""
# Serialize fields to a dict
data = {}
for field_name in self:
value = getattr(self, field_name)
field = self.fields.get(field_name)
if value is not None:
value = field.serialize(value, primitive=primitive)
# Skip empty fields unless field.null
if not field.null and value in [None, '', []]:
continue
data[field.name] = value
record = {self.__class__.__name__: data}
return record
[docs] def to_json(self, *args, **kwargs):
"""Convert Model to JSON."""
return json.dumps(self.serialize(primitive=True), *args, **kwargs)
[docs] def is_superset(self, other):
"""
Whether this model instance is a 'superset' of the other model instance.
A model instance is a 'superset' of another if it satisfies the following conditions:
- The model instances are of the same type
- For each of the attributes of the model instances, either:
- This instance has more information, or
- Both instances have the same information
:param other: The other model instance to compare with this model instance
:type other: BaseModel
:return: Whether this model instance is a superset of the other model instance
:rtype: bool
"""
if type(self) != type(other):
return False
for field_name, field in six.iteritems(self.fields):
# Method works recursively so it works with nested models
if hasattr(field, 'model_class'):
if self[field_name] is None:
if other[field_name] is not None:
return False
elif other[field_name] is None:
pass
elif not self[field_name].is_superset(other[field_name]):
return False
else:
if other[field_name] is not None and self[field_name] != other[field_name]:
return False
return True
[docs] def is_subset(self, other):
"""
Whether this model instance is a 'subset' of the other model instance.
A model instance is a 'subset' of another if it satisfies the following conditions:
- The model instances are of the same type
- For each of the attributes of the model instances, either:
- The other instance has more information, or
- Both instances have the same information
:param other: The other model instance to compare with this model instance
:type other: BaseModel
:return: Whether this model instance is a subset of the other model instance
:rtype: bool
"""
return other.is_superset(self)
[docs] def merge_contextual(self, other):
"""
Merges any fields marked contextual with additional information from other provided that:
- other is of the same type and they don't have any conflicting fields
or
- other is a model type that is part of this model and that field is currently set to be the default value or the field can be merged with the other.
.. note::
This method mutates the model it's called on **and** returns it.
:param other: The other model to merge into this model
:type other: BaseModel
:return: A merged model
:rtype: BaseModel
"""
log.debug(self.serialize())
log.debug(other.serialize())
did_merge = False
should_keep_both_records = self._should_keep_both_records(other)
if self.contextual_fulfilled:
return self
if self._binding_compatible(other):
if type(self) != type(other):
for field_name, field in six.iteritems(self.fields):
if hasattr(field, 'model_class') and isinstance(other, field.model_class):
# print('model class case activated')
log.debug('model class case')
if self[field_name] is not None and field.contextual and not self[field_name].contextual_fulfilled:
if self[field_name].merge_contextual(other):
did_merge = True
elif (field.contextual and self[field_name] is None
and other is not None):
log.debug(field_name)
self[field_name] = copy.copy(other)
did_merge = True
elif self._compatible(other):
for field_name, field in six.iteritems(self.fields):
if (field.contextual
and self[field_name] is None
and other.get(field_name, None) is not None):
self[field_name] = other[field_name]
did_merge = True
self._consolidate_binding()
if did_merge:
if should_keep_both_records:
did_merge = False
return did_merge
[docs] def merge_all(self, other):
"""
Merges any properties between other and self, regardless of whether that field is contextual.
Checks to make sure that there are no conflicts between the values contained in self and those in other.
.. note::
This method mutates the model it's called on **and** returns it.
:param other: The other model to merge into this model
:type other: BaseModel
:return: A merged model
:rtype: BaseModel
"""
log.debug(self.serialize())
log.debug(other.serialize())
pre_merge = [self.serialize(), other.serialize()]
did_merge = False
should_keep_both_records = self._should_keep_both_records(other)
if self._binding_compatible(other):
if type(self) != type(other):
for field_name, field in six.iteritems(self.fields):
if hasattr(field, 'model_class') and isinstance(other, field.model_class):
log.debug('model class case')
if self[field_name] is not None:
if self[field_name].merge_all(other):
did_merge = True
elif (self[field_name] is None
and other is not None):
log.debug(field_name)
self[field_name] = copy.copy(other)
did_merge = True
elif self._compatible(other):
for field_name, field in six.iteritems(self.fields):
if (self[field_name] is None
and other.get(field_name, None) is not None):
did_merge = True
self[field_name] = other[field_name]
self._consolidate_binding()
if did_merge:
if should_keep_both_records:
did_merge = False
return did_merge
def _compatible(self, other):
# return self._compatible_legacy(other)
match = False
if type(other) == type(self):
# Check if the other seems to be describing the same thing as self.
match = True
for field_name, field in six.iteritems(self.fields):
if isinstance(field, ModelType):
if (not field.ignore_when_merging
and self[field_name] is not None
and other[field_name] is not None
and not self[field_name]._compatible(other[field_name])):
match = False
break
else:
if (not field.ignore_when_merging
and self[field_name] is not None
and other[field_name] is not None
and self[field_name] != other[field_name]):
match = False
break
# legacy_match = self._compatible_legacy(other)
# if legacy_match != match:
# print(legacy_match, match)
# pprint(self.serialize())
# pprint(other.serialize())
return match
def _compatible_legacy(self, other):
match = False
if type(other) == type(self):
# Check if the other seems to be describing the same thing as self.
match = True
for field_name, field in six.iteritems(self.fields):
if (not field.ignore_when_merging
and self[field_name] is not None
and other[field_name] is not None
and self[field_name] != other[field_name]):
match = False
break
return match
def _should_keep_both_records(self, other):
should_keep_both = False
if type(other) == type(self):
# Check if the other seems to be describing the same thing as self.
for field_name, field in six.iteritems(self.fields):
if isinstance(field, ModelType):
if (field.ignore_when_merging
and self[field_name] is not None
and other[field_name] is not None
and not self[field_name]._compatible(other[field_name])):
should_keep_both = True
break
else:
if (field.ignore_when_merging
and self[field_name] is not None
and other[field_name] is not None
and self[field_name] != other[field_name]):
should_keep_both = True
break
return should_keep_both
[docs] @classmethod
def flatten(cls):
"""
A set of all models that are associated with this model.
For example, if we have a model like the following with multiple submodels:
.. code-block:: python
class A(BaseModel):
pass
class B(BaseModel):
a = ModelType(A)
class C(BaseModel):
b = ModelType(B)
then `C.flatten()` would give the result::
set(C, B, A)
:return: The set of all models associated with this model.
:rtype: set(BaseModel)
"""
model_set = {cls}
for field_name, field in six.iteritems(cls.fields):
if hasattr(field, 'model_class'):
model_set.update(field.model_class.flatten())
log.debug(model_set)
return model_set
@property
def binding_properties(self):
"""
A dictionary of all binding properties in this model, and their values.
.. note::
This function only returns those properties that are immediately binding for this
model, and not for any submodels.
:returns: A dictionary with the names of all binding fields as the keys and their values as the values.
:rtype: {str: Any}
"""
binding_properties = {}
for field_name, field in six.iteritems(self.fields):
if field.binding and self[field_name] is not None:
binding_properties[field_name] = self[field_name]
return binding_properties
def _binding_compatible(self, other, binding_properties=None):
"""
Whether two models are compatible in terms of their binding properties.
For example, if this model had a compound associated with it and the field was binding,
a model that is associated with another compound will not be merged in.
:param BaseModel other: The other model that will be checked for compatibility with the binding properties in this model
:param {str: Any} binding_properties: Any binding properties from a model that contains this model
:returns: Whether the two models are compatible in terms of their binding properties.
:rtype: bool
"""
if binding_properties is None:
binding_properties = self.binding_properties
if not binding_properties:
return True
if type(other) == type(self):
for field_name, field in six.iteritems(binding_properties):
if other[field_name] != binding_properties[field_name]:
return False
elif other is None:
pass
else:
for field_name, field in six.iteritems(other.fields):
if field_name in binding_properties.keys():
if other[field_name] is not None:
if not (binding_properties[field_name].is_superset(other[field_name]) or
binding_properties[field_name].is_subset(other[field_name])):
return False
elif hasattr(field, 'model_class'):
if not self._binding_compatible(other[field_name]):
return False
return True
def _consolidate_binding(self, binding_properties=None):
if binding_properties is None:
binding_properties = self.binding_properties
if binding_properties == {}:
return
for field_name, field in six.iteritems(self.fields):
if field_name in binding_properties.keys():
self[field_name] = binding_properties[field_name]
elif hasattr(field, 'model_class') and self[field_name] is not None:
self[field_name]._consolidate_binding(binding_properties)
@property
def record_method(self):
"""
Description (string) of which method was used to create this record.
"""
return self._record_method
@record_method.setter
def record_method(self, text):
if not isinstance(text, str):
raise TypeError("Record method description is not string.")
self._record_method = text
def _clean(self):
"""
Removes any subrecords where the required properties have not been fulfilled.
"""
for field_name, field in six.iteritems(self.fields):
if hasattr(field, 'model_class') and self[field_name] is not None:
self[field_name]._clean()
if not self[field_name].required_fulfilled:
self[field_name] = field.default
@property
def is_empty(self):
for field_name, field_type in six.iteritems(self.fields):
if not field_type.is_empty(self[field_name]):
return False
return True
[docs]@python_2_unicode_compatible
class ModelList(MutableSequence):
"""Wrapper around a list of Models objects to facilitate operations on all at once."""
[docs] def __init__(self, *models):
self.models = list(models)
def __getitem__(self, index):
return self.models[index]
def __setitem__(self, index, value):
self.models[index] = value
def __delitem__(self, index):
del self.models[index]
def __len__(self):
return len(self.models)
def __repr__(self):
return self.models.__repr__()
def __str__(self):
return self.models.__str__()
def __contains__(self, element):
log.debug(element.serialize())
log.debug(self.serialize())
log.debug(self.models.__contains__(element))
return self.models.__contains__(element)
[docs] def insert(self, index, value):
self.models.insert(index, value)
[docs] def serialize(self):
"""Serialize to a list of python dictionaries."""
return [e.serialize() for e in self.models]
[docs] def to_json(self, *args, **kwargs):
"""Convert ModelList to JSON."""
return json.dumps(self.serialize(), *args, **kwargs)
[docs] def remove_subsets(self, strict=False):
"""
Remove any subsets contained within the ModelList.
:param bool strict: Default True. Whether only strict subsets are removed. When this is False, duplicates are removed too.
"""
# A dictionary with the type of each element as the key, and the element itself as the value
typed_list = {}
for element in self.models:
if type(element) in typed_list.keys():
typed_list[type(element)].append(element)
else:
typed_list[type(element)] = [element]
new_models = []
for _, elements in six.iteritems(typed_list):
i = 0
length = len(elements)
to_remove = []
# Iterate through the list of elements and if any subsets are found, add the
# indices to a list of values to remove
while i < length:
j = 0
while j < length:
if i != j and elements[i].is_subset(elements[j]) and j not in to_remove:
if strict and elements[i] == elements[j]:
# Do not remove the element if it is not a strict subset depending on the value of strict
pass
else:
to_remove.append(i)
j += 1
i += 1
# Append any values that are not in the list of objects to remove
i = 0
while i < length:
if i not in to_remove:
new_models.append(elements[i])
i += 1
self.models = new_models