Source code for harvesttext.parsing
import re
from .resources import get_baidu_stopwords
from collections import defaultdict
from .algorithms.texttile import TextTile
[docs]class ParsingMixin:
"""
文本解析模块:
- 依存句法分析
- 基于依存句法分析的三元组抽取
- 基于Texttile的文本自动分段算法
"""
[docs] def dependency_parse(self, sent, standard_name=False, stopwords=None):
'''依存句法分析,调用pyhanlp的接口,并且融入了harvesttext的实体识别机制。不保证高准确率。
:param sent:
:param standard_name:
:param stopwords:
:return: arcs:依存弧,列表中的列表。
[[词语id,词语字面值或实体名(standard_name控制),词性,依存关系,依存子词语id] for 每个词语]
'''
from pyhanlp import HanLP, JClass
if not self.hanlp_prepared:
self.hanlp_prepare()
self.standard_name = standard_name
entities_info = self.entity_linking(sent)
sent2 = self.decoref(sent, entities_info)
# [word.ID-1, word.LEMMA, word.POSTAG, word.DEPREL ,word.HEAD.ID-1]
arcs = []
i = 0
sentence = HanLP.parseDependency(sent2)
for word in sentence.iterator():
word0, tag0 = word.LEMMA, word.POSTAG
if stopwords and word0 in stopwords:
continue
if word0 in self.entity_types:
if self.standard_name:
word0 = entities_info[i][1][0] # 使用链接的实体
else:
l, r = entities_info[i][0] # 或使用原文
word0 = sent[l:r]
tag0 = entities_info[i][1][1][1:-1]
i += 1
arcs.append([word.ID-1, word0, tag0, word.DEPREL, word.HEAD.ID-1])
return arcs
[docs] def triple_extraction(self, sent, standard_name=False, stopwords=None, expand = "all"):
'''利用主谓宾等依存句法关系,找到句子中有意义的三元组。
很多代码参考:https://github.com/liuhuanyong/EventTriplesExtraction
不保证高准确率。
:param sent:
:param standard_name:
:param stopwords:
:param expand: 默认"all":扩展所有主谓词,"exclude_entity":不扩展已知实体,可以保留标准的实体名,用于链接。"None":不扩展
:return:
'''
arcs = self.dependency_parse(sent, standard_name, stopwords)
'''对找出的主语或者宾语进行扩展'''
def complete_e(words, postags, child_dict_list, word_index):
if expand == "all" or (expand == "exclude_entity" and "#"+postags[word_index]+"#" not in self.entity_types):
child_dict = child_dict_list[word_index]
prefix = ''
if '定中关系' in child_dict:
for i in range(len(child_dict['定中关系'])):
prefix += complete_e(words, postags, child_dict_list, child_dict['定中关系'][i])
postfix = ''
if postags[word_index] == 'v':
if '动宾关系' in child_dict:
postfix += complete_e(words, postags, child_dict_list, child_dict['动宾关系'][0])
if '主谓关系' in child_dict:
prefix = complete_e(words, postags, child_dict_list, child_dict['主谓关系'][0]) + prefix
return prefix + words[word_index] + postfix
elif expand == "None":
return words[word_index]
else: # (expand == "exclude_entity" and "#"+postags[word_index]+"#" in self.entity_types)
return words[word_index]
words, postags = ["" for i in range(len(arcs))], ["" for i in range(len(arcs))]
child_dict_list = [defaultdict(list) for i in range(len(arcs))]
for i, format_parse in enumerate(arcs):
id0, words[i], postags[i], rel, headID = format_parse
child_dict_list[headID][rel].append(i)
svos = []
for index in range(len(postags)):
# 使用依存句法进行抽取
if postags[index]:
# 抽取以谓词为中心的事实三元组
child_dict = child_dict_list[index]
# 主谓宾
if '主谓关系' in child_dict and '动宾关系' in child_dict:
r = words[index]
e1 = complete_e(words, postags, child_dict_list, child_dict['主谓关系'][0])
e2 = complete_e(words, postags, child_dict_list, child_dict['动宾关系'][0])
svos.append([e1, r, e2])
# 定语后置,动宾关系
relation = arcs[index][-2]
head = arcs[index][-1]
if relation == '定中关系':
if '动宾关系' in child_dict:
e1 = complete_e(words, postags, child_dict_list, head)
r = words[index]
e2 = complete_e(words, postags, child_dict_list, child_dict['动宾关系'][0])
temp_string = r + e2
if temp_string == e1[:len(temp_string)]:
e1 = e1[len(temp_string):]
if temp_string not in e1:
svos.append([e1, r, e2])
# 含有介宾关系的主谓动补关系
if '主谓关系' in child_dict and '动补结构' in child_dict:
e1 = complete_e(words, postags, child_dict_list, child_dict['主谓关系'][0])
CMP_index = child_dict['动补结构'][0]
r = words[index] + words[CMP_index]
if '介宾关系' in child_dict_list[CMP_index]:
e2 = complete_e(words, postags, child_dict_list, child_dict_list[CMP_index]['介宾关系'][0])
svos.append([e1, r, e2])
return svos
[docs] def cut_paragraphs(self, text, num_paras=None, block_sents=3, std_weight=0.5,
align_boundary=True, stopwords='baidu', remove_puncts=True,
seq_chars=-1, **kwargs):
'''
:param text:
:param num_paras: (默认为None)可以手动设置想要划分的段落数,也可以保留默认值None,让算法自动确定
:param block_sents: 算法的参数,将几句句子分为一个block。一般越大,算法自动划分的段落越少
:param std_weight: 算法的参数。一般越大,算法自动划分的段落越多
:param align_boundary: 新划分的段落是否要与原有的换行处对齐
:param stopwords: 字符串列表/元组/集合,或者'baidu'为默认百度停用词,在算法中引入的停用词,一般能够提升准确度
:param remove_puncts: (默认为True)是否在算法中去除标点符号,一般能够提升准确度
:param seq_chars: (默认为-1)如果设置为>=1的值,则以包含这个数量的字符为基本单元,代替默认的句子。
:param **kwargs: passed to ht.cut_sentences, like deduplicate
:return:
'''
if num_paras is not None:
assert num_paras > 0, "Should give a positive number of num_paras"
assert stopwords == 'baidu' or (hasattr(stopwords, '__iter__') and type(stopwords) != str)
stopwords = get_baidu_stopwords() if stopwords == 'baidu' else set(stopwords)
if seq_chars < 1:
cut_seqs = lambda x: self.cut_sentences(x, **kwargs)
else:
seq_chars = int(seq_chars)
def _cut_seqs(text, len0, strip=True, deduplicate=False):
if deduplicate:
text = re.sub(r"([。!?\!\?])\1+", r"\1", text)
if strip:
text = text.strip()
seqs = [text[i:i + len0] for i in range(0, len(text), len0)]
return seqs
cut_seqs = lambda x: _cut_seqs(x, seq_chars, **kwargs)
if align_boundary:
paras = [para.strip() for para in text.split("\n") if len(para.strip()) > 0]
if num_paras is not None:
# assert num_paras <= len(paras), "The new segmented paragraphs must be no less than the original ones"
if num_paras >= len(paras):
return paras
original_boundary_ids = []
sentences = []
for para in paras:
sentences.extend(cut_seqs(para))
original_boundary_ids.append(len(sentences))
else:
original_boundary_ids = None
sentences = cut_seqs(text, **kwargs)
# with entity resolution, can better decide similarity
if remove_puncts:
allpuncs = re.compile(
r"[,\_《。》、?;:‘’"“”【「】」、·!@¥…()—\,\<\.\>\/\?\;\:\'\"\[\]\{\}\~\`\!\@\#\$\%\^\&\*\(\)\-\=\+]")
sent_words = [re.sub(allpuncs, "",
self.seg(sent, standard_name=True, stopwords=stopwords, return_sent=True)
).split()
for sent in sentences]
else:
sent_words = [self.seg(sent, standard_name=True, stopwords=stopwords)
for sent in sentences]
texttiler = TextTile()
predicted_boundary_ids = texttiler.cut_paragraphs(sent_words, num_paras, block_sents, std_weight,
align_boundary, original_boundary_ids)
jointer = " " if (self.language == 'en' and seq_chars > 1) else ""
predicted_paras = [jointer.join(sentences[l:r]) for l, r in
zip([0] + predicted_boundary_ids[:-1], predicted_boundary_ids)]
return predicted_paras