In this session, we'll dig a little bit deeper into partitioners and we'll cover why someone might want to use a partitioner. So far we've seen the different sorts of partitioners, but we've not seen why they should really be used. In the last session, we saw that Spark makes a few different kinds of partitioners available out-of-the-box to users. These were hash partitioners and range partitioners. And we got a little bit of a sense of when one might be used versus the other. We also looked at the sorts of operations that either may introduce new partitioners, or which may carry long unexisting partitioner. Or which could even completely discard a partitioner. Said simply, the biggest reason why someone would want to care about partitioning is because partitioning can bring enormous performance gains, especially in the face of operations that may cause shuffles. The basic intuition is that if you can somehow optimize for data locality, then you can prevent a lot of network traffic from even happening. We know that network traffic means huge latencies. So partitioning can mean significantly reduced latencies which, of course, translates into better performance. As usual, let's look at an example to see how much partitioning can actually help performance. Let's return back to this groupByKey, which is rreduceByKey example that we saw in previous sessions. When we left off with this example, we were pretty happy with the 3X performance boost for using reduceByKey instead of groupByKey. However, we can still do better than that. You might not believe it, but we can use range partitioners and we can optimize our earlier use of reduceByKey. So it doesn't even have to involve shuffling over that work at all, here's how we do it. Although what we've done differently in this example is that we've started by partitioning our initial dataset. So all of this right here, this is still the same from before. So we start by creating a simple range partitioner here. It has 8 partitions, because, perhaps, this number of partitions makes sense for our cluster. And then we pass into our partitionBy function, this RangePartitioner that we just created. And finally we persist() this newly partitioned RDD. Then we just do the same thing that we did previously, we make a pair RDD and then we use the reduceByKey() and that's it. Seems simple enough, but how does it compare when we run it on the cluster? Well, it's significantly faster, this is the result here. We have almost a 9x speed up over the groupByKey version. So 9x speed up over this one here. So imagine instead that these computations weren't running in seconds, but they were running in minutes or hours. This 9x speed up is a pretty big difference then. Okay, so let's look at another example. This one comes from the Learning Spark book, which is a really great book. And you can find this example on pages 61 to 64 of the Learning Spark book. So let's imagine` that we have an application. And let's say that this application is a media application where users can, for example, subscribe to topics for articles. They want to read articles, so they say I'm interested in a certain topic. And let's imagine that in this application, we have a huge table of user information. And we store that big table in memory in the form of a pair RDD. We can call these big datasets userData here, and since it's a pair RDD, its key is a UserID and its value is UserInfo where UserInfo contains a list of topics that the specific users subscribe to. Now imagine, every now and then this application has to combine this big dataset of userData with a smaller dataset representing events that have happened in the last five minutes. We'll call the smaller dataset events here, and it too has key User ID. But its value instead is of type LinkInfo here. Which represents which links that that user has clicked on in the past five minutes. And in this example we may wish to count how many users visited a link that was not related to one of the topics that they've subscribed to. So this is kind of reminiscent of the CFF application that we saw in previous sessions. We know that we're going to have to join these two datasets somehow. Because we're going to want to group the UserInfo to the LinkInfo for those users that have been active in the past five minutes. So what might this program look like? Well here's the full thing implemented on one slide. Of course, the most important part is this line here, where we take the userData and then we do an inner join with events. Now we have a new parity called joined, which contains all the users that have been active in the past five minutes and what they've been clicking on. Now all we have to do is filter that dataset down to figure out if people are clicking on things that they're not subscribed to. So we do that here, and finally, we just count it up to see what that number is. So what do you think about this program? Does it seem all right? Logically it seems like it should do what we want it to do, right? Well, although, it might do the right thing logically, it'll be really inefficient. This is because each time this function here, this processNewLogs function is invoked. The join has no idea about how any of the keys are partitioned in these datasets. And so all that Spark can do, is hash all of the keys of both datasets. And then send the elements with the same key hash across the network to be on the same machine. Then once that's done, it can join together the elements with the same key on that machine. And it does this every time this method is invoked, even though this really huge pair RDD called userData doesn't change. So there's no reason to keep sending this thing around if it's always the same, right? It shouldn't have to keep moving all this data around the network. To give you some sense of what this looks like, here's a little diagram. This is the userData dataset, and then you have the joined dataset here. Basically what's happening is that we have this big shuffle between userData and joined. While it might not seem like it, fixing this is actually quite easy. As you might have guessed, all we have to do is partition the really big userData RDD right at the beginning of the project. So you remember here, we create the userData RDD. So all we have to do is add these two lines here to userData. That is, we just call partitionBy, and we pass to it a new HashPartitioner, with some fixed number of partitions. In this case, we say we want 100 partitions. And after that, all we have to do is just persist() the userData. So, since we called partitionBy while we were building up user data, Spark now knows that it's hash partitioned. And therefore calls to join on this userData thing here, can take advantage of this partitioning. So now, when we call userData join(events), like this here, when we do userData join(events) like we saw on the original program. Spark only has to shuffle the event RDD sending events with specific user IDs to the machine that contains a corresponding hash partition of user data. So, The bottom line is that the tiny pair RDD event, the event one, the small one should now be the one that's shuffled. And not the big RDD full of user data. Or shown more visually, now that the userData pair RDD is pre-partitioned. Spark only has to shuffle the events RDD. This means that events with specific user IDs are the only ones that have to be sent to machines with corresponding hash partitions of user data. No more shuffling that big userData RDD. Now, let's look back to our example, using groupByKey. Let's try to understand a little bit about what's happening with these partitioners under the hood. We recall that grouping all the values of the key-value pairs with the same key requires collecting all of the key-value pairs with the same key on the same machine. That's a mouth full, by default, grouping is done using a hash partition with the default parameters. And then the resulting RDD is configured to use that same hash partitioner that was used to construct it. This is what's going on under the hood. At this point, you might be asking yourself, well, goodness, how do I know when a shuffle will occur? Well, there's a simple rule of thumb to try and remember to determine when a shuffle might occur. A shuffle can occur when a resulting RDD depends on other elements from either the same RDD or another RDD. So certain operations like join just giving how they fundamentally work. The idea is that they should be depending on data from other parts of the same RDD or other RDDs. This should, by default, in your mind trigger some sort of warning that, aha, shuffling could occur. What can I do to reduce it? And, of course, the answer is that if you intelligently partition your data you can either greatly reduce or completely prevent shuffling from occurring. There are also other tricks and even methods that can help you figure out when a shuffle has been either planned or executed. On the one hand, you could look at the return type of certain transformations like we saw earlier in this session. Sometimes you might see a return type called ShuffledRDD here. Of course, this is evidence that a shuffle is either already happened is planned. There's also this very handy method called toDebugString that you can call an RDD to see it's execution plan. This will give all kinds of information about how your job is planned. And you can keep an eye out for warning signs like the ShuffleRDDs here. But perhaps the best thing to do is just keep in mind which operations might cause a shuffle. You can usually tell by logically thinking through what the method does. In any case, here's a handy list of operations that might cause a shuffle. Of course, the usual suspects are here. We have the different kinds of joins. We have the groupByKey, reduceByKey, combineByKey. And then there are other operations like repartition or coalesce. And as their names suggest, these operations sound like they have something to do with partitioning. So, of course, if they are partitioning data, they're going to be moving data around. So these are the sorts of operations that may possibly cause a shuffle. However, if I've hammered anything into your head during this session. The one thing that you should try to remember is that it is possible to sometimes avoid a network shuffle by partitioning. There are two common scenarios where you can avoid network shuffling by partitioning. The first is when you use an operation like groupByKey on pre-partitioned RDDs. So this example, here, this causes values to be computed locally since they've already been pre-partitioned, or preshuffled, so to speak. So, the work can all be done on the local partitions on the worker nodes, without workers having to shuffle their data again to one another. And, in this case, the only time the data has to be moved is when the final reduce values have to be sent back from the worker nodes to the driver node. The other common scenario here has to do with pre-partitioning before doing joins. So, we can completely avoid shuffling by pre-partitioning the two joined RDDs with the same partitioner. Of course, you must also ensure that the pre-partitioned RDDs are cached following the partitioning. Of course, this makes it possible to compute the entire join locally without any network shuffling. Since the data that must be joined together from both pair RDDs has already been relocated to live on the same node in the same partition. So, you don't need to move the data around in this case. So you take anything away from the session, you should remember that how you organize your data on your cluster really matters. You can go from repeatedly shuffling large datasets while trying to join it with a smaller dataset to not having to shuffle any data at all. All just by organizing and partitioning your data intelligently from the beginning of your job. And remember that we saw speedups of up to around 10x on small examples. By just trying to ensure that data isn't transmitted over the network to other machines, when it doesn't have to be. If we think back through the latency numbers that we learned in the first week, you should have the intuition that this 9, 10x speedup could make a big difference in your day to day work. If you're trying to run a job that should complete in 4 hours, but you miss an opportunity to partition data or optimize away a shuffle. It could take 40 hours instead, so this is why partitioning is important.