Distributed Joins, Process Skew

This post will build on concepts introduced in the Distributed Joins, The Basics and Distributed Joins, Modeling for Colocation posts.  I’m assuming some familiarity with table skew, where a distribution key is chosen where some key values has significantly more rows than average, so that one spu or disk belonging to a snippet processor has more data than the average.  And I talked about distributed joins joining 2 large tables needing to have them distributed or redistribute them on some or all of the join keys.  But what is not always obvious is that just because a nice even distribution was achieve for the table, a join may cause that table to redistribute in temp space on join keys that do NOT have even distribution. 

I’ll start with our simple Customer and Order table example.  Let’s just say for what ever reason customer is distributed by customer_key and order is distributed by order_key.  You look at the table distribution and everything looks nice and even.  So you think all is well.  But then you join these 2 large tables and it’s just not as fast as you thought.   A very common problem, especially in data marts or data warehouses, is you had orders with perhaps no customer.  Maybe these were cash orders or just for some reason have no identified customer.  So you assign these to an “Unknown” customer record.  But maybe you find that 20% of all orders have an “Unknown” customer.  Well what can happen is to execute this join it naturally redistributes Orders table by customer_key.  But now 20% of my order data is on one snippet processor disk.  In my environment with a 10400 there are 432 active spus meaning that if there is even distribution any one should only have 0.23% of the data.  So ruffly this snippet processor has 87 times the data, and 87 times the work to do of the average snippet.  So guess what, you are only as good as your slowest snippet processor and you will find this snippet in your query plan will take 87 times as long as it would  everything was evenly distributed during that particular snippet.  Nothing to do with how evenly things are distributed in the table.

Another hard to follow situation is understanding the distribution in each step, because you could hit skew when it joins the results of the table A to table B step with the results of the table C to table D step.  This is important because even when you think a table isn’t big enough to need to be distributed for a join, you expect it to be broadcast, in more complex queries it might first be joined to something else big and then joined on the key with a skew problem. 

So, how to solve.  Well one approach is to keep some of these dimension type tables small so they always broadcast.  A quick hint, you can set enable_factrel_planner = true and then set factrel_size_threshold high like 15000000, which says anything under 15 million rows is more of a dimension so broadcast or redistribute it, and tables over 15M rows are big facts so try to leave in place and don’t redistribute.  And also try setting spu_to_spu_broadcast to something small like .1 or .001 to help encourage broadcasting of smaller tables (these can be set in the config or at the session level).  But like mentioned above, if these small tables act as sort of an intersection point between large tables, i.e. everything ties to them, you may still hit a skew problem like above. 

The prescribed solution is to scatter your “Unknown” keys over numerous records by having a range of keys.  For example, if -1 is the standard key for your single “Unknown” customer, you may need to have 10,000 “Unknown” customer records and have the keys range from -1 to -10,000.  You will then have to spread out your child or fact type records over these many “unknown” parent or dimension rows.

Another approach is to break up your query where you join everything but the “Unknown” to the nice spread out customer keys, and in a second pass handle just the “Unknown” customer hopefully broadcasting that 1 or few low cardinality records and then union the results back together. 

Unfortunately both of these methods can be a little problematic for certain reporting tools.  I’m currently working on some ways to spread the unknowns sort of virtually without having 9999 extra records in the customer table,  but I haven’t completed the testing on that yet.  Hopefully I’ll have more in the future.  But understanding the problem is half the battle.  A little tip for tracking this type of skew down,  actually make physical versions of the tables or intermediate results distributed as it is in the join step or snippet giving you problems.  Actually the snippet redistributing into the skewed distribution is slow in addition to the joining snippet.  One you have the results or table copies physically redistributed, look at the distribution.  If you see a skew problem here, you have a process skew problem during that step in your original query. 

There are other things that can cause process skew that I’ll discuss in later posts, but this data skew after on the fly redistribution is the most common.

This entry was posted in Best Practices, Performance Tuning for Netezza and tagged , , , , . Bookmark the permalink.

8 Responses to Distributed Joins, Process Skew

  1. Anoop says:

    Hi,
    I would like to know how Netezza is handling the temporary table.If I need to select less than two million records and need to do operations on some columns depends on user input,What will be best choice, going with a CREATE TEMPORARY TABLE in DDL and INSERT the data into that table then UPDATE in loop or
    CREATE TEMPORARY TABLE AS SELECT …. after that loop UPDATE .What will give more efficiency.In this scenario the joining columns are dynamic and all are varchar field.How we can Distribute the data for efficiently.
    Thanks in advance

    • NZGuy says:

      Creating the table first or doing a CTAS will perform the same especially if you cast the output into the variable type you desire in CTAS (i.e. select first_name::varchar(40) ) . However, if you can manage to do your update of data as part of the select , that will be more efficient than doing an update after the data is written to the table. Remember an update behind the scenes marks the row as deleted and inserts a new one. If you are updating large percentages of rows, you will actually be scanning both the old and new rows off of disk and then then of course filtering the deleted rows. So that can affect performance until you groom the table (removing the deleted rows). But frankly, unless your table is very wide, 2 million rows is very small in Netezza.

  2. Ranga says:

    Hi,

    Thanks for the lots of useful information that is both wide and deep. Thank you for that.
    My question is, is it possible to overdo the distribution? For example, in your example of customer, order and order_items, let us say we add sku table, would it make sense to distribute based on:
    customer -> distributed on customer_key
    order -> distributed on customer_key, sku_key
    order_item -> distributed on customer_key, sku_key
    sku -> distributed on sku_key

    The report queries will always be run for a short list of customer_group_codes. There is a one to many relationship between customer_group_code and customer_key. Is adding the sku_key to the order and order_item table be considered over doing the distribution or is it recommended?

    What if we are going to run some reports that generate aggregate numbers by customer_group_code? would that change the answer?

  3. NZGuy says:

    An important thing to remember. Distribution is based on a hash (an algorithm to convert to an integer number with very even distribution across different values,i.e. so that Andy and Anders are very different values event though the first 3 character are the same). Also remember the hash happens on the concatenation of the distribution keys. This is not like indexes, using a subset of the distribution columns in your join has zero co-location benefit. You want to find a distribution that is common across ALL. So in what you are suggesting, only a join between order and order_item would be co-located. Joining customer to order would on-the-fly redistribute order on customer (assuming order is too big to broadcast).

    Given your scenario, seems the best choice is to distribute Customer , order, and order_item on Customer_key. You can join all three or any 2 of these tables without redistribution. For sku, I would hope this might be a relatively small table (< 10M) and would broadcast easily. If so, distribution is of no benefit in the above scenario and you can use sku (if it distributes evenly) or random. If sku is large, there is no way to avoid at least one on-the-fly redistribution. If customer_group_code is relatively low cardinality, and I assume you are only using summable aggregates ( nothing like count(distinct …)) I would expect on each spu you will get the relatively small aggregate, that will them be merged together quickly at the end on the host. So to to sum it up, you are looking for common distribution. Composite key distribution is rarely of benefit unless that composite is common across many tables.

    • Ranga says:

      Thanks for the clarification. I have a followup question. The sku table has 4M rows for a total byte size of 200MB. If I query for three customer_group_codes that has about 20 customer_keys between them, and the 20 customers hold about 100K SKU’s between them, how does the broadcasting work? Would the entire sku table (all 200MB) of them get copied over to each of the 20 SPU’s processing the customers? What kind of a load would that put on the internal network fabric? Would that be an acceptable load?

      In the above scenario, since the sku table would be spread over the all the SPU’s, data will start flying every SPU to every other SPU. I am assuming that would JAM the internal network fabric. HOw about if distribute on a dummy key whose value would be the same on every single row. That way the data has to flow from one SPU to the 20 other SPU’s! What are your thoughts on this?

      Thanks in advance.

      • NZGuy says:

        The network fiber on twinfin is 10Gbit. So that means it can roughly move data between s-blades at a rate of about 1.3 Gigabytes per second. Broadcasting even the entire 200MB is no strain at all. How it does it can differ query to query, plan to plan. But I have seen pre-broadcasts that were smart enough to filter off of even a filter set via a join. But if I understand your model, the skus are not directly related to the customers, but through the order items, so I expect the full table to broadcast. Also coming from 1 spu doesn’t help, in fact harms. Typically it does a broadcast into link, where each spu will contribute it’s share of date into the network fabric, and each will subscribe if you will to the full set. The thing to remember about network switches which are the center of the backside network, they can transfer 10Gbit from any and every port to any and every port concurrently, so no jamming happens. Big switches can have total back plane speeds in the terabits/sec. In fact if it was on 1 spu, you would limit the read and network to a single disk and a single network port.

        So overall the broadcast of even the full 4m rows and 200MB of data not only is acceptable load, I would define it as minimal load which should happen in sub-seconds for that broadcast snippet.

        Now all this said, if querying on a limited set of customers is common, you might look at getting a more index like behavior by using cluster based tables using the organize on statement in your table creation.

        • Ranga says:

          Thanks for the detailed explanation on the network fabric. I could not find that information anywhere let alone the intuitive explanation given by you. This is excellent.

          In my case the querying for a subset of customers is the norm (multiple parallel queries for a different subset of customers). To leverage the organize statement, should it be distribute on random and organize by sku_key or is it distribute on sku_key and organize by sku_key?

          Thanks in advance.

          • NZGuy says:

            Actually sku I would just leave random and not have it organized on anything. I would try organizing customer on customer_key, order and order_item on customer_key and what ever date field you probably have, and have all 3 distributed on customer_key . Hopefully the condition on customer group would pre determine the set of customers and zonemap to just those on all 3 tables. If it doesn’t , you might consider organizing customer on customer group and denormalize the customer group field into order and order item IF this field never changes. Also worth note, if you are not already on NPS 7, it should help your scenario. It’s supposed to be able to use only the spus with data related to the query, hopefully resulting in higher concurrency/throughput.

            In general information is hard to come by on Netezza these days. It seemed to be a lot more open before IBM. We get a lot of info from private presentations from Netezza/IBM, issues we open with support, and also the training docs often have unique secrets.

Leave a Reply to Ranga Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>