Class PairedDataHelper<P>

java.lang.Object
weka.knowledgeflow.steps.PairedDataHelper<P>
All Implemented Interfaces:
Serializable

public class PairedDataHelper<P> extends Object implements Serializable

A helper class that Step implementations can use when processing paired data (e.g. train and test sets). Has the concept of a primary and secondary connection/data type, where the secondary connection/data for a given set number typically needs to be processed using a result generated from the corresponding primary connection/data. This class takes care of ensuring that the secondary connection/data is only processed once the primary has completed. Users of this helper need to provide an implementation of the PairedProcessor inner interface, where the processPrimary() method will be called to process the primary data/connection (and return a result), and processSecondary() called to deal with the secondary connection/data. The result of execution on a particular primary data set number can be retrieved by calling the getIndexedPrimaryResult() method, passing in the set number of the primary result to retrieve.

This class also provides an arbitrary storage mechanism for additional results beyond the primary type of result. It also takes care of invoking processing() and finished() on the client step's StepManager.

     public class MyFunkyStep extends BaseStep
       implements PairedDataHelper.PairedProcessor {
       ...
       protected PairedDataHelper m_helper;
       ...
       public void stepInit() {
         m_helper = new PairedDataHelper(this, this,
         StepManager.[CON_WHATEVER_YOUR_PRIMARY_CONNECTION_IS],
         StepManager.[CON_WHATEVER_YOUR_SECONDARY_CONNECTION_IS]);
 
         ...
       }
 
       public void processIncoming(Data data) throws WekaException {
         // delegate to our helper to handle primary/secondary synchronization
         // issues
         m_helper.process(data);
       }
 
       public MyFunkyMainResult processPrimary(Integer setNum, Integer maxSetNun,
         Data data, PairedDataHelper helper) throws WekaException {
           SomeDataTypeToProcess someData = data.getPrimaryPayload();
 
           MyFunkyMainResult processor = new MyFunkyMainResult();
           // do some processing using MyFunkyMainResult and SomeDataToProcess
           ...
           // output some data to downstream steps if necessary
           ...
 
           return processor;
       }
 
       public void processSecondary(Integer setNum, Integer maxSetNum, Data data,
         PairedDataHelper helper) throws WekaException {
         SomeDataTypeToProcess someData = data.getPrimaryPayload();
 
         // get the MyFunkyMainResult for this set number
         MyFunkyMainResult result = helper.getIndexedPrimaryResult(setNum);
 
         // do some stuff with the result and the secondary data
         ...
         // output some data to downstream steps if necessary
       }
     }
 
Version:
$Revision: $
Author:
Mark Hall (mhall{[at]}pentaho{[dot]}com)
See Also:
  • Constructor Details

    • PairedDataHelper

      public PairedDataHelper(Step owner, PairedDataHelper.PairedProcessor processor, String primaryConType, String secondaryConType)
      Constructor
      Parameters:
      owner - the owner step
      processor - the PairedProcessor implementation
      primaryConType - the primary connection type
      secondaryConType - the secondary connection type
  • Method Details

    • process

      public void process(Data data) throws WekaException
      Initiate routing and processing for a particular data object
      Parameters:
      data - the data object to process
      Throws:
      WekaException - if a problem occurs
    • getIndexedPrimaryResult

      public P getIndexedPrimaryResult(int index)
      Retrieve the primary result corresponding to a given set number
      Parameters:
      index - the set number of the result to get
      Returns:
      the primary result
    • reset

      public void reset()
      Reset the helper. The helper must be reset between runs if it is being re-used (as opposed to a new helper instance being created).
    • isFinished

      public boolean isFinished()
      Return true if there is no further processing to be done
      Returns:
      true if processing is done
    • createNamedIndexedStore

      public void createNamedIndexedStore(String name)
      Create a indexed store with a given name
      Parameters:
      name - the name of the store to create
    • getIndexedValueFromNamedStore

      public <T> T getIndexedValueFromNamedStore(String storeName, Integer index)
      Gets an indexed value from a named store
      Type Parameters:
      T - the type of the value
      Parameters:
      storeName - the name of the store to retrieve from
      index - the index of the value to get
      Returns:
      the requested value or null if either the store does not exist or the value does not exist in the store.
    • addIndexedValueToNamedStore

      public void addIndexedValueToNamedStore(String storeName, Integer index, Object value)
      Adds a value to a named store with the given index. Creates the named store if it doesn't already exist.
      Parameters:
      storeName - the name of the store to add to
      index - the index to associate with the value
      value - the value to store