# -*- coding: utf-8 -*-
"""
Document model.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from abc import ABCMeta, abstractproperty
from pprint import pprint
import collections
import io
import json
import logging
import six
from ..utils import python_2_unicode_compatible
from .text import Paragraph, Citation, Footnote, Heading, Title, Caption
from .element import CaptionedElement
from .table import Table
from .figure import Figure
from .meta import MetaData
from ..errors import ReaderError
from ..model.base import ModelList
from ..model.model import Compound
from ..text import get_encoding
from ..config import Config
from ..parse.cem import chemical_name
log = logging.getLogger(__name__)
[docs]@python_2_unicode_compatible
class BaseDocument(six.with_metaclass(ABCMeta, collections.Sequence)):
"""Abstract base class for a Document."""
def __repr__(self):
return '<%s: %s elements>' % (self.__class__.__name__, len(self))
def __str__(self):
return '<%s: %s elements>' % (self.__class__.__name__, len(self))
def __getitem__(self, index):
return self.elements[index]
def __len__(self):
return len(self.elements)
@abstractproperty
def elements(self):
"""Return a list of document elements."""
return []
@abstractproperty
def records(self):
"""Chemical records that have been parsed from this Document."""
return []
[docs]class Document(BaseDocument):
"""A document to extract data from. Contains a list of document elements."""
# TODO: Add a usage example here in the documentation.
[docs] def __init__(self, *elements, **kwargs):
"""Initialize a Document manually by passing one or more Document elements (Paragraph, Heading, Table, etc.)
Strings that are passed to this constructor are automatically wrapped into Paragraph elements.
:param list[chemdataextractor.doc.element.BaseElement|string] elements: Elements in this Document.
:keyword Config config: (Optional) Config file for the Document.
:keyword list[BaseModel] models: (Optional) Models that the Document should extract data for.
"""
self._elements = []
for element in elements:
# Convert raw text to Paragraph elements
if isinstance(element, six.text_type):
element = Paragraph(element)
elif isinstance(element, six.binary_type):
# Try guess encoding if byte string
encoding = get_encoding(element)
log.warning('Guessed bytestring encoding as %s. Use unicode strings to avoid this warning.', encoding)
element = Paragraph(element.decode(encoding))
# print(type(element))
element.document = self
self._elements.append(element)
if 'config' in kwargs.keys():
self.config = kwargs['config']
else:
self.config = Config()
if 'models' in kwargs.keys():
self.models = kwargs['models']
else:
self._models = []
# Sets parameters from configuration file
for element in elements:
if callable(getattr(element, 'set_config', None)):
element.set_config()
log.debug('%s: Initializing with %s elements' % (self.__class__.__name__, len(self.elements)))
[docs] def add_models(self, models):
"""
Add models to all elements.
Usage::
d = Document.from_file(f)
d.set_models([myModelClass1, myModelClass2,..])
Arguments::
models -- List of model classes
"""
log.debug("Setting models")
self._models.extend(models)
for element in self.elements:
if callable(getattr(element, 'add_models', None)):
element.add_models(models)
# print(element.models)
return
@property
def models(self):
return self._models
@models.setter
def models(self, value):
self._models = value
for element in self.elements:
element.models = value
[docs] @classmethod
def from_file(cls, f, fname=None, readers=None):
"""Create a Document from a file.
Usage::
with open('paper.html', 'rb') as f:
doc = Document.from_file(f)
.. note::
Always open files in binary mode by using the 'rb' parameter.
:param f: A file-like object or path to a file.
:type f: file or str
:param str fname: (Optional) The filename. Used to help determine file format.
:param list[chemdataextractor.reader.base.BaseReader] readers: (Optional) List of readers to use. If not set, Document will try all default readers,
which are :class:`~chemdataextractor.reader.acs.AcsHtmlReader`, :class:`~chemdataextractor.reader.rsc.RscHtmlReader`,
:class:`~chemdataextractor.reader.nlm.NlmXmlReader`, :class:`~chemdataextractor.reader.uspto.UsptoXmlReader`,
:class:`~chemdataextractor.reader.cssp.CsspHtmlReader`, :class:`~chemdataextractor.elsevier.ElsevierXmlReader`,
:class:`~chemdataextractor.reader.markup.XmlReader`, :class:`~chemdataextractor.reader.markup.HtmlReader`,
:class:`~chemdataextractor.reader.pdf.PdfReader`, and :class:`~chemdataextractor.reader.plaintext.PlainTextReader`.
"""
if isinstance(f, six.string_types):
f = io.open(f, 'rb')
if not fname and hasattr(f, 'name'):
fname = f.name
return cls.from_string(f.read(), fname=fname, readers=readers)
[docs] @classmethod
def from_string(cls, fstring, fname=None, readers=None):
"""Create a Document from a byte string containing the contents of a file.
Usage::
contents = open('paper.html', 'rb').read()
doc = Document.from_string(contents)
.. note::
This method expects a byte string, not a unicode string (in contrast to most methods in ChemDataExtractor).
:param bytes fstring: A byte string containing the contents of a file.
:param str fname: (Optional) The filename. Used to help determine file format.
:param list[chemdataextractor.reader.base.BaseReader] readers: (Optional) List of readers to use. If not set, Document will try all default readers,
which are :class:`~chemdataextractor.reader.acs.AcsHtmlReader`, :class:`~chemdataextractor.reader.rsc.RscHtmlReader`,
:class:`~chemdataextractor.reader.nlm.NlmXmlReader`, :class:`~chemdataextractor.reader.uspto.UsptoXmlReader`,
:class:`~chemdataextractor.reader.cssp.CsspHtmlReader`, :class:`~chemdataextractor.elsevier.ElsevierXmlReader`,
:class:`~chemdataextractor.reader.markup.XmlReader`, :class:`~chemdataextractor.reader.markup.HtmlReader`,
:class:`~chemdataextractor.reader.pdf.PdfReader`, and :class:`~chemdataextractor.reader.plaintext.PlainTextReader`.
"""
if readers is None:
from ..reader import DEFAULT_READERS
readers = DEFAULT_READERS
if isinstance(fstring, six.text_type):
raise ReaderError('from_string expects a byte string, not a unicode string')
for reader in readers:
# Skip reader if we don't think it can read file
if not reader.detect(fstring, fname=fname):
continue
try:
d = reader.readstring(fstring)
log.debug('Parsed document with %s' % reader.__class__.__name__)
return d
except ReaderError:
pass
raise ReaderError('Unable to read document')
@property
def elements(self):
"""
A list of all the elements in this document. All elements subclass from :class:`~chemdataextractor.doc.element.BaseElement`,
and represent things such as paragraphs or tables, and can be found in :mod:`chemdataextractor.doc.figure`,
:mod:`chemdataextractor.doc.table`, and :mod:`chemdataextractor.doc.text`.
"""
return self._elements
# TODO: memoized_property?
@property
def records(self):
"""
All records found in this Document, as a list of :class:`~chemdataextractor.model.base.BaseModel`.
"""
log.debug("Getting chemical records")
records = ModelList() # Final list of records -- output
head_def_record = None # Most recent record from a heading, title or short paragraph
head_def_record_i = None # Element index of head_def_record
last_product_record = None
title_record = None # Records found in the title
# Main loop, over all elements in the document
for i, el in enumerate(self.elements):
log.debug("Element %d, type %s" %(i, str(type(el))))
last_id_record = None
# FORWARD INTERDEPENDENCY RESOLUTION -- Updated model parsers to reflect defined entities
# 1. Find any defined entities in the element e.g. "Curie Temperature, Tc"
# 2. Update the relevant models
element_definitions = el.definitions
chemical_defs = el.chemical_definitions
for model in el._streamlined_models:
if hasattr(model, 'is_id_only'):
model.update(chemical_defs)
else:
model.update(element_definitions)
el_records = el.records
# Save the title compound
if isinstance(el, Title):
if len(el_records) == 1 and isinstance(el_records[0], Compound) and el_records[0].is_id_only:
title_record = el_records[0] # TODO: why the first only?
# Reset head_def_record unless consecutive heading with no records
if isinstance(el, Heading) and head_def_record is not None:
if not (i == head_def_record_i + 1 and len(el.records) == 0):
head_def_record = None
head_def_record_i = None
# Paragraph with single sentence with single ID record considered a head_def_record
if isinstance(el, Paragraph) and len(el.sentences) == 1:
if len(el_records) == 1 and isinstance(el_records[0], Compound) and el_records[0].is_id_only:
head_def_record = el_records[0]
head_def_record_i = i
# Paragraph with multiple sentences
# We assume that if the first sentence of a paragraph contains only 1 ID Record, we can treat it as a header definition record, unless directly proceeding a header def record
elif isinstance(el, Paragraph) and len(el.sentences) > 0:
if not (isinstance(self.elements[i - 1], Heading) and head_def_record_i == i - 1):
first_sent_records = el.sentences[0].records
if len(first_sent_records) == 1 and isinstance(first_sent_records[0], Compound) and first_sent_records[0].is_id_only:
sent_record = first_sent_records[0]
if sent_record.names:
longest_name = sorted(sent_record.names, key=len)[0]
if sent_record.labels or (sent_record.names and len(longest_name) > len(el.sentences[0].text) / 2): # TODO: Why do the length check? Maybe to make sure that the sentence mostly refers to a compound?
head_def_record = sent_record
head_def_record_i = i
#: BACKWARD INTERDEPENDENCY RESOLUTION BEGINS HERE
for record in el_records:
if isinstance(record, MetaData):
continue
if isinstance(record, Compound):
# Keep track of the most recent compound record with labels
if isinstance(el, Paragraph) and record.labels:
last_id_record = record
# # Keep track of the most recent compound 'product' record
if 'product' in record.roles:
last_product_record = record
# Heading records with compound ID's
if isinstance(el, Heading) and (record.labels or record.names):
head_def_record = record
head_def_record_i = i
# If 2 consecutive headings with compound ID, merge in from previous
if i > 0 and isinstance(self.elements[i - 1], Heading):
prev = self.elements[i - 1]
if (len(el.records) == 1 and record.is_id_only and len(prev.records) == 1 and
isinstance(prev.records[0], Compound) and prev.records[0].is_id_only and not (record.labels and prev.records[0].labels) and
not (record.names and prev.records[0].names)):
record.names.update(prev.records[0].names)
record.labels.update(prev.records[0].labels)
record.roles.update(prev.records[0].roles)
# Unidentified records -- those without compound names or labels
if record.is_unidentified:
if hasattr(record, 'compound'):
# We have property values but no names or labels... try merge those from previous records
if isinstance(el, Paragraph) and (head_def_record or last_product_record or last_id_record or title_record):
# head_def_record from heading takes priority if the heading directly precedes the paragraph ( NOPE: or the last_id_record has no name)
if head_def_record_i and head_def_record_i + 1 == i: # or (last_id_record and not last_id_record.names)):
if head_def_record:
record.compound = head_def_record
elif last_id_record:
record.compound = last_id_record
elif last_product_record:
record.compound = last_product_record
elif title_record:
record.compound = title_record
else:
if last_id_record:
record.compound = last_id_record
elif head_def_record:
record.compound = head_def_record
elif last_product_record:
record.compound = last_product_record
elif title_record:
record.compound = title_record
else:
# Consider continue here to filter records missing name/label...
pass
if record not in records:
log.debug(record.serialize())
records.append(record)
# for record in records:
# for contextual_record in contextual_records:
# # record.merge_contextual(contextual_record)
# contextual_record.merge_contextual(record)
# if not contextual_record.is_contextual:
# print("No longer contextual:", contextual_record)
# records.append(contextual_record)
# contextual_records.remove(contextual_record)
# log.debug(records.serialize())
# Merge abbreviation definitions
for record in records:
compound = None
if hasattr(record, 'compound'):
compound = record.compound
elif isinstance(record, Compound):
compound = record
if compound is not None:
for short, long_, entity in self.abbreviation_definitions:
if entity == 'CM':
name = ' '.join(long_)
abbrev = ' '.join(short)
if name in compound.names and abbrev not in compound.names:
compound.names.add(abbrev)
if abbrev in compound.names and name not in compound.names:
compound.names.add(name)
# Merge Compound records with any shared name/label
len_l = len(records)
log.debug(records)
i = 0
while i < (len_l - 1):
j = i + 1
while j < len_l:
r = records[i]
other_r = records[j]
r_compound = None
if isinstance(r, Compound):
r_compound = r
elif hasattr(r, 'compound') and isinstance(r.compound, Compound):
r_compound = r.compound
other_r_compound = None
if isinstance(other_r, Compound):
other_r_compound = other_r
elif hasattr(other_r, 'compound') and isinstance(other_r.compound, Compound):
other_r_compound = other_r.compound
if r_compound and other_r_compound:
# Strip whitespace and lowercase to compare names
rnames_std = {''.join(n.split()).lower() for n in r_compound.names}
onames_std = {''.join(n.split()).lower() for n in other_r_compound.names}
# Clashing labels, don't merge
if len(r_compound.labels - other_r_compound.labels) > 0 and len(other_r_compound.labels - r_compound.labels) > 0:
j += 1
continue
if any(n in rnames_std for n in onames_std) or any(l in r_compound.labels for l in other_r_compound.labels):
r_compound.merge(other_r_compound)
other_r_compound.merge(r_compound)
if isinstance(r, Compound) and isinstance(other_r, Compound):
records.pop(j)
records.pop(i)
records.append(r_compound)
len_l -= 1
i -= 1
break
j += 1
i += 1
i = 0
length = len(records)
while i < length:
j = 0
while j < length:
if i != j:
records[j].merge_contextual(records[i])
j += 1
i += 1
# print("\n\n\nAFTER:")
# pprint(records.serialize())
# clean up records
cleaned_records = ModelList()
for record in records:
if (self.models and type(record) in self.models) or not self.models:
record._clean()
# print("\nCLEANEDRECORD:", record.required_fulfilled, record not in cleaned_records)
# pprint(record.serialize())
if record.required_fulfilled and record not in cleaned_records:
cleaned_records.append(record)
cleaned_records.remove_subsets()
# Reset updatables
for el in self.elements:
for model in el._streamlined_models:
model.reset_updatables()
# Append contextual records if they've filled required fields
# for record in contextual_records:
# if record.required_fulfilled:
# records.append(record)
return cleaned_records
[docs] def get_element_with_id(self, id):
"""
Get element with the specified ID. If one is not found, None is returned.
:param id: Identifier to search for.
:returns: Element with specified ID
:rtype: BaseElement or None
"""
"""Return the element with the specified ID."""
# Should we maintain a hashmap of ids to make this more efficient? Probably overkill.
# TODO: Elements can contain nested elements (captions, footnotes, table cells, etc.)
return next((el for el in self.elements if el.id == id), None)
@property
def figures(self):
"""
A list of all :class:`~chemdataextractor.doc.figure.Figure` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Figure)]
@property
def tables(self):
"""
A list of all :class:`~chemdataextractor.doc.table.Table` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Table)]
@property
def citations(self):
"""
A list of all :class:`~chemdataextractor.doc.text.Citation` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Citation)]
@property
def footnotes(self):
"""
A list of all :class:`~chemdataextractor.doc.text.Footnote` elements in this Document.
.. note::
Elements (e.g. Tables) can contain nested Footnotes which are not taken into account.
"""
# TODO: Elements (e.g. Tables) can contain nested Footnotes
return [el for el in self.elements if isinstance(el, Footnote)]
@property
def titles(self):
"""
A list of all :class:`~chemdataextractor.doc.text.Title` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Title)]
@property
def headings(self):
"""
A list of all :class:`~chemdataextractor.doc.text.Heading` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Heading)]
@property
def paragraphs(self):
"""
A list of all :class:`~chemdataextractor.doc.text.Paragraph` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Paragraph)]
@property
def captions(self):
"""
A list of all :class:`~chemdataextractor.doc.text.Caption` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, Caption)]
@property
def captioned_elements(self):
"""
A list of all :class:`~chemdataextractor.doc.element.CaptionedElement` elements in this Document.
"""
return [el for el in self.elements if isinstance(el, CaptionedElement)]
@property
def metadata(self):
"""Return metadata information
"""
return [el for el in self.elements if isinstance(el, MetaData)][0]
@property
def abbreviation_definitions(self):
"""
A list of all abbreviation definitions in this Document. Each abbreviation is in the form
(:class:`str` abbreviation, :class:`str` long form of abbreviation, :class:`str` ner_tag)
"""
return [ab for el in self.elements for ab in el.abbreviation_definitions]
@property
def ner_tags(self):
"""
A list of all Named Entity Recognition tags in this Document.
If a word was found not to be a named entity, the named entity tag is None,
and if it was found to be a named entity, it can have either a tag of 'B-CM' for a beginning of a
mention of a chemical or 'I-CM' for the continuation of a mention.
"""
return [n for el in self.elements for n in el.ner_tags]
@property
def cems(self):
"""
A list of all Chemical Entity Mentions in this document as :class:`~chemdataextractor.doc.text.Span`
"""
return list(set([n for el in self.elements for n in el.cems]))
@property
def definitions(self):
"""
Return a list of all recognised definitions within this Document
"""
# TODO: What's the type of this?
return list([defn for el in self.elements for defn in el.definitions])
[docs] def serialize(self):
"""
Convert Document to Python dictionary. The dictionary will always contain the key 'type', which will be 'document',
and the key 'elements', which contains a dictionary representation of each of the elements of the document.
"""
# Serialize fields to a dict
elements = []
for element in self.elements:
elements.append(element.serialize())
data = {'type': 'document', 'elements': elements}
return data
[docs] def to_json(self, *args, **kwargs):
"""Convert Document to JSON string. The content of the JSON will be equivalent
to that of :meth:`serialize`.
The document itself will be under the key 'elements',
and there will also be the key 'type', which will always be 'document'.
Any arguments for :func:`json.dumps` can be passed into this function."""
return json.dumps(self.serialize(), *args, **kwargs)
def _repr_html_(self):
html_lines = ['<div class="cde-document">']
for element in self.elements:
html_lines.append(element._repr_html_())
html_lines.append('</div>')
return '\n'.join(html_lines)