#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Tagger implementations. Used for part-of-speech tagging and named entity recognition.
"""
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from deprecation import deprecated
import io
import logging
import pickle
import random
import re
import dawg
import pycrfsuite
import six
from ..data import load_model, find_data
from .lexicon import Lexicon
log = logging.getLogger(__name__)
POS_TAG_TYPE = "pos_tag"
NER_TAG_TYPE = "ner_tag"
[docs]class BaseTagger(six.with_metaclass(ABCMeta)):
"""
Abstract tagger class from which all taggers inherit.
Subclasses must implement at least one of the following sets of methods for tagging:
- ``legacy_tag()``
- ``tag()``
- ``batch_tag()``
- ``can_tag()`` and ``tag_for_type()``
- ``can_tag()`` and ``can_batch_tag()`` and ``batch_tag_for_type()``
The above interface is called when required by classes including :class:`~chemdataextractor.doc.text.Sentence` or
:class:`~chemdataextractor.doc.document.Document`, depending on whether only the tag for a sentence is required or
for the whole document.
If the user has implemented more than one of the combinations above, the order of presedence for the
tagging methods is as follows:
- ``batch_tag_for_type()``
- ``tag_for_type()``
- ``batch_tag()``
- ``tag()``
- ``legacy_tag()``
Most users should not have to implement the top two options, and the default impelementations are discussed in the
documentation for :class:`~chemdataextractor.nlp.tag.EnsembleTagger` instead of here.
An implementation of the other tagging methods should have the following signatures and
should be implemented in the following cases:
- **tag(self, list(** :class:`~chemdataextractor.doc.text.RichToken` **) tokens) -> list(** :class:`~chemdataextractor.doc.text.RichToken` **, obj)**
Take a list of all the tokens from an element, and return a list of (token, tag) pairs.
This should be the default implementation for any new tagger. More information on how to create a new tagger can be found at
:ref:`in this guide<creating_taggers>`.
- **batch_tag(self, list(list(** :class:`~chemdataextractor.doc.text.RichToken` **)) sents) -> list(list(** :class:`~chemdataextractor.doc.text.RichToken` **, obj))**
Take a list of lists of all the tokens from all the elements in a document, and return a list of lists of (token, tag) pairs.
One thing to note is that the resulting list of lists of (token, tag) pairs need not be in the same order as the incoming list
of lists of tokens, so some sorting can be done if, for example, bucketing of sentences by their lengths is desired.
In addition to ``tag``, the ``batch_tag`` method should be implemented instead of the ``tag`` method in cases where the taggers rely on
backends that are more performant when tagging multiple sentences, and the tagger will be called for every element.
More information can be found in :ref:`in this guide<creating_taggers>`.
.. note::
If a tagger only has ``batch_tag`` implemented, the tagger will fail when applied to an element that does not belong to a document.
- **legacy_tag(self, list(obj tokens) -> (list(obj), obj)**
``legacy_tag`` corresponds to the ``tag`` method in ChemDataExtractor 2.0 and earlier. This has been renamed
``legacy_tag`` due to its complexity in that it could be called with either a list of strings or a list of (token, PoS tag) pairs.
This made it incompatible with the new taggers in their current form. ChemDataExtractor 2.1 will call this method with a list of strings
instead of a list of (token, PoS tag) pairs. This should only be used for converting previously written taggers with as few
code changes as possible, as shown in the :ref:`migration guide<migration_guide_2_1>`.
To express intent to the ChemDataExtractor framework that the tagger can tag for a certain tag type, you should implement the
``can_tag`` method, which takes a tag type and returns a boolean. The default implementation, provided by this class,
looks at the ``tag_type`` attribute of the tagger and returns True if it matches the tag type provided.
.. warning::
While the :class:`~chemdataextractor.doc.text.RichToken` class maintains backwards compatibility in most cases, e.g. parsers by assigning
the ``1`` key in dictionary-style lookup with the combined PoS and NER tag, calling this key in an NER or PoS tagger will cause
your script to crash. To avoid this, please change any previous bits of code such as ``token[1]`` to ``token["ner_tag"]`` or ``token.ner_tag``.
"""
tag_type = ""
"""
The tag type for this tagger. When this tag type is asked for from the token, as described in :class:`~chemdataextractor.doc.text.RichToken`, this
tagger will be called.
"""
[docs] @deprecated(deprecated_in="2.1", details="Deprecated in conjunction with the deprecation of the legacy_tag function. Please write equivalent functionality to use RichTokens.")
def tag_sents(self, sentences):
"""Apply the ``tag`` method to each sentence in ``sentences``."""
return [self.legacy_tag(s) for s in sentences]
[docs] def evaluate(self, gold):
"""Evaluate the accuracy of this tagger using a gold standard corpus.
:param list(list(tuple(str, str))) gold: The list of tagged sentences to score the tagger on.
:returns: Tagger accuracy value.
:rtype: float
"""
tagged_sents = self.tag_sents([w for (w, t) in sent] for sent in gold)
gold_tokens = sum(gold, [])
test_tokens = sum(tagged_sents, [])
accuracy = float(sum(x == y for x, y in six.moves.zip(gold_tokens, test_tokens))) / len(test_tokens)
return accuracy
[docs] def can_tag(self, tag_type):
"""
Whether this tagger can tag the given tag type.
:param obj tag_type: The tag type which the system wants to tag. Usually a string.
:returns: True if this parser can tag the given tag type
:rtype: bool
"""
return tag_type == self.tag_type
[docs] def can_batch_tag(self, tag_type):
"""
Whether this tagger can batch tag the given tag type.
:param obj tag_type: The tag type which the system wants to batch tag. Usually a string.
:returns: True if this parser can tag the given tag type
:rtype: bool
"""
return False
[docs]class EnsembleTagger(BaseTagger):
"""
A class for taggers which act on the results of multiple other taggers.
This could also be done by simply adding each tagger to the sentence and having
the taggers each act on the results from the other taggers by accessing RichToken attributes,
but an EnsembleTagger allows for the user to add one tagger instead,
cleaning up the interface.
The EnsembleTagger is also useful in collating the results from multiple taggers of the same type,
as can be seen in the case of :class:`~chemdataextractor.nlp.cem.CemTagger` which collects
multiple types of NER labellers (a CRF and multiple dictionary taggers), to create a single
coherent NER label.
"""
tag_type = ""
taggers = []
[docs] def __init__(self, *args, **kwargs):
super(EnsembleTagger, self).__init__(*args, **kwargs)
taggers_dict = {}
for i, tagger in enumerate(self.taggers):
if tagger.tag_type == self.tag_type:
tag_type = "_" + self.tag_type + "_" + str(i)
tagger.tag_type = tag_type
taggers_dict[tag_type] = tagger
else:
taggers_dict[tagger.tag_type] = tagger
self.taggers_dict = taggers_dict
self.taggers_dict[self.tag_type] = self
[docs] def tag_for_type(self, tokens, tag_type):
"""
This method will be called if the EnsembleTagger has previously
claimed that it can tag the given tag type via the :meth:`~chemdataextractor.nlp.tag.EnsembleTagger.can_tag` method. The appropriate
tagger within EnsembleTagger is called and the results returned.
.. note::
This method can handle having legacy taggers mixed in with
newer taggers.
:param list(chemdataextractor.doc.text.RichToken) tokens: The tokens which should be tagged
:param obj tag_type: The tag type for which EnsembleTagger should tag the tokens.
:return: A list of tuples of the given tokens and the corresponding tags.
:rtype: list(tuple(~chemdataextractor.doc.text.RichToken, obj))
"""
tagger = self.taggers_dict[tag_type]
if hasattr(tagger, "tag"):
return tagger.tag(tokens)
else:
return tagger.legacy_tag([token.text for token in tokens])
[docs] def batch_tag_for_type(self, sents, tag_type):
"""
This method will be called if the EnsembleTagger has previously
claimed that it can batch tag the given tag type via the
:meth:`~chemdataextractor.nlp.tag.EnsembleTagger.can_batch_tag` method. The appropriate
tagger within EnsembleTagger is called and the results returned.
:param list(~chemdataextractor.doc.text.RichToken) tokens: The tokens which should be tagged
:param obj tag_type: The tag type for which EnsembleTagger should tag the tokens.
:return: A list of tuples of the given tokens and the corresponding tags.
:rtype: list(tuple(~chemdataextractor.doc.text.RichToken, obj))
"""
tagger = self.taggers_dict[tag_type]
return tagger.batch_tag(sents)
[docs] def can_batch_tag(self, tag_type):
return hasattr(self.taggers_dict[tag_type], "batch_tag")
[docs] def can_tag(self, tag_type):
return tag_type in self.taggers_dict.keys()
[docs]class NoneTagger(BaseTagger):
"""Tag every token with None."""
[docs] def __init__(self, tag_type=None):
if tag_type is not None:
self.tag_type = tag_type
else:
self.tag_type = None
[docs] def tag(self, tokens):
return [(token, None) for token in tokens]
[docs]class RegexTagger(BaseTagger):
"""Regular Expression Tagger."""
# TODO: I think NLTK RegexTagger has recently been improved to be more like this, so maybe we can just remove this?
# We aren't actually using this anywhere because the regex ability in parsers is more flexible...
# But may be useful for users that want an easy way to override some other tagger?
#: Regular expression patterns in (regex, tag) tuples.
patterns = [
(r'^-?[0-9]+(.[0-9]+)?$', 'CD'), # cardinal numbers
(r'(The|the|A|a|An|an)$', 'AT'), # articles
(r'.*able$', 'JJ'), # adjectives
(r'.*ness$', 'NN'), # nouns formed from adjectives
(r'.*ly$', 'RB'), # adverbs
(r'.*s$', 'NNS'), # plural nouns
(r'.*ing$', 'VBG'), # gerunds
(r'.*ed$', 'VBD'), # past tense verbs
(r'.*', 'NN') # nouns (default)
]
#: The lexicon to use
lexicon = Lexicon()
[docs] def __init__(self, patterns=None, lexicon=None):
"""
:param list(tuple(string, string)) patterns: List of (regex, tag) pairs.
"""
self.patterns = patterns if patterns is not None else self.patterns
self.regexes = [(re.compile(pattern, re.I | re.U), tag) for pattern, tag in self.patterns]
self.lexicon = lexicon if lexicon is not None else self.lexicon
log.debug('%s: Initializing with %s patterns' % (self.__class__.__name__, len(self.patterns)))
[docs] def tag(self, tokens):
"""Return a list of (token, tag) tuples for a given list of tokens."""
tags = []
for token in tokens:
normalized = self.lexicon[token].normalized
for regex, tag in self.regexes:
if regex.match(normalized):
tags.append((token, tag))
break
else:
tags.append((token, None))
return tags
[docs]class AveragedPerceptron(object):
"""Averaged Perceptron implementation.
Based on implementation by Matthew Honnibal, released under the MIT license.
See more:
http://spacy.io/blog/part-of-speech-POS-tagger-in-python/
https://github.com/sloria/textblob-aptagger
"""
[docs] def __init__(self):
# Each feature gets its own weight vector, so weights is a dict-of-dicts
self.weights = {}
self.classes = set()
# The accumulated values, for the averaging. Keyed by feature/class tuples
self._totals = defaultdict(int)
# The last time the feature was changed, for the averaging. Keyed by feature/class tuples
self._tstamps = defaultdict(int)
# Number of instances seen
self.i = 0
[docs] def predict(self, features):
"""Dot-product the features and current weights and return the best label."""
scores = defaultdict(float)
for feat in features:
if feat not in self.weights:
continue
weights = self.weights[feat]
for label, weight in weights.items():
scores[label] += weight
# Do a secondary alphabetic sort, for stability
return max(self.classes, key=lambda label: (scores[label], label))
[docs] def update(self, truth, guess, features):
"""Update the feature weights."""
def upd_feat(c, f, w, v):
param = (f, c)
self._totals[param] += (self.i - self._tstamps[param]) * w
self._tstamps[param] = self.i
self.weights[f][c] = w + v
self.i += 1
if truth == guess:
return None
for f in features:
weights = self.weights.setdefault(f, {})
upd_feat(truth, f, weights.get(truth, 0.0), 1.0)
upd_feat(guess, f, weights.get(guess, 0.0), -1.0)
return None
[docs] def average_weights(self):
"""Average weights from all iterations."""
for feat, weights in self.weights.items():
new_feat_weights = {}
for clas, weight in weights.items():
param = (feat, clas)
total = self._totals[param]
total += (self.i - self._tstamps[param]) * weight
averaged = round(total / float(self.i), 3)
if averaged:
new_feat_weights[clas] = averaged
self.weights[feat] = new_feat_weights
return None
[docs] def save(self, path):
"""Save the pickled model weights."""
with io.open(path, 'wb') as fout:
return pickle.dump(dict(self.weights), fout)
[docs] def load(self, path):
"""Load the pickled model weights."""
with io.open(path, 'rb') as fin:
self.weights = pickle.load(fin)
[docs]class ApTagger(six.with_metaclass(ABCMeta, BaseTagger)):
"""Greedy Averaged Perceptron tagger, based on implementation by Matthew Honnibal, released under the MIT license.
See more:
http://spacy.io/blog/part-of-speech-POS-tagger-in-python/
https://github.com/sloria/textblob-aptagger
"""
START = ['-START-', '-START2-']
lexicon = Lexicon()
clusters = False
[docs] def __init__(self, model=None, lexicon=None, clusters=None):
""""""
self.perceptron = AveragedPerceptron()
self.tagdict = {}
self.classes = set()
self.model = model if model is not None else self.model
self.lexicon = lexicon if lexicon is not None else self.lexicon
self.clusters = clusters if clusters is not None else self.clusters
log.debug('%s: Initializing with %s' % (self.__class__.__name__, self.model))
[docs] def legacy_tag(self, tokens):
"""Return a list of (token, tag) tuples for a given list of tokens."""
# Lazy load model first time we tag
if not self.classes:
self.load(self.model)
prev, prev2 = self.START
tags = []
for i, token in enumerate(tokens):
tag = self.tagdict.get(token)
if not tag:
features = self._get_features(i, tokens, prev, prev2)
tag = self.perceptron.predict(features)
tags.append((token, tag))
prev2 = prev
prev = tag
return tags
[docs] def train(self, sentences, nr_iter=5):
"""Train a model from sentences.
:param sentences: A list of sentences, each of which is a list of (token, tag) tuples.
:param nr_iter: Number of training iterations.
"""
self._make_tagdict(sentences)
self.perceptron.classes = self.classes
for iter_ in range(nr_iter):
c = 0
n = 0
for sentence in sentences:
prev, prev2 = self.START
context = [t[0] for t in sentence]
for i, (token, tag) in enumerate(sentence):
guess = self.tagdict.get(token)
if not guess:
feats = self._get_features(i, context, prev, prev2)
guess = self.perceptron.predict(feats)
self.perceptron.update(tag, guess, feats)
prev2 = prev
prev = guess
c += guess == tag
n += 1
random.shuffle(sentences)
log.debug('Iter %s: %s/%s=%s' % (iter_, c, n, (float(c) / n) * 100))
self.perceptron.average_weights()
[docs] def save(self, f):
"""Save pickled model to file."""
return pickle.dump((self.perceptron.weights, self.tagdict, self.classes, self.clusters), f, protocol=pickle.HIGHEST_PROTOCOL)
[docs] def load(self, model):
"""Load pickled model."""
self.perceptron.weights, self.tagdict, self.classes, self.clusters = load_model(model)
self.perceptron.classes = self.classes
@abstractmethod
def _get_features(self, i, context, prev, prev2):
"""Map tokens into a feature representation."""
pass
def _make_tagdict(self, sentences):
"""Make a tag dictionary for single-tag words."""
counts = defaultdict(lambda: defaultdict(int))
for sent in sentences:
for word, tag in sent:
counts[word][tag] += 1
self.classes.add(tag)
freq_thresh = 20
ambiguity_thresh = 0.97
for word, tag_freqs in counts.items():
tag, mode = max(tag_freqs.items(), key=lambda item: item[1])
n = sum(tag_freqs.values())
# Don't add rare words to the tag dictionary, only add quite unambiguous words
if n >= freq_thresh and (float(mode) / n) >= ambiguity_thresh:
self.tagdict[word] = tag
[docs]class CrfTagger(BaseTagger):
"""Tagger that uses Conditional Random Fields (CRF)."""
lexicon = Lexicon()
clusters = False
#: Parameters to pass to training algorithm. See http://www.chokkan.org/software/crfsuite/manual.html
params = {
# These parameters are valid for the default LBFGS training algorithm. Change if using another.
'c1': 1.0, # Coefficient for L1 regularization (OWL-QN). Default 0.
'c2': 0.001, # Coefficient for L2 regularization. Default 1.
'max_iterations': 50, # The maximum number of iterations for L-BFGS optimization. Default INT_MAX.
'feature.possible_transitions': False, # Force to generate all possible transition features. Default False.
'feature.possible_states': False, # Force to generate all possible state features. Default False.
# 'feature.minfreq' : 2, # The minimum frequency of features. Default 0.
# 'epsilon' : # Epsilon for testing the convergence of the objective. Default 0.00001.
}
[docs] def __init__(self, model=None, lexicon=None, clusters=None, params=None):
""""""
self.model = model if model is not None else self.model
self.lexicon = lexicon if lexicon is not None else self.lexicon
self.clusters = clusters if clusters is not None else self.clusters
self.params = params if params is not None else self.params
self._tagger = pycrfsuite.Tagger()
self._loaded_model = False
[docs] def load(self, model):
log.debug('Loading %s' % model)
self._tagger.open(find_data(model))
self._loaded_model = True
[docs] def legacy_tag(self, tokens):
"""Return a list of ((token, tag), label) tuples for a given list of (token, tag) tuples."""
# Lazy load model first time we tag
if not self._loaded_model:
self.load(self.model)
features = [self._get_features(tokens, i) for i in range(len(tokens))]
labels = self._tagger.tag(features)
tagged_sent = list(zip(tokens, labels))
return tagged_sent
[docs] def train(self, sentences, model):
"""Train the CRF tagger using CRFSuite.
:params sentences: Annotated sentences.
:params model: Path to save pickled model.
"""
trainer = pycrfsuite.Trainer(verbose=True)
trainer.set_params(self.params)
for sentence in sentences:
tokens, labels = zip(*sentence)
features = [self._get_features(tokens, i) for i in range(len(tokens))]
trainer.append(features, labels)
trainer.train(model)
self.load(model)
[docs]class DictionaryTagger(BaseTagger):
"""Dictionary Tagger. Tag tokens based on inclusion in a DAWG."""
#: The lexicon to use.
lexicon = Lexicon()
#: DAWG model file path.
model = None
#: Entity tag. Matches will be tagged like 'B-CM' and 'I-CM' according to IOB scheme. TODO: Optional no B/I?
entity = 'CM'
#: Delimiters that define where matches are allowed to start or end.
delimiters = re.compile(r'(^.|\b|\s|\W|.$)')
#: Whether dictionary matches are case sensitive.
case_sensitive = False
[docs] def __init__(self, words=None, model=None, entity=None, case_sensitive=None, lexicon=None):
"""
:param list(list(string)) words: list of words, each of which is a list of tokens.
"""
self._dawg = dawg.CompletionDAWG()
self.model = model if model is not None else self.model
self.entity = entity if entity is not None else self.entity
self.case_sensitive = case_sensitive if case_sensitive is not None else self.case_sensitive
self.lexicon = lexicon if lexicon is not None else self.lexicon
self._loaded_model = False
if words is not None:
self.build(words)
[docs] def load(self, model):
"""Load pickled DAWG from disk."""
self._dawg.load(find_data(model))
self._loaded_model = True
[docs] def save(self, path):
"""Save pickled DAWG to disk."""
self._dawg.save(path)
[docs] def build(self, words):
"""Construct dictionary DAWG from tokenized words."""
words = [self._normalize(tokens) for tokens in words]
self._dawg = dawg.CompletionDAWG(words)
self._loaded_model = True
def _normalize(self, tokens):
"""Normalization transform to apply to both dictionary words and input tokens."""
if self.case_sensitive:
return ' '.join(self.lexicon[t].normalized for t in tokens)
else:
return ' '.join(self.lexicon[t].lower for t in tokens)
[docs] def legacy_tag(self, tokens):
"""Return a list of (token, tag) tuples for a given list of tokens."""
if len(tokens) > 0 and isinstance(tokens[0], tuple):
tokens = [token[0] for token in tokens]
if not self._loaded_model:
self.load(self.model)
tags = [None] * len(tokens)
norm = self._normalize(tokens)
length = len(norm)
# A set of allowed indexes for matches to start or end at
delims = [0] + [i for span in [m.span() for m in self.delimiters.finditer(norm)] for i in span] + [length]
# Token indices
token_at_index = []
for i, t in enumerate(tokens):
token_at_index.extend([i] * (len(self.lexicon[t].normalized) + 1))
start_i = 0
end_i = 1
matches = {}
next_start = end_i
# TODO: This could be a little more efficient by skipping indexes forward to next delim points.
while True:
current = norm[start_i:end_i]
if self._dawg.has_keys_with_prefix(current):
# print('%s:%s:%s' % (start_i, end_i, current))
# If the current span is in the dawg, and isn't followed by an alphanumeric character
if current in self._dawg and start_i in delims and end_i in delims:
# print(current)
# Subsequent longer matches with same start_i will overwrite values in matches dict
matches[start_i] = (start_i, end_i, current)
# We can skip forward to after this match next time we increment start_i
next_start = end_i
# Increment end_i provided we aren't already at the end of the input
if end_i < length:
end_i += 1
continue
# Increment start_i provided we aren't already at the end of the input
start_i = next_start
if start_i >= length - 1:
break
end_i = start_i + 1
next_start = end_i
# Apply matches as tags to the relevant tokens
for start_i, end_i, current in matches.values():
start_token = token_at_index[start_i]
end_token = token_at_index[end_i]
# Possible for match to start in 'I' token from prev match. Merge matches by not overwriting to 'B'.
if not tags[start_token] == 'I-%s' % self.entity:
tags[start_token] = 'B-%s' % self.entity
tags[start_token+1:end_token+1] = ['I-%s' % self.entity] * (end_token - start_token)
tokentags = list(zip(tokens, tags))
return tokentags