Skip to content

Data Pipeline code examples


Parse.ly’s Data Pipeline is accessed using two core AWS services, S3 and Kinesis, as described in Getting Access.

This page contains some code examples for how to access this data using common open source programming tools.

Quick Start with awscli and S3

AWS maintains a command-line client called awscli that has a fully-featured S3 command-line interface. AWS maintains full documentation about this client. If you have a configured Python interpreter and the pip installer tool, installing the client is as simple as:

$ pip install awscli

After which, you’ll be able to get help about using the client for S3 access via:

$ aws s3 help

Upon being granted Access to Parse.ly’s Data Pipeline, you will receive AWS credentials to your S3 bucket via an AWS Access Key ID and Secret Access Key. For example, your account administrator will send you a self-expiring email with credentials that look like the following:

The first line is your bucket name, which always starts with the prefix parsely-dw. The next line is the Access Key ID, which is always shorter. The final line is the Secret Access Key.

parsely-dw-mashable
AKIAIOSFODNN7EXAMPLE
wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

To configure these credentials with the CLI, use:

$ aws configure --profile=parsely
AWS Access Key ID [****************CORQ]: AKIAIOSFODNN7EXAMPLE
AWS Secret Access Key [****************TYt+]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
Default region name [us-east-1]: us-east-1
Default output format [None]: json

We recommend the json output format since that is commonly expected by other tools.

This should write a $HOME/.aws/credentials file which will contain these credentials under the parsely header.

Once this is set up, you should be able to use:

$ aws --profile=parsely s3 ls s3://parsely-dw-XXXXXXXX

Where the XXXXXXXX above is filled in with your site/network’s bucket name. In the examples below, we’ll assume this is for mashable.com so the name will be parsely-dw-mashable.

A freshly-configured S3 bucket will likely only have a single prefix, named events/, so your output will look like this:

$ aws --profile=parsely s3 ls s3://parsely-dw-mashable
        PRE events/

Once you see that folder, you can start downloading files from S3 as follows:

$ aws --profile=parsely s3 sync s3://parsely-dw-mashable/events/2016/06/ ./06

This will download all the data from the month of June 2016 to a single folder, ./06, on our local machine.

The page on S3 access describes some of the caveats here — like how frequently these files will be generated, what their typical sizes might be, and what to expect about file naming conventions.

On Transferring Large Amounts of Data

S3 is designed to allow for the storage and transfer of terabytes of data with ease. But there are good and bad ways of working with S3. For example: if you already use AWS, you will get a huge benefit from downloading S3 data to an EC2 instance already inside the us-east-1 region. This is because data transfer within Amazon’s network can happen at gigabit speeds. Further, though the awscli described above is convenient, it isn’t particularly fast. Tools like Spark speed up S3 loading via parallelization.

Code Integration Overview

The open source ecosystem around data analytics can be overwhelming, with a “paradox of choice”. In this section, we try to summarize the most popular and production-ready tools, as well as how they would satisfy various use cases around Parse.ly’s Data Pipeline.

Summary of Preferred Tool Options

ToolAccess MechanismLanguagesSimple vs AdvancedFault Tolerant?Data Latency
aws s3S3ShellSimpleNo15m-60m
aws kinesisKinesisShellSimpleNo0s-5s
boto3.s3S3PythonSimpleNo15m-60m
Pandas or RS3Python, RSimpleNo15m-60m
boto3.kinesisKinesisPythonAdvancedNo0s-5s
Spark (Bulk)S3Python, ScalaAdvancedYes15m-60m
Spark (Stream)KinesisPython, ScalaAdvancedYes0s-5s
StormKinesisPython, JavaAdvancedYes0s-5s
RedshiftS3 => COPY commandSQLAdvancedYes30m-60m
BigQuery (Bulk)S3 => Google Storage XferSQLAdvancedYes30m-60m
BigQuery (Stream)Kinesis => Insert APISQLAdvancedYes5s-10s

Guide to Common Use Cases

Synchronize data to my own nodes or S3 bucket. Use the aws s3 from the command-line. Use boto3 with your S3 bucket from Python. Other languages have other libraries similar to boto3.

Download some data locally for doing in-memory analysis using Pandas, Spark, R, or similar tools. Use aws s3 from the command-line. Pandas and Spark have built-in support for S3 URIs (e.g. s3://parsely-dw-mashable) via their file loaders. R has a module called aws.s3 that will access S3 buckets easily.

Build real-time alerting or real-time analytics in simplest way possible. The boto3 library can be easily connected to your Kinesis stream. A single process can consume all shards of your Kinesis stream and respond to events as they come in.

Use Amazon EMR or Databricks Cloud to bulk-process gigabytes (or terabytes) of raw analytics data for historical analyses, machine learning models, or the like. For this, you’ll need to use Spark in batch mode via Scala or Python (pyspark). Alternatively, you can use traditional Hadoop or tools built atop it like Apache Pig.

Build real-time alerting or real-time analytics, but support scale and fault tolerance that goes beyond a single data consumer node. If you have no need for one-at-a-time processing and micro-batching is good enough, use Spark Streaming with its built-in Kinesis connector. If you need one-at-a-time processing, use Apache Storm.

Synchronize my S3 bucket with an Amazon Redshift instance every few hours. Redshift knows how to bulk load data from S3 via its COPY command. If you are using Python, you can use sqlalchemy and the Redshift dialect to execute this command regularly.

Synchronize my S3 bucket with a Google BigQuery instance every few hours. Google provides a command-line tool that works with both S3 and Google Cloud Storage (GCS), which is called gsutil. The bq command-line tool allows you to modify and create BigQuery Datasets (tables) with their JSON schema language.

Stream real-time events into my Google BigQuery instance. BigQuery Datasets (tables) can be managed using bq, and the Python google-api-python-client can be used to do streaming inserts using table().insertAll(...). If you wire a process from Kinesis to this API, you can have streaming events in BigQuery. Also, Spark Streaming can be used easily for this use case.

Basic Integrations

Python Code for S3 with boto3

Accessing S3 data programmatically is relatively easy with the boto3 Python library. The below code snippet prints three files from S3 programmatically, filtering on a specific day of data.

from pprint import pprint
import boto3
BUCKET = "parsely-dw-mashable"
# s3 client
s3 = boto3.resource('s3')
# s3 bucket
bucket = s3.Bucket(BUCKET)
# all events in hour 2016-06-01T00:00Z
prefix = "events/2016/06/01/00"
# pretty-print the first 3 objects
files = bucket.objects.filter(Prefix=prefix)
pprint(list(files)[:3])

This will produce output like this:

[
  s3.ObjectSummary(bucket_name='parsely-dw-mashable', key=u'events/2016/06/01/00/parsely-dw-mashable-001.gz'),
  s3.ObjectSummary(bucket_name='parsely-dw-mashable', key=u'events/2016/06/01/00/parsely-dw-mashable-002.gz'),
  s3.ObjectSummary(bucket_name='parsely-dw-mashable', key=u'events/2016/06/01/00/parsely-dw-mashable-003.gz')
]

The full documentation has more details.

Getting Started with Kinesis and awscli

Kinesis provides real-time streaming data by offering two primary primitives:

  • Streams: 24-hour logs of real-time data.
  • Shards: partitions of real-time data to allow for scale-out consumers.

To consume a stream, you first must get a list of shards, then acquire a “shard iterator”, and finally ask that shard iterator for its (most recent) records.

To acquire a shard iterator at the command line, you can use awscli as follows:

$ SHARD_ITERATOR=$(aws kinesis get-shard-iterator   --shard-id shardId-000000000000   --shard-iterator-type LATEST   --query 'ShardIterator'   --stream-name 'parsely-dw-mashable')

You will then have a shard iterator that looks something like this:

$ echo $SHARD_ITERATOR
"AAAAAAAAAAGryM+pw4kCuLiEUfOIJsf...shnLTfHtriA=="

And the actual shard iterator identifier is stored in the $SHARD_ITERATOR shell/environment variable. It expires in a few minutes if unused.

You can now use this with aws kinesis get-records to fetch some data. We’ll filter the data to only show the PartitionKey, since the actual data itself is gzip-compressed JSON and thus not readable from the shell.

$ aws kinesis get-records --shard-iterator $SHARD_ITERATOR | grep 'PartitionKey'
"PartitionKey": "http://mashable.com/2016/05/03/game-thrones-heir/",
"PartitionKey": "http://mashable.com/tech/?utm_cid=mash-prod-nav-ch",
"PartitionKey": "http://mashable.com/2016/05/03/game-thrones-heir/",
...

There you have it. Streaming real-time records from the command-line!

Python Code for Kinesis with boto3

This boto3 code snippet will connect to your Kinesis Stream — set below by STREAM — and then find all its shards and dump one JSON record from each of them.

This does programmatically what the above command-line shell example did. But now, you can fully customize the processing. Also, boto3 automatically decompresses your data records, though you are still responsible for parsing the JSON messages therein.

import json
import boto3
STREAM = "parsely-dw-mashable"
# kinesis client
kinesis = boto3.client('kinesis')
# gets all shard iterators in a stream,
# and fetches most recent data
def get_kinesis_shards(stream):
  """Return list of shard iterators, one for each shard of stream."""
  shard_ids = [shard[u"ShardId"]
        for shard in shards]
  shard_iters = [kinesis.get_shard_iterator(
          StreamName=stream,
          ShardId=shard_id,
          ShardIteratorType="LATEST")
        for shard_id in shard_ids]
  return shard_iters
# acquire all iterators
shard_iters = get_kinesis_shards(STREAM)
# essentially tail -n1 for the Kinesis stream on each shard
for shard in shard_iters:
  records = kinesis.get_records(
    ShardIterator=shard[u"ShardIterator"],
    Limit=1)[u"Records"]
  for record in records:
    datum = json.loads(record[u"Data"])
    print(json.dumps(datum, indent=4, sort_keys=True))

This will produce output like this:

{
  "action": "pageview",
  "apikey": "mashable.com",
  "ts_action": "2016-06-17 01:21:24",
  "ua": "Mozilla/5.0 (iPad; CPU OS 9_3_2 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13F69 Safari/601.1",
  "url": "http://mashable.com/2016/06/16/game-of-thrones-season-6-finale-predicitons/#kEZPFgyuygqJ",
  "visitor_site_id": "ecbcb5ea-5403-4e6f-8648-3fbe71900746"
  // ... other fields elided ...
}

You can imagine that wiring this sort of code to any real-time streaming process — such as streaming writes to Google BigQuery — would be a straightforward next step.

Other Kinesis Clients Available

Beyond boto3, Kinesis has a number of other clients for other programming languages, and even a “high-level” client that depends upon Amazon DynamoDB for fault tolerance and high availability. See AWS documentation for more

Advanced Integrations

Moving beyond single-node and in-memory analytics, our advanced integrations showcase how to use cluster computing technologies, and even prepare you for building a highly-available, fault-tolerant ETL for your own cloud-hosted SQL analytics engine.

>Loading data into Spark, Redshift and BigQuery is an advanced topic, and requires some knowledge of distributed computing and how the Amazon and Google public clouds work. You can skip this section if that sounds too technical for you. Just rest assured that we made it as easy as possible for your team to load Parse.ly’s raw data into these tools.

Using Spark with S3

Spark picks up on your AWS credentials via the operating system environment. We recommend you create a little script in this directory called env_parsely. In it, you can do:

export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE;
export AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY;

… while replacing the above values with your provisioned AWS access/secret keys.

Then you can do source env_parsely before running Spark commands that require S3 access. You can actually make this script get automatically sourced by all Spark commands by making sure it appears in spark-env.sh. For a Spark installation rooted at /opt/spark, here is how that would look:

cp /opt/spark/conf/spark-env.sh.template /opt/spark/conf/spark-env.sh
cat env_parsely >>/opt/spark/conf/spark-env.sh

Then, /opt/spark/bin/pyspark would open a shell where it would be possible to do the following:

> lines = sc.textFile("s3://parsely-dw-mashable/events/2016/06/01/00")
> lines.take(1)
'{"apikey": "mashable.com", "url": "http://mashable.com/", ...}'

Using Spark Streaming with Kinesis

See Spark’s official docs for now.

Using Storm with Kinesis

See the AWS Labs Kinesis Storm Spout for now.

SQL Engines and ETLs

An ETL, or extract-transform-load process, is a way of taking data from Parse.ly’s Data Pipeline and enriching it before you load it into a “final resting place”, where you and your team members might run useful ad-hoc queries against it.

Parse.ly’s raw data schema was specifically designed to make this process easy, especially with modern cloud analytics SQL engines like Amazon Redshift and Google BigQuery.

Real-Time vs Delayed

The first thing you need to decide when building an ETL for your own SQL engine is, “how important is real-time data?”

Real-time Analytics. If you want real-time analytics — that is, data that can be queried almost as fast as it arrives — then you have a limited set of options. You must use Parse.ly’s real-time stream and you must perform streaming writes of that data. You have to ensure your ETL does not add much latency on its own. Your best option here is to wire Kinesis to BigQuery’s “streaming insert” API.

Near-Real-Time Analytics. If you can live with 15-minute to 60-minute delays on your data, then you can use either the Kinesis stream (with batching/buffering) or the S3 bucket (by waiting for new objects to arrive). Both Redshift and BigQuery can handle bulk loads of data every 15 minutes without issue.

24-Hour Delayed Analytics. If you can live with a 24-hour delay on your data, then you can use daily bulk load jobs against your S3 bucket. This is, perhaps, the simplest and most reliable option. Both Redshift and BigQuery can do bulk data loads of gigabytes of data at rest, and this can run as a cron job or similar rather than requiring an always-on pipeline consumer. This is also the option that lets you best take advantage of systems like Amazon EMR and its spot instance cluster computing model.

Redshift vs BigQuery

Redshift and BigQuery are the two most popular SQL engines available on the public cloud market, and they are run by Amazon and Google, respectively.

The raw data is easy to read in JSON format, but thankfully Amazon Redshift and Google BigQuery each have native support for parsing lines of compressed gzip JSON data, like the kind Parse.ly provides. Indeed, the main way Parse.ly does end-to-end testing on its raw data formats is to ensure they properly bulk and stream load into these cloud SQL data stores.

However, these engines also have various data integration options, as will be discussed in the data loading tutorials below.

Loading Data into Redshift

In the case of Redshift, bulk data loads from S3 are a “one-liner”. Simply use the Redshift COPY command like this after creating a schema/table named parsely.rawdata:

COPY parsely.rawdata
FROM 's3://parsely-dw-mashable/events/2016/06/01/12'
CREDENTIALS 'aws_access_key_id=XXXXXXXXXXXXXXXX;aws_secret_access_key=XXXXXXXXXXXXXXXX'
REGION AS 'us-east-1'
FORMAT AS json 'auto'
DATEFORMAT 'auto';

Here you would customize your S3 Bucket name, date prefix, and access credentials, as we discussed in the Getting Access page. You could monitor the bulk data load in your Redshift AWS panel.

To test that the data was loaded properly, you can do a global pageview count as follows:

SELECT COUNT(action) as views
FROM parsely.rawdata
WHERE action = 'pageview';

See parsely_raw_data and its example Redshift schema for more information.

Loading Data into BigQuery

In the case of BigQuery, you have two options: bulk load or streaming inserts.

To bulk load data, the easiest option is to transfer your data from S3 to Google Cloud Storage (GCS) using Google’s [Storage Transfer][transfer] service. In this case, you would have a gs:// bucket that is in sync with your s3:// bucket, and could bulk load your data using Google’s bq command-line tool.

You can do one-off S3 to GCP copies using gsutil cp.

$ gsutil -m cp -r   s3://parsely-dw-mashable/**   gs://parsely-dw-mashable/

Once data is in Google Storage, you can use bq to bulk load it quickly. For example:

$ bq load   --max_bad_records=1000   --source_format=NEWLINE_DELIMITED_JSON   parsely.rawdata   gs://parsely-dw-mashable/events/2016/06/01/12   bq.json

Where bq.json is a JSON file containing your BigQuery schema definition.

To test that the data was loaded properly, you can do a global pageview count as follows:

SELECT COUNT(action) as views
FROM parsely.rawdata
WHERE action = 'pageview';

To stream data in, you’d use something like the Python Google BigQuery client library, and run tabledata().insertAll(...) function calls as data arrives on your Amazon Kinesis Stream. We have some example Python code for this in our Github repo, as well.

See parsely_raw_data and its example BigQuery schema for more information.

Other Example Code in parsely_raw_data

We have some example streaming (Kinesis) and bulk (S3) integration code in Python, along with a representation of a a single Parse.ly raw event in Python.

Getting Help with Integration

  • If you are already a Parse.ly customer, your site is already instrumented for Parse.ly’s Data Pipeline. no additional integration steps are required to start leveraging your raw data. You simply get in touch with us, and we can discuss how to supply you with your secure access key ID and secret access key. From that point forward, you can use the service as you see fit, and you can also explore how to instrument custom events and custom data.
  • If you are not a Parse.ly customer, you simply need to go through our basic integration. Contact us for a demo and we can also walk through the integration steps necessary — which are generally pretty painless.

Last updated: October 23, 2024