状态管理器未保留/检索数据



NiFi 1.1.1

我正在尝试使用状态管理器保留一个字节 []。

private byte[] lsnUsedDuringLastLoad;

@Override
    public void onTrigger(final ProcessContext context,
            final ProcessSession session) throws ProcessException {
...
...
...
​final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}
...
...
...
}

我没有收到任何异常,甚至没有日志错误条目,处理器继续运行。以下加载代码始终为持久字段返回一个空值(检索状态图:{}(:

try {
                    stateMap = stateManager.getState(Scope.CLUSTER);
                    stateMapProperties = new HashMap<>(stateMap.toMap());
                    logger.debug("Retrieved the statemap : "+stateMapProperties);

                    lastMaxLSN = (stateMapProperties
                            .get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
                            .get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
                            : stateMapProperties.get(
                                    ProcessorConstants.LAST_MAX_LSN).getBytes();

                    logger.debug("Attempted to load the previous lsn from NiFi state : "
                            + lastMaxLSN);
                } catch (IOException ioe) {
                    logger.error("Couldn't load the state map", ioe);
                    throw new ProcessException(ioe);
                }

我想知道ZK是否有错,或者我在使用州地图时错过了什么!

替换的文档说:

"将组件状态的值更新为新值,当且仅当该值当前与给定的 oldValue 相同时。">

https://github.com/apache/nifi/blob/master/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java#L79-L92

我会建议这样的事情:

if (stateMap.getVersion() == -1) {
  stateManager.setState(stateMapProperties, Scope.CLUSTER);
} else {
  stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER);
}

第一次检索状态时,版本应该是 -1,因为以前从未存储过任何内容,在这种情况下,您使用 setState,但之后的所有时间都可以使用 replace。

replace()和返回值背后的想法是,能够对冲突做出反应。同一节点或另一个节点(在群集中(上的另一个任务可能在此期间更改了状态。当replace()返回false时,你可以对冲突做出反应,整理,可以自动整理的内容,并在无法整理时通知用户。

这是我使用的代码:

/**
 * Set or replace key-value pair in status cluster wide. In case of a conflict, it will retry to set the state, when the given
 * key does not yet exist in the map. If the key exists and the value is equal to the given value, it does nothing. Otherwise
 * it fails and returns false.
 * 
 * @param stateManager that controls state cluster wide.
 * @param key of key-value pair to be put in state map.
 * @param value of key-value pair to be put in state map.
 * @return true, if state map contains the key with a value equal to the given value, probably set by this function.
 *      False, if a conflict occurred and key-value pair is different. 
 * @throws IOException if the underlying state mechanism throws exception.
 */
private boolean setState(StateManager stateManager, String key, String value) throws IOException {
        boolean somebodyElseUpdatedWithoutConflict = false;
        do {
            StateMap stateMap = stateManager.getState(Scope.CLUSTER);
            // While the next two lines run, another thread might change the state.
            Map<String,String> map = new HashMap<String, String>(stateMap.toMap()); // Make mutable
            String oldValue = map.put(key, value);
            if(!stateManager.replace(stateMap, map, Scope.CLUSTER)) {
                // Conflict happened. Sort out action to take
                if(oldValue == null)
                    somebodyElseUpdatedWithoutConflict = true; // Different key was changed. Retry
                else if(oldValue.equals(value))
                    break; // Lazy case. Value already set
                else
                    return false; // Unsolvable conflict
            }
        } while(somebodyElseUpdatedWithoutConflict);
        return true;
}

您可以在// Conflict happened...后用您需要的任何冲突解决方案替换部件。

最新更新