Source code for chemdataextractor.cli.evaluate

# -*- coding: utf-8 -*-
"""
Commands for running evaluations.

"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import copy
import json
import logging
import os

import click

from ..reader import RscHtmlReader, AcsHtmlReader, NlmXmlReader


log = logging.getLogger(__name__)


@click.group()
@click.pass_context
def evaluate(ctx):
    """Evaluation commands."""
    pass


@evaluate.command()
@click.argument('input', type=click.File('r'))
def run(input):
    """"""
    pub = os.path.basename(input.name).split('.', 1)[0]
    if pub == 'rsc':
        reader = RscHtmlReader()
    elif pub == 'acs':
        reader = AcsHtmlReader()
    elif pub == 'springer':
        reader = NlmXmlReader()
    else:
        raise click.ClickException('Invalid publisher')
    doc = reader.read(input)
    # Serialize all records apart from those that are just chemical names or just labels
    records = [record.serialize(primitive=True) for record in doc.records]
    records = [record for record in records if not record.keys() == ['names'] and not record.keys() == ['labels']]
    with open('%s-out.json' % os.path.splitext(input.name)[0], 'w') as outf:
        json.dump(records, outf, indent=2)


[docs]def eval_document(gold, out, transform=None): if transform: gold = transform(gold) out = transform(out) tp, fp, fn = 0, 0, 0 tmp_out = copy.deepcopy(out) tmp_gold = copy.deepcopy(gold) for gc in gold: if gc not in tmp_out: fn += 1 else: tmp_out.remove(gc) for oc in out: if oc not in tmp_gold: fp += 1 else: tp += 1 tmp_gold.remove(oc) return tp, fp, fn
[docs]def get_names(cs): """Return list of every name.""" records = [] for c in cs: records.extend(c.get('names', [])) return records
[docs]def get_labels(cs): """Return list of every label.""" records = [] for c in cs: records.extend(c.get('labels', [])) return records
[docs]def get_ids(cs): """Return chemical identifier records.""" records = [] for c in cs: records.append({k: c[k] for k in c if k in {'names', 'labels'}}) return records
[docs]def get_spectra_type(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): records.append('nmr') for uvvis in c.get('uvvis_spectra', []): records.append('uvvis') for ir in c.get('ir_spectra', []): records.append('ir') return records
[docs]def get_spectra_subject(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) for uvvis in c.get('uvvis_spectra', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) for ir in c.get('ir_spectra', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) return records
[docs]def get_spectra_peaks(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): if 'peaks' in nmr: records.append(nmr['peaks']) for uvvis in c.get('uvvis_spectra', []): if 'peaks' in uvvis: records.append(uvvis['peaks']) for ir in c.get('ir_spectra', []): if 'peaks' in ir: records.append(ir['peaks']) return records
[docs]def get_spectra_solvent(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): if 'solvent' in nmr: records.append(nmr['solvent']) for uvvis in c.get('uvvis_spectra', []): if 'solvent' in uvvis: records.append(uvvis['solvent']) for ir in c.get('ir_spectra', []): if 'solvent' in ir: records.append(ir['solvent']) return records
[docs]def get_spectra_core(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): nmr = {k: nmr[k] for k in nmr if k in {'peaks', 'solvent'}} records.append(nmr) for uvvis in c.get('uvvis_spectra', []): uvvis = {k: uvvis[k] for k in uvvis if k in {'peaks', 'solvent'}} records.append(uvvis) for ir in c.get('ir_spectra', []): ir = {k: ir[k] for k in ir if k in {'peaks', 'solvent'}} records.append(ir) return records
[docs]def get_spectra_temp(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): if 'temperature' in nmr: records.append(nmr['temperature']) for uvvis in c.get('uvvis_spectra', []): if 'temperature' in uvvis: records.append(uvvis['temperature']) for ir in c.get('ir_spectra', []): if 'temperature' in ir: records.append(ir['temperature']) return records
[docs]def get_spectra_apparatus(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): if 'apparatus' in nmr: records.append(nmr['apparatus']) for uvvis in c.get('uvvis_spectra', []): if 'apparatus' in uvvis: records.append(uvvis['apparatus']) for ir in c.get('ir_spectra', []): if 'apparatus' in ir: records.append(ir['apparatus']) return records
[docs]def get_spectra_full(cs): records = [] for c in cs: for nmr in c.get('nmr_spectra', []): nmr['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(nmr) for uvvis in c.get('uvvis_spectra', []): uvvis['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(uvvis) for ir in c.get('ir_spectra', []): ir['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(ir) return records
[docs]def get_property_value(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): if 'value' in qy: records.append(qy['value']) for mp in c.get('melting_points', []): if 'value' in mp: records.append(mp['value']) for fl in c.get('fluorescence_lifetimes', []): if 'value' in fl: records.append(fl['value']) for op in c.get('electrochemical_potentials', []): if 'value' in op: records.append(op['value']) return records
[docs]def get_property_units(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): if 'units' in qy: records.append(qy['units']) for mp in c.get('melting_points', []): if 'units' in mp: records.append(mp['units']) for fl in c.get('fluorescence_lifetimes', []): if 'units' in fl: records.append(fl['units']) for op in c.get('electrochemical_potentials', []): if 'units' in op: records.append(op['units']) return records
[docs]def get_property_subject(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) for mp in c.get('melting_points', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) for fl in c.get('fluorescence_lifetimes', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) for op in c.get('electrochemical_potentials', []): records.append({k: c[k] for k in c if k in {'names', 'labels'}}) return records
[docs]def get_property_solvent(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): if 'solvent' in qy: records.append(qy['solvent']) for mp in c.get('melting_points', []): if 'solvent' in mp: records.append(mp['solvent']) for fl in c.get('fluorescence_lifetimes', []): if 'solvent' in fl: records.append(fl['solvent']) for op in c.get('electrochemical_potentials', []): if 'solvent' in op: records.append(op['solvent']) return records
[docs]def get_property_temperature(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): if 'temperature' in qy: records.append(qy['temperature']) for fl in c.get('fluorescence_lifetimes', []): if 'temperature' in fl: records.append(fl['temperature']) for op in c.get('electrochemical_potentials', []): if 'temperature' in op: records.append(op['temperature']) return records
[docs]def get_property_apparatus(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): if 'apparatus' in qy: records.append(qy['apparatus']) for mp in c.get('melting_points', []): if 'solvent' in mp: records.append(mp['apparatus']) for fl in c.get('fluorescence_lifetimes', []): if 'apparatus' in fl: records.append(fl['apparatus']) for op in c.get('electrochemical_potentials', []): if 'apparatus' in op: records.append(op['apparatus']) return records
[docs]def get_property_core(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): qy = {k: qy[k] for k in qy if k in {'value', 'units', 'solvent'}} records.append(qy) for qy in c.get('melting_points', []): qy = {k: qy[k] for k in qy if k in {'value', 'units', 'solvent'}} records.append(qy) for qy in c.get('fluorescence_lifetimes', []): qy = {k: qy[k] for k in qy if k in {'value', 'units', 'solvent'}} records.append(qy) for qy in c.get('electrochemical_potentials', []): qy = {k: qy[k] for k in qy if k in {'value', 'units', 'solvent'}} records.append(qy) return records
[docs]def get_property_full(cs): records = [] for c in cs: for qy in c.get('quantum_yields', []): qy['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(qy) for qy in c.get('melting_points', []): qy['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(qy) for qy in c.get('fluorescence_lifetimes', []): qy['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(qy) for qy in c.get('electrochemical_potentials', []): qy['subject'] = {k: c[k] for k in c if k in {'names', 'labels'}} records.append(qy) return records
EVALS = [ ('full', None), ('names', get_names), ('labels', get_labels), ('ids', get_ids), ('spectra type', get_spectra_type), ('spectra subject', get_spectra_subject), ('spectra peaks', get_spectra_peaks), ('spectra solvent', get_spectra_solvent), ('spectra temperature', get_spectra_temp), ('spectra apparatus', get_spectra_apparatus), ('spectra full', get_spectra_full), ('property value', get_property_value), ('property units', get_property_units), ('property subject', get_property_subject), ('property solvent', get_property_solvent), ('property temperature', get_property_temperature), ('property apparatus', get_property_apparatus), ('property full', get_property_full), ] @evaluate.command() def compare(): """""" edir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))), 'data/cde-evaluation') for eval_name, transform in EVALS: print('Evaluation: %s' % eval_name) doc_count = 0 tp, fp, fn = 0, 0, 0 for filename in os.listdir(edir): filename = os.path.join(edir, filename) # print(filename) if filename.endswith('-out.json'): with open(filename) as outf: out = json.load(outf) if not os.path.isfile('%s-gold.json' % filename[:-9]): continue with open('%s-gold.json' % filename[:-9]) as goldf: gold = json.load(goldf) doctp, docfp, docfn = eval_document(gold, out, transform) doc_count += 1 tp += doctp fp += docfp fn += docfn print('TP: %s\tFP:%s\tFN:%s' % (tp, fp, fn)) # if tp + fp > 0 and tp + fn > 0: p = 100 * float(tp) / (tp + fp) if tp > 0 or fp > 0 else 0 r = 100 * float(tp) / (tp + fn) if tp > 0 or fn > 0 else 0 f = 2 * p * r / (p + r) if p > 0 or r > 0 else 0 print('P: %0.2f%%\tR: %0.2f%%\tF: %0.2f%%' % (p, r, f)) print('%s documents' % doc_count) print('================================')