#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tagger wrappers that wrap AllenNLP functionality. Used for and named entity recognition.
"""
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
import math
import logging
import re
import copy
import datetime
from .tag import BaseTagger
from ..data import find_data
import torch
import json
from yaspin import yaspin
from allennlp.data.tokenizers.token import Token as AllenNLPToken
from allennlp.models.archival import load_archive
from allennlp.predictors import SentenceTaggerPredictor
from allennlp.data.instance import Instance
from allennlp.data.fields.text_field import TextField
log = logging.getLogger(__name__)
[docs]class ProcessedTextTagger(BaseTagger):
"""
Class to process text before the text is fed into any other taggers.
This class is designed to be used with AllenNlpWrapperTagger and replaces any
single-number tokens with <nUm> in accordance with the training data.
"""
tag_type = "processed_text"
number_pattern = re.compile('([\+\-–−]?\d+(([\.・,\d])+)?)')
number_string = "<nUm>"
[docs] def tag(self, tokens):
tags = []
for token in tokens:
processed_text = token.text
if re.fullmatch(self.number_pattern, processed_text):
processed_text = self.number_string
tags.append((token, processed_text))
return tags
class _AllenNlpTokenTagger(BaseTagger):
"""
Class to get the AllenNLP token corresponding to a CDE token.
Intended for internal use with AllenNlpWrapperTagger.
"""
tag_type = "_allennlptoken"
def tag(self, tokens):
tags = []
for token in tokens:
allennlptoken = AllenNLPToken(text=token.processed_text)
tags.append((token, allennlptoken))
return tags
[docs]class AllenNlpWrapperTagger(BaseTagger):
"""
A wrapper for an AllenNLP model. Tested with a CRF Tagger but should work with any sequence labeller trained
in allennlp.
"""
model = None
tag_type = None
indexers = None
overrides = None
[docs] def __init__(self, indexers=None,
weights_location=None,
gpu_id=None,
archive_location=None,
tag_type=None,
min_batch_size=None,
max_batch_size=None,
max_allowed_length=None):
"""
:param indexers (dict(str, ~allennlp.data.token_indexers.TokenIndexer), optional): A dictionary of all the AllenNLP indexers to be used with the taggers.
Please refer to their documentation for more detail.
:param weights_location (str, optional): Location for weights.
Corresponds to weights_file parameter for the load_archive function from AllenNLP.
:param gpu_id (int, optional): The ID for the GPU to be used. If None is passed in, ChemDataExtractor will
automatically detect if a GPU is available and use that. To explicitly use the CPU, pass in a value of -1.
:param archive_location (str, optional): The location where the model is archived. Corresponds to the archive_file
parameter in the load_archive function from AllenNLP. Alternatively, you can set this parameter to None and set
the class property ``model``, which will then search for the model inside of ChemDataExtractor's default model directory.
:param tag_type (obj, optional): Override the class's tag type. Refer to the documentation for
:class:`~chemdataextractor.nlp.tag.BaseTagger` for more information on how to use tag types.
:param min_batch_size (int, optional): The minimum batch size to use when predicting. Default 100.
:param max_batch_size (int, optional): The maximum batch size to use when predicting. Default 200.
:param max_allowed_length (int, optional): The maximum allowed length of a sentence when predicting.
Default 220. Any sentences longer than this will be split into multiple smaller sentences via a sliding window approach and the
results will be collected. Needs to be a multiple of 4 for correct predictions.
"""
if tag_type is not None:
self.tag_type = tag_type
if indexers is not None:
self.indexers = indexers
if self.indexers is None:
self.indexers = {}
self._gpu_id = gpu_id
if archive_location is None:
archive_location = find_data(self.model)
self._weights_location = weights_location
self._archive_location = archive_location
self._predictor = None
if self.overrides is None:
self.overrides = {}
self.min_batch_size = min_batch_size
if min_batch_size is None:
self.min_batch_size = 100
self.max_batch_size = max_batch_size
if max_batch_size is None:
self.max_batch_size = 200
self.max_allowed_length = max_allowed_length
if max_allowed_length is None:
self.max_allowed_length = 220
[docs] def process(self, tag):
"""
Process the given tag. This can be used for example if the names of tags in training are different
from what ChemDataExtractor expects.
:param tag str: The raw string output from the predictor.
:returns: A processed version of the tag
:rtype: str
"""
return tag
@property
def predictor(self):
"""
The AllenNLP predictor for this tagger.
"""
if self._predictor is None:
with yaspin(text="Initialising AllenNLP model", side="right").simpleDots as sp:
gpu_id = self._gpu_id
if gpu_id is None and torch.cuda.is_available():
print("Automatically activating GPU support")
gpu_id = torch.cuda.current_device()
loaded_archive = load_archive(archive_file=self._archive_location, weights_file=self._weights_location,
overrides=json.dumps(self.overrides))
model = loaded_archive.model
if gpu_id is not None and gpu_id >= 0:
model = model.cuda(gpu_id)
model = model.eval()
self._predictor = copy.deepcopy(SentenceTaggerPredictor(model=model, dataset_reader=None))
sp.ok("✔")
return self._predictor
[docs] def tag(self, tokens):
tags = list(self.batch_tag([tokens])[0])
return tags
[docs] def batch_tag(self, sents):
"""
:param chemdataextractor.doc.text.RichToken sents:
:returns: list(list(~chemdataextractor.doc.text.RichToken, obj))
Take a list of lists of all the tokens from all the elements in a document, and return a list of lists of (token, tag) pairs.
One thing to note is that the resulting list of lists of (token, tag) pairs need not be in the same order as the incoming list
of lists of tokens, as sorting is done so that we can bucket sentences by their lengths.
More information can be found in the :class:`~chemdataextractor.nlp.tag.BaseTagger` documentation, and :ref:`in this guide<creating_taggers>`.
"""
log.debug(len(sents))
start_time = datetime.datetime.now()
# Divide up the sentence so that we don't get sentences longer than BERT can handle
all_allennlptokens, sentence_subsentence_map = self._get_subsentences(sents)
# Create batches
all_allennlptokens = sorted(all_allennlptokens, key=len)
instances = self._create_batches(all_allennlptokens)
instance_time = datetime.datetime.now()
log.debug("".join(["Created instances:", str(instance_time - start_time)]))
log.debug("Num Batches: ", len(instances))
predictions = []
for instance in instances:
prediction_start_time = datetime.datetime.now()
log.debug("".join(["Batch size:", str(len(instance))]))
with torch.no_grad():
batch_predictions = self.predictor.predict_batch_instance(instance)
predictions.extend(batch_predictions)
prediction_end_time = datetime.datetime.now()
log.debug("".join(["Batch time:", str(prediction_end_time - prediction_start_time)]))
id_predictions_map = {}
for allensentence, prediction in zip(all_allennlptokens, predictions):
id_predictions_map[id(allensentence)] = prediction["tags"]
# Assign tags to each sentence
tags = self._assign_tags(sents, sentence_subsentence_map, id_predictions_map)
end_time = datetime.datetime.now()
log.debug("".join(["Total time for batch_tag:", str(end_time - start_time)]))
return tags
def _get_subsentences(self, sents):
"""
ChemDataExtractor may encounter sentences that are longer than what some of the
taggers in AllenNLP may support. (e.g. a BERT based tagger only supports sequences
up to 512 tokens long). This method gets around this limitation by splitting such
long sentences into multiple overlapping subsentences using a sliding window,
and returning a map between these subsentences and their parent sentence.
"""
sentence_subsentence_map = {}
all_allennlptokens = []
max_allowed_length = self.max_allowed_length
for sent in sents:
subsentences = [sent]
if len(sent) > max_allowed_length:
num_sent_divisions = len(sent) / max_allowed_length
num_tokens_per_subsentence = math.ceil(math.ceil(len(sent) / num_sent_divisions) / 4) * 4
increment = math.ceil(num_tokens_per_subsentence / 2)
subsentences = [sent[: num_tokens_per_subsentence]]
i = increment
while i + increment < len(sent):
subsentences.append(sent[i: i + num_tokens_per_subsentence])
i += increment
allennlpsents_for_sent = []
for subsent in subsentences:
allennlptokens = []
for token in subsent:
allennlptokens.append(token._allennlptoken)
allennlpsents_for_sent.append(id(allennlptokens))
all_allennlptokens.append(allennlptokens)
sentence_subsentence_map[id(sent)] = allennlpsents_for_sent
return all_allennlptokens, sentence_subsentence_map
def _create_batches(self, all_allennlptokens):
"""
Create batches to feed into the predictor within the given batch size range.
To try to be more efficient, these batches are sorted by the length of the sentences.
"""
min_batch_size = self.min_batch_size
max_batch_size = self.max_batch_size
new_list_sequence_delta = 5
instances = []
if len(all_allennlptokens) > min_batch_size:
current_list_min_sequence_length = len(all_allennlptokens[0])
divided_sents = []
sents_current = []
for sent in all_allennlptokens:
if (len(sent) > current_list_min_sequence_length + new_list_sequence_delta and len(sents_current) > min_batch_size) or len(sents_current) > max_batch_size:
divided_sents.append(sents_current)
sents_current = [sent]
current_list_min_sequence_length = len(sent)
else:
sents_current.append(sent)
divided_sents.append(sents_current)
for div_sents in divided_sents:
division_instances = []
for sent in div_sents:
division_instances.append(Instance({"tokens": TextField(tokens=sent, token_indexers=self.indexers)}))
instances.append(division_instances)
else:
for allennlptokens in all_allennlptokens:
instances.append(Instance({"tokens": TextField(tokens=allennlptokens, token_indexers=self.indexers)}))
instances = [instances]
return instances
def _assign_tags(self, sents, sentence_subsentence_map, id_predictions_map):
"""
Assign the tags to the correct sentences based on the map between the sentences
and subsentences as created in the get_subsentences method.
See the paper on new NER (citation to be added) for more detail on how the tags
are allocated from each subsentence.
"""
tags = []
for sent in sents:
sent_tags = []
allen_ids = sentence_subsentence_map[id(sent)]
for allen_id in allen_ids:
sent_tags.append(id_predictions_map[allen_id])
if len(sent_tags) == 1:
consolidated_tags = sent_tags[0]
else:
consolidated_tags = []
_ranges_used = []
num_tokens_per_subsentence = len(sent_tags[0])
quarter_loc = int(num_tokens_per_subsentence / 4)
for index, subsent_tags in enumerate(sent_tags):
if index == 0:
consolidated_tags.extend(subsent_tags[: -quarter_loc])
_ranges_used.append(len(subsent_tags[: -quarter_loc]))
elif index == len(sent_tags) - 1:
consolidated_tags.extend(subsent_tags[quarter_loc:])
_ranges_used.append(len(subsent_tags[quarter_loc:]))
else:
consolidated_tags.extend(subsent_tags[quarter_loc: -quarter_loc])
_ranges_used.append(len(subsent_tags[quarter_loc: 3 * quarter_loc]))
if len(sent) != len(consolidated_tags):
raise TypeError(f"The length of the sentence {len(sent)} and the length of the consolidated tags {len(consolidated_tags)} are different for the tagger for {self.tag_type}.")
tags.append(zip(sent, [self.process(tag) for tag in consolidated_tags]))
return tags