Big Data Migration Strategy

I analyzed three Big Data migration strategies performed at Twitter, Facebook and Netflix. As well as the one we did at eHarmony. And found some commonality amongst them which I distilled down to a repeatable strategy. This strategy consists of a five step process that migrates Big Data from a source to a target database:

  1. Replication to Staging
  2. Bulk Copy to Target
  3. Downtime at Source
  4. Merge Staging with Bulk Copy
  5. Go Live on Target

For brevity, let’s coin this repeatable pattern of Big Data Migration Strategy as RCDML:

RCDML - Big Data Migration Strategy

Let’s go over some key points of the RCDML process flow:

  • The Replication starts right before the Bulk Copy.
  • Replication stream is directed into the Staging Database.
  • Bulk Copy is pushed directly into the Target Database.
  • The reason for the above split is to ensure the checkpoints and validations are isolated to each data set.
  • Optional Downtime begins immediately after the Bulk Copy, and it ends with Merge.
  • Validation should happen before and after the Merge ensuring that record counts match.
  • During the Merge, replicated data from staging overwrites the bulk copied data.
  • Go Live.

My conclusion is that RCDML process can work at any scale. In other words – the downtime will be proportional to the amount of data generated by the live changes and not the actual Big Data. That’s because the Bulk Copy takes place outside of the downtime period.

Obviously – the Bulk Copy should be completed as fast as possible to keep the replicated data set small. This reduces the time it takes to Merge the two data sets together later. However, since most Big Data Platforms have a high performance Bulk Loading tools – we can parallelize them and achieve a good throughout keeping the replication window small.

There is also something else very interesting … You see, I used to think that Replication always belonged to the DBA team. And that DBAs should just pick an off the shelf replication tool and bolt it on to the source database. However, what I learned is actually quite the opposite, so let’s talk about this in the next chapter.

Replication

Big Data Migration - Replication

I prefer to develop a queue based Replication process and place it directly into the main Data Service and channel all writes to the data layer through it. This creates an embedded high availability infrastructure that automatically evolves together with the Data Service and never lags behind.

The Replication Dispatch is where you plug-in the replication queues. Each queue can be autonomously switched Off to shut down the data flow to the database during a maintenance window. Upon completion of the maintenance the queue is turned On to process the replication backlog.

An alternative method would be to bolt-on a separate replication mechanism to an existing production database outside of the main data path. However be aware of the pitfall in doing this! Your development team will most likely be distant from implementation and maintenance of this bolt-on replication. Instead, it will be handed over to the DBA/Infrastructure team. And subtle nuances will slip through the cracks potentially causing data issues down the line.

Your DBA/Infrastructure team will also be fully dependent on the third party support services for this replication tool. And this will drain your team of the invaluable problem solving skills and instead turn them into remote hands for the replication vendor.

I believe that a successful replication process should be developed in-house and placed directly in the hands of the core data services team. And it should be maintained by this development team and not the Infrastructure/DBA team.

Regardless on the Replication method you choose – it should be turned On right before the Bulk Copy begins to ensure some overlap exists in the data. This overlap will be resolved during the Merge process.

I hope that I convinced you that the replication should be handled by the core data services dev team. But, what about Bulk Copy? Who should be responsible for the Bulk Copy? DBAs or Developers? Let’s find out!

Bulk Copy

It would be impossible to cover all of the bulk copy tools here because there are so many database platforms you might be migrating from and to. Instead, I’d like to focus on the fundamental principles of getting a consistent Big Data snapshot and moving it from database A to database B.

However, I still think it’s valuable to present a concrete visual example to cover these principles. And since my Big Data Migration Strategy experience is with an Oracle RDBMS on a SAN based shared storage – let’s use that as a baseline for this chapter:

Bulk Copy

Before we go over the above Bulk Copy process flow, I’d like to emphasize that Replication has to be On before the Bulk Copy begins. This ensures that there is some overlap in the two data sets which we resolve during the Merge phase of the RCDML process.

The objective is to copy a very Big Data set from a Source Database PROD to a target database TARGET. And the fundamental principles to follow are as follows:

  1. Consistent Data Set
  2. Low Impact on Source
  3. Transform only on Target
  4. One Hop Data Transfer
  5. Isolate Bulk Copy and Replication

Consistent Data Set

It’s imperative to extract a fully consistent data set from the source database. This will not only make the data validation phase easier, but actually possible. Because you need to be able to trust your extraction process. And a basic verification that the number of rows between source and target match is both simple and invaluable.

Low Impact on Source

The bulk copy tools are fully capable of taking 100% I/O bandwidth at the source database. For this reason it’s important to find the right balance of throughput and I/O utilization by throttling the bulk copy process.

I prefer to monitor response times directly at the source database storage subsystem. The goal here is to not let it go above accepted baseline such as 10ms. Which brings us to the second interesting point

I used to think that Bulk Copy belong to the ETL development team. Today, I think it really belongs to the DBA team. And it’s because DBAs are closer to the storage and database and can clearly see the impact the Bulk Copy tools have on these components.

Sure, we can have DBAs and Developers work together on this, but, in reality, unless they sit in the same room – it never works. There needs to be 100% ownership established for anything to be accomplished properly.

So instead of creating a finger pointing scenario it’s best to ensure that the fingers are always pointed back at us. If there is no one else to blame – stuff gets magically done!

Transform only on Target

It’s often necessary to transform the data during migration. After all, how often do you get a chance to touch every single row and fix the data model?

There are two places where this transformation can be accomplished – during extract or reload. I think it’s best to leave the extracted data set in exact same state as it was found in the source database. And instead – do all transformation during reload, directly on the target database.

Doing reload on the target database sets up an iterative development cycle of the transformation rules. And removes the need for a fresh source extract for each cycle. It will also make the data validation process possible because extracted data will mirror the source.

One Hop Data Transfer

Physically moving the Big Data is a monumental task. And for this reason the unload processshould deposit the data someplace where the reload process can read it directly. Setting up this One Hop Data Transfer is best through a Network Attached Storage (NAS) via a 10 Gigabit Ethernet network which is now widely available.

Isolate Bulk Copy and Replication

It’s very important to measure each data set independently using a simple count. And then, count the overlap between them. This will make the validation of the Merge process as simple as adding the two and subtracting the third (overlap).

Downtime

If the Replication is setup directly in the core data service, and if it’s utilizing a queue, we can pause it and begin the Merge process immediately without any downtime.

On the other hand, if the replication is a bolt-on process reading transactional logs from the primary database, then the only option is to shutdown and do the merge during a real outage.

That’s because the writes and reads in this case go directly into the primary database and not through a switched data service. And as such, require a hard-stop, configuration change to point to a different database. That is, if the objective is to switch to the target database following the merge.

Merge

First step is to get the counts of the Bulk Copy and the Replication rows and also the count of their overlap.

Next, it’s most often faster and easier to delete the overlap from the Bulk Copy set and then simply insert/append the Replication set into it.

Finally, a validation of the counts either gives the green light for the Go-Live or makes this run just another rehearsal.

Speaking of rehearsals – there should be at least three, with the final rehearsal one week within the Go-Live schedule.

Having the last rehearsal as close to production date as possible ensures that the data growth variance is accounted for in the final schedule. And it pins the migration process in the team’s collective memory.

Go Live

The process of going live on the new target database is directly related and dependent on the Replication options chosen earlier. We covered this topic in the Downtime phase.

Specifically, if the replication is queued, switched and built into the core data service, then going live is as simple as setting the new database as primary in the dispatch configuration. This also means there was no downtime during the merge.

On the other hand, if the replication is a bolt-on-read-transaction-logs type, then the Downtime is already active. And we need to update configuration files for each individual service pointing them to the new database.

Conclusion

In summary:

  • There are 5 phases of the RCDML Big Data migration process, and with careful planning it’s possible to make it work at any scale.
  • The two most critical components of any Big Data migration strategy are Replication and Bulk Copy, and giving the responsibility for these components to the right team can directly effect the Downtime and Go Live schedule.

If you found this article helpful and would like to receive more like it as soon as I release them make sure to sign up to my newsletter below:

SUBSCRIBE

January 31, 2016

Posted In: Operations

Tags: , ,

Breaking Oracle: Beyond 429496729 Rows

It’s Tue Nov 4 10:55 2014.  I finally get “4294967295 rows created” after 1 hour and 50 minutes has elapsed.  That’s (232 – 1) – the number Oracle engineers thought you’d never reach using the Data Pump tool.  But we reached and went beyond it a long time ago. In fact we are so far beyond it – we doubled that number and keep adding to it over 10 million daily.

And today we are decoupling our matching data store consisting of 8.9 billion matches from our customer facing database.  All this data is going into it’s own database cluster.  The end goal?  Tackle the “Big Data” problem and scale the matching microservices independently.

It’s Not What You Did, Son!

What we are doing is not ground breaking. Twitter did something similar.  They moved their “largest (and most painful to maintain) table – the statuses table, which contains all tweets and retweetsfrom MySQL to Cassandra back in 2010. Netflix moved their User’s Movie Queue from SimpleDB to Cassandra back in 2013 using a similar approach.

But as Viggo Tarasov said “It’s not what you did, son. It’s who you did it to”!  And in this case we are doing this migration on an Oracle Centric platform, but instead of relying on the “Big Iron” replication technologies we developed our own migration service and drafted a repeatable technique of dealing with the Big Data problem.

This technique combines batch and realtime processing to apply the same set of functions on the whole data set to produce a consistent result. You could even argue that it’s a similar pattern used in the Lambda Architecture.  But instead of a query being the end-result – we are after a whole data set conforming to the new data model.

What’s The Plan?

The migration plan we came up with has a 5-Step process:

eHarmony Matching Database Migration

eHarmony Matching Database Migration
  1. At T0 we turn on an event driven “Dual-Write” migration service to mirror the writes from our “legacy” database to the new database cluster. These writes go into a dedicated “empty” staging schema. If it’s an update on the source DB and no record exists in the staging schema then we create a new one. This migration service would be applying the same transformation rules as the “Backfill” process (see 3) and the two sets of data merge afterwords.
  2. At T1 we create a consistent “as-of” snapshot of our “legacy” database, mount it on a standalone node and extract the data to a shared storage.
  3. Start the “Backfill” batch process and load the data as of T1 into the new database cluster applying transformation rules, deduplication, partitioning and re-indexing at the same time. The “Backfill” data lands into it’s own dedicated schema which is separate from the “Dual-Write” schema (see 1).
  4. Once the “Backfill” batch process finishes – we briefly pause the “Dual-Write” migration service and merge it’s data with the “Backfill” schema. “Dual-Write” data overwrites any overlap with the “Backfill” data because it’s the lastest version from the “legacy” database.
  5. Finally, restart “Dual-Write” migration service and continue mirroring the writes to keep the two databases in sync.

Once the two databases are in sync we start to deploy new microservices on the new datastore. We then sample these microservices for a small cohort of users carefully watching for any failures and performance issues.

In the event of a failure we simply revert back to using the old service/datastore, fix the problem and repeat the process until our confidence level is high (all the while the Dual-Write migration service is keeping the two data stores in sync). And the obvious end result is switching all users to the new datastore.

What follows next is the peek into the tooling and instrumentation methods that got us through the steps 2–5.

Instrument or Else!

Last night we did steps 1 and 2 – and the historical data extract finished in 8 hours 8 minutes and 11 seconds – that’s 13 hours faster than the last rehearsal during which we had some problems with our shared storage system. We keep the prior stats for reference right in the “run-doc” comment section (I go into the “run-doc” structure a little further).

To create a consistent snapshot we placed the source database in a backup mode and took a storage level (SAN) snapshot of the underlying volumes, we then presented the snapshot to the staging database node and restored the database bringing it to a consistent state. At this point we started an extract process using Oracle’s Data Pump tool with 16 parallel workers dumping the data to a shared storage via NFS.

For any long running process (over few minutes long) we also snapshot the underlying system performance metrics using continuous monitoring we developed in-house – this allows us to get an early warning if the system is struggling with resources while running the job:

eHarmony Matching Database Monitoring

eHarmony Matching Database Monitoring

Without this instrumentation we’d be flying blind and would only know there is an issue after the prior rehearsal’s elapsed time has passed.

I Like to Copy/Paste

So far so good – we are ahead of schedule and I am now driving the “Backfill” process which consists of steps 8 through 16:

eHarmony Matching Database Migration Schedule

eHarmony Matching Database Migration Schedule

The “run-doc” is a series of copy/paste commands that call independent shell scripts – you could string them together in a master script or run them one at a time. Each individual script checks for completion errors and emails result on exit.

The run-doc limits the error-prone decision making process during production rollout and lets you focus on the task at hand even when you are half asleep. The last thing you want to be doing when tired is doubting yourself while figuring out the next step.

Keep Your Hands Off!

Part of the migration process is the transformation of the legacy data into a newly designed data model capable of absorbing the Big Data flow and conforming it to the new set of stringent constraints. You can do the transformation during extract or reload.

Our testing showed that applying these data transformation rules during the “Backfill/reload” process performed faster because our target database cluster had more CPU power than the standalone staging server where we extract the legacy data. It also helps when the original extract mirrors the legacy data just in case there are any questions during the data validation phase.

Needle In A Haystack!

Our new data model and indexing strategy revealed a problem – duplicates! Take 8.9 Billion Matches double them for indexing by flipping the USER1 and USER2 ids (you can now lookup by either ID) and what you get is a cross match duplicate in a rare case when two distinct matches were mistakenly made for the same pair of people:

Source to Target Row Mapping

Source to Target Row Mapping

I thought it was an easy problem to solve … and it was – during the first rehearsal, when I simply used a GROUP BY function to find the duplicates. But during our second rehearsal we must have hit a critical mass and the same exact query plan generated twice the amount of SORT – all on disk.

We ran out of TEMP space. I was getting nowhere – three runs each lasting over 40 minutes failing on space every time. I gave the TEMP 2TB of space, it consumed it and I threw in the towel.

Thankfully Oracle has a powerful set of Analytic functions that can deal with this problem more efficiently than a simple GROUP BY function ever could. Specifically the “Row_number() over (partition-clause order-by-clause)” that assigns a unique number for each row in the ordered data partition as defined by the partition-clause.

And when the Row_number() is applied to a data set with duplicates these dups resolve to a Row_number() > 1 and it’s easy to filter them with a simple where clause predicate. Running this filter on a full data set of 17.8 Billion Matches took only 50 minutes [Elapsed: 00:50:02.89].

Do You Have The Brakes?

After purging the duplicates we moved on to the indexing process using a custom developed package that takes a table name, number of concurrent workers and the ID of the database node on which to run the index rebuild as it’s input parameters. It spawns X-number of the index rebuild workers that read the queue table which contains a dynamically built list of table partitions to work on.

We can stop/start the index rebuild workers at any time and they’ll pick up where they left off. This capability was essential during the testing phase and allowed us to carefully adjust the index rebuild parameters by monitoring it’s effect on the database cluster and the storage sub-system.

Are We There Yet?

Thankfully it’s easy to answer because (1) the index rebuild workers keep a progress log (using autonomous transactions) (2) simply looking at the queue table:

Index Rebuild Progress

Index Rebuild Progress

This level of instrumentation tells us exactly what table partition we are at, the counts of finished vs pending and an average time it took to rebuild all indexes for each partition:

Per Partition Index Rebuild Stats

Per Partition Index Rebuild Stats

It’s also easy to tell when it’s all done by checking the autonomous transaction table log:

Autonomous Logging Table

Autonomous Logging Table

At the end we also get a per table summary with a total time spent on it:

Index Rebuild Summary Stats

Index Rebuild Summary Stats

Do You Know What You Are Doing?

Oracle optimizer doesn’t, unless you give it the table/index stats it needs to come up with the correct query plan. To get these stats you run a gather statistics process that can take a full day to complete.

This is not optimal during production migration so instead we gathered these stats ahead of time, exported them out and now we simply import them using DBMS_STATS.IMPORT_SCHEMA_STATS – all it takes is a few seconds. Just make sure to watch out for the auto-generated hash partition names [SYS_P%]. We had to write a small procedure to rename them to match the names in the statistics.

The Moment Of Truth

At this point the Backfill activities are over – data set is clean, partitioned and indexed. We briefly stop the Dual-Write migration service, take a count of rows:

  • Backfill Historical Matches: 8867335138
  • Dual-Write Online Matches: 23046176

then purge all overlapping Matches from the Historical data set by getting match_id from the Online data set and using nested loop via a hint [USE_NL] as it was the most efficient query plan for this operation during out testing phase.

As a side note we hit 30K IOPS during this operation with service times under a millisecond:

3PAR Hits 30K IOPS

3PAR Hits 30K IOPS

Next step of merging the two sets of data is a simple matter of doing direct path insert [INSERT APPEND PARALLEL] with 32 slaves and just letting Oracle manage index maintenance – direct path insert puts index partition in an unusable state during the data load but it also keeps track of it’s changes via UNDO segments and resolves the delta after the data load completes. It only took 1.5 minutes (01:30.13) to insert the 23046176 rows and do the index maintenance.

Last step is to simply flip the synonyms for the database access points that Online/Batch APIs hit and turn the system over for use. Mission accomplished!

Summary

In summary – our Big Data restructuring approach breaks up the data set into a historical “as-of” snapshoot and the live data stream. Historical data set is handled by a batch process and the live data by a migration service that gets events from a message broker (we used HornetQ). The two sets merge afterwords and the message broker handles the backlog of the live data stream during the merge.

If you found this article helpful and would like to receive more like it as soon as I release them make sure to sign up to my newsletter below:

SUBSCRIBE

January 15, 2016

Posted In: Operations

Tags: , ,