这是这个问题的后续问题:用于大列表的Spark FlatMap函数
总结:我想在Java8中编写一个Spark FlatMap函数,它生成与一组dna序列匹配的所有可能的正则表达式。对于巨大的字符串,这是有问题的,因为regex集合不适合内存(一个映射器很容易生成千兆字节的数据)。我明白我必须诉诸于像惰性序列之类的东西,我假设我必须为此使用Stream<String>
。我现在的问题是如何构建这个流。Java Streams - Stream.Builder.
如果我的算法开始生成模式,他们可以"推"到流与accept(String)
方法,但当我尝试了链接中的代码(替换为字符串生成器函数)与一些日志语句之间,我注意到随机字符串生成函数得到执行之前build()
被调用。我不明白所有的随机字符串将如何存储,如果他们不能装进内存。
我必须以不同的方式构建流吗?基本上我想在MapReduce Mapper.map
函数中得到等价的context.write(substring)
。
UPDATE1:不能使用range函数,实际上我正在使用一个结构它在一个后缀树上迭代。
UPDATE2:在请求更完整的实现时,我没有替换接口与实际实现因为实现是非常大的,对于理解这个思想不是必需的。
更完整的问题草图:
我的算法试图发现DNA序列的模式。该算法采用与同一基因相对应的不同生物体序列。假设我在大麦中有基因a,在水稻和其他物种中有相同的基因a然后我比较它们上游序列。我正在寻找的模式类似于正则表达式,例如TGA..GA..GA。探索所有可能的模式,我建立一个广义后缀树从序列。这个树提供了关于不同序列的信息模式发生在。为了将树与搜索算法解耦,我实现了某种迭代器结构:TreeNavigator。它有以下接口:
interface TreeNavigator {
public void jumpTo(char c); //go from pattern p to p+c (c can be a dot from a regex or [AC] for example)
public void backtrack(); //pop the last character
public List<Position> getMatches();
public Pattern trail(); //current pattern p
}
interface SearchSpace {
//degrees of freedom in regex, min and maxlength,...
public boolean inSearchSpace(Pattern p);
public Alphabet getPatternAlphabet();
}
interface ScoreCalculator {
//calculate a score, approximately equal to the number of occurrences of the pattern
public Score calcConservationScore(TreeNavigator t);
}
//Motif algorithm code which is run in the MapReduce Mapper function:
public class DiscoveryAlgorithm {
private Context context; //MapReduce context object to write to disk
private Score minScore;
public void runDiscovery(){
//depth first traveral of pattern space A, AA, AAA,... AAC, ACA, ACC and so fort
exploreSubTree(new TreeNavigator());
}
//branch and bound for pattern space, if pattern occurs too little, stop searching
public boolean survivesBnB(Score s){
return s.compareTo(minScore)>=0;
}
public void exploreSubTree(Navigator nav){
Pattern current = nav.trail();
Score currentScore = ScoreCalculator.calc(nav);
if (!survivesBnB(currentScore)}{
return;
}
if (motif in searchspace)
context.write(pattern);
//iterate over all possible extensions: A,C,G,T, [AC], [AG],... [ACGT]
for (Character c in SearchSpace.getPatternAlphabet()){
nav.jumpTo(c);
exploreSubTree(nav);
nav.backtrack();
}
}
}
完整的MapReduce源代码@ https://github.com/drdwitte/CloudSpeller/相关研究论文:http://www.ncbi.nlm.nih.gov/pubmed/26254488
UPDATE3:我继续阅读关于创建流的方法。从我读到目前为止,我想我必须重写我的runDiscovery()转换为供应商。然后可以将此供应商转换为a流通过StreamSupport类。
下面是对您的需求的一个简单的、懒惰的评估:
public static void main(String[] args) {
String string = "test";
IntStream.range(0, string.length())
.boxed()
.flatMap(start -> IntStream
.rangeClosed(start + 1, string.length())
.mapToObj(stop -> new AbstractMap.SimpleEntry<>(start, stop))
)
.map(e -> string.substring(e.getKey(), e.getValue()))
.forEach(System.out::println);
}
收益率:
t
te
tes
test
e
es
est
s
st
t
解释:
// Generate "start" values between 0 and the length of your string
IntStream.range(0, string.length())
.boxed()
// Combine each "start" value with a "stop" value that is between start + 1 and the length
// of your string
.flatMap(start -> IntStream
.rangeClosed(start + 1, string.length())
.mapToObj(stop -> new AbstractMap.SimpleEntry<>(start, stop))
)
// Convert the "start" / "stop" value tuple to a corresponding substring
.map(e -> string.substring(e.getKey(), e.getValue()))
.forEach(System.out::println);
@LukasEder解决方案的替代方案,我相信,更有效:
IntStream.range(0, string.length())
.mapToObj(start -> IntStream.rangeClosed(start+1, string.length())
.mapToObj(end -> string.substring(start, end)))
.flatMap(Function.identity())
.forEach(System.out::println);
更新作为基准被请求,这里是(Java 8u45, x64,字符串长度10,100,1000):
Benchmark (len) Mode Cnt Score Error Units
SubstringTest.LukasEder 10 avgt 30 1.947 ± 0.012 us/op
SubstringTest.LukasEder 100 avgt 30 151.660 ± 0.524 us/op
SubstringTest.LukasEder 1000 avgt 30 52405.761 ± 183.921 us/op
SubstringTest.TagirValeev 10 avgt 30 1.712 ± 0.018 us/op
SubstringTest.TagirValeev 100 avgt 30 138.179 ± 5.063 us/op
SubstringTest.TagirValeev 1000 avgt 30 48188.499 ± 107.321 us/op
好吧,@LukasEder解决方案只慢了8-13%,这可能并不多。