.. _building_pipeline_blocks: Building your own PipelineBlocks ********************************* To add your own functionality to the pipeline, you should build your own subclass of :class:`~core.pipeline_block.PipelineBlock`. There are 2 things you must do: 1. Implement an __init__() function to your class which calls the PipelineBlock's init function and passes the (unique) file names it requires as input as well as the (unique) filenames of files this block will create. This function will always run on the node on which you start the pipeline (i.e. "locally"). 2. Implement a run() function which takes a DataSample as input, computes the block operation and instructs the DataSample to write its outputs, if applicable. This function may run on remote cluster nodes, depending on how you configure the :ref:`parsl` backend. The run function must be decorated as a @python_app (running python code, see example below) or a @bash_app (defining a bash call to run, intended for starting non-python executables). 3. Optional: You can add a pre_run() function and a validate_sample() function which runs before and after the run() function, respectively. Unlike the run function, both of them must by @python_apps. They are intended to do pre-processing where necessary and validate the sample where necessary. Example:: class RandomFailBlock( PipelineBlock ): """ Block that shows how to fail/abort (pseudo-randomly) """ def __init__( self, name="RandomFailBlock" ): super().__init__( name ) @python_app( cache = False ) def run( self, sample, walltime = 60*5 ): import time time.sleep(1) if sample.random.random() > 0.9: raise SampleProcessingException( self, sample, "Random test exception" ) #raise ValueError( "123" ) return sample @python_app(cache=False) def validate_sample( self, sample, walltime = 60*5 ): if sample.random.random() > 0.9: raise SampleValidationException( self, sample, "Random validation test exception" ) return sample Determinism =============================== The pipeline intends to create random, procedural simulations. However, the outputs should always stay reproducable. For this to work, avoid using random.random() and similar, instead use the DataSample's own random number generator that you can access via it's :meth:`~core.data_sample.DataSample.random` property. Adding PipelineBlocks to a Pipeline ==================================== This block can then be added to your pipeline like this:: from core.pipeline import Pipeline from blocks.sample.sample_blocks import RandomFailBlock # Initialize the pipeline: pipeline = Pipeline(verbose=True) # Initialize your block and add it to the pipeline: block1 = RandomFailBlock() pipeline.append_block( block1 ) Note: * A PipelineBlock should access data/files only through the DataSample's read/write functions. * Usually, a single instance of your PipelineBlock will be created and used on all the DataSamples passed to its run function. However, this may happen across multiple threads/processes/computers. The block should not store any information specific to a DataSample; instead, use the DataSample's functions for this purpose. Raising issues =============================== Whenever a PipelineBlock encounters a sample for which it cannot successfully complete its functionality, it should raise a :class:`~core.exceptions.SampleProcessingException`. This could happen, for example, when a simulation does not converge or some criteria are not met by the sample. Note that the occurrence of such an exception is not considered a fatal error: The pipeline will continue to run for all other samples, but the current sample's processing will be stopped. In the validate_sample function, you should raise :class:`~core.exceptions.SampleValidationException` instead. These work almost just like SampleProcessingException, but this allows differentiating between a processing issue (bad input, for example) and a validation issue (for example when output values fall outside of an allowed range). The message will also be logged in the sample's folder so that users can analyze the sample further to determine what went wrong. Once your pipeline blocks have been built, you can :ref:`configure your pipeline and run it `.