RSS News Sources
MySQL Applier For Hadoop: Implementation
This is a follow up post, describing the implementation details of Hadoop Applier, and steps to configure and install it. Hadoop Applier integrates MySQL with Hadoop providing the real-time replication of INSERTs to HDFS, and hence can be consumed by the data stores working on top of Hadoop. You can know more about the design rationale and per-requisites in the previous post.
Design and Implementation:
Hadoop Applier replicates rows inserted into a table in MySQL to the Hadoop Distributed File System(HDFS). It uses an API provided by libhdfs, a C library to manipulate files in HDFS.
The library comes pre-compiled with Hadoop distributions.It connects to the MySQL master (or read a binary log generated by MySQL) and:
- fetches the row insert events occurring on the master
- decodes these events, extracts data inserted into each field of the row
- uses content handlers to get it in the format required and appends it to a text file in HDFS.
Schema equivalence is a simple mapping:
Databases are mapped as separate directories, with tables in them as sub-directories. Data inserted into each table is written into text files (named as datafile1.txt) in HDFS. Data can be in comma separated format; or any other delimiter can be used, that is configurable by command line arguments.
The diagram explains the mapping between MySQL and HDFS schema.
The file in which the data is stored is named datafile1.txt here; you can name is anything you want. The working directory where this datafile goes is base_dir/db_name.db/tb_name.
The timestamp at which the event occurs is included as the first field in each row inserted in the text file.
The implementation follows these steps:
- Connect to the MySQL master using the interfaces to the binary log
#include “binlog_api.h”
Binary_log binlog(create_transport(mysql_uri.c_str())); binlog.connect();
- Register content handlers
/*
Table_index is a sub class of Content_handler class in the Binlog API
*/ Table_index table_event_hdlr;
Applier replay_hndlr(&table_event_hdlr, &sqltohdfs_obj); binlog.content_handler_pipeline()->push_back(&table_event_hdlr); binlog.content_handler_pipeline()->push_back(&replay_hndlr);
- Start an event loop and wait for the events to occur on the master
while (true) { /* Pull events from the master. This is the heart beat of the event listener. */ Binary_log_event *event; binlog.wait_for_next_event(&event); }
- Decode the event using the content handler interfaces
class Applier : public mysql::Content_handler
{ public: Applier(Table_index *index, HDFSSchema *mysqltohdfs_obj) { m_table_index= index; m_hdfs_schema= mysqltohdfs_obj; } mysql::Binary_log_event *process_event(mysql::Row_event *rev) { int table_id= rev->table_id; typedef std::map<long int, mysql::Table_map_event *> Int2event_map; int2event_map::iterator ti_it= m_table_index->find(table_id);
- Each row event contains multiple rows and fields.
Iterate one row at a time using Row_iterator.
mysql::Row_event_set rows(rev, ti_it->second); mysql::Row_event_set::iterator it= rows.begin(); do { mysql::Row_of_fields fields= *it; long int timestamp= rev->header()->timestamp; if (rev->get_event_type() == mysql::WRITE_ROWS_EVENT) table_insert(db_name, table_name, fields, timestamp, m_hdfs_schema); } while (++it != rows.end());
- Get the field data separated by field delimiters and row delimiters.
Each row contains a vector of Value objects. The converter allows us to transform the value into another representation.
mysql::Row_of_fields::const_iterator field_it= fields.begin();
mysql::Converter converter; std::ostringstream data; data << timestamp; do { field_index_counter++; std::vector<long int>::iterator it; std::string str; converter.to(str, *field_it);
data << sqltohdfs_obj->hdfs_field_delim; data << str; } while (++field_it != fields.end()); data << sqltohdfs_obj->hdfs_row_delim;
- Connect to the HDFS file system. If not provided, the connection information (user name, password host and port) are read from the XML configuration file, hadoop-site.xml.
HdfsFS m_fs= hdfsConnect(host.c_str(), port);
- Create the directory structure in HDFS. Set the working directory and open the file in append mode.
hdfsSetWorkingDirectory(m_fs, (stream_dir_path.str()).c_str()); const char* write_path= "datafile1.txt"; hdfsFile writeFile;
- Append data at the end of the file.
writeFile= hdfsOpenFile(m_fs, write_path, O_WRONLY|O_APPEND, 0, 0, 0); tSize num_written_bytes = hdfsWrite(m_fs, writeFile, (void*)data, strlen(data));
Install and Configure:
Follow these steps to install and run the Applier:
1. Download a Hadoop release (I am using 1.0.4); configure and install (for the purpose of the demo, install it in pseudo distributed mode). Flag 'dfs.support.append'must be set to true while configuring HDFS(hdfs-site.xml). Since append is not supported in Hadoop 1.x, set the flag 'dfs.support.broken.append' to true.
2. Set the environment variable $HADOOP_HOME to point to the Hadoop installation directory.
3. CMake doesn't come with a 'find' module for libhdfs. Ensure that the 'FindHDFS.cmake' is in the CMAKE_MODULE_PATH. You can download a copy here.
4. Edit the file 'FindHDFS.cmake', if necessary, to have HDFS_LIB_PATHS set as a path to libhdfs.so, and HDFS_INCLUDE_DIRS have the path pointing to the location of hdfs.h. For 1.x versions, library path is $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib , and header files are contained in $ENV{HADOOP_HOME}/src/c++/libhdfs. For 2.x releases, header files and libraries can be found in $ENV{HADOOP_HOME}/lib/native, and $ENV{HADOOP_HOME}/include respectively.
For versions 1.x, this patch will fix the paths:
--- a/cmake_modules/FindHDFS.cmake
+++ b/cmake_modules/FindHDFS.cmake
@@ -11,6 +11,7 @@ exec_program(hadoop ARGS version OUTPUT_VARIABLE
Hadoop_VERSION
# currently only looking in HADOOP_HOME
find_path(HDFS_INCLUDE_DIR hdfs.h PATHS
$ENV{HADOOP_HOME}/include/
+ $ENV{HADOOP_HOME}/src/c++/libhdfs/
# make sure we don't accidentally pick up a different version
NO_DEFAULT_PATH
)
@@ -26,9 +27,9 @@ endif()
message(STATUS "Architecture: ${arch_hint}")
if ("${arch_hint}" STREQUAL "x64")
- set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
+ set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-amd64-64/lib)
else ()
- set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/lib/native)
+ set(HDFS_LIB_PATHS $ENV{HADOOP_HOME}/c++/Linux-i386-32/lib)
endif ()
message(STATUS "HDFS_LIB_PATHS: ${HDFS_LIB_PATHS}")
5.Since libhdfs is JNI based API, it requires JNI header files and libraries to build. If there exists a module FindJNI.cmake in the CMAKE_MODULE_PATH and JAVA_HOME is set; the headers will be included, and the libraries would be linked to. If not, it will be required to include the headers and load the libraries separately (modify LD_LIBRARY_PATH).
6. Build and install the library 'libreplication', to be used by Hadoop Applier,using CMake.
- Download a copy of Hadoop Applier from http://labs.mysql.com.
- 'mysqlclient' library is required to be installed in the default library paths. You can either download and install it (you can get a copy here), or set the environment variable $MYSQL_DIR to point to the parent directory of MySQL source code. Make sure to run cmake on MySQL source directory. $export MYSQL_DIR=/usr/local/mysql-5.6
- Run the 'cmake' command on the parent directory of the Hadoop Applier source. This will generate the necessary Makefiles. Make sure to set cmake option ENABLE_DOWNLOADS=1; which will install Google Test required to run the unit tests. $cmake . -DENABLE_DOWNLOADS=1
- Run 'make' and 'make install' to build and install. This will install the library 'libreplication' which is to be used by Hadoop Applier.
$export PATH=$HADOOP_HOME/bin:$PATH
$export CLASSPATH=$(hadoop classpath)
8. The code for Hadoop Applier can be found in /examples/mysql2hdfs, in the Hadoop Applier repository. To compile, you can simply load the libraries (modify LD_LIBRARY_PATH if required), and run the command “make happlier” on your terminal. This will create an executable file in the mysql2hdfs directory.
.. and then you are done!
Now run hadoop dfs (namenode and datanode), start a MySQL server as master with row based replication (you can use mtr rpl suite for testing purposes : $MySQL-5.6/mysql-test$./mtr --start --suite=rpl --mysqld=--binlog_format='ROW' --mysqld=--binlog_checksum=NONE), start hive (optional) and run the executable ./happlier, optionally providing MySQL and HDFS uri's and other available command line options. (./happlier –help for more info).
There are useful filters as command line options to the Hadoop applier.
Options Use -r, --field-delimiter=DELIM
Use DELIM instead of ctrl-A for field delimiter. DELIM can be a string or an ASCII value in the format '\nnn' .Escape sequences are not allowed. Provide the string by which the fields in a row will be separated. By default, it is set to ctrl-A -w, --row-delimiter=DELIM
Use DELIM instead of LINE FEED for row delimiter . DELIM can be a string or an ASCII value in the format '\nnn' Escape sequences are not allowed. Provide the string by which the rows of a table will be separated. By default, it is set to LINE FEED (\n) -d, --databases=DB_LIST
DB-LIST is made up of one database name, or many names separated by commas. Each database name can be optionally followed by table names. The table names must follow the database name, separated by HYPHENS
Example: -d=db_name1-table1-table2,dbname2-table1,dbname3 Import entries for some databases, optionally include only specified tables. -f, --fields=LIST
Similar to cut command, LIST is made up of one range, or many ranges separated by commas.Each range is one of: N N'th byte, character or field, counted from 1 N- from N'th byte, character or field, to end of line N-M from N'th to M'th (included) byte,
character or field -M from first to M'th (included) byte, character or field Import entries for some fields only in a table
-h, --help Display help
Integration with HIVE:Hiveruns on top of Hadoop. It is sufficient to install Hive only on the Hadoop master node.Take note of the default data warehouse directory, set as a property in hive-default.xml.template configuration file. This must be the same as the base directory into which Hadoop Applier writes.
Since the Applier does not import DDL statements; you have to create similar schema's on both MySQL and Hive, i.e. set up a similar database in Hive using Hive QL(Hive Query Language). Since timestamps are inserted as first field in HDFS files,you must take this into account while creating tables in Hive.
SQL Query Hive Query CREATE TABLE t (i INT); CREATE TABLE t ( time_stamp INT, i INT) [ROW FORMAT DELIMITED] STORED AS TEXTFILE;
Now, when any row is inserted into table on MySQL databases, a corresponding entry is made in the Hive tables. Watch the demo to get a better understanding.
The demo is non audio, and is meant to be followed in conjunction with the blog.You can also create an external table in hive and load data into the tables; its your choice!
Watch the Hadoop Applier Demo >>
Limitations of the Applier:In the first version we support WRITE_ROW_EVENTS, i.e. only insert statements are replicated. We have considered adding support for deletes, updates and DDL's as well, but they are more complicated to handle and we are not sure how much interest there is in this.We would very much appreciate your feedback on requirements - please use the comments section of this blog to let us know!
The Hadoop Applier is compatible with MySQL 5.6, however it does not import events if binlog checksums are enabled. Make sure to set them to NONE on the master, and the server in row based replication mode.
This innovation includes dedicated contribution from Neha Kumari, Mats Kindahl and Narayanan Venkateswaran. Without them, this project would not be a success.
Give it a try! You can download a copy from http://labs.mysql.com and get started NOW!
PlanetMySQL Voting: Vote UP / Vote DOWN
High Availability for Drupal Part 1 - Investigating the Issues
Drupal is one of the most popular Content Management Systems (CMS) and is used increasingly in high-visibility sites, such as www.whitehouse.gov. This has brought a lot of attention on how to get the most performance out of Drupal and how to improve the availability of such sites. In this blog series I'll take you through the basics and on through to designing your own HA Drupal site.
But first, we need to understand what the challenges are in getting Drupal (or indeed any CMS) working on multiple servers in such a way as to ensure high availability and performance.
PlanetMySQL Voting: Vote UP / Vote DOWN
MySQL Book in Chinese
One of my old students and lab assistants stopped by to show his fiancée the BYU-Idaho campus. It was a long trip since he lives in Macao, China.
He kindly brought me a copy of my Oracle Database 11g and MySQL 5.6 Developer Handbook in simplified Chinese. He’s holding it in the photo.
That makes three books translated into Chinese, which made my day. It’ll be interesting to see if the new MySQL Workbench: Data Modeling & Development book gets translated into Chinese too. Oddly, I never hear about this from the publisher first.
The cover emphasized only the Dolphin, not the Oracle logo material. It made me wonder, how many MySQL users there might be in China. If anybody from China catches the post, it would be great to hear about the MySQL Community in China.
Likewise, if anybody in China catches the post and reads the book, please let me know if you liked it. Naturally, let me know if you found any problems with it too. By the way, I keep an errata for the book here.
PlanetMySQL Voting: Vote UP / Vote DOWN
DROP TABLE + mult.DROP INDEXes in one statement? (2 replies)
When I now simply enter a DROP TABLE command then I got errors because of
still depending objects/constraints.
Is there a way to DROP a TABLE and all (unknown) (primary) INDEXES in one command? I mean something like:
DROP TABLE mytab123 INCLUDEDEPENDINGINDEXES;
Peter
Optimize table and fragmentation (3 replies)
I am trying to optimize a database of around 60GB. I ran mysqlcheck with optimize options and individual optimize table statements in mysql prompt. But I still see the tables as fragmented. I am checking the fragmentation by using the below query.
SELECT TABLE_NAME from information_schema.TABLES where TABLE_SCHEMA NOT IN ('mysql' , 'information_schema') and Data_free > 0
Data_Free for all tables returned by above query is 65.56 MB .
Request you to help me to find out what I am missing.
Thanks,
Kaarthiik
Slow query. Why is it slow? (12 replies)
1 - It takes almost 20 seconds. How else can I optimise?
2 - Why not use convering_index_three?
Thank you
============================================================
SELECT `products_table`.`id`,
`code`,
`class`,
`category`,
`status`,
`price`,
`production_date`,
`products_status`.`title` AS STATUS
FROM products_table
JOIN `products_status`
ON `products_status`.`id` = `products_table`.`status`
WHERE `products_table`.`id` > 0
AND `class` = '6'
AND `category` = 'E'
ORDER BY `products_table`.`id`
LIMIT 50
================================
CREATE TABLE `products_table`
(
`id` INT(11) NOT NULL auto_increment,
`code` VARCHAR(4) NOT NULL,
`class` INT(11) NOT NULL,
`category` VARCHAR(5) NOT NULL,
`price` DECIMAL(6, 2) NOT NULL,
`production_date` DATE NOT NULL,
`quick_description` VARCHAR(255) NOT NULL,
`description` TEXT NOT NULL,
`status` INT(11) NOT NULL,
PRIMARY KEY (`id`),
KEY `production_date` (`production_date`),
KEY `index_price` (`price`),
KEY `class_index` (`class`),
KEY `category_index` (`category`),
KEY `status_index` (`status`),
KEY `covering_index_class` (`class`, `category`, `price`, `production_date`
, `status`),
KEY `covering_index` (`id`, `code`, `class`, `category`, `price`,
`production_date`, `status`),
KEY `covering_index_three` (`id`, `class`, `category`, `price`,
`production_date`, `status`),
KEY `covering_category` (`id`, `category`, `price`, `production_date`,
`status`)
)
engine=innodb
auto_increment=7642518
DEFAULT charset=latin1
=================================
Indexes: http://webmoosh.com/aashghalduni/ax/indexes.png
EXPLAIN: http://webmoosh.com/aashghalduni/ax/explain_sql.png
About Mysql performance improvement (3 replies)
======================================
Issue number 1:
Quote:
Use "EXPLAIN" to see the execution plan and improve the performance. If it doesn't return a great number of 'rows' then it is a good candidate.
This is WRONG advice! Proof: "rows" in example 1 are only 188 and it takes 3.9150 seconds while "rows" in example 2 are 3012128 and it takes 0.3491
What is going on? How can I test, watch and learn from something that doesn't make sense? There is absolutely no relation between those numbers! Even if the number is approximate because of INNODB it does not make ANY sense at all! Most analysis in books or forums are based on that!
EXAMPLE 1: (Query took 3.9150 sec)
Code:
SELECT `products_table`.`id`,
`code`,
`class`,
`category`,
`status`,
`price`,
`production_date`,
`products_status`.`title` AS STATUS
FROM `products_table`
JOIN `products_status`
ON `products_status`.`id` = `products_table`.`status`
WHERE `products_table`.`id` > 0
AND `class` = '1'
AND `production_date` = '1972-01-19'
ORDER BY `products_table`.`id`
LIMIT 50
Type: Index_merge, Key production_date,PRIMARY, Rows:188
EXAMPLE 2: (Query took 0.3491 sec)
Code:
SELECT `products_table`.`id`,
`code`,
`class`,
`category`,
`status`,
`price`,
`production_date`,
`products_status`.`title` AS STATUS
FROM `products_table` USE INDEX (primary)
JOIN `products_status`
ON `products_status`.`id` = `products_table`.`status`
WHERE `products_table`.`id` > 0
ORDER BY `products_table`.`id`
LIMIT 50
Type: range, Key PRIMARY, Rows:3012128, EXTRAusing where
Issue number 2:
MySQL is DUMB! Why? In example 3 why doesn't it simply use primary key??
EXAMPLE 3: (Query took 1007.1933 sec)
Code:
SELECT `products_table`.`id`,
`code`,
`class`,
`category`,
`status`,
`price`,
`production_date`,
`products_status`.`title` AS STATUS
FROM `products_table`
JOIN `products_status`
ON `products_status`.`id` = `products_table`.`status`
WHERE `products_table`.`id` > 0
ORDER BY `products_table`.`id`
LIMIT 50
Type: Range, Key:covering_index,Rows2207384, EXTRAUsing where; Using index
Primary key works in that case. Proof: Example 4.
EXAMPLE 4: (Query took 0.0026 sec)
Code:
SELECT `products_table`.`id`,
`code`,
`class`,
`category`,
`status`,
`price`,
`production_date`,
`products_status`.`title` AS STATUS
FROM `products_table` USE INDEX ( primary )
JOIN `products_status`
ON `products_status`.`id` = `products_table`.`status`
WHERE `products_table`.`id` > 0
ORDER BY `products_table`.`id`
LIMIT 50
Type: range, Key:PRIMARY, Rows:3016458, EXTRA:Using WHERE
Issue number 3:
Why not use the proper index 'covering_index_three which is on ( id, class, category) columns?
EXAPLE 5: (Query took 154.00 sec)
Code:
SELECT `products_table`.`id`,
`code`,
`class`,
`category`,
`status`,
`price`,
`production_date`,
`products_status`.`title` AS STATUS
FROM `products_table`
JOIN `products_status`
ON `products_status`.`id` = `products_table`.`status`
WHERE `products_table`.`id` > 0
AND `class` = '2'
AND `category` = 'F'
ORDER BY `products_table`.`id`
LIMIT 50
Type: ref, Key:status_index, ref: testing.products_status.id, Rows:3751, EXTRA:Using WHERE
INDEXES
Code:
CREATE TABLE `products_table`
(
`id` INT(11) NOT NULL auto_increment,
`code` VARCHAR(4) NOT NULL,
`class` INT(11) NOT NULL,
`category` VARCHAR(5) NOT NULL,
`price` DECIMAL(6, 2) NOT NULL,
`production_date` DATE NOT NULL,
`quick_description` VARCHAR(255) NOT NULL,
`description` TEXT NOT NULL,
`status` INT(11) NOT NULL,
PRIMARY KEY (`id`),
KEY `production_date` (`production_date`),
KEY `index_price` (`price`),
KEY `class_index` (`class`),
KEY `category_index` (`category`),
KEY `status_index` (`status`),
KEY `covering_index_class` (`class`, `category`, `price`, `production_date`
, `status`),
KEY `covering_index` (`id`, `code`, `class`, `category`, `price`,
`production_date`, `status`),
KEY `covering_index_three` (`id`, `class`, `category`, `price`,
`production_date`, `status`),
KEY `covering_category` (`id`, `category`, `price`, `production_date`,
`status`)
)
engine=innodb
auto_increment=7642518
DEFAULT charset=latin1
An old note on the Storage Engine API
Whenever I stick my head into the MySQL storage engine API, I’m reminded of a MySQL User Conference from several years ago now.
Specifically, I’m reminded of a slide from an early talk at the MySQL User Conference by Paul McCullagh describing developing PBXT. For “How to write a Storage Engine for MySQL”, it went something like this:
- Develop basic INSERT (write_row) support – INSERT INTO t1 VALUES (42)
- Develop full table scan (rnd_init, rnd_next, rnd_end) - SELECT * from t1
- If you’re sane, stop here.
A lot of people stop at step 3. It’s a really good place to stop too. It avoids most of the tricky parts that are unexpected, undocumented and unlogical (yes, I’m inventing words here).
PlanetMySQL Voting: Vote UP / Vote DOWN
Non-Deterministic Query in Replication Stream
You might find a warning like the below in your error log:
130522 17:54:18 [Warning] Unsafe statement written to the binary log using statement format since BINLOG_FORMAT = STATEMENT. Statements writing to a table with an auto-increment column after selecting from another table are unsafe because the order in which rows are retrieved determines what (if any) rows will be written. This order cannot be predicted and may differ on master and the slave.
Statement: INSERT INTO tbl2 SELECT * FROM tbl1 WHERE col IN (417,523)
What do MariaDB and MySQL mean with this warning? The server can’t guarantee that this exact query, with STATEMENT based replication, will always yield identical results on the slave.
Does that mean that you have to use ROW based (or MIXED) replication? Possibly, but not necessarily.
For this type of query, it primarily refers to the fact that without ORDER BY, rows have no order and thus a result set may show up in any order the server decides. Sometimes it’s predictable (depending on storage engine and index use), but that’s not something you want to rely on. You don’t have to ponder that, as an ORDER BY is never harmful.
Would ORDER BY col solve the problem? That depends!
If col is unique, yes. If col is not unique, then multiple rows could result and they’d still have a non-deterministic order. So in that case you’d need to ORDER BY col,anothercol to make it absolutely deterministic. The same of course applies if the WHERE clause only referred to a single col value: if multiple rows can match, then it’s not unique and it will require an additional column for the sort.
There are other query constructs where going to row based or mixed replication is the only way. But, just because the server tells you it can’t safely replicate a query with statement based replication, that doesn’t mean you can’t use statement based replication at all… there might be another way.
PlanetMySQL Voting: Vote UP / Vote DOWN
Mac Firewall and Remote DB Access (1 reply)
I find I can only access the database if I drop the Mac firewall entirely. Macs allow you to specify which applications are allowed through the firewall via security settings. I've tried adding all apps on the mac with mysql in their names but still no joy.
How can I have the server running accepting remote access with the firewall up?
Can't get mysqldump to work... (2 replies)
I'm trying to move a mysql database off of a Linux machine to a Windows machine running MySql. I'm attempting to do this using 'mysqldump' but I keep getting a syntax error. Any help would be greatly appreciated.
Here is what I'm typing in;
mysql>mysqldump norfolk > dump.sql;
ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your mySQL server version for the right syntax to use near 'mysqldump norfolk > dump.sql' at line 1
I also tried;
mysql>mysqldump --databases norfolk > dump.sql but I get the same error.
Any help would be great appreciated!
Thanks
Ryan
