Spark Sql Auto Broadcast Join Tuning

12.04.2020by
  1. Broadcast Join In Spark
  2. Spark Sql Join
  3. Spark Sql Auto Broadcast Join Tuning Free
  4. Spark Broadcast Join Example

Spark Configuration

  • Spark.sql.broadcastTimeout: 300: Timeout in seconds for the broadcast wait time in broadcast joins spark.sql.autoBroadcastJoinThreshold: 10485760 (10 MB) Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled.
  • Join is one of CROSS, INNER, LEFT ANTI, LEFT OUTER, LEFT SEMI and right join side can be broadcast, i.e. The size is less than spark.sql.autoBroadcastJoinThreshold Join is one of CROSS, INNER and RIGHT OUTER and left join side can be broadcast, i.e. The size is less than spark.sql.autoBroadcastJoinThreshold.
  • Broadcast Joins (aka Map-Side Joins). Join LeafNode. In Spark SQL. These findings (or discoveries) usually fall into a study category than a single topic and so the goal of Spark SQL’s Performance Tuning Tips and Tricks chapter is to have a single place for the so-called tips and tricks.

Spark SQL uses broadcast join (aka broadcast hash join) instead of hash join to optimize join queries when the size of one side data is below spark.sql.autoBroadcastJoinThreshold. Broadcast join can be very efficient for joins between a large table (fact) with relatively small tables (dimensions) that could then be used to perform a star-schema join. Join operations in Apache Spark is often a biggest source of performance problems and even full-blown exceptions in Spark. After this talk, you will understand the two most basic methods Spark employs for joining dataframes – to the level of detail of how Spark distributes the data within the cluster. Jul 05, 2018  Join on numeric fields whenever possible and cast strings to integer if used in join condition (if possible). You can manually instruct a query to broadcast (i.e. This article touched on some of the “lower-hanging fruit” options, such as tuning a Spark SQL query and modifying Spark run-time parameters to hopefully speed up your big. Jul 18, 2016  Spark is a component of IBM® Open Platform with Apache Spark and Apache Hadoop. Apache Spark is a fast and general-purpose cluster computing system that allows you to process massive amount of data using your favorite programming languages including Java, Scala and Python.

spot-ml main component uses Spark and Spark SQL to analyze network events and those considered the most unlikelyor most suspicious.

To run spot-ml with its best performance and scalability, it will probably be necessary to configure Yarn, Spark and Spot. Here are our recommended settings.

General Yarn tuning

spot-ml Spark application has been developed and tested on CDH Yarnclusters. Careful tuning of the Yarn cluster may be necessary before analyzing large amounts of data with spot-ml.

For small data sets, under 100 GB parquet files, default Yarn configurations should be enoughbut if users try to analyze hundreds of gigabytes of data in parquet format it's probable that it don't work; Yarn most likely will start killingcontainers or will terminate the application with Out Of Memory errors.

For more on how to tune Yarn for general use, we suggest these links:

Users need to keep in mind that to get a Spark application running, especially for big data sets, it might take more thanone try before getting any results.

Configuring Spot's Usage of Spark

When running Spark on Yarn users can set up a set of properties in order to get the best performance and consume resources in a moreeffective way. Since not all clusters are the same and not all users are planning to have the same capacity of computation, we have createdvariables that users need to configure before running spot-ml.

After installing spot-setup users will find the spot.conffile under /etc folder. This file contains all the required configuration to run spot-ml, as explained in INSTALL.md. In thisfile exist a section for Spark properties, below is the explanation for each of those variables:

However, as you probably know the Tuned parameters are not stored unless you complete the landing sequence described in the WIKI. Based on several comments I’ve read it appears there is still some room for some artful tuning after Autotune.quote=“ronhogwarts”So here are my latest values, flying well except for 60% throttle (3.6:1 Power to weight) where there is some high frequency oscillation, but otherwise good.Does the autotune use the values you give it as a starting point, or is it from scratch each time?/quote. I have the “nearly all” log below for reference. 3-2 auto tune.

Besides the variables in spot.conf, users can modify the rest of the properties in ml_ops.sh based on their needs.

Setting Spark properties

After Yarn cluster has been tuned the next step is to set Spark properties assigning the right values to spot.conf Sparkvariables.

Number of Executors, Executor Memory, Executor Cores and Executor Memory Overhead

The first thing users need to know is how to set the number of executors and the memory per executor as well as the number of cores.To get that number, users should know the available total memory per node after Yarn tuning, this total memory is determined by yarn.nodemanager.resource.memory-mbproperty and the total number of available cores is given by yarn.nodemanager.resource.cpu-vcores.

Depending on the total physical memory available for Yarn containers, the memory per executor can determine thestarting point to set the total amount of executors. To calculate the memory per executor and number of executors we suggest tousers follow the next steps:

  1. Divide the total of physical memory per node by a number between 3 and 5.
  2. If the result of the division is something equal or bigger than 30 GB then continue to calculate the number of executors.
  3. If the result of the division is less than 30 GB try smaller number.
  4. The result of the division will be the memory per executor. Multiply the number used in the first division by the number of nodes.
  5. The result of step 4 could be the total executors but users need to consider resources for the application driver. Depending on theresult of the multiplication, we recommend to subtract 2 or 3 executors.

See example below:

Having a cluster with 9 nodes, each node with 152 GB physical memory available: 152/5 ~ 30 GB.5 executors x 9 nodes = 45 executors - 2 = 43 executors.

In the previous example, taking off 2 executors ensures enough memory for the application driver.

Users can determine the number of cores per executor having the total of executors and theavailable vcpus per node:

  1. Divide the available vcpus by the number of executors.

Example:

Having a total of 432 vcpus, 48 per node: 432/43 ~ 10 cores per executor.

Although it sounds like a good idea to allocate all the available cores, we have seen cases where many coresper executor will cause Spark to assign more task to every executor and that can potentially cause OOM errors.Is recommended to keep a close relation between cores and executor memory.

Lastly, for overhead memory we recommend to use something between 8% and 10% of executor memory.

Following the example, the values for the Spark variables in spot.conf would look like this:

Driver Memory, Driver Maximum Results and Driver Memory Overhead

spot-ml application executes actions such as .collect, .orderBy, .saveAsTextFile so we recommend to assign aconsiderable amount of memory for the driver.

The same way, driver maximum results should be enough for the serialized results. How many computers can you install traktor pro 2 on.

Depending on users data volume these two properties can be small as tens of gigabytes for driver and a couple of gigabytes fordriver maximum results or grow up to 50 GB and 8 GB respectively. Users can follow the next steps to determine the amount of memoryfor driver and maximum results:

  1. If executor memory is equal or bigger than 30 GB, make driver memory the same as executor memory and driver maximum resultsa value equal or bigger than 6 GB.
  2. If executor memory is less than 30 GB but data to be analyzed is equal or bigger than 100 GB make driver something between 30 GBand 50 GB. Driver maximum results should be something equal or bigger than 8 GB.

Following the example in the previous section, with 9 nodes, 43 executors, 30 GB memory each executor, we can set driver memoryto 30GB and 8GB driver maximum results.

Memory overhead for driver can be set to something between 8% and 10% of driver memory.

This is how Spark variables look like for driver properties:

Representation of memory allocation in driver node.

Broadcast Join In Spark

For more information about Spark properties click here.

Spark autoBroadcastJoinThreshold in spot-ml

After Spark LDA runs, Topics Matrix and Topics Distribution are joined with the original data set i.e. NetFlow records, DNS records or Proxy records to determine the probability of each event to happen. This joining process is similar to join a big data set and a lookup table. In this case, the big data set is the entire set of records, and the lookup table is a dictionary of documents and probabilities per topic or words and probabilities per topic.

Because of the possible diversity of documents/IPs, the lookup table containing document probability distribution can grow to something bigger than 10 MB. Taking in account that 10 MB is Spark's default auto broadcast threshold for joins, a join with a lookup table bigger than that threshold will result in the execution of a traditional join with lots of shuffling.

The correct setting of SPK_AUTO_BRDCST_JOIN_THR and PRECISION can help to always broadcast document probability distribution lookup table and avoid slow joins.

As a first step, users need to decide whether they want to change from 64 bit floating point probabilities to 32 bit floating point probabilities; if users decide to change from 64 to 32 bit, the document probability distribution lookup table will be half the size and more easily broadcasted.

If users want to cut payload memory consumption roughly in half, they should set the precision option to 32.

PRECISION='32'

If users prefer to keep 64 bit floating point numbers, they should set precision option to 64 (default).

PRECISION='64'

Given the approximate number of distinct IPs in every batch or data set being analyzed, users should set SPK_AUTO_BRDCST_JOIN_THR to something that can fit the document probability distribution lookup table.

For instance, if a user knows there can be 2,000,000 distinct IP addresses and is using 20 Topics, the document probability distribution lookup table can grow to something like 190 bytes per row if using PRECISION as 64 bit and 110 bytes per row if using 32 bit option.

Document probability distribution lookup table record example:

(192.169.111.110, [0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05, 0.05]

In that case, users should set auto broadcast join threshold to something that can fit 365 MB (380000000 Bytes) for 64 bit floating precision numbers or 210 MB (220000000 Bytes) for 32 bit floating precision numbers.

PRECISION='32'

SPK_AUTO_BRDCST_JOIN_THR='220000000'

Known Spark error messages running spot-ml

Out Of Memory Error

This issue includes java.lang.OutOfMemoryError: Java heap space and java.lang.OutOfMemoryError : GC overhead limit exceeded.When users get OOME can be for many different issues but we have identified a couple of reasons for thiserror in spot-ml.

The main reason for this error in spot-ml can be when the ML algorithm returns large results for word probabilities per topic.Since ML algorithm results are broadcast, each executor needs more memory.

Spark Sql Join

Another possible reason for this error is driver is running out of memory, try increasing driver memory.

Container killed by Yarn for exceeding memory limits. X.Y GB of X GB physical memory used. Consider boosting spark.yarn.executor memoryOverhead

Spark Sql Auto Broadcast Join Tuning Free

This issue is caused by certain operations, mainly during the join of document probabilities per topic with the rest ofthe data - scoring stage. If users receive this error they should try with increasing memory overhead up to 10% of executor memoryor increase executors memory.

org.apache.spark.serializer.KryoSerializer

Spark Broadcast Join Example

KryoSerializer can cause issues if property spark.kryoserializer.buffer.max is not enough for the data being serialized.Try increasing memory up to 2 GB but keeping in mind the total of the available memory.

Comments are closed.