On Data Lakes and Stream Ingestion

Vojtech Tuma
11 min readMay 3, 2022

--

Cloud architectures often contain some kind of generic event ingestion service and a data lake or data warehouse for storage of these.

The events in question may be trackings of UI actions and impressions, monitorings of IoT devices, business logic actions reported by other components.

I would like to list a few thoughts and recommendations of what to keep in mind when designing and using a platform for this purpose. As the text is rather long, I’ve diluted it with a few boomer memes.

Reference Architecture

The platform for event ingestion and analytical processing consists of multiple parts, each of which can be filled by different technologies.

For simplicity, let’s consider the following components:

  1. Event Source: IoT device, client application, external service,
  2. Reception Layer: a simple microservice which receives messages from Event Source and passes it on, possibly doing authentication, routing, …
  3. Streaming Layer: a message queue, such as Kafka or RabbitMQ, which facilitates stream processing. Usually accompanied with some suite for processing and dashboards, such as Spark Streaming and Grafana,
  4. Lake Ingestor: a component for reading the data from Stream and intelligently storing into the Data Lake,
  5. Data Lake: a technology for analytical processing, BigQuery, S3, Postgres, Clickhouse, Redshift, …

Recommendations

Do not omit any piece of the Reference Architecture

  • Some teams build a data platform without having a real data producer — this blunder should be obvious.
  • Omitting the Reception Layer and writing directly into the queue should be done only if you have full control over the Event Source, and don’t expect too many distinct Event Sources. Otherwise, any downtimes and migrations, or even changes to data representation, routing or internal processing logic will become painful.
  • Without the Streaming Layer, you need to write from Reception directly to the Data Lake — which for some particular Lakes may be ok (e.g. Postgres), but others (S3) prefer bulk writes. Therefore, you will either have too small batches, or increase the risk of a data loss. Additionally, you lose the ability to analyse the data in an online fashion — while that may not be a 0-day requirement, it is very good to keep the door open.
  • While some folks like to say that Kafka + Streaming is all you need, it may for some usecases that have a non-temporal access pattern or need to read back cause huge costs and intractable compute times — so don’t omit the Data Lake.

I’m implicitly assuming this is a “push” model where Event Source sends requests to the Reception Layer. However, everything is identically applicable to a “pull” model, where some service actively crawls external Event Sources.

Rely on existing tools

There are folks who write their message queue, e.g., for high-performance trading, and it may be justified. But most often it isn’t.

Less obvious is queue-to-queue replication. In some cases, you want a short distance between the Reception Layer (which usually has no practical persistence on its own) and the queue, to lose no data — so it’s a good idea to have your message queue distributed together with your Reception Layer, and then one centralised queue where you run stream processing and from which you ingest into the Lake.

However, you have a choice — either have independent instances of the queues, and have your own consumer-producer that moves the data from the remote queues to the central one, or rely on tools like Kafka Mirror Maker or the underlying routing capabilities of the queue such as with NATS. So far, whenever there was an official way of solving this, it performed better for me than trying to do it on my own.

Least obvious is the case for the Lake Ingestor. Kafka Connect is a great tool, but essentially forces you to go with Avro, and all extensions have to be written in a particular way. Operating it also has some particular quirks, and you may prefer e.g. a Spark Streaming cluster instead, for whatever reason. If an existing tool works for you, go for it, but implementing this part on your own may be well justified, and is often not much extra work.

Tracing

Tracing, as represented e.g. by Jaeger, is a means of monitoring and analysing complex distributed systems. The idea behind is “provide information about states of requests as they flow through the system”. Data Lake Ingestion is, when under heavy loads and a big number of producers or consumers, a complex system. However, we often need to have not only a “sampling picture” obtained from explicit tracing of a fraction of requests, but a complete statistical insight.

At a minimum, every message should have created_at, received_at, and stored_at, filled respectively at the Event Source, Reception Layer and Lake Ingestor. None can be really omitted — the Event Source can often not be trusted to be in sync, and delays between Reception and Ingestor are often tricky, occurring only with some problematic messages or at particular times. The cost of having those extra timestamps is in storage, but these are nicely compressible time series which good storage can exploit.

You can also add machine ids of the Reception/Ingestion instances, to help identify which messages are causing bottlenecks and delay processing.

Schema

There are two schools of thought which I call json school and schema school. The json one, also called schema on read, prefers not to enforce schema upon ingestion, as that involves a risk of data loss if the data does not fit the schema, requires bookkeeping, a more complicated code, etc. The schema school, also called schema on write, uses an enforced-schema format such as Protobuf or Avro, and holds schemata in some central registry or catalog (accompanied with consumer/producer libraries), possibly coupled with validations beyond just schema.

I’m a firm adherent of the schema school — mostly for the fact that in the realm of Data Science, data is more often read than written (as otherwise, your business plan is likely bad), and with known and managed schemata, this is simplified. No matter what your schema registry is, you should aim for descriptive names of fields, possibly with longer additional comments. Ideally, coupled together with some data catalog such as DataHub or Amundsen. A terrible experience during data analytical work is when you come to a datastream without any guidance of just what is there.

This is a similar line of thought as can be found in some programming style guides (e.g., Google’s) which says to optimise your code for the reader, not for the writer.

Of course, setting up the schema registry and data catalog takes time and comes with an operational overhead — so this assumes you have more than one data stream, and that the data is of some value or potential. Everyone in the ecosystem will have to stick to those libraries, and would not be able to just json.loads— which, arguably, is a good thing in the long run. The data loss problem can be addressed by storing unparseable data into a dedicated fallback location, in their original format, along with alerting for the volume. This is universally better — if there is a data problem (schema change), you need to know about it as soon as possible, not when the first daily metrics compute job breaks.

Topic Homogeneity

Kafka allows you to organise your data into topics, which allows e.g. more granular message consumption or operational configuration. Within a topic, you can additionally partition data, which affects consumption parallelisation options. That, however, does not always give a clear guidance on how to divide data among topics.

Kafka scales topic counts pretty well, reportedly hundreds of thousands — so that should not be a problem. Humans scale less well, having too many topics may create a cognitive overload and chaos.

My rule of thumb here is to have a 1–1 relation between topics and schemata. For example:

  • You have mobile applications reporting UI actions — those come from iOs and Android clients. They differ in some headers (OS version, device parameters), but the message body is essentially the same (“a red button was clicked at this time”); so you would group those two into a single topic. In case you expect a consumer dedicated only to one platform, it may make sense to use platform when partitioning — but unless well justified I tend to stay away from such optimisations and rely on random/round-robin instead.
  • You have a mobile application reporting UI actions and heartbeats. While a certain header part (identification of the device) is the same, the message body is wholly different (one is event, the other telemetry), and one can hardly imagine usage combining those two — so separate in two topics.

This rule of thumb pays well to optimising both stream consumers (due to fewer message drops or fewer consumer threads) and storage (due to columnar compression). Additionally, it makes it simpler to describe the topics — reducing the cognitive load on the users searching for topics which are relevant.

Schema evolution plays a role here — in general, incremental schema extension which add a few fields make sense to be done within the existing topic, but a major refactor should end up in a _V{n+1} topic instead, ideally produced concurrently to the previous one for a while.

That brings up the practice to always name your new topics _V1, even if you don’t expect schema evolution.

Tiers within Data Lake

I really like the bronze-silver-gold terminology around Data Lake tiers. While it sounds appealing to have just a single “data lake” bucket and dump everything there, you’ll quickly run into problems around access, data lineage, whether a certain table should be backed up or whether it can be recomputed, cognitive load, etc. A good system for tracking and exploring data lineage addresses those somehow, but starting with some top-level classification always helps.

The classification is as follows:

  • Bronze layer consists of data as they were ingested, without any additional processing or filtering applied.
  • Silver layer derives from bronze via deduplication, cleaning, transforming, … This is what users (Data Scientists, Data Analysts, ML Researchers) start working with.
  • Gold layer emerges from silver with more involved jobs that include joins, forecasts, aggregations, … Those are typically intended for exports to Business Analysts or dashboards.

This has the following properties:

  • You know you should never delete from Bronze, as that can mean a loss of potentially non-reingestable data. On the other hand, you can usually delete from Silver, because anything can be re-computed again (assuming you git). As for Gold, you can delete unless you have some audit-like requirements.
  • A lot of preprocessing code is fixed in the Bronze-Silver ETL layer, simplifying subsequent analyses. It is ok to have multiple Silver tables derived from one Bronze — you can view Silver as materialised views from SQL world.
  • A lot of entity/fact definition is formalised in the Gold layer, again reducing the need to describe and catalogise code/sql snippets to just describing/catalogising tables, which, in my opinion, is simpler.
  • A troubleshooting along lineage, that is, “which component nulled this field?”, becomes simpler as you simply query the respective tables in all three layers and compare field by field. If the problem is already in Bronze, it means it happened before Lake; otherwise one of your internal jobs has caused it.

You can and should partition those layers differently. A rule of thumb is:

  • Bronze is partitioned by ingestion time,
  • Silver is partitioned by event time,
  • Gold is partitioned by entity id.

This also gives a simple rule to “is my table rather Silver or Gold?”. That works better than using “is my table exported or not?” — I mean, it often makes good sense to treat a table as Silver despite it ends up in some dashboards or other exports. The important distinction is between ingestion time and event time — you can usually rely on an event being reported after it has happened, but you don’t have a physical guarantee that you will receive this in a timely fashion. For the user, it is ideal to have data partitioned nicely in e.g. days, but what to do when events get delayed by more than a day? I like to address this by having the delayed field in my Silver tables, and a dual logic in the ETL job for Bronze-Silver transformation — you read the data as they are ingested and store them to the correct Silver partition, with the delayed field set accordingly. Optionally, this delayed is exposed as a partition column. And then, in individual Gold tables, you define how delayed their computation is / what range of delayed messages they consider during recomputations or backfills.

It may sound like a good idea to partition by event time already in the bronze layer, but I view two disadvantages there:

  • When a delayed event comes, you should notify all your downstream dependencies — but there is no information they can use to efficiently pick only the delayed events out from all events that happened at that time.
  • The storage is inefficient — ideally, you want to have bulk files; but writing events back to older partitions usually creates single-event files. This does not manifest in my strategy, because the Bronze tables have all events in bulks, and Silver/Gold tables have bigger chunks recomputed.

Furthermore, always append to your Bronze layer, never rewrite existing data. Otherwise, you risk your Silver/Gold tables not being recomputable. This is another reason for keeping track of both ingestion time and event time inside the data — e.g., if the data you work with are measurements from sensors that may get corrected backwards, you need to know which value came later, and you also need to keep the original value to explain why some reports changed later on.

Conclusion

Ingesting larger volumes of data for open-ended analytical usage is a tricky business — decisions you make will become written in the data, affecting indirectly all subsequent jobs, queries, reports, dashboards, et cetera. In other words, addressing technical debt comes not with just the coding work, but also with migrations of data, addressing report discrepancies, re-tuning alerts. Regardless of whether those best recommendations fit your usecase and culture or not, question whether what you do locks you technically in, and whether you lose any observability potential and history tracking. No matter how much you think before refactoring, you will do some steps wrongly, and being able to minimise damage is more important than getting it seemingly right.

--

--

Vojtech Tuma
Vojtech Tuma

Written by Vojtech Tuma

#books - #running - #pullups - #boardGames - #dataScience - #programming - #trolling - #etc

Responses (2)