Apache Beam

From XennisWiki
Jump to: navigation, search

Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream (continuous) processing. Beam Pipelines are defined using one of the provided SDKs and executed in one of the Beam’s supported runners (distributed processing back-ends) including Apache Apex, Apache Flink, Apache Gearpump (incubating), Apache Samza, Apache Spark, and Google Cloud Dataflow. (Wikipedia)

Installation (Python SDK)

Further information: Apache Beam Python SDK Quickstart

Use pip to install the package

pip install apache-beam

Install it with Google Dataflow (part of the Google Cloud Platform)

pip install apache-beam[gcp]

Run an example on Cloud Dataflow

Clone the Beam repository and cd into the examples directory

git clone https://github.com/apache/beam.git
cd beam
cd sdks/python/apache_beam/examples

Create a virtualenv and install the requirements

virtualenv .venv --python python2.7
source ./.venv/bin/activate 
pip install future
pip install apache-beam[gcp]
python wordcount.py
    --runner DataflowRunner \
    --project ${GCP_PROJECT} \
    --region ${GCP_REGION} \
    --temp_location gs://some-bucket/tmp/ \
    --staging_location gs://some-bucket/stating/ \
    --output gs://some-bucket/out/

Usage (Python SDK)

Tagged output

Yield two tagged outputs

def process(self, element):
    yield pvalue.TaggedOutput('red', element.get('red'))
    yield pvalue.TaggedOutput('blue', element.get('blue'))

Use the tagged output

data = (
    input
    | u'example function' >> beam.ParDo(ExampleFunction()).with_outputs('red', 'blue')

data.red
data.blue

Snippets (Python SDK)

Read ZIP compressed file

class ReadAllFromZipFile(beam.DoFn):

    def __init__(self):
        super(ReadAllFromZipFile, self).__init__()

    def process(self, element, *args, **kwargs):
        try:
            with FileSystems.open(element) as archive_channel:
                with zipfile.ZipFile(archive_channel) as archive:
                    for filename in archive.infolist():
                        with archive.open(filename) as f:
                            return iter(f.readlines()) 
        except Exception as e:
            logging.error(u'Error processing file %s: %s', element, e)

Check PCollection is not empty

import apache_beam as beam
from apache_beam import pvalue

is_not_empty = (
    your_pcollection
    | "count" >> beam.combiners.Count.Globally()
    | "is not empty" >> beam.Map(lambda n: n > 0)
)

( p | beam.Map(some_action, is_not_empty=pvalue.AsSingleton(is_not_empty)) )
def some_action(element, is_not_empty):
    if is_not_empty:
        # some action here

Runner

See also

External links