Class StepManagerImpl

java.lang.Object
weka.knowledgeflow.StepManagerImpl
All Implemented Interfaces:
StepManager

public class StepManagerImpl extends Object implements StepManager
Concrete implementation of the StepManager interface. Has a number of methods, beyond those aimed at Step implementations, that are useful for applications that manipulate Steps and their connections.
Version:
$Revision: $
Author:
Mark Hall (mhall{[at]}pentaho{[dot]}com)
  • Constructor Details

    • StepManagerImpl

      public StepManagerImpl(Step step)
      Constructor
      Parameters:
      step - the Step to manage
  • Method Details

    • getName

      public String getName()
      Get the name of the Step being managed
      Specified by:
      getName in interface StepManager
      Returns:
      the name of the Step being managed
    • getManagedStep

      public Step getManagedStep()
      Get the step managed by this manager
      Specified by:
      getManagedStep in interface StepManager
      Returns:
      the step managed by this manager
    • setManagedStep

      public void setManagedStep(Step step)
      Set the step managed by this manager
      Parameters:
      step - the step to manage
    • setStepIsResourceIntensive

      public void setStepIsResourceIntensive(boolean resourceIntensive)
      Set whether the managed step is resource (cpu/memory) intensive or not
      Specified by:
      setStepIsResourceIntensive in interface StepManager
      Parameters:
      resourceIntensive - true if the managed step is resource intensive
    • stepIsResourceIntensive

      public boolean stepIsResourceIntensive()
      Get whether the managed step is resource (cpu/memory) intensive or not
      Specified by:
      stepIsResourceIntensive in interface StepManager
      Returns:
      true if the step is resource intensive
    • setStepMustRunSingleThreaded

      public void setStepMustRunSingleThreaded(boolean mustRunSingleThreaded)
      Set whether the managed step must run single-threaded. I.e. in an executor service with one worker thread, thus effectively preventing more than one copy of the step from executing at any one point in time
      Specified by:
      setStepMustRunSingleThreaded in interface StepManager
      Parameters:
      mustRunSingleThreaded - true if the managed step must run single-threaded
    • getStepMustRunSingleThreaded

      public boolean getStepMustRunSingleThreaded()
      Get whether the managed step must run single-threaded. I.e. in an executor service with one worker thread, thus effectively preventing more than one copy of the step from executing at any one point in time
      Specified by:
      getStepMustRunSingleThreaded in interface StepManager
      Returns:
      true if the managed step must run single-threaded
    • getStepVisual

      public StepVisual getStepVisual()
      Get the step visual in use (if running in a visual environment)
      Returns:
      the step visual in use
    • setStepVisual

      public void setStepVisual(StepVisual visual)
      Set the step visual to use when running in a graphical environment
      Parameters:
      visual - the step visual to use
    • setStepProperty

      public void setStepProperty(String name, Object value)
      Set a property for this step
      Parameters:
      name - the name of the property
      value - the value of the property
    • getStepProperty

      public Object getStepProperty(String name)
      Get a named property for this step.
      Parameters:
      name - the name of the property to get
      Returns:
      the value of the property or null if the property is not set
    • getExecutionEnvironment

      public ExecutionEnvironment getExecutionEnvironment()
      Get the execution environment the managed step is running in
      Specified by:
      getExecutionEnvironment in interface StepManager
      Returns:
      the execution environment
    • getSettings

      public Settings getSettings()
      Get the current knowledge flow settings
      Specified by:
      getSettings in interface StepManager
      Returns:
      the current knowledge flow settings
      Throws:
      IllegalStateException - if there is no execution environment available
    • getLoggingLevel

      public LoggingLevel getLoggingLevel()
      Get the logging level in use
      Specified by:
      getLoggingLevel in interface StepManager
      Returns:
      the logging level in use
    • setLoggingLevel

      public void setLoggingLevel(LoggingLevel newLevel)
      Set the logging level to use
      Parameters:
      newLevel - the level to use
    • getLog

      public Logger getLog()
      Get the log to use
      Specified by:
      getLog in interface StepManager
      Returns:
      the log in use or null if no log has been set
    • setLog

      public void setLog(Logger log)
      Set the log to use
      Parameters:
      log - the log to use
    • isStepBusy

      public boolean isStepBusy()
      Returns true if, at the current time, the managed step is busy with processing
      Specified by:
      isStepBusy in interface StepManager
      Returns:
      true if the managed step is busy with processing
    • isStopRequested

      public boolean isStopRequested()
      Return true if a stop has been requested by the runtime environment
      Specified by:
      isStopRequested in interface StepManager
      Returns:
      true if a stop has been requested
    • isStepFinished

      public boolean isStepFinished()
      Return true if the current step is finished.
      Specified by:
      isStepFinished in interface StepManager
      Returns:
      true if the current step is finished
    • setStopRequested

      public void setStopRequested(boolean stopRequested)
      Set the status of the stop requested flag
      Parameters:
      stopRequested - true if a stop has been requested
    • processing

      public void processing()
      Started processing. Sets the busy flag to true.
      Specified by:
      processing in interface StepManager
    • finished

      public void finished()
      Finished all processing. Sets the busy flag to false and prints a finished message to the status area of the log.
      Specified by:
      finished in interface StepManager
    • interrupted

      public void interrupted()
      Finished processing due to a stop being requested. Sets the busy flag to false.
      Specified by:
      interrupted in interface StepManager
    • isStreamFinished

      public boolean isStreamFinished(Data data)
      Returns true if this data object marks the end of an incremental stream. Note - does not check that the data object is actually an incremental one of some sort! Just checks to see if the CON_AUX_DATA_INCREMENTAL_STREAM_END flag is set to true or not;
      Specified by:
      isStreamFinished in interface StepManager
      Parameters:
      data - the data element to check
      Returns:
      true if the data element is flagged as end of stream
    • throughputUpdateStart

      public void throughputUpdateStart()
      Clients can use this to record a start point for streaming throughput measuring
      Specified by:
      throughputUpdateStart in interface StepManager
    • throughputUpdateEnd

      public void throughputUpdateEnd()
      Clients can use this to record a stop point for streaming throughput measuring
      Specified by:
      throughputUpdateEnd in interface StepManager
    • throughputFinished

      public void throughputFinished(Data... data) throws WekaException
      Clients can use this to indicate that throughput measuring is finished (i.e. the stream being processed has ended). Final throughput information is printed to the log and status
      Specified by:
      throughputFinished in interface StepManager
      Parameters:
      data - one or more Data events (with appropriate connection type set) to pass on to downstream connected steps. These are used to carry any final data and to inform the downstream step(s) that the stream has ended
      Throws:
      WekaException - if a problem occurs
    • disconnectStepWithConnection

      public void disconnectStepWithConnection(Step toDisconnect, String connType)
      Disconnect the supplied step under the associated connection type from both the incoming and outgoing connections for the step managed by this manager. Does nothing if this step does not have any connections to the supplied step, or does not have connections to the supplied step of the required type.
      Parameters:
      toDisconnect - the step to disconnect
      connType - the connection type to disconnect
    • disconnectStep

      public void disconnectStep(Step toDisconnect)
      Remove the supplied step from connections (both incoming and outgoing of all types) for the step managed by this manager. Does nothing if the this step does not have any connections to the supplied step
      Parameters:
      toDisconnect - the step to disconnect
    • clearAllConnections

      public void clearAllConnections()
      Clear all connections to/from the step managed by this manager. Also makes sure that all directly connected upstream and downstream steps remove their respective outgoing and incoming connections to this step
    • addIncomingConnection

      public void addIncomingConnection(String connectionName, StepManagerImpl step)
      Add an incoming connection (comprising of the type of connection and associated step component) to this step of the specified type
      Parameters:
      connectionName - the name of the type of connection to add
      step - the source step component that is connecting with given connection type
    • removeIncomingConnection

      public void removeIncomingConnection(String connectionName, StepManagerImpl step)
      Remove an incoming connection to this step of the specified type
      Parameters:
      connectionName - the name of the type of connection to remove
      step - the source step component associated with the given connection type
    • addOutgoingConnection

      public boolean addOutgoingConnection(String connectionName, StepManagerImpl step)
      Add an outgoing connection (comprising of the type of connection and associated target step) to this step of the specified type. Connection is only made if the target step will accept the connection type at this time
      Parameters:
      connectionName - the name of the type of connection to add
      step - the target step component that is receiving the given connection type it can't accept the connection at the present time
      Returns:
      true if the connection was successful
    • addOutgoingConnection

      public boolean addOutgoingConnection(String connectionName, StepManagerImpl step, boolean force)
      Add an outgoing connection (comprising of the type of connection and associated target step) to this step of the specified type. Connection is only made if the target step will accept the connection type at this time
      Parameters:
      connectionName - the name of the type of connection to add
      step - the target step component that is receiving the given connection type
      force - whether to force the connection, even if the target step says it can't accept the connection at the present time
      Returns:
      true if the connection was successful
    • removeOutgoingConnection

      public void removeOutgoingConnection(String connectionName, StepManagerImpl step)
      Remove an outgoing connection from this step of the specified type
      Parameters:
      connectionName - the name of the type of connection to remove
      step - the target step component associated with the given connection type
    • getIncomingConnectedStepsOfConnectionType

      public List<StepManager> getIncomingConnectedStepsOfConnectionType(String connectionName)
      Get a list of steps providing incoming connections of the specified type
      Specified by:
      getIncomingConnectedStepsOfConnectionType in interface StepManager
      Parameters:
      connectionName - the type of connection being received by this step
      Returns:
      a list of connected steps
    • getOutgoingConnectedStepsOfConnectionType

      public List<StepManager> getOutgoingConnectedStepsOfConnectionType(String connectionName)
      Description copied from interface: StepManager
      Get a list of downstream steps connected to this step with the given connection type.
      Specified by:
      getOutgoingConnectedStepsOfConnectionType in interface StepManager
      Parameters:
      connectionName - the name of the outgoing connection
      Returns:
      a list of downstream steps connected to this one with the named connection type
    • getIncomingConnectedStepWithName

      public StepManager getIncomingConnectedStepWithName(String stepName)
      Get a named step connected to this step with an incoming connection
      Specified by:
      getIncomingConnectedStepWithName in interface StepManager
      Parameters:
      stepName - the name of the step to look for
      Returns:
      the connected step
    • getOutgoingConnectedStepWithName

      public StepManager getOutgoingConnectedStepWithName(String stepName)
      Get a named step connected to this step with an outgoing connection
      Specified by:
      getOutgoingConnectedStepWithName in interface StepManager
      Parameters:
      stepName - the name of the step to look for
      Returns:
      the connected step
    • getOutgoingConnections

      public Map<String,List<StepManager>> getOutgoingConnections()
      Get the map of downstream (outgoing connections) connected steps
      Specified by:
      getOutgoingConnections in interface StepManager
      Returns:
      the map of downstream connected steps
    • getIncomingConnections

      public Map<String,List<StepManager>> getIncomingConnections()
      Get the man of upstream (incoming connections) connected steps
      Specified by:
      getIncomingConnections in interface StepManager
      Returns:
      the map of upstream connected steps
    • addStepOutputListener

      public void addStepOutputListener(StepOutputListener listener, String outputConnectionName)
      Register non-step third party to receive data from the managed step for the specified outgoing connection type. Output listeners are not serialized into the JSON flow when flows are saved.
      Parameters:
      listener - the output listener to register
      outputConnectionName - the name of the connection type
    • removeStepOutputListener

      public void removeStepOutputListener(StepOutputListener listener, String outputConnectionName)
      De-register non-step third party from receiving data from the managed step
      Parameters:
      listener - the output listener to de-register
      outputConnectionName - the name of the connection type the listener is registered against
    • clearAllStepOutputListeners

      public void clearAllStepOutputListeners()
      Clear all registered StepOutputListeners
    • clearStepOutputListeners

      public void clearStepOutputListeners(String outputConnectionName)
      Clear all the StepOutputListeners that are registered to receive the supplied connection type.
      Parameters:
      outputConnectionName - type of the connection to clear the listeners for
    • outputData

      public void outputData(String outgoingConnectionName, Data data) throws WekaException
      Output a Data object to all downstream connected Steps that are connected with the supplied connection name. Sets the connection type on the supplied Data object to the supplied connection name. Also notifies any registered StepOutputListeners.
      Specified by:
      outputData in interface StepManager
      Parameters:
      outgoingConnectionName - the type of the outgoing connection to send data to
      data - a single Data object to send
      Throws:
      WekaException
    • outputData

      public void outputData(Data... data) throws WekaException
      Output one or more Data objects to all relevant steps. Populates the source in each Data object for the client, HOWEVER, the client must have populated the connection type in each Data object to be output so that the StepManager knows which connected steps to send the data to. Also notifies any registered StepOutputListeners. Note that the downstream step(s)' processIncoming() method is called in a separate thread for batch connections. Furthermore, if multiple Data objects are supplied via the varargs argument, and a target step will receive more than one of the Data objects, then they will be passed on to the step in question sequentially within the same thread of execution.
      Specified by:
      outputData in interface StepManager
      Parameters:
      data - one or more Data objects to be sent
      Throws:
      WekaException - if a problem occurs
    • outputData

      public void outputData(String outgoingConnectionName, String stepName, Data data) throws WekaException
      Outputs the supplied Data object to the named Step. Does nothing if the named step is not connected immediately downstream of this Step. Sets the supplied connection name on the Data object. Also notifies any StepOutputListeners.
      Specified by:
      outputData in interface StepManager
      Parameters:
      outgoingConnectionName - the name of the outgoing connection
      stepName - the name of the step to send the data to
      data - the data to send
      Throws:
      WekaException
    • getStepOutgoingConnectionTypes

      public List<String> getStepOutgoingConnectionTypes()
      Used by the rendering routine in LayoutPanel to ensure that connections downstream from a deleted connection get rendered in grey rather than red.
      Returns:
      a list of outgoing connection types that the managed step can produce (adjusted to take into account any upstream broken connections)
    • numIncomingConnections

      public int numIncomingConnections()
      Get the number of incoming connections to the managed step
      Specified by:
      numIncomingConnections in interface StepManager
      Returns:
      the number of incoming connections
    • numIncomingConnectionsOfType

      public int numIncomingConnectionsOfType(String connectionName)
      Get the number of incoming connections to the managed step of a given type
      Specified by:
      numIncomingConnectionsOfType in interface StepManager
      Parameters:
      connectionName - the name of the connection type
      Returns:
      the number of incoming connections of this type
    • numOutgoingConnections

      public int numOutgoingConnections()
      Get the number of outgoing connections from the managed step
      Specified by:
      numOutgoingConnections in interface StepManager
      Returns:
      the number of incoming connections
    • numOutgoingConnectionsOfType

      public int numOutgoingConnectionsOfType(String connectionName)
      Get the number of outgoing connections from the managed step of a given type
      Specified by:
      numOutgoingConnectionsOfType in interface StepManager
      Parameters:
      connectionName - the name of the connection type
      Returns:
      the number of outgoing connections of this type
    • getIncomingStructureForConnectionType

      public Instances getIncomingStructureForConnectionType(String connectionName) throws WekaException
      Attempt to get the incoming structure (as a header-only set of instances) for the named incoming connection type. Assumes that there is only one incoming connection of the named type. If there are zero, or more than one, then null is returned
      Specified by:
      getIncomingStructureForConnectionType in interface StepManager
      Parameters:
      connectionName - the name of the incoming connection to get the structure for
      Returns:
      the structure as a header-only set of instances or null if there are zero or more than one upstream connected steps producing the named connection, or if the upstream step can't tell us the structure, or if the upstream step can't represent the structure of the connection type as a set of instances.
      Throws:
      WekaException - if a problem occurs
    • getIncomingStructureForConnectionType

      public Instances getIncomingStructureForConnectionType(String connectionName, Environment env) throws WekaException
      Attempt to retrieve the structure (as a header-only set of instances) for the named incoming connection type. Assumes that there is only one step connected with the supplied incoming connection type.
      Specified by:
      getIncomingStructureForConnectionType in interface StepManager
      Parameters:
      connectionName - the type of the incoming connection to get the structure for
      env - the Environment to use
      Returns:
      the structure of the data for the specified incoming connection, or null if the structure can't be determined (or represented as an Instances object)
      Throws:
      WekaException - if a problem occurs
    • getIncomingStructureFromStep

      public Instances getIncomingStructureFromStep(StepManager sourceStep, String connectionName) throws WekaException
      Attempt to get the incoming structure (as a header-only set of instances) from the given managed step for the given connection type.
      Specified by:
      getIncomingStructureFromStep in interface StepManager
      Parameters:
      sourceStep - the step manager managing the source step
      connectionName - the name of the connection to attempt to get the structure for
      Returns:
      the structure as a header-only set of instances, or null if the source step can't determine this at present or if it can't be represented as a set of instances.
      Throws:
      WekaException - if a problem occurs
    • logLow

      public void logLow(String message)
      Log a message at the low logging level
      Specified by:
      logLow in interface StepManager
      Parameters:
      message - the message to log
    • logBasic

      public void logBasic(String message)
      Log a message at the basic logging level
      Specified by:
      logBasic in interface StepManager
      Parameters:
      message - the message to log
    • logDetailed

      public void logDetailed(String message)
      Log a message at the detailed logging level
      Specified by:
      logDetailed in interface StepManager
      Parameters:
      message - the message to log
    • logDebug

      public void logDebug(String message)
      Log a message at the debugging logging level
      Specified by:
      logDebug in interface StepManager
      Parameters:
      message - the message to log
    • logWarning

      public void logWarning(String message)
      Log a warning message
      Specified by:
      logWarning in interface StepManager
      Parameters:
      message - the message to log
    • logError

      public void logError(String message, Throwable cause)
      Log an error
      Specified by:
      logError in interface StepManager
      Parameters:
      message - the message to log
      cause - the optional Throwable to log
    • statusMessage

      public void statusMessage(String message)
      Output a status message to the status area of the log
      Specified by:
      statusMessage in interface StepManager
      Parameters:
      message - the message to output
    • log

      public void log(String message, LoggingLevel level)
      Log a message at the supplied logging level
      Specified by:
      log in interface StepManager
      Parameters:
      message - the message to write
      level - the level for the message
    • environmentSubstitute

      public String environmentSubstitute(String source)
      Substitute the values of environment variables in the given string
      Specified by:
      environmentSubstitute in interface StepManager
      Parameters:
      source - the source string to substitute in
      Returns:
      the source string with all known environment variables resolved
    • getInfoStep

      public Step getInfoStep(Class stepClass) throws WekaException
      Returns a reference to the step being managed if it has one or more outgoing CON_INFO connections and the managed step is of the supplied class
      Specified by:
      getInfoStep in interface StepManager
      Parameters:
      stepClass - the expected class of the step
      Returns:
      the step being managed if outgoing CON_INFO connections are present and the step is of the supplied class
      Throws:
      WekaException - if there are no outgoing CON_INFO connections or the managed step is the wrong type
    • getInfoStep

      public Step getInfoStep() throws WekaException
      Returns a reference to the step being managed if it has one or more outgoing CON_INFO connections.
      Specified by:
      getInfoStep in interface StepManager
      Returns:
      the step being managed if outgoing CON_INFO connections are present
      Throws:
      WekaException - if there are no outgoing CON_INFO connections
    • findStepInFlow

      public StepManager findStepInFlow(String stepNameToFind)
      Finds a named step in the current flow. Returns null if the named step is not present in the flow
      Specified by:
      findStepInFlow in interface StepManager
      Parameters:
      stepNameToFind - the name of the step to find
      Returns:
      the StepManager of the named step, or null if the step does not exist in the current flow.
    • stepStatusMessagePrefix

      public String stepStatusMessagePrefix()
      Gets a prefix for the step managed by this manager. Used to uniquely identify steps in the status area of the log
      Returns:
      a unique prefix for the step managed by this manager