Class ProcessingContainer_Impl

All Implemented Interfaces:
Runnable, CasProcessorController, RunnableContainer, ConfigurableResource, Resource

public class ProcessingContainer_Impl extends ProcessingContainer implements RunnableContainer
Manages a pool of CasProcessor instances. Provides access to CasProcessor instance to Processing Thread. Processing threads check out an instance of Cas Processor and when done invoking its process() method return it back to pool. The container aggregates counts and totals on behalf of all instances of Cas Processor. It also manages error and restart thresholds for Cas Processors as a group. Errors are aggregated for all instances of Cas Processor as a group NOT individually. The container takes appropriate actions when threshold are exceeded. What action is taken depends on declaritive specification in the cpe descriptor.
  • Field Details

    • CONTAINER_SLEEP_TIME

      private static final int CONTAINER_SLEEP_TIME
      See Also:
    • casProcessorStatus

      private int casProcessorStatus
    • configParams

      private ConfigurationParameterSettings configParams
    • isLocal

      private boolean isLocal
    • isRemote

      private boolean isRemote
    • isIntegrated

      private boolean isIntegrated
    • batchCounter

      private long batchCounter
    • errorCounter

      private int errorCounter
    • sampleCounter

      private long sampleCounter
    • failureThresholdSample

      private long failureThresholdSample
    • configuredErrorRate

      private int configuredErrorRate
    • batchSize

      private int batchSize
    • processed

      private long processed
    • restartCount

      private int restartCount
    • casProcessorCPEConfiguration

      private CasProcessorConfiguration casProcessorCPEConfiguration
    • bytesIn

      private long bytesIn
    • bytesOut

      private long bytesOut
    • retryCount

      private int retryCount
    • abortCount

      private int abortCount
    • filteredCount

      private int filteredCount
    • remaining

      private long remaining
    • processedEntityIds

      private Stack processedEntityIds
    • totalTime

      private long totalTime
    • filterList

      private LinkedList filterList
    • logPath

      private String logPath
    • logger

      private Logger logger
    • initialized

      private boolean initialized
    • lastCas

      private Object lastCas
    • casProcessorPool

      public ServiceProxyPool casProcessorPool
    • casPDeployer

      private CasProcessorDeployer casPDeployer
    • metadata

      private ProcessingResourceMetaData metadata
    • statMap

      private HashMap statMap
    • isPaused

      private boolean isPaused
    • singleFencedInstance

      private boolean singleFencedInstance
    • lockForIsPaused

      private final Object lockForIsPaused
    • processorName

      private String processorName
    • fetchTime

      private long fetchTime
    • failedCasProcessorList

      public LinkedList failedCasProcessorList
  • Constructor Details

  • Method Details

    • getMetadata

      public ProcessingResourceMetaData getMetadata()
      Returns component's input/output capabilities
    • setMetadata

      public void setMetadata(ProcessingResourceMetaData aMetadata)
      Sets component's input/output capabilities
      Specified by:
      setMetadata in class ProcessingContainer
      Parameters:
      aMetadata - component capabilities
    • setCasProcessorDeployer

      public void setCasProcessorDeployer(CasProcessorDeployer aDeployer)
      Plug in deployer object used to launch/deploy the CasProcessor instance. Used for restarts.
      Specified by:
      setCasProcessorDeployer in class ProcessingContainer
      Parameters:
      aDeployer - - object responsible for deploying/launching CasProcessor
    • getDeployer

      public CasProcessorDeployer getDeployer()
      Returns deployer object used to launch the CasProcessor
      Specified by:
      getDeployer in class ProcessingContainer
      Returns:
      - CasProcessorDeployer - deployer object
    • deployLogger

      private void deployLogger()
      Deploy Container specific logger to capture any exceptions occuring during CasProcessor(s) lifespan. Each CasProcessor may have its own log. It is optional and available when there is a parameter called 'containerLogPath' in the section of the cpe descriptor. If
    • logAbortedCases

      public void logAbortedCases(Object[] abortedCasList)
      Logs Cas'es that could not be processed.
      Specified by:
      logAbortedCases in class ProcessingContainer
      Parameters:
      abortedCasList - - an arrar of Cas'es that could not be processed by this CasProcessor
    • resetSampleCounter

      private void resetSampleCounter()
      Resets sample counter. This counter is used to determine acceptable error rates associated with hosted CasProcessor. Error rates are measured based on error rate in a given sample: 3 per 1000 for example, where 3 is an error rate and 1000 is a sample size.
    • resetBatchCounter

      private void resetBatchCounter()
      Resets batch counter. This counter is used to determine if the hosted CasProcessor should do special processing. A CasProcessor may buffer all Cas's in memory and only when its batch size is reached, it does something usefull with them, like save them to a file, index them, etc
    • getBytesIn

      public long getBytesIn()
      Returns total number of bytes ingested so far by all CasProcessor instances managed by this container.
      Specified by:
      getBytesIn in class ProcessingContainer
      Returns:
      - bytes processed
    • addBytesIn

      public void addBytesIn(long aBytesIn)
      Aggregate total bytes ingested by the CasProcessor.
      Specified by:
      addBytesIn in class ProcessingContainer
      Parameters:
      aBytesIn - - number of ingested bytes
    • getBytesOut

      public long getBytesOut()
      Returns total number of bytes processed so far by all CasProcessor instances managed by this container.
      Specified by:
      getBytesOut in class ProcessingContainer
      Returns:
      - bytes processed
    • addBytesOut

      public void addBytesOut(long aBytesOut)
      Aggregate total bytes processed by this CasProcessor
      Specified by:
      addBytesOut in class ProcessingContainer
    • incrementRestartCount

      public void incrementRestartCount(int aCount)
      Increment number of times the casProcessor was restarted due to failures
      Specified by:
      incrementRestartCount in class ProcessingContainer
      Parameters:
      aCount - - restart count
    • getRestartCount

      public int getRestartCount()
      Returns total number of all CasProcessor restarts.
      Specified by:
      getRestartCount in class ProcessingContainer
      Returns:
      number of restarts
    • incrementRetryCount

      public void incrementRetryCount(int aCount)
      Increments number of times CasProceesor failed analyzing Cas'es due to timeout or some other problems
      Specified by:
      incrementRetryCount in class ProcessingContainer
      Parameters:
      aCount - - failure count
    • getRetryCount

      public int getRetryCount()
      Return the up todate number of retries recorded by the container.
      Specified by:
      getRetryCount in class ProcessingContainer
      Returns:
      - retry count
    • incrementAbortCount

      public void incrementAbortCount(int aCount)
      Increment number of aborted Cas'es due to inability to process the Cas
      Specified by:
      incrementAbortCount in class ProcessingContainer
      Parameters:
      aCount - - number of aborts while processing Cas'es
    • getAbortCount

      public int getAbortCount()
      Return the up todate number of aborts recorded by the container
      Specified by:
      getAbortCount in class ProcessingContainer
      Returns:
      - number of failed attempts to analyze CAS'es
    • incrementFilteredCount

      public void incrementFilteredCount(int aCount)
      Increments number of CAS'es filtered by the CasProcessor. Filtered CAS'es dont contain required features. Features that are required by the Cas Processor to perform analysis. Dependant feateurs are defined in the filter expression in the CPE descriptor
      Specified by:
      incrementFilteredCount in class ProcessingContainer
      Parameters:
      aCount - - number of filtered Cas'es
    • getFilteredCount

      public int getFilteredCount()
      Returns number of filtered Cas'es
      Specified by:
      getFilteredCount in class ProcessingContainer
      Returns:
      # of filtered Cas'es
    • getRemaining

      public long getRemaining()
      Returns number of entities still to be processed by the CasProcessor It is a delta of total number of entities to be processed by the CPE minus number of entities processed so far.
      Specified by:
      getRemaining in class ProcessingContainer
      Returns:
      Number of entities yet to be processed
    • setRemaining

      public void setRemaining(long aRemainingCount)
      Copies number of entities the CasProcessor has yet to process.
      Specified by:
      setRemaining in class ProcessingContainer
      Parameters:
      aRemainingCount - - number of entities to process
    • setLastProcessedEntityId

      public void setLastProcessedEntityId(String aEntityId)
      Copies id of the last entity processed by the CasProcessor
      Specified by:
      setLastProcessedEntityId in class ProcessingContainer
      Parameters:
      aEntityId - - id of the entity
    • getLastProcessedEntityId

      public String getLastProcessedEntityId()
      Returns id of the last entity processed by the CasProcessor
      Specified by:
      getLastProcessedEntityId in class ProcessingContainer
      Returns:
      - id of entity
    • setLastCas

      @Deprecated public void setLastCas(Object aCasObject)
      Deprecated.
      Copies the last Cas Processed
      Specified by:
      setLastCas in class ProcessingContainer
    • getLastCas

      @Deprecated public Object getLastCas()
      Deprecated.
      Returns the last Cas processed
      Specified by:
      getLastCas in class ProcessingContainer
    • incrementProcessed

      public void incrementProcessed(int aIncrement)
    • setProcessed

      public void setProcessed(long aProcessedCount)
      Used when recovering from checkpoint, sets the total number of entities before CPE stopped.
      Specified by:
      setProcessed in class ProcessingContainer
      Parameters:
      aProcessedCount - - number of entities processed before CPE stopped
    • getProcessed

      public long getProcessed()
      Returns number of entities processed so far.
      Specified by:
      getProcessed in class ProcessingContainer
      Returns:
      - processed - number of entities processed
    • resetErrorCounter

      private void resetErrorCounter()
      Re-initializes the error counter
    • resetRestartCount

      public void resetRestartCount()
      Specified by:
      resetRestartCount in class ProcessingContainer
    • incrementTotalTime

      public void incrementTotalTime(long aTime)
      Increments total time spend in the process() method of the CasProcessor
      Specified by:
      incrementTotalTime in class ProcessingContainer
      Parameters:
      aTime - - total time in process()
    • getTotalTime

      public long getTotalTime()
      Returns total time spent in process()
      Specified by:
      getTotalTime in class ProcessingContainer
      Returns:
      - number of millis spent in process()
    • abortCPMOnError

      public boolean abortCPMOnError()
      Returns true if maximum threshold for errors has been exceeded and the CasProcessor is configured to force CPE shutdown. It looks at the value of the action attribute of the element in the cpe descriptor.
      Specified by:
      abortCPMOnError in class ProcessingContainer
      Returns:
      - true if the CPE should stop processing, false otherwise
    • isTimeout

      private boolean isTimeout(Throwable aThrowable)
      Returns true if the Exception cause is SocketTimeoutException
      Parameters:
      aThrowable - - Exception to check for SocketTimeoutException
      Returns:
      - true if Socket Timeout, false otherwise
    • incrementCasProcessorErrors

      public void incrementCasProcessorErrors(Throwable aThrowable) throws Exception
      This routine determines what to do with an exception thrown during the CasProcessor processing. It interprets given exception and throws a new one according to configuration specified in the CPE descriptor. It examines provided thresholds and determines if the CPE should continue to run, if it should disable the CasProcessor (and all its instances), or disregard the error and continue.
      Specified by:
      incrementCasProcessorErrors in class ProcessingContainer
      Parameters:
      aThrowable - - exception to examine
      Throws:
      AbortCPMException - - force the CPE to stop processing
      AbortCasProcessorException - - disables all instances of CasProcessor in this container
      ServiceConnectionException - - forces the restart/relauch of the failed CasProcessor
      SkipCasException - - disregard error, skip bad Cas'es and continue with the next Cas bundle
      Exception
    • isEndOfBatch

      public boolean isEndOfBatch(CasProcessor aCasProcessor, int aProcessedSize) throws ResourceProcessException, IOException
      Specified by:
      isEndOfBatch in class ProcessingContainer
      Throws:
      ResourceProcessException
      IOException
    • processCas

      public boolean processCas(Object[] aCasList)
      Returns true if the Cas bundles should be processed by the CasProcessor. This routine checks for existance of dependent featues defined in the filter expression defined for the CasProcessor in the cpe descriptor. Currently this is done on per bundle basis. Meaning that all Cas'es must contain required features. If even one Cas does not have them, the entire bundle is skipped.
      Specified by:
      processCas in class ProcessingContainer
      Parameters:
      aCasList - - bundle containing instances of CAS
    • hasFeature

      private boolean hasFeature(CasData aCas)
      Used during filtering, determines if a given Cas has a required feature. Required featured are defined in the filter. Filtering is optional and if not present in the cpe descriptor this routine always returns true.
      Parameters:
      aCas - - Cas instance to check
      Returns:
      - true if feature is in the Cas, false otherwise
    • processCas

      private boolean processCas(Object aCas)
      Checks if a given Cas has required features.
      Parameters:
      aCas - - Cas instance to check
      Returns:
      - true if feature is in the Cas, false otherwise
    • getCasProcessorConfiguration

      public CasProcessorConfiguration getCasProcessorConfiguration()
      Returns CasProcessor configuration object. This object represents xml configuration defined in the section of the cpe descriptor.
      Specified by:
      getCasProcessorConfiguration in class ProcessingContainer
      Returns:
      CasProcessorConfiguration instance
    • start

      @Deprecated public void start()
      Deprecated.
      Description copied from interface: RunnableContainer
      Starts the container
      Specified by:
      start in interface RunnableContainer
    • stop

      @Deprecated public void stop()
      Deprecated.
      Description copied from interface: RunnableContainer
      Stops the container
      Specified by:
      stop in interface RunnableContainer
    • getCasProcessor

      public CasProcessor getCasProcessor()
      Returns available instance of the CasProcessor from the instance pool. It will wait indefinitely until an instance is available.
      Specified by:
      getCasProcessor in interface CasProcessorController
      Returns:
      CasProcessor
    • releaseCasProcessor

      public void releaseCasProcessor(CasProcessor aCasProcessor)
      Returns a given casProcessor instance back to the pool.
      Specified by:
      releaseCasProcessor in class ProcessingContainer
      Parameters:
      aCasProcessor - - an instance of CasProcessor to return back to the pool
      See Also:
    • getStatus

      public int getStatus()
      Returns the current status of the CasProcessor
      Specified by:
      getStatus in interface CasProcessorController
      Returns:
      int status
    • setStatus

      public void setStatus(int aStatus)
      Changes the status of the CasProcessor as a group
      Specified by:
      setStatus in interface CasProcessorController
      Parameters:
      aStatus - - new status
    • isLocal

      @Deprecated public boolean isLocal()
      Deprecated.
      Description copied from interface: CasProcessorController
      Returns true if this is a Locally Deployed CasProcessor ( Same machine, different JVM )
      Specified by:
      isLocal in interface CasProcessorController
      Returns:
      true if Local, false otherwise
    • isRemote

      @Deprecated public boolean isRemote()
      Deprecated.
      Description copied from interface: CasProcessorController
      Returns true if this is a Remotely Deployed CasProcessor
      Specified by:
      isRemote in interface CasProcessorController
      Returns:
      true if Remote, false otherwise
    • isIntegrated

      @Deprecated public boolean isIntegrated()
      Deprecated.
      Description copied from interface: CasProcessorController
      Returns true if this is a Integrated CasProcessor
      Specified by:
      isIntegrated in interface CasProcessorController
      Returns:
      true if Integrated, false otherwise
    • continueOnError

      private boolean continueOnError()
      Returns true if the CasProcessor has been configured to continue despite error
      Returns:
      - true if ignoring errors, false otherwise
    • isAbortable

      public boolean isAbortable()
      Determines if instances of CasProcessor managed by this container are abortable. Abortable CasProcessor's action attribute in the element has a value of 'disable'.
      Specified by:
      isAbortable in interface CasProcessorController
      Returns:
      true if CasProcessor can be disabled
    • initialize

      public boolean initialize(ResourceSpecifier aSpecifier, Map aAdditionalParams) throws ResourceInitializationException
      Description copied from interface: Resource
      Initializes this Resource from a ResourceSpecifier. Applications do not need to call this method. It is called automatically by the ResourceFactory and cannot be called a second time.
      Specified by:
      initialize in interface Resource
      Overrides:
      initialize in class Resource_ImplBase
      Parameters:
      aSpecifier - specifies how to create a resource or locate an existing resource service.
      aAdditionalParams - a Map containing additional parameters. May be null if there are no parameters. Each class that implements this interface can decide what additional parameters it supports.
      Returns:
      true if and only if initialization completed successfully. Reutrns false if the given ResourceSpecifier is not of an appropriate type for this Resource. If the ResourceSpecifier is of an appropriate type but is invalid or if some other failure occurs, an exception should be thrown.
      Throws:
      ResourceInitializationException - if a failure occurs during initialization.
      See Also:
    • destroy

      public void destroy()
      Destroy instances of CasProcessors managed by this container. Before destroying the instance, this method notifies it with CollectionProcessComplete so that the component finalizes its logic and does appropriate cleanup before shutdown.
      Specified by:
      destroy in interface Resource
      Overrides:
      destroy in class Resource_ImplBase
      See Also:
    • run

      public void run()
      Specified by:
      run in interface Runnable
    • getConfigParameterValue

      public Object getConfigParameterValue(String aParamName)
      Description copied from interface: ConfigurableResource
      Looks up the value of a configuration parameter. This method will only return the value of a parameter that is not defined in any group.

      This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

      Specified by:
      getConfigParameterValue in interface ConfigurableResource
      Parameters:
      aParamName - the name of a parameter that is not in any group
      Returns:
      the value of the parameter with name aParamName, null is either the parameter does not exist or it has not been assigned a value.
    • getConfigParameterValue

      public Object getConfigParameterValue(String aGroupName, String aParamName)
      Description copied from interface: ConfigurableResource
      Looks up the value of a configuration parameter in a group. If the parameter has no value assigned within the group, fallback strategies will be followed.

      This method returns null if the parameter is optional and has not been assigned a value. (For mandatory parameters, an exception is thrown during initialization if no value has been assigned.) This method also returns null if there is no declared configuration parameter with the specified name.

      Specified by:
      getConfigParameterValue in interface ConfigurableResource
      Parameters:
      aGroupName - the name of a configuration group. If the group name is null, this method will return the same value as getParameterValue(String).
      aParamName - the name of a parameter in the group
      Returns:
      the value of the parameter in group aGroupName with name aParamName,,null is either the parameter does not exist or it has not been assigned a value.
    • setConfigParameterValue

      public void setConfigParameterValue(String aParamName, Object aValue)
      Description copied from interface: ConfigurableResource
      Sets the value of a configuration parameter. This only works for a parameter that is not defined in any group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
      Specified by:
      setConfigParameterValue in interface ConfigurableResource
      Parameters:
      aParamName - the name of a parameter that is not in any group
      aValue - the value to assign to the parameter
    • setConfigParameterValue

      public void setConfigParameterValue(String aGroupName, String aParamName, Object aValue)
      Description copied from interface: ConfigurableResource
      Sets the value of a configuration parameter in a group. Note that there is no guarantee that the change will take effect until ConfigurableResource.reconfigure() is called.
      Specified by:
      setConfigParameterValue in interface ConfigurableResource
      Parameters:
      aGroupName - the name of a configuration group. If this parameter is null, this method will have the same effect as setParameterValue(String,Object).
      aParamName - the name of a parameter in the group
      aValue - the value to assign to the parameter.
    • reconfigure

      public void reconfigure() throws ResourceConfigurationException
      Description copied from interface: ConfigurableResource
      Instructs this Resource to re-read its configuration parameter settings.
      Specified by:
      reconfigure in interface ConfigurableResource
      Throws:
      ResourceConfigurationException - if the configuration is not valid
    • getName

      public String getName()
      Returns the name of this container. It is the name of the Cas Processor.
      Specified by:
      getName in class ProcessingContainer
    • getMetaData

      public ResourceMetaData getMetaData()
      Description copied from interface: Resource
      Gets the metadata that describes this Resource.
      Specified by:
      getMetaData in interface Resource
      Overrides:
      getMetaData in class Resource_ImplBase
      Returns:
      an object containing all metadata for this resource.
      See Also:
    • incrementStat

      public void incrementStat(String aStatName, Integer aStat)
      Increment a value of a given stat
      Specified by:
      incrementStat in class ProcessingContainer
    • addStat

      public void addStat(String aStatName, Object aStat)
      Add an arbitrary object and bind it to a given name
      Specified by:
      addStat in class ProcessingContainer
    • getStat

      public Object getStat(String aStatName)
      Return an abject identified with a given name
      Specified by:
      getStat in class ProcessingContainer
    • getAllStats

      public HashMap getAllStats()
      Returns all stats aggregate during the CPM run
      Specified by:
      getAllStats in class ProcessingContainer
      Returns:
      a map of all stats aggregated during the CPM run
    • pause

      public void pause()
      Pauses the container until resumed. The CPM will pause to the Container while it is trying to re-connect to a shared remote service. While the Container is paused getCasProcessor() will not be allowed to return a new CasProcessor. All other methods are accessible and will function fine.
      Specified by:
      pause in class ProcessingContainer
    • resume

      public void resume()
      Specified by:
      resume in class ProcessingContainer
    • isPaused

      public boolean isPaused()
      Specified by:
      isPaused in class ProcessingContainer
    • getPool

      public ServiceProxyPool getPool()
      Specified by:
      getPool in class ProcessingContainer
    • setSingleFencedService

      public void setSingleFencedService(boolean aSingleFencedInstance)
      Specified by:
      setSingleFencedService in class ProcessingContainer
    • isSingleFencedService

      public boolean isSingleFencedService()
      Specified by:
      isSingleFencedService in class ProcessingContainer
    • getFetchTime

      public long getFetchTime()