Monday, February 15, 2016

How Sqoop Import Works - Session 3

Sqoop is written in Java like Hadoop. It imports a table from a database by running a MapReduce job that reads rows from the table, and writes the records to HDFS.





Sqoop uses Java DataBase Connectivity (JDBC) API to that allows application to access data stored in RDBMS.

Based on the URL in the connection string, Sqoop predicts which driver it should load. For cases, where Sqoop does not know which JDBC driver it should load, users can specify how to load the JDBC driver into Sqoop.



Sqoop client gets the metadata information. It then uses JDBC to examine a list of all columns and map them to MapReduce class field values e.g. for Widget class
public Integer get_id();
public String get_widget_name();
public java.math.BigDecimal get_price();
public java.sql.Date get_design_date();
public Integer get_version();
public String get_design_comment();

More critical to the import is the Serialization methods that form the DBWritableInterface.
Widget class implements DBWritableInterface (which allows the Widget class to interact with JDBC)
public void readFields(ResultSet _dbResults) throws SQLException
public void write(PreparedStatement _dbStmt) throws SQLException

JDBC ResultSet interface provides a cursor that retrieves records from a query.

readFields() -> populate the fields of the WidgetObject with columns from one row of the ResultSet’s data. 
Write()-> is used to allow sqoop to insert new widget rows into a table in Sqoop export.
Input format -> DataDrivenDBInputFormat -> read sections of a table from db, partitions a query over several map tasks

Reading a table is done with a simple query -> SELECT col1,col2,col3,... FROM tableName
Performance can be improved by using splitting column. Using metadata about the table, sqoop will guess the best splitting column (typically the primary key if it exists).

Example of splitting a query across multiple map tasks:
Suppose the widget table has 100,000 entries (with id column from 0 to 99,999). If we have 5 map tasks, DataDrivenDBInputFormat, will then generate:
SELECT id,widget_name… FROM widgets WHERE id>0 AND ID<20000,
SELECT id,widget_name… FROM widgets WHERE id>20000 AND ID<40000,
SELECT id,widget_name… FROM widgets WHERE id>40000 AND ID<60000,
SELECT id,widget_name… FROM widgets WHERE id>60000 AND ID<80000,
SELECT id,widget_name… FROM widgets WHERE id>80000 AND ID<100000,

The choice of splitting column is essential for efficiently parallelizing work. Users can specify a splitting column   when running an import job, to tune the job to data’s actual distribution.
If import job is run as single task (-m 1) then splitting is not performed.

Controlling the Import
Sqoop does not need to import the entire table at once. If rows upto 100,000 are already inserted, user can specif y a where clause (where id>100,000) to import only the new rows. User specified conditions are applied before the task-splitting is performed.

Imports and Consistency
Map tasks reading from a database in parallel are running in separate processes. They cannot share a single db transaction. So any updates to the rows of a table must be disabled during the Sqoop import.

Direct Mode Imports
Sqoop allows to choose from various different strategies for performing an import.
Some faster ways of import are also available other than the JDBC based sqoop import. e.g. MySQL mysqldump application can read from a table with greater throughput than a JDBC channel. But it cannot import large objects like CLOB and BLOB. To use direct mode imports, direct mode must be specifically enabled by the user using --direct argument. The metadata is still read using JDBC , even while using direct mode.

No comments:

Post a Comment