public abstract class SimpleStreamableFunctionNodeModel extends NodeModel implements StreamableFunctionProducer
ColumnRearranger. Each input row is mapped to an output row.
|Constructor and Description|
Default constructor, defining one data input and one data output port.
Constructor for a node with multiple in or out ports.
|Modifier and Type||Method and Description|
This function is called whenever the derived model should re-configure and generate the expected output specs.
Creates a column rearranger that describes the changes to the input table.
Streaming API (pending): Factory method for a streamable operator that is used to execute this node.
This function is invoked by the
Streaming API (pending): Defines properties on the input ports when used in a streamed and/or distributed fashion.
Streaming API (pending): Similar to
Can the computation of the individual nodes run in parallel? Default is
Load internals into the derived
Override this function in the derived model and reset your
Save internals of the derived
addWarningListener, computeFinalOutputSpecs, configure, continueLoop, createInitialStreamableOperatorInternals, createMergeOperator, execute, finishStreamableExecution, getAvailableFlowVariables, getAvailableInputFlowVariables, getCredentialsProvider, getInHiLiteHandler, getInPortType, getInteractiveNodeView, getLogger, getLoopEndNode, getLoopStartNode, getNrInPorts, getNrOutPorts, getOutHiLiteHandler, getOutPortType, getWarningMessage, iterate, loadValidatedSettingsFrom, notifyViews, notifyWarningListeners, onDispose, peekFlowVariableDouble, peekFlowVariableInt, peekFlowVariableString, pushFlowVariableDouble, pushFlowVariableInt, pushFlowVariableString, removeWarningListener, resetAndConfigureLoopBody, saveSettingsTo, setInHiLiteHandler, setWarningMessage, stateChanged, validateSettings
public SimpleStreamableFunctionNodeModel(PortType inPortTypes, PortType outPortTypes, int streamableInPortIdx, int streamableOutPortIdx)
inPortTypes- in-port types. The ports at the index
streamableInPortIdxMUST be a non-optional
outPortTypes- out-port types.The ports at the index
streamableOutPortIdxMUST be a non-optional
streamableInPortIdx- the index of the port that is streamable. All the others are assumed as neither streamable nor distributable.
streamableOutPortIdx- the index of the port that is streamable. All the others are assumed as neither streamable nor distributable.
protected BufferedDataTable execute(BufferedDataTable inData, ExecutionContext exec) throws Exception
Node#executeNode()method of the node (through the
#executeModel(BufferedDataTable,ExecutionMonitor)method) only after all predecessor nodes have been successfully executed and all data is therefore available at the input ports. Implement this function with your task in the derived model.
The input data is available in the given array argument
inData and is ensured to be neither
null nor contain
null elements (with few non-standard exception, which are described in
more detail in
In order to create output data, you need to create objects of class
BufferedDataTable. Use the
execution context argument to create
inData- An array holding
DataTableelements, one for each input.
exec- The execution monitor for this execute method. It provides us with means to create new
BufferedDataTable. Additionally, it should be asked frequently if the execution should be interrupted and throws an exception then. This exception might me caught, and then after closing all data streams, been thrown again. Also, if you can tell the progress of your task, just set it in this monitor.
nullDataTable elements with the size of the number of outputs. The result of this execution.
Exception- If you must fail the execution. Try to provide a meaningful error message in the exception as it will be displayed to the user.Please be advised to check frequently the canceled status by invoking
ExecutionMonitor#checkCanceledwhich will throw an
CanceledExcecutionExceptionand abort the execution.
protected DataTableSpec configure(DataTableSpec inSpecs) throws InvalidSettingsException
For ordinary(*) nodes the passed DataTableSpec elements are never
null but can be empty. The model
null data table spec(s) for the outputs. Note, after the model has been executed this
function will not be called anymore, as the output DataTableSpecs are then being pulled from the output
DataTables. A derived
NodeModel that cannot provide any DataTableSpecs at its outputs before
execution (because the table structure is unknown at this point) can return an array containing just
null elements. As an example consider a "Transpose" node that flips columns to rows -- there is no
way to determine the table spec at time of configuration as the number of rows (which is the number of new
columns at the output) is unknown though the node is still executable.
(*)For nodes that support optional inputs or may have inactive outputs it's better to override
Implementation note: This method is called from the
NodeModel.configure(PortObjectSpec) method unless that
method is overwritten.
inSpecs- The input data table specs (as many as this model has inputs). Do NOT modify the contents of this array. If no spec is available for any given port (because the port is not connected or the previous node does not produce a spec) the framework will pass an empty
DataTableSpec(no columns) unless the port is marked as
optional(in which case the array element is null).
nullDataTableSpec elements are changed to empty once.
InvalidSettingsException- if the
#configure()failed, that is, the settings are inconsistent with given DataTableSpec elements.
protected boolean isDistributable()
truebut subclasses can enforce sequential access by overwriting this method and returning
public InputPortRole getInputPortRoles()
A data input is streamed when the node implementation only needs to see
each data record once (no iterative access), otherwise it's non-streamed.
If a port is streamed the
StreamableOperator.runFinal(PortInput, PortOutput, ExecutionContext)
method will provide the input as a
RowInput object, to which the
client implementation can safely type-cast to. For non-streamed ports the
input is represented by an instance of
BufferedDataTable) are always non-streamed.
An data input may be distributable (= parallelizable), in which case the data is processed in paralleled (possibly scattered in the cloud). Non-data ports are always non-distributable (but the execution may still take place in a distributed fashion if another port is distributed -- any non-distributable port is then simply duplicated as required).
public OutputPortRole getOutputPortRoles()
NodeModel.getInputPortRoles()describes the role of the output. An output is distributable when the (distributed!) input directly maps to the output without any further merge or reduction step (which is otherwise described by the
NodeModel.createMergeOperator()). Only data outputs can be distributable, any other (model) output is always non-distributable. The input- and output roles define the place where the output data is generated:
StreamableOperator.runFinal(PortInput, PortOutput, ExecutionContext)method. Only one instance of the operator is used.
StreamableOperator.runFinal(PortInput, PortOutput, ExecutionContext)method, too. Note that in this case there are several instances of a
StreamableOperator(either representing different threads in the same JVM or distributed in a compute cluster).
NodeModel.finishStreamableExecution(StreamableOperatorInternals, ExecutionContext, PortOutput)implementation. The client implementation must also overwrite the
NodeModel.createMergeOperator()method. The implementation of
StreamableOperator.runFinal(PortInput, PortOutput, ExecutionContext)must not return or push any result into the
StreamableOperatorinstances and the model after the merge in the NodeModel.
protected abstract ColumnRearranger createColumnRearranger(DataTableSpec spec) throws InvalidSettingsException
InvalidSettingsExceptionif necessary) and then return a customized
spec- The spec of the input table.
InvalidSettingsException- If the settings or the input are invalid.
protected int getStreamableInPortIdx()
public int getStreamableOutPortIdx()
public StreamableFunction createStreamableOperator(PartitionInfo partitionInfo, PortObjectSpec inSpecs) throws InvalidSettingsException
NodeModel.execute(PortObject, ExecutionContext)method. Subclasses may override it to return a new operator that follows the data handling described by the
This method is called by the node executor once or multiple times depending on the input roles. If any input is distributable, the method is called multiple times (for each partition once), possibly on different (remote) clones of this NodeModel.
partitionInfo- The partition info describing the chunk (if distributable).
inSpecs- The port object specs of the input ports. These are identical to the specs that
NodeModel.configure(PortObjectSpec)was last called with (also on the remote side).
InvalidSettingsException- Usually not thrown in the client but still part of the method signature as implementations often run the same methods as during configure. (This method is not being called when configure fails.)
protected void reset()
NodeModel. All components should unregister themselves from any observables (at least from the hilite handler right now). All internally stored data structures should be released. User settings should not be deleted/reset though.
protected void loadInternals(File nodeInternDir, ExecutionMonitor exec) throws IOException, CanceledExecutionException
NodeModel. This method is only called if the
Nodewas executed. Read all your internal structures from the given file directory to create your internal data structure which is necessary to provide all node functionalities after the workflow is loaded, e.g. view content and/or hilite mapping.
nodeInternDir- The directory to read from.
exec- Used to report progress and to cancel the load process.
IOException- If an error occurs during reading from this dir.
CanceledExecutionException- If the loading has been canceled.
protected void saveInternals(File nodeInternDir, ExecutionMonitor exec) throws IOException, CanceledExecutionException
NodeModel. This method is only called if the
Nodeis executed. Write all your internal structures into the given file directory which are necessary to recreate this model when the workflow is loaded, e.g. view content and/or hilite mapping.
nodeInternDir- The directory to write into.
exec- Used to report progress and to cancel the save process.
IOException- If an error occurs during writing to this dir.
CanceledExecutionException- If the saving has been canceled.
KNIME GmbH, Konstanz, Germany
You may not modify, publish, transmit, transfer or sell, reproduce, create derivative works from, distribute, perform, display, or in any way exploit any of the content, in whole or in part, except as otherwise expressly permitted in writing by the copyright owner or as specified in the license file distributed with this product.