我有一个类似的数据流
stream = "carracecowtenhihellocohiwcar ......"
我必须从流中获得列表中所有单词的出现次数
words = ["car", "cow", "hi", ....]
所以,结果会像一样
result = {
"car": 2,
"cow": 1,
"hi": 2,
....
....
}
在我目前的实现中,我正在遍历单词列表,并将它们添加到dict中,如下所示,
我正在寻找更好的方法来做到这一点,因为单词列表不断增加,数据流不断。
这就是我目前拥有的,
import re
def word_count(stream_obj):
mydict = {}
words = ["car", "cow", "hi", "hello"]
max_word_len = len(max(words, key=len))
regex = re.compile("|".join(words))
last_chunk_remainder = ""
while(stream_obj.getchunk() is not None):
stream_data = last_chunk_remainder + stream_obj.getchunk()
for word in words:
mydict[word] = stream_data.count(word)
# to handle the corner case like if the stream chunk ends with
# “ca” and first letter of next is "r", so that make the word
# match words in the list, which is "car"
if not regex.findall(stream_data[-max_word_len:]):
last_chunk_remainder = stream_data[-max_word_len:]
感谢
所以我尝试了一种基于trie的方法来解决您的问题(现在我明白了您想要什么)。也许你可以从中找到一些有用的东西。有一个最初的想法,一个围绕这个想法的抽象接口,以帮助寻找更有效的解决方案,还有一些pytest测试,以帮助了解它是否/如何工作。有一些trie模块,但现在看起来更有趣。
from collections import defaultdict
# Faking an infinite stream of characters
from itertools import cycle
stream = cycle('carracecowtenhihellocohiwcar')
# Just exploring the idea of a trie. If it works, we can think about a
# more efficient implementation later.
def new_trie_branch():
return defaultdict(new_trie_branch)
# A symbol used to indicate leaves in the trie
END_OF_WORD = object()
# The trie is implemented as a dictionary mapping letters to
# sub-tries. The pseudo-letter END_OF_WORD marks the end of a path in
# the trie which corresponds to a valid whole word.
def make_trie(words):
trie = new_trie_branch()
for word in words:
branch = trie
for letter in word:
branch = branch[letter]
branch[END_OF_WORD] = True
return trie
# As each letter comes out of the stream, it is fed into a collection
# of 'listeners'. Each listener is a stateful function which
# corresponds to some location in the trie and is aware of the word
# prefix which describes the path from the trie's root to the current
# node. When such a listener is given a letter, it checks (in the trie)
# whether the prefix plus the new letter form a complete word: if so,
# it bumps the word count for that word. It also checks whether the
# prefix plus the new letter form a valid longer prefix: if so, it
# adds a new listener (corresponding to the next node in the trie)
# into the collection of listeners that will be applied to the next letter to
# come out of the stream.
def count_words_in_stream(words, stream, word_count = None):
word_count = defaultdict(int) if word_count is None else word_count
def make_listener(branch, prefix):
def listener(next_letter):
if next_letter in branch:
next_branch = branch[next_letter]
word = prefix + next_letter
if END_OF_WORD in next_branch:
word_count[word] += 1
next_listeners.append(make_listener(next_branch, word))
return listener
start_of_word_listener = make_listener(make_trie(words), '')
listeners = [start_of_word_listener]
for letter in stream:
next_listeners = [start_of_word_listener]
for listen in listeners:
listen(letter)
listeners = next_listeners
return word_count
# Now we try to come up with an implementation-independent interface
# for the trie to allow us to refactor more easily in search of a more
# efficient implementation, if necessary.
class Trie(object):
def __init__(self, words):
self._trie = make_trie(words)
# Checks whether the given WORD is present in the trie
def __contains__(self, word):
trie = self._trie
for letter in word:
if letter not in trie:
return False
trie = trie[letter]
else:
return END_OF_WORD in trie
# The 'in' operator (__contains__) checks for the presence of a
# whole word in the trie, so we need a different interface for
# checking whether a given branch exists at this node.
def has_branch(self, branch_id):
return branch_id in self._trie
# Picks one branch of the trie
def __getitem__(self, branch_id):
branch = Trie('')
branch._trie = self._trie[branch_id]
return branch
# Iterates over the branches of this trie
def __iter__(self):
return iter(self._trie)
# Same as count_words_in_stream above, but uses the abstract interface
# we just invented.
def abstract_count_words_in_stream(words, stream, word_count = None):
word_count = defaultdict(int) if word_count is None else word_count
def make_listener(branch, prefix):
def listener(next_letter):
if branch.has_branch(next_letter):
next_branch = branch[next_letter]
word = prefix + next_letter
if next_branch.has_branch(END_OF_WORD):
word_count[word] += 1
next_listeners.append(make_listener(next_branch, word))
return listener
start_of_word_listener = make_listener(Trie(words), '')
listeners = [start_of_word_listener]
for letter in stream:
next_listeners = [start_of_word_listener]
for listen in listeners:
listen(letter)
listeners = next_listeners
return word_count
################################################################################
# Some tests of the implementation. These are written in the pytest
# framework.
################################################################################
from pytest import mark
# Testing the specific implementation details. Just to get us going.
@mark.parametrize('words, trie', (
(['one'],
{'o': {'n': {'e': {END_OF_WORD: True}}}}),
('one two'.split(),
{'o': {'n': {'e': {END_OF_WORD: True}}},
't': {'w': {'o': {END_OF_WORD: True}}}}),
('abc abd'.split(),
{'a': {'b': {'c': {END_OF_WORD: True},
'd': {END_OF_WORD: True}}}})
))
def test_make_trie(words, trie):
assert make_trie(words) == trie
count_words_test_data = ('words, stream, expected', (
(['cow'] ,'abcdefg', {}),
(['cow'] ,'cowcowcow', {'cow':3}),
('cow car fish'.split(), 'cowcarfishcarcarfishcow',
{'cow':2, 'car':3, 'fish':2}),
('and hand handy'.split(), 'handyandhand',
{'and':3, 'hand':2, 'handy':1}),
))
@mark.parametrize(*count_words_test_data)
def test_count_words_in_stream(words, stream, expected):
assert count_words_in_stream(words, stream) == expected
################################################################################
# Testing the abstract Trie interface. This will help if we want to
# refactor the implementation in search of something more efficient.
################################################################################
@mark.parametrize('words, absents', (
('one'.split(), 'o on ono'.split()),
('o on one'.split(), []),
('abc abd'.split(), ['ab'])
))
def test_Trie_word_presence(words, absents):
trie = Trie(words)
for word in words:
assert word in trie
for absent in absents:
assert absent not in trie
@mark.parametrize(*count_words_test_data)
def test_abstract_count_words_in_stream(words, stream, expected):
assert abstract_count_words_in_stream(words, stream) == expected
stream = "carracecowtenhihellocohiwcar"
words = ["car", "cow", "hi"]
print { word:stream.count(word) for word in words }
我做到了,并尝试涵盖所有已知的角落案例,如果你能提出一些建议/改进,我将非常感谢,感谢您的帮助,并对最初不完整的问题表示抱歉。
import re
from collections import defaultdict
WORD_COUNTS = defaultdict(int)
WORDS = ["car", "cat", "cow", "hi", "hello"]
MAX_WORD_LEN = len(max(WORDS, key=len))
REGEX = ("|".join(WORDS))
RE_OBJ = re.compile(REGEX)
def count_words(stream):
last_stream_remainder = ""
while True:
data = stream.get_chunk()
# Breaking point
if data is None:
break
if not data:
continue
data = last_stream_remainder + data
for match in RE_OBJ.finditer(data):
WORD_COUNTS[match.group(0)] += 1
# to cover the corner case like remainder from last
# chunk can attach with new one and make a word
if match:
if match.end() >= len(data):
continue
else:
last_match = min((len(data) - match.end()), MAX_WORD_LEN)
last_stream_remainder = data[-last_match:]
else:
last_stream_remainder = data[-MAX_WORD_LEN:]
class StreamReader(object):
STREAM_DATA = ["car1cat1lftrysomecow1shi1iamgoinghello1pleasegoocar2sarehere",
"car3car4car5cat2cat3h", "i2thisishello2hello3he", "", "llo4", None]
def get_chunk(self):
return self.STREAM_DATA.pop(0)
stream = StreamReader()
count_words(stream)
print WORD_COUNTS.items()
# [('car', 5), ('hi', 3), ('hello', 4), ('cow', 1), ('cat', 3)]
这是我的看法。每个字符需要O(k)时间,或者整个流需要O(nk)时间,其中k是单词的长度,n是流的长度;和O(k)空间。
class Solution:
def __init__(self, s):
self.buff, self.count, self.s = '', 0, s
def process(self, a):
self.buff += a
if len(self.buff) > len(self.s):
self.buff = self.buff[1:]
if (self.buff) == self.s:
self.count += 1
以下是一些测试:
solution = Solution('cocoa')
solution.process('c')
solution.process('o')
solution.process('c')
solution.process('o')
assert solution.count == 0
solution.process('c')
solution.process('o')
solution.process('a')
assert solution.count == 1
print('First test passed')
solution.count = 0
solution = Solution('acbcc')
stream = 'acbcbcc'
for a in stream:
solution.process(a)
assert solution.count == 0
print('Second test passed')
我尝试了下面的代码,它对我来说效果很好。使用trie树来解决这个问题。
from collections import defaultdict
from itertools import cycle
def new_trie_branch():
return defaultdict(new_trie_branch)
END_OF_WORD = object()
def make_trie_tree(words):
trie = new_trie_branch()
for word in words:
branch = trie
for letter in word:
branch = branch[letter]
branch[END_OF_WORD] = True
return trie
def count_words_in_stream(words, stream, word_count = None):
word_count = defaultdict(int) if word_count is None else word_count
def make_listener(branch, prefix):
def listener(next_letter):
if next_letter in branch:
next_branch = branch[next_letter]
word = prefix + next_letter
if END_OF_WORD in next_branch:
word_count[word] += 1
next_listeners.append(make_listener(next_branch, word))
return listener
start_of_word_listener = make_listener(make_trie_tree(words), '')
listeners = [start_of_word_listener]
for letter in stream:
next_listeners = [start_of_word_listener]
for listen in listeners:
listen(letter)
listeners = next_listeners
return word_count
stream = "acacathellockword"
words = ['aca','cat','hell','hello','lock','world']
print(dict(count_words_in_stream(words,stream)))
输出:
{'aca': 2, 'cat': 1, 'hell': 1, 'hello': 1, 'lock': 1}