Category Archives: tungsten-replicator

Analytical Replication Performance from MySQL to Vertica on MemCloud

I’ve recently been trying to improve the performance of the Vertica replicator, particularly in the form of the of the new single schema replication. We’ve done a lot in the new Tungsten Replicator 5.3.0 release to improve (and ultimately support) the new single schema model.

As part of that, I’ve also been personally looking to Kodiak MemCloud as a deployment platform. The people at Kodiak have been really helpful (disclaimer: I’ve worked with some of them in the past). MemCloud is a high-performance cloud platform that is based on hardware with high speed (and volume) RAM, SSD and fast Ethernet connections. This means that even without any adjustment and tuning you’ve got a fast platform to work on.

However, if you are willing to put in some extra time, you can tune things further. Once you have a super quick environment, you find you can tweak and update the settings a little more because you have more options available.  Ultimately you can then make use of that faster environment to stretch things a little bit further. And that’s exactly what I did when trying to determine how quickly I could get data into Vertica from MySQL.

In fact, the first time I ran my high-load test suite on MemCloud infrastructure, replicating data from MySQL into Vertica, I made this comment to my friend at Kodiak:

The whole thing went so quick I thought it hadn’t executed at all!

In general, there are two things you want to test when using replication to move data from a transactional environment into an analytical one:

  • Latency of moving the data
  • Apply rate for moving the data

The two are subtly different. The first measures how long it takes to get the data from the source to the target. The second measures how much data you can move in a set period of time.

Depending on your deployment and application, either, or both, can be critical. For example, if you are using analytics to perform real-time analysis and charging on your data, the first one is the most important, because you want the info up to date as quickly as possible. If you performing log analysis or longer-term trends, the second is probably more important. You may not worry about being a few seconds, but you want many thousands of transactions to be transferred. I concentrated on the former rather than the latter, because latency in the batch applier is something you can control by setting the batch interval.

So what did I test?

At a basic level, I was replicating data from MySQL directly into Vertica. That is, extracting data from the MySQL binary log, and writing that into a table within HPE Vertica cluster using 3 nodes. Each is running in MemCloud, which each running with 64GB of RAM and 2TB of SSD disk space. I’ve deliberately made no configuration changes to Vertica at this point.

The first thing I did was set-up a basic replication pipeline between the two. Replication into Vertica works by batch-loading data through CSV files into Vertica tables and then ‘materialising’ the changes into the carbon copy tables. Because it’s done in batches, the latency is effectively governed by the batch apply settings, which were configured for 10,000 rows or 5 seconds.

To generate the load, I’ve written a script that generates 100,000 rows of random data, then updates about 70% of those randomly and deletes the other 30%. So each schema is generating about 200,000 rows of changes for each load. This is designed to test the specific batch replication scenario. Ultimately it does this across multiple schemas (same structure). I specifically use this because I match this with the replication to get replication (rather than transaction) statistics. I need to be able to effectively monitor the apply rate into Vertica from MySQL.

The first time I ran the process of just generating the data and inserting into MySQL, the command returned almost immediately. I seriously thought it had failed because I couldn’t believe I’d just inserted 200,000 rows into MySQL that quick. Furthermore, over on the Vertica side, I’m monitoring the application through the trepctl perf command, which provides the live output of the process. And for a second I see the blip as the data is replicated and then applied. I thought it was so quick, it was a single row (or even failed transaction) that caused the blip.

The first time I ran the tests, I got some good results with 20 simultaneous schemas:

  • 460,000 rows/minute from a single MySQL source into Vertica. 

Then I doubled up the source MySQL servers, so two servers, 40 simultaneous schemas, and ultimately writing in about 8 million rows:

  • 986,000 rows/minute into Vertica across 40 schemas from 2 sources

In both cases, the latency was between 3-7 seconds for each batch write (remember we are handling 10,000 rows or 5s per batch). We are also doing this across *different* schemas at this stage. These are not bad figures.

I did some further tweaks, this time, reconfiguring the batch writes to do larger blocks and larger intervals. This increases the potential latency (because there will be bigger gaps between writes into Vertica), but increases the overall row-apply performance. Now we are handling 100,000 rows or 10s intervals. The result? A small bump for a single source server:

  • 710,000 rows/minute into Vertica across 20 schemas from 1 source

Latency has increased though, with us topping out at around 11.5s for the write when performing the very big blocks. Remember this is single-source, and so I know that the potential is there to basically double that with a second MySQL source since the scaling seems almost linear.

Now I wanted to move on to test a specific scenario I added into the applier, which is the ability to replicate from multiple source schemas into a single target schema. Each source is identical, and to ensure that the ‘materialise’ step works correctly, and that we can still analyse the data, a filter is inserted into the replication that adds the source schema to each row.

The sample data inserts look like this:

insert into msg values (0,"RWSAjXaQEtCf8nf5xhQqbeta");
insert into msg values (0,"4kSmbikgaeJfoZ6gLnkNbeta");
insert into msg values (0,"YSG4yeG1RI6oDW0ohG6xbeta");

With the filter, what gets inserted is;

0, "RWSAjXaQEtCf8nf5xhQqbeta", "sales1"
0,"4kSmbikgaeJfoZ6gLnkNbeta", "sales1"

Etc, where ‘sales1’ is the source schema, added as an extra column to each row.

This introduces two things we need to handle on the Vertica side:

  1. We now have to merge taking into account the source schema (since the ID column of the data could be the same across multiple schemas). For example, whereas before we did ‘DELETE WHERE ID IN (xxxx)’, and now we have to do ‘DELETE WHERE ID IN (xxxx) AND dbname = ‘sales1”.
  2. It increases the contention ratio on the Vertica table because we now effectively write into only one table. This increases the locks and the extents processed by Vertica.

The effect of this change is that the overall apply rate slows down slightly due to the increased contention on a single table. Same tests, 20 schemas from one MySQL source database and we get the following:

  • 760,000 rows/minute into Vertica with a single target table

This is actually not as bad as I was expecting when you consider that we are modifying every row of incoming data, and are no longer able to multi-thread the apply.

I then tried increasing that using the two sources and 40 schemas into the same single table. Initially, the performance was no longer linear, and I failed to get any improvement beyond about 10% above that 760K/min figure.

Now it was time to tune other things. First of all, I changed some of the properties on the Vertica side in terms of the queries I was running, tweaking the selecting and query parameters for the DELETE operations. For batch loading, what we do is DELETE and then INSERT, or, in some case, DELETE and UPDATE if you’ve configured it that way. Tweaking the subquery that is being used increased the performance a little.

Changing the projections used also increased the performance of the single schema apply. But the biggest gains, perhaps unsurprisingly, were to change the way the tables were defined in the first place and to use partitions in the table definition. To do this, I modified the original filter that was adding the schema name, and instead had it add the schema hash, a unique Java ID for the string. Then I created the staging and base tables in Vertica using the integer hash as the partition. Then I modified the queries to ensure that the partitions would be used effectively.

The result was that the 760k/min rate was now scalable. I couldn’t get any faster when writing into a single schema, but the rate remains relatively constant whether I am replicating five schemas into a single one, or 40 schemas from two or three sources into the same. In fact, it does ultimately start to dip slightly as you add more source schemas.

Even better, the changes I’d made to the queries and the overall Vertica batch applier also improved the speed of the standard (i.e. multi-schema) applier. I also added partitions to the ID field for too to improve the general apply rate. After testing for a couple days, the average rate:

  • 1,900,000 rows/minute from a single MySQL source into Vertica.

This was also scalable. Five MySQL sources elicited a rate of 8.8 million rows/minute into Vertica, making the applier rate linear with a 1% penalty for each additional MySQL source. The latency stayed the same, hovering around the same level as before of around 11s for most of the time. Occasionally you’d get a spike because Vertica was having trouble keeping up, but

Essentially, we are replicating data from MySQL into Vertica for analytics at a rate I simply called ‘outstandingly staggering’. I still do.

The new single schema applier, database name filter (rowadddbname) and performance improvements are all incorporated into the new Tungsten Replicator.

Keynote and Session at Percona Live Dublin 2017

On Sunday I will travel over to Dublin for Percona Live 2017.

I have two sessions, a keynote on the Wednesday morning where I’ll be talking about all the fun new stuff we have planned at Continuent and some new directions we’re working on.

I also have a more detailed session on our new appliers for Kafka, Elasticsearch and Cassandra, that’s Tuesday morning.

MCBrown-Keynote.jpg

If you haven’t already booked to come along, feel free to use the discount code SeeMeSpeakPLE17 which will get you 15% off!

Upcoming Webinar, 19th July, What is New in Tungsten Replicator 5.2 and Tungsten Clustering 5.2?

Continuent Tungsten 5.2 is just around the corner. This is one of our most exciting Tungsten product releases for some time!

In this webinar we’re going to have a look at a host of new features in the new release, including
Three new Replication Applier Targets (Kafka, Cassandra and Elasticsearch)
New improvements to our core command-line tools trepctl and thl
New foundations for our filtering services, and
Improvements to the compatibility between replication and clustering

This webinar is going be a packed session and we’ll show all the exciting stuff with more in-depth follow-up sessions in the coming weeks.

 

You’ll also learn about some more exciting changes coming in the upcoming Tungsten releases (5.2.1 and 5.3), and our major Tungsten 6.0 release due out by the end of the year.

So come and join us to get the low down on everything related to Tungsten Replicator 5.2 and Tungsten Clustering 5.2. on Wednesday, July 19, 2017 9:00 AM – 9:30 AM PDT at https://attendee.gotowebinar. com/register/ 4108437731342545667

New Continuent Webinar Wednesdays and Training Tuesdays

We are just starting to get into the swing of setting up our new training and webinar schedule.
Initially, there will be one Webinar session (typically on a Wednesday) and one training session (on a Tuesday) every week from now. We’ll be covering a variety of different topics at each.
Typically our webinars will be about products and features, comparisons to other products, mixed in with product news (new releases, new features) and individual sessions based on what is going on at Continuent and the market in general.
Our training, by comparison, is going to be a hands-on, step-by-step sequence covering all of the different areas of our product. So we’ll cover everything from the basics of how the products work, how to deploy them, typical functionality (switching, start/stop, etc), and troubleshooting.
All of the sessions are going to be recorded and we’ll produce a suitable archive page so that you can go and view the past sessions. Need a refresher on re-provisioning a node in your cluster? There’s going to be a video for it and documentation to back it up.
Our first webinar is actually next Thursday (the Wednesday rule wouldn’t be a good one without an exception) and is all about MySQL Multi-Site/Multi-Master Done Right:
In this webinar, we discuss what makes Tungsten Clustering better than other alternatives (AWS RDS, Galera, MySQL InnoDB Cluster, and XtraDBCluster), especially for geographically distributed multi-site deployments, both for disaster recovery and multi-site/multi-master needs.
If you want to attend, please go ahead and register using this link: http://go.continuent.com/n0JI04Q03EAV0RD0i000Vo4
Keep your eyes peeled for the other upcoming sessions. More details soon.

Extending the Tungsten Replicator Core JS Filter Functionality

Tungsten Replicator has a really cool feature in that we can filter data as it goes past on the wire.
The replicator itself is written entirely in Java and writing filters for it is not as straightforward as it looks, which is why the much better feature is just to use the JavaScript mechanism and write filters using that tool instead. I’ll save the details for how you can write filters to process and massage data for another time, but right now I wanted to find a good way of improving that JavaScript environment.
There are filters, for example, where I want to be able to load JSON option and configuration files, or write out JSON versions of information, and plenty more.
Mozilla’s Rhino JS environment is what is used to provide the internal JS environment for running filters. The way this is supported is that rather than creating a Rhino JS environment that can do whatever it wants, instead, we create a JS instance specifically for executing the required functions within the filter. One of these instances is created for each filter that is configured in the system (and each batch instance too).
The reason we do this is because for each filter, we want each transaction event that appears in the THL log to get executed through the JS instance where the filter() function in the JS filter is executed with a single argument, the event data.
The limitation of this model is that we dont get the full Rhino environment because we execute the JS function directly, so certain top level items and functions like load() or require(), or utilities like JSON.stringify() are not available. We could do that by changing the way we do the configuration, but that could start to get messy quickly, while also complicating the security aspects of how we execute these components.
There are some messy ways in which we could get round this, but in the end, because I also wanted to add some general functionality into the filters system shared across all JS instances, I chose instead to just load a set of utility functions, written in JavaScript, into the JS instance for the filter. The wonderful thing about JS is that we can write all of the functions in JS, even for classes, methods and functions that aren’t provided elsewhere.
So I chose the path of least resistance, which means loading and executing a core JS file before loading and executing the main filter JS so that. We can place into that JS file all of the utility functions we want to be available to all of the filters.
So, to enable this the first thing we do is update the core Java code when we load the filter JS to load our core utility JS first. That occurs in replicator/src/java/com/continuent/tungsten/replicator/filter/JavaScriptFilter.java, within the prepare() function which is where we instantiate the JS environment based on the code.
String coreutilssrc = properties.getString("replicator.filter.coreutils");

// Import the standard JS utility script first
try
 {
 // Read and compile the core script functions
 BufferedReader inbase = new BufferedReader(new FileReader(coreutilssrc));
 script = jsContext.compileReader(inbase, scriptFile, 0, null);
 inbase.close();

 script.exec(jsContext, scope);
 }
catch (IOException e)
 {
 throw new ReplicatorException("Core utility library file not found: "
 + coreutilssrc, e);
 }
catch (EvaluatorException e)
 {
 throw new ReplicatorException(e);
 }
This is really straightforward, we obtain the path to the core utilities script from the configuration file (we’ll look at how we define that later), and then compile that within the jsContext object, where our JavaScript is being executed. We add some sensible error checking, but otherwise this is simple.
It’s important to note that this is designed to load that core file *before* the main filter file just in case we want to use anything in there.
Next, that configuration line, we can add into a default config by creating a suitable ‘template’ file for tpm, which we do by creating the file replicator/samples/conf/filters/default/coreutils.tpl. I’ve put it into the filters section because it only applies to filter environments.
The content is simple, it’s the line with the location of our core utility script:
# Defines the core utility script location
replicator.filter.coreutils=${replicator.home.dir}/support/filters-javascript/coreutils.js

And lastly, we need the script itself, replicator/support/filters-javascript/coreutils.js :
// Core utility JavaScript and functions for use in filters
//
// Author: MC Brown (9af05337@opayq.com)


// Simulate the load() function to additional external JS scripts

function load(filename) {
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(filename)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    eval(sb);
}

// Read a file and evaluate it as JSON, returning the evaluated portion

function readJSONFile(path)
{
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(path)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    jsonval = eval("(" + sb + ")");

    return jsonval;
}

// Class for reoncstituing objects into JSON

JSON = {
    parse: function(sJSON) { return eval('(' + sJSON + ')'); },
    stringify: (function () {
      var toString = Object.prototype.toString;
      var isArray = Array.isArray || function (a) { return toString.call(a) === '[object Array]'; };
      var escMap = {'"': '\"', '\': '\\', 'b': '\b', 'f': '\f', 'n': '\n', 'r': '\r', 't': '\t'};
      return function stringify(value) {
        if (value == null) {
          return 'null';
        } else if (typeof value === 'number') {
          return isFinite(value) ? value.toString() : 'null';
        } else if (typeof value === 'boolean') {
          return value.toString();
        } else if (typeof value === 'object') {
          if (typeof value.toJSON === 'function') {
            return stringify(value.toJSON());
          } else if (isArray(value)) {
            var res = '[';
            for (var i = 0; i < value.length; i++)
              res += (i ? ', ' : '') + stringify(value[i]);
            return res + ']';
          } else if (toString.call(value) === '[object Object]') {
            var tmp = [];
            for (var k in value) {
              if (value.hasOwnProperty(k))
                tmp.push(stringify(k) + ': ' + stringify(value[k]));
            }
            return '{' + tmp.join(', ') + '}';
          }
        }
        return '"' + value.toString() + '"';
      };
    })()
  };

For the purposes of validating my process, there are three functions:
  • load() – which loads an external JS file and executes it, so that we can load other JS scripts and libraries.
  • readJSONFile() – which loads a JSON file and returns it as a JSON object.
  • JSON class – which does two things, one is provides  JSON.parse() method for parsing strings as JSON objects into JS objects and the other is JSON.stringify() which will turn a JS object back into JSON
Putting all of this together gives you a replicator where we now have some useful functions to make writing JavaScript filters easier. I’ve pushed all of this up into my fork of the Tungsten Replicator code here: https://github.com/mcmcslp/tungsten-replicator/tree/jsfilter-enhance
Now, one final note. Because of the way load() works, in terms of running an eval() on the code to import it, it does mean that there is one final step to make functions useful. To explain what I mean, let’s say you’ve written a new JS filter using the above version of the replicator.
In your filter you include the line:
load("/opt/continuent/share/myreallyusefulfunctions.js");
Within that file, you define a function called runme():
function runme()
{
     logger.info("I'm a bit of text");
}

Now within myreallyusefulfunctions.js I can call that function fine:
runme();
But from within the JS filter, runme() will raise an unknown function error. The reason is that we eval()‘d the source file within the load() function, and so it’s context is wrong.
We can fix that within myreallyusefulfunctions.js by exporting the name explicitly:
if (runme.name) this[runme.name] = runme;
This points the parent namespace to the runme() in this context, and we put that at the end of myreallyusefulfunctions.js script and everything is fine.
I’m lazy, and I haven’t written a convenient function for it, but I will in a future blog.
Now we’ve got this far, let’s start building some useful JS functions and functionality to make it all work nicely…

Extending the Tungsten Replicator Core JS Filter Functionality

Tungsten Replicator has a really cool feature in that we can filter data as it goes past on the wire.

The replicator itself is written entirely in Java and writing filters for it is not as straightforward as it looks, which is why the much better feature is just to use the JavaScript mechanism and write filters using that tool instead. I’ll save the details for how you can write filters to process and massage data for another time, but right now I wanted to find a good way of improving that JavaScript environment.

There are filters, for example, where I want to be able to load JSON option and configuration files, or write out JSON versions of information, and plenty more.

Mozilla’s Rhino JS environment is what is used to provide the internal JS environment for running filters. The way this is supported is that rather than creating a Rhino JS environment that can do whatever it wants, instead, we create a JS instance specifically for executing the required functions within the filter. One of these instances is created for each filter that is configured in the system (and each batch instance too).

The reason we do this is because for each filter, we want each transaction event that appears in the THL log to get executed through the JS instance where the filter() function in the JS filter is executed with a single argument, the event data.

The limitation of this model is that we dont get the full Rhino environment because we execute the JS function directly, so certain top level items and functions like load() or require(), or utilities like JSON.stringify() are not available. We could do that by changing the way we do the configuration, but that could start to get messy quickly, while also complicating the security aspects of how we execute these components.

There are some messy ways in which we could get round this, but in the end, because I also wanted to add some general functionality into the filters system shared across all JS instances, I chose instead to just load a set of utility functions, written in JavaScript, into the JS instance for the filter. The wonderful thing about JS is that we can write all of the functions in JS, even for classes, methods and functions that aren’t provided elsewhere.

So I chose the path of least resistance, which means loading and executing a core JS file before loading and executing the main filter JS so that. We can place into that JS file all of the utility functions we want to be available to all of the filters.

So, to enable this the first thing we do is update the core Java code when we load the filter JS to load our core utility JS first. That occurs in replicator/src/java/com/continuent/tungsten/replicator/filter/JavaScriptFilter.java, within the prepare() function which is where we instantiate the JS environment based on the code.

String coreutilssrc = properties.getString(“replicator.filter.coreutils”);

// Import the standard JS utility script first
try
 {
 // Read and compile the core script functions
 BufferedReader inbase = new BufferedReader(new FileReader(coreutilssrc));
 script = jsContext.compileReader(inbase, scriptFile, 0, null);
 inbase.close();

 script.exec(jsContext, scope);
 }
catch (IOException e)
 {
 throw new ReplicatorException("Core utility library file not found: "
 + coreutilssrc, e);
 }
catch (EvaluatorException e)
 {
 throw new ReplicatorException(e);
 }

This is really straightforward, we obtain the path to the core utilities script from the configuration file (we’ll look at how we define that later), and then compile that within the jsContext object, where our JavaScript is being executed. We add some sensible error checking, but otherwise this is simple.

It’s important to note that this is designed to load that core file *before* the main filter file just in case we want to use anything in there.

Next, that configuration line, we can add into a default config by creating a suitable ‘template’ file for tpm, which we do by creating the file replicator/samples/conf/filters/default/coreutils.tpl. I’ve put it into the filters section because it only applies to filter environments.

The content is simple, it’s the line with the location of our core utility script:

# Defines the core utility script location
replicator.filter.coreutils=${replicator.home.dir}/support/filters-javascript/coreutils.js

And lastly, we need the script itself, replicator/support/filters-javascript/coreutils.js :

// Core utility JavaScript and functions for use in filters
//
// Author: MC Brown (9af05337@opayq.com)


// Simulate the load() function to additional external JS scripts

function load(filename) {
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(filename)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    eval(sb);
}

// Read a file and evaluate it as JSON, returning the evaluated portion

function readJSONFile(path)
{
    var file = new java.io.BufferedReader(new java.io.FileReader(new java.io.File(path)));

    var sb = "";
    while((line = file.readLine()) != null)
        {
            sb = sb + line + java.lang.System.getProperty("line.separator");
        }

    jsonval = eval("(" + sb + ")");

    return jsonval;
}

// Class for reoncstituing objects into JSON

JSON = {
    parse: function(sJSON) { return eval('(' + sJSON + ')'); },
    stringify: (function () {
      var toString = Object.prototype.toString;
      var isArray = Array.isArray || function (a) { return toString.call(a) === '[object Array]'; };
      var escMap = {'"': '\\"', '\\': '\\\\', '\b': '\\b', '\f': '\\f', '\n': '\\n', '\r': '\\r', '\t': '\\t'};
      return function stringify(value) {
        if (value == null) {
          return 'null';
        } else if (typeof value === 'number') {
          return isFinite(value) ? value.toString() : 'null';
        } else if (typeof value === 'boolean') {
          return value.toString();
        } else if (typeof value === 'object') {
          if (typeof value.toJSON === 'function') {
            return stringify(value.toJSON());
          } else if (isArray(value)) {
            var res = '[';
            for (var i = 0; i < value.length; i++)
              res += (i ? ', ' : '') + stringify(value[i]);
            return res + ']';
          } else if (toString.call(value) === '[object Object]') {
            var tmp = [];
            for (var k in value) {
              if (value.hasOwnProperty(k))
                tmp.push(stringify(k) + ': ' + stringify(value[k]));
            }
            return '{' + tmp.join(', ') + '}';
          }
        }
        return '"' + value.toString() + '"';
      };
    })()
  };

For the purposes of validating my process, there are three functions:

  • load() – which loads an external JS file and executes it, so that we can load other JS scripts and libraries.
  • readJSONFile() – which loads a JSON file and returns it as a JSON object.
  • JSON class – which does two things, one is provides  JSON.parse() method for parsing strings as JSON objects into JS objects and the other is JSON.stringify() which will turn a JS object back into JSON

Putting all of this together gives you a replicator where we now have some useful functions to make writing JavaScript filters easier. I’ve pushed all of this up into my fork of the Tungsten Replicator code here: https://github.com/mcmcslp/tungsten-replicator/tree/jsfilter-enhance

Now, one final note. Because of the way load() works, in terms of running an eval() on the code to import it, it does mean that there is one final step to make functions useful. To explain what I mean, let’s say you’ve written a new JS filter using the above version of the replicator.

In your filter you include the line:

load("/opt/continuent/share/myreallyusefulfunctions.js");

Within that file, you define a function called runme():

function runme()
{
     logger.info("I'm a bit of text");
}

Now within myreallyusefulfunctions.js I can call that function fine:

runme();

But from within the JS filter, runme() will raise an unknown function error. The reason is that we eval()‘d the source file within the load() function, and so it’s context is wrong.

We can fix that within myreallyusefulfunctions.js by exporting the name explicitly:

if (runme.name) this[runme.name] = runme;

This points the parent namespace to the runme() in this context, and we put that at the end of myreallyusefulfunctions.js script and everything is fine.

I’m lazy, and I haven’t written a convenient function for it, but I will in a future blog.

Now we’ve got this far, let’s start building some useful JS functions and functionality to make it all work nicely…


Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.


Tungsten Replicator 3.0 is Cloudera Enterprise 5 Certified

One of the key platforms I’ve been testing on for the MySQL to Hadoop replication has been Cloudera, largely driven by customer requirements, but it’s also one of the easiest way to get started with Hadoop.

logo_cloudera_certified

What I’m even more pleased about is the fact that we are proud to announce that Tungsten Replicator 3.0 is certified for use on the new Cloudera Enterprise 5 platform. That means that we’re sure that replicating your data from MySQL to Cloudera 5 and have it work without causing problems or difficulties on the Hadoop loading and materialisation.

Cloudera is a great product, and we’re very happy to be working so effectively with the new Cloudera Enterprise 5. Cloudera certainly makes the core operation of managing and monitoring your Hadoop cluster so much easier, while still providing core functionality from the Hadoop family like Hive, HBase and Impala.

What I’m really interested in is the support for Spark, which will allow much easier live-querying and access to data.  That should make some data processing and live data views much easier to build and query further down the line.


Continuent Replication to Hadoop – Now in Stereo!

Hopefully by now you have already seen that we are working on Hadoop replication. I’m happy to say that it is going really well. I’ve managed to push a few terabytes of data and different data sets through into Hadoop on Cloudera, HortonWorks, and Amazon’s Elastic MapReduce (EMR). For those who have been following my long association with the IBM InfoSphere BigInsights Hadoop product, and I’m pleased to say that it’s working there too. I’ve had to adapt Robert’s original script to work with the different versions of the underlying Hadoop tools and systems to make it compatible. The actual performance and process is unchanged; you just use a different JS-based batchloader script to work with different tools.

Robert has also been simplifying some of the core functionality, such as configuring some fixed pre-determined formats, so you no longer have to explicitly set the field and record separators.

I’ve also been testing the key feature of being able to integrate the provisiong of information using Sqoop and merging that original Sqooped data into Hadoop, and then following up with the change data that the replicator is effectively transferring over. The system works exactly as I’ve just described – start the replicator, Sqoop the data, materialise the view within Hadoop. It’s that easy; in fact, if you want a deeper demonstration of all of these features, we’ve got a video from my recent webinar session:

Real Time Data Loading from MySQL to Hadoop with New Tungsten Replicator 3.0

If you can’t spare the time, but still want to know about our Hadoop applier, try our short 5-minute video:

Real-time data loading into Hadoop with Tungsten Replicator

While you’re there, check out the Clustering video I did at the same time:

Continuent Tungsten Clustering

And of course, don’t forget that you can see the product and demos live by attending Percona Live in Santa Clara this week (1st-4th April).


Real-Time Replication from MySQL to Cassandra

Earlier this month I blogged about our new Hadoop applier, I published the docs for that this week (http://docs.continuent.com/tungsten-replicator-3.0/deployment-hadoop.html) as part of the Tungsten Replicator 3.0 documentation (http://docs.continuent.com/tungsten-replicator-3.0/index.html). It contains some additional interesting nuggets that will appear in future blog posts.

The main part of that functionality that performs the actual applier for Hadoop is based around a JavaScript applier engine – there will eventually be docs for that as part of the Batch Applier content (http://docs.continuent.com/tungsten-replicator-3.0/deployment-batchloading.html). The core of this system is that it    takes the information from the data stream of the THL and the CSV file that was written by the batch applier system, and runs the commands necessary to load it into Hadoop and perform any necessary merges.

I wanted to see how easy it would be to use the same system to use that same flexible system and bend it to another database system, in my case, I chose Cassandra.

For the record, it took me a couple of hours to have this working, and I’m guessing another hour will file down some of the rough edges.

Cassandra is interesting as a database because it mixes a big distributed key/value store with a close-enough to SQL like interface in the form of CQL. And that means we can make use of the CQL to help us perform the merging into the final tables in a manner not dissimilar to the method we use for loading into Vertica.

Back to the Javascript batch loader, the applier provides five different implementable functions (all are technically optional) that you can use at different stages of the applier process. These are:

  • prepare() – called once when the applier goes online and can be used to create temporary directories or spaces
  • begin() – called at the start of each transaction
  • apply() – called at the end of the transaction once the data file has been written, but before the commit
  • commit() – called after each transaction commit has taken place; this where we can consolidate info.
  • release() – called when the applier goes offline

We can actually align these functions with a typical transaction – prepare() happens before the statements even start, begin() is the same as BEGIN, apply() happens immediately before COMMIT and commit() happens just after. release() can be used to do any clean up afterwards.

So let’s put this into practice and use it for Cassandra.

The basic process for loading is as follows:

  1. Write a CSV file to load into Cassandra
  2. Load the CSV file into a staging table within Cassandra; this is easy through CQL using the ‘COPY tablename FROM filename’ CQL statement.
  3. Merge the staging table data with a live table to create a carbon copy of our MySQL table content.

For the loading portion, what we’ll do is load the CSV into a staging table, and then we’ll merge the staging table and live table data together during the commit stage of our batch applier. We’ll return to this in more detail.

For the merging, we’ll take the information from the staging table, which includes the sequence number and operation type, and then write the ‘latest’ version of that row and put it into the live table. That gives us a structure like this:

Cassandra Loader

Tungsten Replicator is going to manage this entire process for us – all we need to do ins install the replicators, plug in these custom bits, and let it run.

As with the Hadoop applier, what we’re going to do is use the batch applier to generate only insert and delete rows; UPDATE statements will be converted into a delete of the original version and insert of the new version. So:

INSERT INTO sample VALUES (1,’Message’)

Is an insert…

DELETE sample WHERE id  = 1

Is a delete, and:

UPDATE sample SET message = ’Now you see me’ WHERE id = 1

is actually:

DELETE sample WHERE id  = 1
 INSERT INTO sample VALUES (1,’Now you see me’)

This gets round the problem of doing updates (which in big data stores are expensive, particularly Hadoop which doesn’t support updating existing data), into a more efficient delete and insert.

In the CSV data itself, this is represented by prefix every row with three fields:

optype, sequence number, unique id

Optype is ‘D’ for a delete and ‘I’ for an insert and is used to identify what needs to be done. The sequence number is the unique transaction ID from the replicator THL. This number increases by one for every transaction, and this means we can always identify the ‘latest’ version of a row, which is important to us when processing the transaction into Cassandra. the unique ID is the primary key (or compound key) from the source data. We need this to ensure we update the right row. To replicate data in this way, we must have a primary key on the data. If you don’t have primary keys, you are probably in a world of hurt anyway, so it shouldn’t be a stretch.

One difficulty here is that we need to cope with an idiosyncracy of Cassandra, which is that by default, Cassandra orders fields in the ‘tables’ (really collections of key/values) so that integers and numbers appear first in the table, and text appears last. This is an optimisation that Cassandra makes that complicates things for us, but only in a very small way. For the moment, we’ll handle it by assuming that we are loading only one table with a known format into Cassandra. We could handle multiple tables by using a simple IF statement in the JS and using different formats for that, or we could actually extract the info from the incoming data; I’m going to skip that because it keeps us away from the cool element of actually getting the data in.

Within Cassandra then we have two tables, the table we are loading data into, and the staging table that we load the CSV data into. For our sample, the live schema is ‘sample', the live table is ‘sample’ and the staging table is ‘staging_sample’.

The definitions for these in Cassandra are for the sample live table:

 CREATE TABLE sample (
 id int,
 message text,
 PRIMARY KEY (id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

And for the staging_sample table:

CREATE TABLE staging_sample (
 optype text,
 seqno int,
 fragno int,
 id int,
 message text,
 PRIMARY KEY (optype, seqno, fragno, id)
 ) WITH
 bloom_filter_fp_chance=0.010000 AND
 caching='KEYS_ONLY' AND
 comment='' AND
 dclocal_read_repair_chance=0.000000 AND
 gc_grace_seconds=864000 AND
 index_interval=128 AND
 read_repair_chance=0.100000 AND
 replicate_on_write='true' AND
 populate_io_cache_on_flush='false' AND
 default_time_to_live=0 AND
 speculative_retry='99.0PERCENTILE' AND
 memtable_flush_period_in_ms=0 AND
 compaction={'class': 'SizeTieredCompactionStrategy'} AND
 compression={'sstable_compression': 'LZ4Compressor'};

I’ve put both tables into a ‘sample’ collection.

Remember that that idiosyncrasy I mentioned? Here it is, a bare table loading from CSV will actually order the data as:

seqno,uniqno,id,optype,message

This is Cassandra’s way of optimising integers over text to speed up lookups, but for us is a minor niggle. Right now, I’m going to handle it by assuming we are replicating only one schema/table and we we not what the structure of that looks like. Longer term, I want to pull it out of the metadata, but that’s a refinement.

So let’s start by having a look at the basic JS loader script, it’s really the component that is going to handle the core element of the work, managing the CSV files that come in from the batch engine and applying them into Cassandra. Remember, there are five functions that we can define, but for the purposes of this demonstration we’re going to use only two of them, apply(), which will load the CSV file into Cassandra, and the commit() function, which will perform the steps to merge the stage data.

The apply() function does two things, it identifies the table and schema, and then runs the command to load this data into Cassandra through the cqlsh command-line tool. We actually can’t run CQL directly from this command line, but I wrote a quick shell script that pipes CQL from the command-line into a running cqlsh.

The commit() function on the other hand is simpler, although it does a much more complicated job using another external script, this time written in Ruby.

So this gives us a cassandra.js script for the batch applier that looks like this:

function apply(csvinfo)
{
   sqlParams = csvinfo.getSqlParameters();
   csv_file = sqlParams.get("%%CSV_FILE%%");
   schema = csvinfo.schema;
   table = csvinfo.table;
  runtime.exec("/opt/continuent/share/applycqlsh.sh " + schema + ' "copy staging_' + table + " (optype,seqno,uniqno,id,message) from '" + csv_file + "';\"");
}

function commit()
{
  runtime.exec("/opt/continuent/share/merge.rb " + schema);
}

So, the apply() function is called for each event as written into the THL from the MySQL binary log, and the content of the CSV file generated at that point contains the contents of the THL event; if it’s one row, it’s a one-row CSV file; if it’s a statement or transaction that created 2000 rows, it’s a 2000 row CSV file.

The csvinfo object that is provided contains information about the batch file that is written, including, as you can see here, the schema and table names, and the sequence number. Note that we could, at this point, pull out table info, but we’re going to concentrate on pulling a single table here just for demo purposes.

The CQL for loading the CSV data is:

COPY staging_tablename (optype,seqno,uniqno,id,message) from ‘FILENAME’;

This says, copy the the specific columns in this order from the file into the specified table.  As I mentioned, currently this is hard coded into the applier JS, but would be easy to handle for more complex schemas and structures.

The commit() function is even simpler, because it just calls a script that will do the merging for us - we’ll get to that in a minute.

So here’s the script that applies an arbitrary CQL statement into Cassandra:

 #!/bin/bash
SCHEMA=$1;shift
echo "$*" |cqlsh -k $SCHEMA tr-cassandra2

Really simple, but gets round a simple issue.

The script that does the merge work is more complex; in other environments we might be able to do this all within SQL, but CQL is fairly limited with no sub-queries. So we do it long-hand using Ruby. The basic sequence is quite simple, and is in two phases:

  1. Delete every row mentioned in the staging table with an optype of D with a matching unique key
  2. Insert the *last* version of an insert for each unique ID - the last version will be the latest one in the output. We can pick this out by just iterating over every insert and picking the one with the highest Sequence number as generated by the THL transaction ID.
  3. Delete the content from the staging table because we’ve finished with it. That empties the staging table ready for the next set of transactions.

That file looks like this:

#!/usr/bin/ruby

require 'cql'

client = Cql::Client.connect(hosts: ['192.168.1.51'])
client.use('sample')

rows = client.execute("SELECT id FROM staging_sample where optype = 'D'")

deleteids = Array.new()

rows.each do |row|
puts "Found ID #{row['id']} has to be deleted"
deleteids.push(row['id'])
end

deleteidlist = deleteids.join(",")

client.execute("delete from sample where id in (#{deleteidlist})");
puts("delete from sample where id in (#{deleteidlist})");
rows = client.execute("SELECT * FROM staging_sample where optype = 'I'");

updateids = Hash.new()
updatedata = Hash.new()

rows.each do |row|
id = row['id']
puts "Found ID #{id} seq #{row['seqno']} has to be inserted"
if updateids[id]
if updateids[id] < row['seqno']
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
else
updateids[id] = row['seqno']
row.delete('seqno')
row.delete('fragno')
row.delete('optype')
updatedata[id] = row
end
end

updatedata.each do |rowid,rowdata|
puts "Should update #{rowdata['id']} with #{rowdata['message']}"
collist = rowdata.keys.join(',')
colcount = rowdata.keys.length
substbase = Array.new()
#  (1..colcount).each {substbase.push('?')}
rowdata.values.each do |value|
if value.is_a?(String)
substbase.push("'" + value.to_s + "'")
else
substbase.push(value)
end
end

substlist = substbase.join(',')

puts('Column list: ',collist)
puts('Subst list: ',substlist)
cqlinsert = "insert into sample ("+collist+") values ("+substlist+")"
puts("Statement: " + cqlinsert)
client.execute(cqlinsert)
end

client.execute("delete from staging_sample where optype in ('D','I')")

Again, currently, this is hard coded, but I could easily of got the schema/table name from the JS batch applier - the actual code is table agnostic and will work with any table.

So, I’ve setup two replicators - one uses the cassandra.js rather than hadoop.js but works the same way, and copied the applycqlsh.sh and merge.rb into /opt/continuent/share.

And we’re ready to run. Let’s try it:

mysql> insert into sample values (0,'First Message’);
Query OK, 1 row affected (0.01 sec)

We’ve inserted one row. Let’s go check Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+---------------
489 | First Message

Woohoo - data from MySQL straight into Cassandra.

Now let’s try updating it:

mysql> update sample set message = 'Updated Message' where id = 489;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

And in Cassandra:

cqlsh:sample> select * from sample;

id  | message
-----+-----------------
489 | Updated Message

Bigger woohoo. Not only am I loading data directly into Cassandra, but I can update it as well. Now I can have a stream of update and information within MySQL replicated over to Cassandra for whatever analysis or information that I need without any issues.

Cool huh? I certainly think so (OK, but I’m biased).

Now I haven’t tested it, but this should just as easily work from Oracle; I’ll be testing that and let you know.

Any other database destinations people would like to see for replicating into? If so, let me know and I’ll see what I can do.