NiFi 1.1.1
我正在尝试使用状态管理器保留一个字节 []。
private byte[] lsnUsedDuringLastLoad;
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
...
...
...
final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}
...
...
...
}
我没有收到任何异常,甚至没有日志错误条目,处理器继续运行。以下加载代码始终为持久字段返回一个空值(检索状态图:{}(:
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());
logger.debug("Retrieved the statemap : "+stateMapProperties);
lastMaxLSN = (stateMapProperties
.get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: stateMapProperties.get(
ProcessorConstants.LAST_MAX_LSN).getBytes();
logger.debug("Attempted to load the previous lsn from NiFi state : "
+ lastMaxLSN);
} catch (IOException ioe) {
logger.error("Couldn't load the state map", ioe);
throw new ProcessException(ioe);
}
我想知道ZK是否有错,或者我在使用州地图时错过了什么!
替换的文档说:
"将组件状态的值更新为新值,当且仅当该值当前与给定的 oldValue 相同时。">
https://github.com/apache/nifi/blob/master/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java#L79-L92
我会建议这样的事情:
if (stateMap.getVersion() == -1) {
stateManager.setState(stateMapProperties, Scope.CLUSTER);
} else {
stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER);
}
第一次检索状态时,版本应该是 -1,因为以前从未存储过任何内容,在这种情况下,您使用 setState,但之后的所有时间都可以使用 replace。
replace()
和返回值背后的想法是,能够对冲突做出反应。同一节点或另一个节点(在群集中(上的另一个任务可能在此期间更改了状态。当replace()
返回false时,你可以对冲突做出反应,整理,可以自动整理的内容,并在无法整理时通知用户。
这是我使用的代码:
/**
* Set or replace key-value pair in status cluster wide. In case of a conflict, it will retry to set the state, when the given
* key does not yet exist in the map. If the key exists and the value is equal to the given value, it does nothing. Otherwise
* it fails and returns false.
*
* @param stateManager that controls state cluster wide.
* @param key of key-value pair to be put in state map.
* @param value of key-value pair to be put in state map.
* @return true, if state map contains the key with a value equal to the given value, probably set by this function.
* False, if a conflict occurred and key-value pair is different.
* @throws IOException if the underlying state mechanism throws exception.
*/
private boolean setState(StateManager stateManager, String key, String value) throws IOException {
boolean somebodyElseUpdatedWithoutConflict = false;
do {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
// While the next two lines run, another thread might change the state.
Map<String,String> map = new HashMap<String, String>(stateMap.toMap()); // Make mutable
String oldValue = map.put(key, value);
if(!stateManager.replace(stateMap, map, Scope.CLUSTER)) {
// Conflict happened. Sort out action to take
if(oldValue == null)
somebodyElseUpdatedWithoutConflict = true; // Different key was changed. Retry
else if(oldValue.equals(value))
break; // Lazy case. Value already set
else
return false; // Unsolvable conflict
}
} while(somebodyElseUpdatedWithoutConflict);
return true;
}
您可以在// Conflict happened...
后用您需要的任何冲突解决方案替换部件。