Apache NiFi: automate your data flow!

August 2020

We all had encountered data feed that had to be imported, deduped, split, merged, moved to a different format, exported, or verified.

Data flow management and data wrangling is a common task; the typical problems are well known; the pattern of data exchange between systems is nothing new.  The practical implementation depends on the scale, the volume of data, and your business needs:

  • You can write custom code to handle each feed individually.
    This approach works well if you only process a few feeds at most and have a lot of custom requirements. The rollout of new feeds may be slow; you have to remember and count for gotchas (for example, commas inside CSV file formats or special characters) and code and test basic tasks like reading from disk, logging, and exporting. As a number of feeds grow, this solution may be problematic maintenance-wise.
  • You can write generic code to handle any feed based on the given schema.
    Great if you have particular business requirements, already have the internal infrastructure that suits your needs, and managing data feeds is a critical part of your business. Flexible yet gives you complete control over the process.
  • You can use ETL (extract-transform-load) tools to wrangle your data and do something else with your time.
    The best approach if you have a lot of feeds and need to roll out new ones quickly.

Apache NiFi is an excellent solution for the last two options - powerful right out of the box; it also provides the capability to write a custom feed processor.

Apache NiFi

Donated by NSA to Apache Software Foundation back in 2014, NiFi (or as it was known originally, "Niagarafiles") quickly graduated to Top-Level project in 2015.

Billed as an "easy to use system to process and distribute data," NiFi automates data flow between various systems. It is built on the Flow-Based Programming paradigm - a concept introduced by J. Paul Morrison in the 1960s. In a nutshell, multiple black box processors exchange messages via external connections using concurrent scheduling and shared storage. Processors can be rearranged and connected in different ways without changing the logic inside, and program flow runs until there is data to be processed.

Familiarizing yourself with FBP principles makes for an easier understanding of NiFi.

Concurrency allows data packets to be processed in parallel by multiple black boxes, making NiFi fast and efficient. On average hardware, NiFi can process between 50-100MB of data per second, which can be scaled out further by clustering or up by adding more memory to VM.
Once NiFi is installed and configured, open your browser and use the web interface to define data flows. Black boxes, aka processors, can be dragged to the canvas, configured, and connected for continuous data flow. There are multiple ways to accomplish the task, and once you are satisfied with the specific sequence, it can be saved as a template to be used for even faster feed onboarding.

Available Black Boxes

The following processor categories are currently available:

  • Consume data: read from disk, retrieve from database, download from FTP, pull from Kafka or Twitter, get from AWS
  • Export data: save to disk, send an email, put to FTP, insert into a database, post to AWS
  • Transform data: compress/uncompress, XSLT/JOLT, encrypt, use regular expressions to manipulate the text
  • Route data: redirect data flow based on attributes, validity, or uniqueness of the content* Split or merge data: split or merge incoming content
  • Update data: query, insert, update, delete content
  • Extract attributes from data

Setting Up Processors and Data Flow

The most simplistic flow should parse incoming data, perform some operations on it, and output the final result.

NiFi provides data readers and writers in various formats: CSV, JSON, Avro, etc. Readers use schema to parse incoming data, while Writers can use a different schema to modify the output. Schemas can be stored in Schema Registry or typed inline, depending on your needs; they also can be inherited from downstream if no changes to flow is desired.

NiFi term for data is a FlowFile, which consists of actual content and attributes or content metadata. Attributes can be build-in, like file name and size; or custom. Custom attributes may be extracted from the content or include values of build-in attributes.

NiFi Expression Language provides flexibility to split, merge, evaluate, and convert the data to modify attribute values. Once the attribute is created, it travels along with FlowFile content until the end of the sequence. An example of attribute usage may include output file name change or routing.

Each processor comes with predefined configuration options that include available routing. Traffic can be redirected based on processor status (i.e., success/failure), build-in attributes (i.e., file name, date, size), custom attributes (i.e., matching rules on content).

RecordPath Language can be used to configure processors and specify fields inside the record that should be used for routing, data manipulation, and transformation.

Additionally, the QueryRecord processor supports Ni-Fi integration with Apache Calcite, allowing writing queries against incoming data flow. In the example below, SQL is used to dedupe incoming records.

The initial processor in the sequence can be scheduled to run at intervals or specified times. Sequences can be started all at once or a few processors at a time, which is helpful during debugging.

Speaking of debugging, setting processor Bulletin Level setting to Debug provides excellent detailed information.

Writing a custom processor

If you have written feed processing applications before, take a look at your desired flow before deciding on writing a custom processor. It's possible that NiFi provides a perfectly valid solution for accomplishing what you need but in slightly different ways. There is a learning curve in thinking "in NiFi," but you'll get it after the first few practice processes!

For example, let's say you need to select a record set from the database every second Tuesday and then execute a specific operation for each record. Once data is processed, each result should be emailed to a designated record keeper.

In NiFi that flow would look like this:

  • ExecuteSQL processor to select data using cron scheduling
  • PartitionRecord processor to split the dataset into separate processing streams, one for each record type
  • Once streams are processed, send them through ConvertRecord to move from binary Avro to human-readable JSON
  • Generate the output and use PutEmail to send it out

If you still feel like the custom processor is the way to go, write one in Java following excellent dev documentation.

Use Cases examples

  • Download the feed file from SFTP, validate it, and insert it in the database after truncating old data.
  • Pull data via REST API, aggregate it and generate a summary report to be delivered by email.
  • Consume incoming data, splitting it into buckets by record type, deduping each bucket. Save duplicates in a separate folder.
  • Enrich incoming data via data lookups
  • Emit message to Apache Kafka when specific logging event occurs

Gotchas

As a good practice, review processor documentation before turning it on.

For example, GetFile can look for a file to process in a root folder only, or recursively search all subdirectories. By default, recursion is turned on. If you leave a generic file name mask and have GetFile running recursively on a timer with a 0-sec wait, it may lead to rather unintended consequences.

Similarly, thoughtlessly running processors that execute SQL statements can bring down your database due to the amount of traffic generated.

While NiFi does provide logging capabilities, make sure someone is watching or notified if the failure occurs - posting a message to the Slack channel works very well for additional monitoring.

Give it a try!

NiFi is fast, flexible, and cuts the time needed to bring up new feeds or modify existing ones. It does the heavy lifting for common data manipulation tasks.

I also love that non-developers can understand the visual data flow and make modifications if needed, without having to rely on software engineering. With sufficient training, more team members are empowered to make changes in data processing logic or debug easily resolvable issues - like missing files or unexpected file formats.