import numpy as np
import pandas as pd
from collections import defaultdict
from collections.abc import Iterable
from itertools import combinations
[docs]class SentDict(object):
def __init__(self, docs=[], method="PMI",min_times=5, scale="None", pos_seeds=None,neg_seeds=None):
super(SentDict, self).__init__()
self.sent_dict = {}
self.words = set()
assert isinstance(pos_seeds, Iterable)
assert isinstance(neg_seeds, Iterable)
self.build_sent_dict(docs, method, min_times, scale, pos_seeds, neg_seeds)
def __getitem__(self,key):
return self.sent_dict[key]
[docs] def set_pos_seeds(self, pos_seeds):
self.pos_seeds = set(pos_seeds) & set(self.words)
[docs] def set_neg_seed(self, neg_seeds):
self.neg_seeds = set(neg_seeds) & set(self.words)
[docs] def build_sent_dict(self ,docs=[], method="PMI",min_times=5, scale="None", pos_seeds=None,neg_seeds=None):
self.doc_count = len(docs)
self.method = method
pos_seeds = set(pos_seeds)
neg_seeds = set(neg_seeds)
if self.doc_count > 0:
if method == "PMI":
self.co_occur, self.one_occur = self.get_word_stat(docs)
self.words = set(word for word in self.one_occur if self.one_occur[word]>=min_times)
if len(pos_seeds) > 0 or len(neg_seeds) > 0: # 如果有新的输入,就更新种子词,否则默认已有(比如通过set已设定)
self.pos_seeds = (pos_seeds & self.words)
self.neg_seeds = (neg_seeds & self.words)
if len(self.pos_seeds) > 0 or len(self.neg_seeds) > 0:
self.sent_dict = self.SO_PMI(self.words, scale)
else:
raise Exception("你的文章中不包含种子词,SO-PMI算法无法执行")
else:
raise Exception("不支持的情感分析算法")
[docs] def analyse_sent(self, words, avg):
if self.method == "PMI":
words = (set(words) & set(self.sent_dict))
if avg:
return sum(self.sent_dict[word] for word in words) / len(words) if len(words) > 0 else 0
else:
return [self.sent_dict[word] for word in words]
else:
raise Exception("不支持的情感分析算法")
[docs] def get_word_stat(self, docs, co=True):
co_occur = dict() # 由于defaultdict太占内存,还是使用dict
one_occur = dict()
for doc in docs:
for word in doc:
if not word in one_occur:
one_occur[word] = 1
else:
one_occur[word] += 1
# 考虑自共现,否则如果一个负面词不与其他负面词共存,那么它就无法获得PMI,从而被认为是负面的,这不合情理
if not (word,word) in co_occur:
co_occur[(word,word)] = 1
else:
co_occur[(word,word)] += 1
if co:
for a,b in combinations(doc,2):
if not (a,b) in co_occur:
co_occur[(a,b)] = 1
co_occur[(b,a)] = 1
else:
co_occur[(a,b)] += 1
co_occur[(b,a)] += 1
return co_occur,one_occur
[docs] def PMI(self,w1,w2):
if not((w1 in self.one_occur) and (w2 in self.one_occur)):
raise Exception()
if not (w1,w2) in self.co_occur:
return 0
c1, c2 = self.one_occur[w1], self.one_occur[w2]
c3 = self.co_occur[(w1,w2)]
return np.log2((c3*self.doc_count)/(c1*c2))
[docs] def SO_PMI(self, words, scale="None"):
ret = {}
max0, min0 = 0, 0
for word in words:
tmp = sum(self.PMI(word,seed) for seed in self.pos_seeds) - \
sum(self.PMI(word,seed) for seed in self.neg_seeds)
max0, min0 = max(tmp,max0), min(tmp,min0)
ret[word] = tmp
if scale == "+-1":
# 在正负两个区域分别做线性变换
# 不采用统一线性变换2*(x-mid)/(max-min)的原因:
# 要保留0作为中性情感的语义,否则当原来的最小值为0时,经过变换会变成-1
for word, senti in ret.items():
if senti > 0: # 如果触发此条件,max0≥senti>0, 不用检查除数为0。下同
ret[word] /= max0
elif senti < 0:
ret[word] /= (-min0)
elif scale == "0-1":
# 这里可以采用同一变换
ret = {word:(senti-min0)/(max0-min0) for word, senti in ret.items()}
return ret
if __name__ == "__main__":
docs = [["武磊","威武",",","中超","第一","射手","太","棒","了","!"],
["武磊","强",",","中超","最","棒","球员"],
["郜林","不行",",","只会","抱怨","的","球员","注定","上限","了"],
["郜林","看来","不行",",","已经","到","上限","了"]]
sent_dict = SentDict(docs,method="PMI",min_times=1,pos_seeds=["棒"],neg_seeds=["不行"])
print("威武", sent_dict["威武"])
print("球员", sent_dict["球员"])
print("上限", sent_dict["上限"])
print(sent_dict.analyse_sent(docs[0]))