Tuesday 31 March 2020

Shuffle and broadcast joins

A common use case in ETL jobs involves joining new data to either lookup tables or historical data. You need different considerations to guide this process when working with distributed technologies such as Spark, rather than traditional databases that sit on a single machine.
Traditional databases join tables by pairing values on a given column. When all the data sits in a single database, it often goes unnoticed how computationally expensive row-wise comparisons are. When data is distributed across a cluster, the expense of joins becomes even more apparent.
A standard (or shuffle) join moves all the data on the cluster for each table to a given node on the cluster. This is expensive not only because of the computation needed to perform row-wise comparisons, but also because data transfer across a network is often the biggest performance bottleneck of distributed systems.
By contrast, a broadcast join remedies this situation when one DataFrame is sufficiently small. A broadcast join duplicates the smaller of the two DataFrames on each node of the cluster, avoiding the cost of shuffling the bigger DataFrame.

Wednesday 3 April 2019

Spark tuning parameters

Spark parameters

Dynamic Executor Allocation
spark.dynamicAllocation.enabled=True
spark.dynamicAllocation.executorIdleTimeout=2m
spark.dynamicAllocation.minExecutors=1
spark.dynamicAllocation.maxExecutors=2000

Better fetch failure handling
spark.max.fetch.failures.per.stage = 10

Scaling spark Driver
spark.rpc.io.serverThreads = 64

Tuning memory configurations
  1.Enable Off heap memory
  spark.memory.offHeap.enabled = True
  spark.memory.offHeap.size = 3g
  spark.executor.memory = 3g
  spark.yarn.executor.memoryOverhead = 0.1 * (spark.executor.memory + spark.memory.offHeap.size)

  2.Garbage collection Tuning
  spark.executor.extraJavaOptions = -XX:ParallelGCThreads=4 -XX:+UseParallelGC

Eliminate Disk I/O bottleneck
1.spark.shuffle.file.buffer=1Mb
  spark.unsafe.sorter.spill.reader.buffer.size=1Mb
2.spark.file.transferTo=false
  spark.shuffle.unsafe.file.output.buffer=5Mb
3.spark.io.comporession.lz4.blockSize=512KB

Cache index files on Shuffle Server
spark.shuffle.service.index.cache.entries=2048

Scaling External Shuffle Service
Tune shuffle service worker thread and backlog
spark.shuffle.io.serverThreads=128
spark.shuffle.io.backLog=8192

Configurable shuffle registration timeout and entry
spark.shuffle.registration.timeout = 2m
spark.shuffle.registration.maxAttempts = 5

XML parsing in spark using databricks/spark-xml library

Using databricks/spark-xml to read a XML into spark dataframe.

Assume the sample XML

<?xml version="1.0"?>
<catalog>
    <book id="bk101">
    <author>
        Gambardella, Matthew</author>
        <title>
        XML Developer's Guide</title>
        <genre>
        Computer</genre>
        <price>44.95</price>
        <publish_date>2000-10-01</publish_date>
        <description>


            An in-depth look at creating applications
            with XML.This manual describes Oracle XML DB, and how you can use it to store, generate, manipulate, manage,
            and query XML data in the database.


            After introducing you to the heart of Oracle XML DB, namely the XMLType framework and Oracle XML DB repository,
            the manual provides a brief introduction to design criteria to consider when planning your Oracle XML DB
            application. It provides examples of how and where you can use Oracle XML DB.


            The manual then describes ways you can store and retrieve XML data using Oracle XML DB, APIs for manipulating
            XMLType data, and ways you can view, generate, transform, and search on existing XML data. The remainder of
            the manual discusses how to use Oracle XML DB repository, including versioning and security,
            how to access and manipulate repository resources using protocols, SQL, PL/SQL, or Java, and how to manage
            your Oracle XML DB application using Oracle Enterprise Manager. It also introduces you to XML messaging and
            Oracle Streams Advanced Queuing XMLType support.
        </description>
        </book>
<book id="bk102">
    <author>Ralls, Kim</author>
    <title>Midnight Rain</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-12-16</publish_date>
    <description>A former architect battles corporate zombies,
        an evil sorceress, and her own childhood to become queen
        of the world.</description>
</book>
    <book id="bk103">
        <author>Corets, Eva</author>
        <title>Maeve Ascendant</title>
        <genre>Fantasy</genre>
        <price>5.95</price>
        <publish_date>2000-11-17</publish_date>
        <description>After the collapse of a nanotechnology
            society in England, the young survivors lay the
            foundation for a new society.</description>
    </book>
    <book id="bk104">
        <author>Corets, Eva</author>
        <title>Oberon's Legacy</title>
        <genre>Fantasy</genre>
        <price>5.95</price>
        <publish_date>2001-03-10</publish_date>
        <description>In post-apocalypse England, the mysterious
            agent known only as Oberon helps to create a new life
            for the inhabitants of London. Sequel to Maeve
            Ascendant.</description>
    </book>
    <book id="bk105">
        <author>Corets, Eva</author>
        <title>The Sundered Grail</title>
        <genre>Fantasy</genre>
        <price>5.95</price>
        <publish_date>2001-09-10</publish_date>
        <description>The two daughters of Maeve, half-sisters,
            battle one another for control of England. Sequel to
            Oberon's Legacy.</description>
    </book>
    <book id="bk106">
        <author>Randall, Cynthia</author>
        <title>Lover Birds</title>
        <genre>Romance</genre>
        <price>4.95</price>
        <publish_date>2000-09-02</publish_date>
        <description>When Carla meets Paul at an ornithology
            conference, tempers fly as feathers get ruffled.</description>
    </book>
    <book id="bk107">
        <author>Thurman, Paula</author>
        <title>Splish Splash</title>
        <genre>Romance</genre>
        <price>4.95</price>
        <publish_date>2000-11-02</publish_date>
        <description>A deep sea diver finds true love twenty
            thousand leagues beneath the sea.</description>
    </book>
    <book id="bk108">
        <author>Knorr, Stefan</author>
        <title>Creepy Crawlies</title>
        <genre>Horror</genre>
        <price>4.95</price>
        <publish_date>2000-12-06</publish_date>
        <description>An anthology of horror stories about roaches,
            centipedes, scorpions  and other insects.</description>
    </book>
    <book id="bk109">
        <author>Kress, Peter</author>
        <title>Paradox Lost</title>
        <genre>Science Fiction</genre>
        <price>6.95</price>
        <publish_date>2000-11-02</publish_date>
        <description>After an inadvertant trip through a Heisenberg
            Uncertainty Device, James Salway discovers the problems
            of being quantum.</description>
    </book>
    <book id="bk110">
        <author>O'Brien, Tim</author>
        <title>Microsoft .NET: The Programming Bible</title>
        <genre>Computer</genre>
        <price>36.95</price>
        <publish_date>2000-12-09</publish_date>
        <description>Microsoft's .NET initiative is explored in
            detail in this deep programmer's reference.</description>
    </book>
    <book id="bk111">
        <author>O'Brien, Tim</author>
        <title>MSXML3: A Comprehensive Guide</title>
        <genre>Computer</genre>
        <price>36.95</price>
        <publish_date>2000-12-01</publish_date>
        <description>The Microsoft MSXML3 parser is covered in
            detail, with attention to XML DOM interfaces, XSLT processing,
            SAX and more.</description>
    </book>
    <book id="bk112">
        <author>Galos, Mike</author>
        <title>Visual Studio 7: A Comprehensive Guide</title>
        <genre>Computer</genre>
        <price>49.95</price>
        <publish_date>2001-04-16</publish_date>
        <description>Microsoft Visual Studio 7 is explored in depth,
            looking at how Visual Basic, Visual C++, C#, and ASP+ are
            integrated into a comprehensive development
            environment.</description>
    </book>
</catalog>

To parse this into a spark dataframe, create a sbt project with the following structure
src
 - main
   - scala
    - sample
      - Books.scala
 - resources
   - Books.xml
build.sbt

Books.scala

package sample

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import com.databricks.spark.xml._

val spark = SparkSession.builder.appName("Books xml Parsing").master("local[*]").getOrCreate()

val booksXML: DataFrame = spark.read.option("rowTag", "catalog").xml(getClass.getResource("/Books.xml").getPath)
booksXML.show()
selected_data.write.option("header","true").parquet("C:\\Users\\Jijo\\flatten_xml_spark\\books_flatfile")

spark.stop()

build.sbt
import Dependencies._

lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization := "sample",
      scalaVersion := "2.12.7",
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "xmlparsing",
    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.4.0",
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.0",
    libraryDependencies.+=(scalaTest % Test),
    libraryDependencies += "com.databricks" %% "spark-xml" % "0.5.0")

Thursday 22 February 2018

Using pytest to check ssh to multiple servers

Use case :  check ssh connectivity to multiple clusters in one test


#!/usr/bin/python import paramiko import pytest
@pytest.fixture(params=["ip1.ip1.ip1.ip1", "ip2.ip2.ip2.ip2"]) def ssh(request): ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(request.param,username="username",key_filename="connect.pem") yield ssh ssh.close() def test_hello(ssh): stdin, stdout, stderr = ssh.exec_command("echo hello") stdin.close() stderr.read() == b"" assert stdout.read() == b"hello\n"
run pytest in the terminal to test this.

to check plan of pytest run :
pytest --collect-only
Fixture does the setup of environment before we execute our test cases , in this case since there are multiple parameters , you can find more than one call for that method in the test plan.
To use a fixture in our method, just pass the method name in the function arguments

> pytest --collect-only
collected 2 items 
<Module Utility_test.py'>
  <Function 'test_hello[10.30.107.85]'>

  <Function 'test_hello[10.20.91.148]'>

Unable to install R packages rgl & qpcR in sparkR

If you encounter such an error while installing the package rgl in sparkR

configure: using libpng dynamic linkage checking for X... no 
configure: error: X11 not found but required, 
configure aborted.

its because X11 is a windows library and to resolve use:
Ubuntu : 

sudo apt-get install libglu1-mesa-dev


Redhat:

sudo yum install mesa-libGL-devel mesa-libGLU-devel libpng-devel

Tuesday 18 July 2017

Sqoop views in netezza to hdfs

I pondered upon a use case to transfer netezza tables/views to hadoop system. The current flow that we are using are :
1. Netezza -> SAN
2. SAN -> S3
3. S3 -> hdfs

If there is no primary key for the table in netezza you will be forced to use -split-by option or -m option. Only use verbose if needed.

And the reverse to transfer to netezza. After analyzing the use case the best option i found was to use sqoop. We are using yarn queues hence the queue option you can ignore this option if none is setup.

1. Transfer view
#Sqoop doesnot allow you to write into existing directory so removing the directory before transferring
hdfs dfs -rm -R /apps/hive/warehouse/<hivedbname>.db/<hivetablename>

sqoop import -Dmapreduce.job.queuename=q1 --hive-import --hive-database <hivedbname> --hive-table <hivetablename> --driver org.netezza.Driver --direct --connect jdbc:netezza://<host>:<port>/<netezzadbname> --username <netezzauser> --password <netezzapwd> --table <netezza tablename> --target-dir hdfs:///apps/hive/warehouse/<hivedbname>.db/<hivetablename> -split-by <anycolumn>

If we dont use --driver org.netezza.Driver parameter the following error is encountered.

2017-07-18 09:34:53,079 ERROR [Thread-16] org.apache.sqoop.mapreduce.db.netezza.NetezzaJDBCStatementRunner: Unable to execute external table export
org.netezza.error.NzSQLException: ERROR:  Column reference "DATASLICEID" not supported for views

at org.netezza.internal.QueryExecutor.getNextResult(QueryExecutor.java:276)
at org.netezza.internal.QueryExecutor.execute(QueryExecutor.java:73)
at org.netezza.sql.NzConnection.execute(NzConnection.java:2673)
at org.netezza.sql.NzStatement._execute(NzStatement.java:849)
at org.netezza.sql.NzPreparedStatament.execute(NzPreparedStatament.java:152)
at org.apache.sqoop.mapreduce.db.netezza.NetezzaJDBCStatementRunner.run(NetezzaJDBCStatementRunner.java:75)

End of LogType:syslog


Instead of split by option we can also use -m 1 , which transfers the data in one mapper & can be a bit slow.

2. Transfer a table
#Sqoop doesnot allow you to write into existing directory so removing the directory before transferring
hdfs dfs -rm -R /apps/hive/warehouse/<hivedbname>.db/<hivetablename>

sqoop import -Dmapreduce.job.queuename=q1 --verbose --hive-import --hive-database jijo --direct --connect jdbc:netezza://<host>:<port>/<netezzadbname> --username <netezzauser> --password <netezzapwd> --table <netezza tablename> --target-dir hdfs:///apps/hive/warehouse/<hivedbname>.db/<hivetablename> -m 1


Running
analyze table <hivedbname>.<hivetablename> compute statistics
would be ideal for hive running on tez execution engine.