3 回答
TA贡献1859条经验 获得超6个赞
下面提供的解决方案在运行时的复杂度约为O(n),其中n是每个句子中的标记数。
对于 500 万个句子,concepts.txt它在大约 30 秒内执行所需的操作,请参阅第三部分中的基本测试。
当涉及到空间复杂性时,您将不得不保留一个嵌套的字典结构(让我们暂时像这样简化它),假设它是O(c*u),其中u是特定长度概念的唯一标记(标记方式) ,而 c 是概念的长度。
很难确定确切的复杂性,但它与此非常相似(对于您的示例数据和您提供的concepts.txt数据[ ]这些数据非常准确,但我们将在执行过程中了解血腥细节)。
我假设您可以在空格上拆分您的概念和句子,如果不是这样,我建议您查看spaCy,它提供了更智能的方法来标记您的数据。
一、简介
让我们以你的例子为例:
concepts = [
["natural language processing", "text mining", "texts", "nlp"],
["advanced data mining", "data mining", "data"],
["discourse analysis", "learning analytics", "mooc"],
]
正如您所说,概念中的每个元素都必须映射到第一个元素,因此,在 Pythonish 中,它会大致遵循以下几点:
for concept in concepts:
concept[1:] = concept[0]
如果所有概念的标记长度都等于 1(这里不是这种情况),并且是唯一的,那么任务将很容易。让我们专注于第二种情况和一个特定的(稍作修改的)示例concept以了解我的观点:
["advanced data mining", "data something", "data"]
这里data将映射到advanced data mining, BUT data something,它由 组成data应该在它之前映射。如果我理解正确,你会想要这句话:
"Here is data something and another data"
要映射到:
"Here is advanced data mapping and another advanced data mining"
而不是天真的方法:
"Here is advanced data mapping something and another advanced data mining"
请注意,对于第二个示例,我们只映射了data,而不是data something。
为了优先考虑data something(和其他符合这种模式的),我使用了一个充满字典的数组结构,其中数组中较早的概念是那些在标记方面更长的概念。
继续我们的例子,这样的数组看起来像这样:
structure = [
{"data": {"something": "advanced data mining"}},
{"data": "advanced data mining"},
]
请注意,如果我们按此顺序遍历标记(例如,首先遍历具有连续标记的第一个字典,如果未找到匹配项,则转到第二个字典,依此类推),我们将首先获得最长的概念。
2. 代码
好的,我希望你明白基本的想法(如果没有,请在下面发表评论,我会尝试更详细地解释不清楚的部分)。
免责声明:我对这个代码方面并不是特别自豪,但它完成了工作,我想可能会更糟。
2.1 分层字典
首先,让我们获得最长的概念标记(不包括第一个元素,因为它是我们的目标,我们永远不必更改它):
def get_longest(concepts: List[List[str]]):
return max(len(text.split()) for concept in concepts for text in concept[1:])
使用这些信息,我们可以通过创建与不同长度的概念一样多的字典来初始化我们的结构(在上面的示例中,它将是 2,所以它适用于您的所有数据。任何长度的概念都可以):
def init_hierarchical_dictionaries(longest: int):
return [(length, {}) for length in reversed(range(longest))]
请注意,我将每个概念的长度添加到数组中,IMO 这样在遍历时更容易,尽管在对实现进行了一些更改后,您可以不使用它。
现在,当我们有了这些辅助函数时,我们可以从概念列表中创建结构:
def create_hierarchical_dictionaries(concepts: List[List[str]]):
# Initialization
longest = get_longest(concepts)
hierarchical_dictionaries = init_hierarchical_dictionaries(longest)
for concept in concepts:
for text in concept[1:]:
tokens = text.split()
# Initialize dictionary; get the one with corresponding length.
# The longer, the earlier it is in the hierarchy
current_dictionary = hierarchical_dictionaries[longest - len(tokens)][1]
# All of the tokens except the last one are another dictionary mapping to
# the next token in concept.
for token in tokens[:-1]:
current_dictionary[token] = {}
current_dictionary = current_dictionary[token]
# Last token is mapped to the first concept
current_dictionary[tokens[-1]] = concept[0].split()
return hierarchical_dictionaries
此函数将创建我们的分层字典,请参阅源代码中的注释以获得一些解释。您可能想创建一个自定义类来保留这个东西,这样使用起来应该更容易。
这与1. 介绍中描述的对象完全相同
2.2 遍历字典
这部分要困难得多,但这次让我们使用自上而下的方法。我们将从简单开始:
def embed_sentences(sentences: List[str], hierarchical_dictionaries):
return (traverse(sentence, hierarchical_dictionaries) for sentence in sentences)
提供分层字典,它创建一个生成器,根据概念映射转换每个句子。
现在traverse功能:
def traverse(sentence: str, hierarchical_dictionaries):
# Get all tokens in the sentence
tokens = sentence.split()
output_sentence = []
# Initialize index to the first token
index = 0
# Until any tokens left to check for concepts
while index < len(tokens):
# Iterate over hierarchical dictionaries (elements of the array)
for hierarchical_dictionary_tuple in hierarchical_dictionaries:
# New index is returned based on match and token-wise length of concept
index, concept = traverse_through_dictionary(
index, tokens, hierarchical_dictionary_tuple
)
# Concept was found in current hierarchical_dictionary_tuple, let's add it
# to output
if concept is not None:
output_sentence.extend(concept)
# No need to check other hierarchical dictionaries for matching concept
break
# Token (and it's next tokens) do not match with any concept, return original
else:
output_sentence.append(tokens[index])
# Increment index in order to move to the next token
index += 1
# Join list of tokens into a sentence
return " ".join(output_sentence)
再一次,如果您不确定发生了什么,请发表评论。
使用这种方法,悲观地,我们将执行O(n*c!)检查,其中 n 是句子中的标记数,c 是最长概念的标记长度,它是阶乘。这种情况是极不可能在实践中发生,在句子中每个令牌将不得不几乎完全适合最长的概念,再加上所有短的概念必须是最短的一个前缀(如super data mining,super data和data)。
对于任何实际问题,它会更接近 O(n),正如我之前所说,使用您在 .txt 文件中提供的数据,最坏的情况是 O(3 * n),通常是 O(2 * n) .
遍历每个字典:
def traverse_through_dictionary(index, tokens, hierarchical_dictionary_tuple):
# Get the level of nested dictionaries and initial dictionary
length, current_dictionary = hierarchical_dictionary_tuple
# inner_index will loop through tokens until match or no match was found
inner_index = index
for _ in range(length):
# Get next nested dictionary and move inner_index to the next token
current_dictionary = current_dictionary.get(tokens[inner_index])
inner_index += 1
# If no match was found in any level of dictionary
# Return current index in sentence and None representing lack of concept.
if current_dictionary is None or inner_index >= len(tokens):
return index, None
# If everything went fine through all nested dictionaries, check whether
# last token corresponds to concept
concept = current_dictionary.get(tokens[inner_index])
if concept is None:
return index, None
# If so, return inner_index (we have moved length tokens, so we have to update it)
return inner_index, concept
这构成了我的解决方案的“肉”。
3. 结果
现在,为简洁起见,下面提供了整个源代码(concepts.txt是您提供的):
import ast
import time
from typing import List
def get_longest(concepts: List[List[str]]):
return max(len(text.split()) for concept in concepts for text in concept[1:])
def init_hierarchical_dictionaries(longest: int):
return [(length, {}) for length in reversed(range(longest))]
def create_hierarchical_dictionaries(concepts: List[List[str]]):
# Initialization
longest = get_longest(concepts)
hierarchical_dictionaries = init_hierarchical_dictionaries(longest)
for concept in concepts:
for text in concept[1:]:
tokens = text.split()
# Initialize dictionary; get the one with corresponding length.
# The longer, the earlier it is in the hierarchy
current_dictionary = hierarchical_dictionaries[longest - len(tokens)][1]
# All of the tokens except the last one are another dictionary mapping to
# the next token in concept.
for token in tokens[:-1]:
current_dictionary[token] = {}
current_dictionary = current_dictionary[token]
# Last token is mapped to the first concept
current_dictionary[tokens[-1]] = concept[0].split()
return hierarchical_dictionaries
def traverse_through_dictionary(index, tokens, hierarchical_dictionary_tuple):
# Get the level of nested dictionaries and initial dictionary
length, current_dictionary = hierarchical_dictionary_tuple
# inner_index will loop through tokens until match or no match was found
inner_index = index
for _ in range(length):
# Get next nested dictionary and move inner_index to the next token
current_dictionary = current_dictionary.get(tokens[inner_index])
inner_index += 1
# If no match was found in any level of dictionary
# Return current index in sentence and None representing lack of concept.
if current_dictionary is None or inner_index >= len(tokens):
return index, None
# If everything went fine through all nested dictionaries, check whether
# last token corresponds to concept
concept = current_dictionary.get(tokens[inner_index])
if concept is None:
return index, None
# If so, return inner_index (we have moved length tokens, so we have to update it)
return inner_index, concept
def traverse(sentence: str, hierarchical_dictionaries):
# Get all tokens in the sentence
tokens = sentence.split()
output_sentence = []
# Initialize index to the first token
index = 0
# Until any tokens left to check for concepts
while index < len(tokens):
# Iterate over hierarchical dictionaries (elements of the array)
for hierarchical_dictionary_tuple in hierarchical_dictionaries:
# New index is returned based on match and token-wise length of concept
index, concept = traverse_through_dictionary(
index, tokens, hierarchical_dictionary_tuple
)
# Concept was found in current hierarchical_dictionary_tuple, let's add it
# to output
if concept is not None:
output_sentence.extend(concept)
# No need to check other hierarchical dictionaries for matching concept
break
# Token (and it's next tokens) do not match with any concept, return original
else:
output_sentence.append(tokens[index])
# Increment index in order to move to the next token
index += 1
# Join list of tokens into a sentence
return " ".join(output_sentence)
def embed_sentences(sentences: List[str], hierarchical_dictionaries):
return (traverse(sentence, hierarchical_dictionaries) for sentence in sentences)
def sanity_check():
concepts = [
["natural language processing", "text mining", "texts", "nlp"],
["advanced data mining", "data mining", "data"],
["discourse analysis", "learning analytics", "mooc"],
]
sentences = [
"data mining and text mining",
"nlp is mainly used by discourse analysis community",
"data mining in python is fun",
"mooc data analysis involves texts",
"data and data mining are both very interesting",
]
targets = [
"advanced data mining and natural language processing",
"natural language processing is mainly used by discourse analysis community",
"advanced data mining in python is fun",
"discourse analysis advanced data mining analysis involves natural language processing",
"advanced data mining and advanced data mining are both very interesting",
]
hierarchical_dictionaries = create_hierarchical_dictionaries(concepts)
results = list(embed_sentences(sentences, hierarchical_dictionaries))
if results == targets:
print("Correct results")
else:
print("Incorrect results")
def speed_check():
with open("./concepts.txt") as f:
concepts = ast.literal_eval(f.read())
initial_sentences = [
"data mining and text mining",
"nlp is mainly used by discourse analysis community",
"data mining in python is fun",
"mooc data analysis involves texts",
"data and data mining are both very interesting",
]
sentences = initial_sentences.copy()
for i in range(1_000_000):
sentences += initial_sentences
start = time.time()
hierarchical_dictionaries = create_hierarchical_dictionaries(concepts)
middle = time.time()
letters = []
for result in embed_sentences(sentences, hierarchical_dictionaries):
letters.append(result[0].capitalize())
end = time.time()
print(f"Time for hierarchical creation {(middle-start) * 1000.0} ms")
print(f"Time for embedding {(end-middle) * 1000.0} ms")
print(f"Overall time elapsed {(end-start) * 1000.0} ms")
def main():
sanity_check()
speed_check()
if __name__ == "__main__":
main()
速度检查结果如下:
Time for hierarchical creation 107.71822929382324 ms
Time for embedding 30460.427284240723 ms
Overall time elapsed 30568.145513534546 ms
因此,对于 500 万个句子(您提供的 5 个句子连接了 100 万次)和您提供的概念文件(1.1 mb),执行概念映射大约需要 30 秒,我想这还不错。
在最坏的情况下,字典应该占用与输入文件一样多的内存(concepts.txt在这种情况下),但通常会更低/更低,因为它取决于概念长度和这些单词的唯一单词的组合。
TA贡献1862条经验 获得超7个赞
TA贡献1824条经验 获得超5个赞
import re
concepts = [['natural language processing', 'text mining', 'texts', 'nlp'], ['advanced data mining', 'data mining', 'data'], ['discourse analysis', 'learning analytics', 'mooc']]
sentences = ['data mining and text mining', 'nlp is mainly used by discourse analysis community', 'data mining in python is fun', 'mooc data analysis involves texts', 'data and data mining are both very interesting']
replacementDict = {concept[0] : concept[1:] for concept in concepts}
finderAndReplacements = [(re.compile('(' + '|'.join(replacees) + ')'), replacement)
for replacement, replacees in replacementDict.items()]
def sentenceReplaced(findRegEx, replacement, sentence):
return findRegEx.sub(replacement, sentence, count=0)
def sentencesAllReplaced(sentences, finderAndReplacements=finderAndReplacements):
for regex, replacement in finderAndReplacements:
sentences = [sentenceReplaced(regex, replacement, sentence) for sentence in sentences]
return sentences
print(sentencesAllReplaced(sentences))
设置:我更喜欢concepts用字典表示,其中键、值是替换、替换。将此存储在replacementDict
为每个预期的替换组编译匹配的正则表达式。将其与其预期的替代品一起存储在finderAndReplacements列表中。
sentenceReplaced执行替换后,函数返回输入语句。(此处的应用顺序无关紧要,因此如果我们注意避免竞争条件,并行化应该是可能的。)
最后,我们循环查找/替换每个句子。(大量并行结构将提供改进的性能。)
我很想看到一些彻底的基准测试/测试/报告,因为我确信根据此任务输入 ( concepts, sentences) 和运行它的硬件的性质,有很多微妙之处。
在这种情况下,sentences与concepts替代品相比,是占主导地位的输入组件,我相信编译正则表达式将是有利的。当句子很少而概念很多时,尤其是大多数概念都不在任何句子中时,编译这些匹配器将是一种浪费。如果每次替换都有很多替换,则使用的编译方法可能性能不佳甚至出错。. . (关于输入参数的不同假设提供了许多权衡考虑,这是经常发生的情况。)
添加回答
举报