pipe.core.exceptions

pipe.core.base

Step Objects

class Step()

Base class providing basic functionality for all steps related classes

There are three types of steps:

Extractor, Loader, Transformer.

How to understand which one you need:

  1. If you need to get data (extract) from external source, you need extractor
  2. If you need to send data (load) to external source, you need loader
  3. If you need to interact with data (transform) you need transformer

__and__

 | __and__(other: 'Step') -> 'Step'

Overriding boolean AND operation for merging steps:

Example:

EUser(pk=1) & EBook(where=('id', 1))

In case any of steps throws an exception, nothing happens

__or__

 | __or__(other: 'Step') -> 'Step'

Overriding boolean OR operation for merging steps:

Example:

EUser(pk=1) | LError()

in case first step throws an exception then store goes to the second step with information about an exception in the store

Arguments:

  • other: Step which merge with

Returns:

Step which runs both of the steps according to an operator

validate

 | validate(store: frozendict) -> frozendict

Validates store according to Step.required_fields field

Arguments:

  • store:

Returns:

Store with adapted data

factory

 | @classmethod
 | factory(cls, run_method: t.Callable, name: str = '', **arguments) -> type

Step factory, creates step with run_method provided

Arguments:

  • run_method: Method which will be runned by pipe
  • name: Name for a step
  • arguments: Arguments for a step constructor

Returns:

New Step

run

 | run(store: frozendict) -> frozendict

Method which provide ability to run any step.

Pipe shouldn't know which exactly step is running, that's why we need run method. But developers should be limited in 3 options, which presented in _available_methods

You can extend this class and change _available_methods field, if you want to customize this behavior

Arguments:

  • store: Current pipe state

Returns:

New frozendict object with updated pipe state

BasePipe Objects

class BasePipe()

Base class for all pipes, implements running logic and inspection of pipe state on every step

__init__

 | __init__(initial: t.Mapping, inspection: bool = False)

Arguments:

  • initial: Initial store state
  • inspection: Inspection mode on/off

set_inspection

 | set_inspection(enable: bool = True) -> bool

Sets inspection mode

Examples:

Toggle inspection on:

MyPipe({}).set_inspection()

*Toggle inspection off:

MyPipe({}).set_inspection(False)

before_pipe

 | before_pipe(store: frozendict) -> frozendict

Hook for running custom pipe (or anything) before every pipe execution

Arguments:

  • store:

Returns:

Store

after_pipe

 | after_pipe(store: frozendict) -> frozendict

Hook for running custom pipe (or anything) after every pipe execution

Arguments:

  • store:

Returns:

Store

interrupt

 | interrupt(store: frozendict) -> bool

Interruption hook which could be overridden, allow all subclassed pipes set one condition, which will be respected after any step was run. If method returns true, pipe will not be finished and will return value returned by step immediately (respects after_pipe hook)

Arguments:

  • store:

Returns:

NamedPipe Objects

class NamedPipe(BasePipe)

Simple pipe structure to interact with named pipes.

Example:

class MyPipe(NamedPipe):
pipe_schema = {
'crop_image': (EImage('<path>'), TCrop(width=230, height=140), LSave('<path>'))
}

image_path = MyPipe(<initial_store>).run_pipe('crop_image')

pipe.core.decorators

configure

configure(config: dict) -> t.Callable

Configures Step class with values from config variable TODO: candidate for deprecation?

Arguments:

  • config:

Returns: