我有风暴闪电,
package storm.bolt;
import java.util.Map;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.Text;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
public class AccumuloBolt implements IRichBolt {
private static final long serialVersionUID = 1L;
private OutputCollector collector;
private ZooKeeperInstance instance;
private Connector connector;
private BatchWriter bw;
private Text colf;
private MultiTableBatchWriter mtbw;
private final String instanceName;
private final String zooServers;
private final String userName;
private final String password;
Map<String, Integer> counters;
/**
* @param zooServers The host on which Zookeeper is running.
* @param userName for which Accumula username.
* @param password The Acumula passowrd
* written to.
* String instanceName = "myistance";
* String zooServers = "192.168.1.81:2181";
* String userName = "root";
* String password = "aryadevi";
*/
public AccumuloBolt(String instanceName, String zooServers, String userName,
String password) {
this.instanceName = instanceName;
this.zooServers = zooServers;
this.userName = userName;
this.password = password;
}
public void prepare( Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
try {
//this.instance = new ZooKeeperInstance(instanceName, zooServers);
this.instance = new ZooKeeperInstance("myistance", "192.168.1.81:2181");
//this.connector= instance.getConnector(userName, password);
this.connector= instance.getConnector("root", "aryadevi");
this.mtbw=connector.createMultiTableBatchWriter(200000l, 300, 4);
this.bw=null;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void execute(Tuple input) {
if (shouldActOnInput(input)) {
try{
if (!this.connector.tableOperations().exists("new2"))
this.connector.tableOperations().create("new2");
this.bw = this.mtbw.getBatchWriter("new2");
this.colf=new Text("colfam");
System.out.println("writing ...");
String str = input.getString(0);
if(!counters.containsKey(str)){
counters.put(str, 1);
}else{
Integer c = counters.get(str) + 1;
counters.put(str, c);
}
}catch (Exception e) {
throw new RuntimeException(e);
}
//DBObject updateObj = getDBObjectForInput(input);
//this.bw.addMutation(m);
} else {
collector.ack(input);
}
}
public void cleanup() {
try{
for(Map.Entry<String, Integer> entry : counters.entrySet()){
Mutation m = new Mutation(new Text(String.format("row_%d",entry.getKey() )));
m.put(this.colf, new Text(String.format("colqual_%d", entry.getKey())), new Value((String.format("value_%d", entry.getValue())).getBytes()));
System.out.println(entry.getKey()+": "+entry.getValue());
bw.addMutation(m);
}
this.mtbw.close();
}catch (Exception e) {
throw new RuntimeException(e);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
public boolean shouldActOnInput(Tuple input) {
return true;
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
我只是使用"mvn complete "编译这个表单,并使用mvn package创建一个包然后我只是运行风暴使用以下推荐storm jar target/storm-twitter-0.0.1- snapshot -jar-with-dependencies.jarTwitterStorm 运行此命令后,出现如下错误
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46217 [Thread-8-count] ERROR backtype.storm.util - Async loop died!
java.lang.ExceptionInInitializerError: null
at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.log4j.Logger.getLogger(Logger.java:43) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.accumulo.core.client.ZooKeeperInstance.<clinit>(ZooKeeperInstance.java:63) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
... 8 common frames omitted
46218 [Thread-10-count] ERROR backtype.storm.daemon.executor -
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46218 [Thread-8-count] ERROR backtype.storm.daemon.executor -
java.lang.ExceptionInInitializerError: null
at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.log4j.Logger.getLogger(Logger.java:43) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
at org.apache.accumulo.core.client.ZooKeeperInstance.<clinit>(ZooKeeperInstance.java:63) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting StackOverflowError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[log4j-over-slf4j-1.6.6.jar:1.6.6]
... 8 common frames omitted
46218 [Thread-6] INFO backtype.storm.daemon.executor - Loading executor count:[4 4]
46219 [Thread-6] INFO backtype.storm.daemon.task - Emitting: count __system ["startup"]
46220 [Thread-6] INFO backtype.storm.daemon.executor - Loaded executor tasks count:[4 4]
46224 [Thread-6] INFO backtype.storm.daemon.executor - Finished loading executor count:[4 4]
46224 [Thread-12-count] INFO backtype.storm.daemon.executor - Preparing bolt count:(4)
46225 [Thread-12-count] ERROR backtype.storm.util - Async loop died!
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46226 [Thread-12-count] ERROR backtype.storm.daemon.executor -
java.lang.NoClassDefFoundError: Could not initialize class org.apache.accumulo.core.client.ZooKeeperInstance
at storm.bolt.AccumuloBolt.prepare(AccumuloBolt.java:60) ~[storm-twitter-0.0.1-SNAPSHOT-jar-with-dependencies.jar:na]
at backtype.storm.daemon.executor$fn__5641$fn__5653.invoke(executor.clj:690) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:429) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_55]
46321 [Thread-10-count] INFO backtype.storm.util - Halting process: ("Worker died")
46321 [Thread-8-count] INFO backtype.storm.util - Halting process: ("Worker died")
看起来这个Storm票有一个相关的讨论:https://issues.apache.org/jira/browse/storm - 122
我认为Accumulo有一个slf4j-log4j12依赖,而Storm使用log4j-over-slf4j,这是不兼容的。讨论似乎建议从Accumulo依赖项中排除slf4j-log4j12和log4j等日志依赖项。我不知道这是否有效,但值得一试。