挖掘给定序列中的重复子序列

人工智能 Python 模式识别 序列建模 数据挖掘
2021-10-31 10:14:51

给定一个字母I={i1,i2,,in}和一个序列S=[e1,e2,,em], 其中项目ejI,我有兴趣找到每一个模式(的子序列S) 出现在S多于N次,并且有一个长度L两个限制之间:m<L<M. 在图案之间重叠的情况下,只应考虑最长的图案。

例如,给定N=3,m=3M=6, 一个字母I={A,B,C,D,E,F}, 和下面的序列S(星号只是在本例中标记模式的位置):

S=[A,A,F,F,A,A,B,D,E,F,E,D,F,F,A,A,F,C,C,A,B,D,D,C,C,C,A,B,D,D,C,A,C,B,E,A,B,C,F,F,A,A,E,A,B,C,A,D,E,F,F,A,A,B,C,D,A,E,A,B,C,A,B,D,D,]
所寻求的算法应该能够返回以下模式:
[C,A,B,D,D],[F,F,A,A]
连同他们各自的职位S.

一个可用的 Python 实现将是非常可取的(并行实现,更是如此)。

我正在阅读有关BIDE 算法的信息,但我认为这不是正确的方法。有任何想法吗?提前致谢!

1个回答

您可以像 BIDE 方法一样执行此操作。可以这样做:

class TreeNode:
    def __init__(self, element, depth, count=0, parent=None):
        self.count= count
        self.element= element
        self.depth= depth
        self.subnodes= dict()
        self.parent= parent
        
    def __repr__(self):
        return f'{self.__class__.__name__}({self.element}, {self.depth}, {self.count})'
        
    def get_subnode(self, element, create=True):
        if create is True:
            return self.subnodes.setdefault(element, TreeNode(
                element, 
                self.depth+1, 
                count=0, 
                parent=self
            )
        )
        else:
            return self.subnodes.get(element)

    def get_subnode_increasing_count(self, element):
        subnode= self.get_subnode(element)
        subnode.count+= 1
        return subnode
    
    def harvest(self, N, m, M, prefix=[], append_result_to=None):
        # get all sequences which occurs more than N times
        # and have a length (depth) of m < depth < M
        prefix= prefix.copy()
        prefix_len= len(prefix)
        if append_result_to is None:
            result= list()
        else:
            result= append_result_to
        if self.element is not None:
            prefix.append(self.element)
            prefix_len+= 1
        if N >= self.count or prefix_len >= M:
            return result
        if N < self.count and m < prefix_len and prefix_len < M:
            # the subsequence matches the constraints
            result.append(prefix)
        for subnode in self.subnodes.values():
            subnode.harvest(
                N, 
                m, 
                M, 
                prefix=prefix, 
                append_result_to=result
            )
        return result
        
    def get_prefix(self):
        if self.parent is None:
            prefix= []
        else:
            prefix= self.parent.get_prefix()
        if self.element is not None:
            prefix.append(self.element)
        return prefix
        
    def print_tree_counts(self, leaves_only=False, min_count=0):
        if leaves_only is False or not self.subnodes:
            if self.count >= min_count:
                print(self.get_prefix(), self.count)
        for subnode in self.subnodes.values():
            subnode.print_tree_counts(leaves_only=leaves_only, min_count=min_count)

顺序方法

def find_patterns(S, N, m, M):
    root= TreeNode(None, 0, 0)
    active_nodes= []
    for el in S:
        root.count+=1
        # append the root node
        active_nodes.append(root)
        # now replace all nodes in active nodes by the node
        # that is reached one level deeper following el
        active_nodes= [node.get_subnode_increasing_count(el) for node in active_nodes if node.depth < M]

    # now harvest the tree after applying the restrictions
    return root, root.harvest(N, m, M)

S='AAFFAABDEFEDFFAAFCCABDDCCCABDDCACBEABCFFAAEABCADEFFAABCDAEABCABDD'
root, patterns= find_patterns(S, 3, 3, 6)
patterns

结果是:

[['F', 'F', 'A', 'A']]

您的第二个模式恰好出现 3 次,因此不能满足 > 3 次出现的要求([C,A,B,D,D])。

并行处理的修改

要使其可并行处理,您可以进行幻灯片修改。只需在 TreeNode 中创建另一个允许合并节点的方法。像这样:

def merge_nodes(self, other_nodes):
    # merge other_nodes into this node
    # including all subnodes
    if len(other_nodes) > 0:
        elements= set()
        for other_node in other_nodes:
            self.count+= other_node.count
            elements.update(other_node.subnodes.keys())
        # elements now contains the set of the next elements 
        # with which the sequence continues across the 
        # other nodes
        for element in elements:
            # get the node of the resulting tree that represents
            # the sequnce continuing with element, if there is
            # no such subnode, create one, since there is at least
            # one other node that counted sequence seq + element
            my_subnode= self.get_subnode(element, create=True)
            other_subnodes= list()
            for other_node in other_nodes:
                # now get the subnode for each other node, that
                # represents the same sequence (seq + element)
                other_subnode= other_node.get_subnode(element, create=False)
                if other_subnode is not None:
                    other_subnodes.append(other_subnode)
            # merge the subnode the same way
            my_subnode.merge_nodes(other_subnodes)

现在您可以在子树上调用 find_patterns,但您需要考虑到,您可能无法正常拆分输入序列。您需要定义一些重叠序列部分,以便可以完成从一个序列片段末尾开始的模式,但需要以不同方式计算此重叠序列(否则您会得到双重计数,这会导致错误输入) . 因此,您必须确保只有在序列片段中开始的模式与重叠部分继续,但在重叠部分中没有新模式开始,因为您将它们计算在内:

def find_patterns(S, N, m, M, overlap=[]):
    root= TreeNode(None, 0, count=0, parent=None)
    active_nodes= []
    def process_element(active_nodes, element, M):
        return [
            node.get_subnode_increasing_count(element) 
                for node in active_nodes 
                    if node.depth < M
        ]
    for el in S:
        root.count+=1
        # append the root node
        active_nodes.append(root)
        # now replace all nodes in active nodes by the node
        # that is reached one level deeper following el
        active_nodes= process_element(active_nodes, el, M)
    # complete the already started sequences with the
    # overlapping sequence (the sequence that may be
    # processed in another process)
    for el in overlap:
        active_nodes= process_element(active_nodes, el, M)
    # now harvest the tree after applying the restrictions
    return root, root.harvest(N, m, M)

现在你可以这样做:

split_point= len(S) // 2
S1= S[split_point:]
S2= S[:split_point]
S1_overlap= S2[:10] 
# in fact you could have just used :5 above, 
# but that doesn't affect the result
r1, _= find_patterns(S1, 3, 3, 6, overlap=S1_overlap)
# Note: you could safely start the comand in the next 
# line in parallel to the previous call of find_patterns
# and of course, you could split your sequence in as
# many fragments, as you like, as long as you maintain
# the overlap correctly and call merge_nodes afterwards
r2, _= find_patterns(S2, 3, 3, 6) 
r1.merge_nodes([r2])
patterns= r1.harvest(3, 3, 6)

这产生与上述相同的结果。为了更清楚一点,我对重叠部分的意思是:

S1 is 'CBEABCFFAAEABCADEFFAABCDAEABCABDD'
S2 is 'AAFFAABDEFEDFFAAFCCABDDCCCABDDCA'
S1_overlap is 'AAFFAABDEF'

你只会在find_patterns(S1, 3, 3, 6)S1 中搜索重复的模式,因此它不会考虑以 BDD 部分开始(从 S1 的末尾开始)并在 S2 内继续的模式。我也考虑find_patterns(S1, 3, 3, 6, overlap=S1_overlap)这些模式。