统计数据流中给定字符串列表的出现次数



我有一个类似的数据流

stream = "carracecowtenhihellocohiwcar ......"

我必须从流中获得列表中所有单词的出现次数

words = ["car", "cow", "hi", ....]

所以,结果会像一样

result = {
  "car": 2,
  "cow": 1,
  "hi": 2,
  ....
  ....
}

在我目前的实现中,我正在遍历单词列表,并将它们添加到dict中,如下所示,

我正在寻找更好的方法来做到这一点,因为单词列表不断增加,数据流不断。

这就是我目前拥有的,

import re
def word_count(stream_obj):
    mydict = {}
    words = ["car", "cow", "hi", "hello"]
    max_word_len = len(max(words, key=len))
    regex = re.compile("|".join(words))
    last_chunk_remainder = ""
    while(stream_obj.getchunk() is not None):
        stream_data = last_chunk_remainder + stream_obj.getchunk()
        for word in words:
            mydict[word] = stream_data.count(word)
        # to handle the corner case like if the stream chunk ends with
        # “ca” and first letter of next is "r", so that make the word
        # match words in the list, which is "car"
        if not regex.findall(stream_data[-max_word_len:]):
            last_chunk_remainder = stream_data[-max_word_len:]

感谢

所以我尝试了一种基于trie的方法来解决您的问题(现在我明白了您想要什么)。也许你可以从中找到一些有用的东西。有一个最初的想法,一个围绕这个想法的抽象接口,以帮助寻找更有效的解决方案,还有一些pytest测试,以帮助了解它是否/如何工作。有一些trie模块,但现在看起来更有趣。

from collections import defaultdict
# Faking an infinite stream of characters
from itertools import cycle
stream = cycle('carracecowtenhihellocohiwcar')
# Just exploring the idea of a trie. If it works, we can think about a
# more efficient implementation later.
def new_trie_branch():
    return defaultdict(new_trie_branch)
# A symbol used to indicate leaves in the trie
END_OF_WORD = object()
# The trie is implemented as a dictionary mapping letters to
# sub-tries. The pseudo-letter END_OF_WORD marks the end of a path in
# the trie which corresponds to a valid whole word.
def make_trie(words):
    trie = new_trie_branch()
    for word in words:
        branch = trie
        for letter in word:
            branch = branch[letter]
        branch[END_OF_WORD] = True
    return trie
# As each letter comes out of the stream, it is fed into a collection
# of 'listeners'. Each listener is a stateful function which
# corresponds to some location in the trie and is aware of the word
# prefix which describes the path from the trie's root to the current
# node. When such a listener is given a letter, it checks (in the trie)
# whether the prefix plus the new letter form a complete word: if so,
# it bumps the word count for that word. It also checks whether the
# prefix plus the new letter form a valid longer prefix: if so, it
# adds a new listener (corresponding to the next node in the trie)
# into the collection of listeners that will be applied to the next letter to
# come out of the stream.
def count_words_in_stream(words, stream, word_count = None):
    word_count = defaultdict(int) if word_count is None else word_count
    def make_listener(branch, prefix):
        def listener(next_letter):
            if next_letter in branch:
                next_branch = branch[next_letter]
                word = prefix + next_letter
                if END_OF_WORD in next_branch:
                    word_count[word] += 1
                next_listeners.append(make_listener(next_branch, word))
        return listener
    start_of_word_listener = make_listener(make_trie(words), '')
    listeners = [start_of_word_listener]
    for letter in stream:
        next_listeners = [start_of_word_listener]
        for listen in listeners:
            listen(letter)
        listeners = next_listeners
    return word_count
# Now we try to come up with an implementation-independent interface
# for the trie to allow us to refactor more easily in search of a more
# efficient implementation, if necessary.
class Trie(object):
    def __init__(self, words):
        self._trie = make_trie(words)
    # Checks whether the given WORD is present in the trie
    def __contains__(self, word):
        trie = self._trie
        for letter in word:
            if letter not in trie:
                return False
            trie = trie[letter]
        else:
            return END_OF_WORD in trie
    # The 'in' operator (__contains__) checks for the presence of a
    # whole word in the trie, so we need a different interface for
    # checking whether a given branch exists at this node.
    def has_branch(self, branch_id):
        return branch_id in self._trie
    # Picks one branch of the trie
    def __getitem__(self, branch_id):
        branch = Trie('')
        branch._trie = self._trie[branch_id]
        return branch
    # Iterates over the branches of this trie
    def __iter__(self):
        return iter(self._trie)
# Same as count_words_in_stream above, but uses the abstract interface
# we just invented.
def abstract_count_words_in_stream(words, stream, word_count = None):
    word_count = defaultdict(int) if word_count is None else word_count
    def make_listener(branch, prefix):
        def listener(next_letter):
            if branch.has_branch(next_letter):
                next_branch = branch[next_letter]
                word = prefix + next_letter
                if next_branch.has_branch(END_OF_WORD):
                    word_count[word] += 1
                next_listeners.append(make_listener(next_branch, word))
        return listener
    start_of_word_listener = make_listener(Trie(words), '')
    listeners = [start_of_word_listener]
    for letter in stream:
        next_listeners = [start_of_word_listener]
        for listen in listeners:
            listen(letter)
        listeners = next_listeners
    return word_count
################################################################################
# Some tests of the implementation. These are written in the pytest
# framework.
################################################################################
from pytest import mark
# Testing the specific implementation details. Just to get us going.
@mark.parametrize('words, trie', (
    (['one'],
     {'o': {'n': {'e': {END_OF_WORD: True}}}}),
    ('one two'.split(),
     {'o': {'n': {'e': {END_OF_WORD: True}}},
      't': {'w': {'o': {END_OF_WORD: True}}}}),
    ('abc abd'.split(),
     {'a': {'b': {'c': {END_OF_WORD: True},
                  'd': {END_OF_WORD: True}}}})
))
def test_make_trie(words, trie):
    assert make_trie(words) == trie
count_words_test_data = ('words, stream, expected', (
    (['cow'] ,'abcdefg', {}),
    (['cow'] ,'cowcowcow', {'cow':3}),
    ('cow car fish'.split(), 'cowcarfishcarcarfishcow',
     {'cow':2, 'car':3, 'fish':2}),
    ('and hand handy'.split(), 'handyandhand',
     {'and':3, 'hand':2, 'handy':1}),
))
@mark.parametrize(*count_words_test_data)
def test_count_words_in_stream(words, stream, expected):
    assert count_words_in_stream(words, stream) == expected

################################################################################
# Testing the abstract Trie interface. This will help if we want to
# refactor the implementation in search of something more efficient.
################################################################################
@mark.parametrize('words, absents', (
    ('one'.split(), 'o on ono'.split()),
    ('o on one'.split(), []),
    ('abc abd'.split(), ['ab'])
))
def test_Trie_word_presence(words, absents):
    trie = Trie(words)
    for word in words:
        assert word in trie
    for absent in absents:
        assert absent not in trie
@mark.parametrize(*count_words_test_data)
def test_abstract_count_words_in_stream(words, stream, expected):
    assert abstract_count_words_in_stream(words, stream) == expected
stream = "carracecowtenhihellocohiwcar"
words = ["car", "cow", "hi"]
print { word:stream.count(word) for word in words }

我做到了,并尝试涵盖所有已知的角落案例,如果你能提出一些建议/改进,我将非常感谢,感谢您的帮助,并对最初不完整的问题表示抱歉。

import re
from collections import defaultdict
WORD_COUNTS = defaultdict(int)
WORDS = ["car", "cat", "cow", "hi", "hello"]
MAX_WORD_LEN = len(max(WORDS, key=len))
REGEX = ("|".join(WORDS))
RE_OBJ = re.compile(REGEX)
def count_words(stream):
    last_stream_remainder = ""
    while True:
        data = stream.get_chunk()
        # Breaking point 
        if data is None:
            break
        if not data:
            continue
        data = last_stream_remainder + data
        for match in RE_OBJ.finditer(data):
            WORD_COUNTS[match.group(0)] += 1
        # to cover the corner case like remainder from last 
        # chunk can attach with new one and make a word
        if match:
            if match.end() >= len(data):
                continue
            else:
                last_match = min((len(data) - match.end()), MAX_WORD_LEN)
                last_stream_remainder = data[-last_match:]
        else:
            last_stream_remainder = data[-MAX_WORD_LEN:]
class StreamReader(object):
    STREAM_DATA = ["car1cat1lftrysomecow1shi1iamgoinghello1pleasegoocar2sarehere",
                   "car3car4car5cat2cat3h", "i2thisishello2hello3he", "", "llo4", None]
    def get_chunk(self):
        return self.STREAM_DATA.pop(0)
stream = StreamReader()
count_words(stream)
print WORD_COUNTS.items()
# [('car', 5), ('hi', 3), ('hello', 4), ('cow', 1), ('cat', 3)]

这是我的看法。每个字符需要O(k)时间,或者整个流需要O(nk)时间,其中k是单词的长度,n是流的长度;和O(k)空间。

class Solution:
  def __init__(self, s):
    self.buff, self.count, self.s = '', 0, s
  def process(self, a):
    self.buff += a
    if len(self.buff) > len(self.s):
      self.buff = self.buff[1:]
      if (self.buff) == self.s:
        self.count += 1

以下是一些测试:

solution = Solution('cocoa')
solution.process('c')
solution.process('o')
solution.process('c')
solution.process('o')
assert solution.count == 0
solution.process('c')
solution.process('o')
solution.process('a')
assert solution.count == 1
print('First test passed')
solution.count = 0
solution = Solution('acbcc')
stream = 'acbcbcc'
for a in stream:
  solution.process(a)
assert solution.count == 0
print('Second test passed')

我尝试了下面的代码,它对我来说效果很好。使用trie树来解决这个问题。

from collections import defaultdict
from itertools import cycle
def new_trie_branch():
    return defaultdict(new_trie_branch)
END_OF_WORD = object()

def make_trie_tree(words):
    trie = new_trie_branch()
    for word in words:
        branch = trie
        for letter in word:
            branch = branch[letter]
        branch[END_OF_WORD] = True
    return trie

def count_words_in_stream(words, stream, word_count = None):
    word_count = defaultdict(int) if word_count is None else word_count
    def make_listener(branch, prefix):
        def listener(next_letter):
            if next_letter in branch:
                next_branch = branch[next_letter]
                word = prefix + next_letter
                if END_OF_WORD in next_branch:
                    word_count[word] += 1
                next_listeners.append(make_listener(next_branch, word))
        return listener
    start_of_word_listener = make_listener(make_trie_tree(words), '')
    listeners = [start_of_word_listener]
    for letter in stream:
        next_listeners = [start_of_word_listener]
        for listen in listeners:
            listen(letter)
        listeners = next_listeners
    return word_count

stream = "acacathellockword"
words = ['aca','cat','hell','hello','lock','world']
print(dict(count_words_in_stream(words,stream)))

输出:

    {'aca': 2, 'cat': 1, 'hell': 1, 'hello': 1, 'lock': 1}

相关内容

  • 没有找到相关文章

最新更新