The two kinds of coprocessors
1. observers: allow the cluster to behave differently during normal client operations.
2. endpoints: allow you to extend the cluster's capabilities, exposing new operations to client applications.
Observers
How it works? That's take a look at the request lifecycle compares to the RegionObserver intercepted
Lifecycle of a request.
1. Client sends a put request
2. Request is dispatched to appropriate RegionServer and region
3. The region receives the put(), processes it, and constructs a response.
4. The final result is returned to the client.
What a RegionServer do
1. Client sends a put request
2. Request is dispatched to appropriate RegionServer and region.
3. CoprocessorHost intercepts the request and invoices prePut() on each RegionObserver registered on the table.
4. Unless interrupted by a **prePut()**, the request continues to region and is processed normally.
5. The result produced by the region is once again intercepted by the CoprocessorHost. This time **postPut()** is called on each registered RegionObserver.
6. Assuming no postPut() interrupts the response, the final result is returned to the client.
There are three types of observers
- RegionObserver: This observer hooks into the stages of data access and manipulation. For example: Get, Put, Delete, Scan, and so on.
- WALObserver: The write-ahead log (WAL) also supports an observer coprocessor. The only available hooks are pre- and post-WAL write events.
- MasterObserver: For hooking into DDL events, such as table creation or schema modifications. For example: postDeleteTable().
Example of RegionObserver
package org.apache.hadoop.hbase.coprocessor.example;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
// For using RegionObserver, we need to inherit BaseRegionObserver first
public class HelloWorldCoprocessor extends BaseRegionObserver{
// We override prePut function
// When there comes a put command, before we execute it, we run prePut first
@Override
public void prePut(
ObserverContext<RegionCoprocessorEnvironment> e,
Put put,
WALEdit edit,
Durability durability)
throws IOException{
// Here we try to create a folder named coprocessor on the hadoop file system before execute put()
FileSystem fw = e.getEnvironment().getRegion().getFilesystem();
fw.mkdirs(new Path("hdfs:///hbase/coprocessor"));
}
// We also try to override preGet function
// When there comes a get command, before we execute it, we run preGet first
@Override
public void preGet(
ObserverContext<RegionCoprocessorEnvironment> e,
Get get,
List<KeyValue> result)
throws IOException{
byte[] testme = Bytes.toBytes("r1");
// if the row name we want to get equal to 'r1', then we add an extra key as output.
if (Bytes.equals(get.getRow(), testme)) {
KeyValue kv = new KeyValue(get.getRow(), testme, testme, Bytes.toBytes(System.currentTimeMillis()));
result.add(kv);
}
}
}
Then we compile it to a .jar file
You can download the hbase source file and put your java file under below path:
hbase-0.96.0/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor.
Then execute **mvn package** under hbase-0.96.0/hbase-examples.
The .jar file will save to hbase-0.96.0/hbase-examples/target
Coprocessor Deployment
It provides two options for deploying coprocessor extensions
1. load from configuration: Which happens when the master or region servers start up
If loaded in this manner, the coprocessors will be active on all regions of all tables.
2. load from table attribute: dynamic loading when the table is (re)opened.
Configured to load on a per table basis. via shell command 'alter' + 'table_att'
# load from configuration, add below configuration into hbase-site.xml
# which name is set by the following rules
# hbase.coprocessor.region.classes: for RegionObservers and Endpoints
# hbase.coprocessor.master.classes: for MasterObservers
# hbase.coprocessor.wal.classes: for WALObservers
# value is the class name, if there are multiple classes specified for loading, the class names must be comma separated.
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.example.HelloWorldCoprocessor</value>
</property>
***** ATTENTION *****
The jar file should be copied to each server of your cluster and placed in where hbase can see
For example: under $HBASE_HOME/lib/
After Configuration is set, you need to stop hbase and re-start it again by below command
stop-hbase.sh //stop hbase
start-hbase.sh //re-start hbase
***** ATTENTION *****
# load from shell
# This is per table basis
// Create a table named 'table1', with column family 'cf1'
hbase(main):014:0> create 'table1','f1'
0 row(s) in 0.4360 seconds
=> Hbase::Table - table1
// using alter shell command to setup coprocessor, you need to disable table first
hbase(main):015:0> disable 'table1'
0 row(s) in 1.3170 seconds
// 'coprocessor' => ' location of file | class name | priority | attrs'
hbase(main):017:0> alter 'table1', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/hbase-examples-v2.jar|org.apache.hadoop.hbase.coprocessor.example.HelloWorldCoprocessor|1001|'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.2470 seconds
// You can use shell command describe to check the settings
hbase(main):018:0> describe 'table1'
DESCRIPTION ENABLED
'table1', {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///hbase/hbase-examples-v2.jar|org.apache false
.hadoop.hbase.coprocessor.example.HelloWorldCoprocessor|1001|'}, {NAME => 'f1', DATA_BLOCK_ENCOD
ING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => '
NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65
536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}
1 row(s) in 0.0420 seconds
# Remember to enable 'table1' after done
hbase(main):022:0> enable 'table1'
0 row(s) in 1.3650 seconds
*****PS. We haven't successful yet on setting coprocessor by loading from shell... Still trying :P
Test your coprocessor
- We will create a new file named 'coprocessor' under hdfs:///hbase/ before we execute put shell command.
- We will add an extra key value before we execute get command and the row name is 'r1'
Here we try our first prePut() function first.
# Make sure there is no file named 'coprocessor' under hdfs:///hbase/
ubuntu@ip-10-232-158-223:~$ hadoop fs -ls hdfs:///hbase
Found 9 items
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:34 hdfs:///hbase/.tmp
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:32 hdfs:///hbase/WALs
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 08:26 hdfs:///hbase/archive
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:32 hdfs:///hbase/corrupt
drwxr-xr-x - ubuntu supergroup 0 2013-12-19 10:05 hdfs:///hbase/data
-rw-r--r-- 3 ubuntu supergroup 99428 2013-12-26 05:00 hdfs:///hbase/hbase-examples-v2.jar
-rw-r--r-- 3 ubuntu supergroup 42 2013-12-19 10:05 hdfs:///hbase/hbase.id
-rw-r--r-- 3 ubuntu supergroup 7 2013-12-19 10:05 hdfs:///hbase/hbase.version
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:43 hdfs:///hbase/oldWALs
# Then we try to put a data into table1
hbase(main):023:0> put 'table1','r1','f1:1','value1'
0 row(s) in 0.0880 seconds
# Go to check hadoop file system again
# Yes!! There is one folder named 'coprocessor' was created.
ubuntu@ip-10-232-158-223:~$ hadoop fs -ls hdfs:///hbase
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:34 hdfs:///hbase/.tmp
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:32 hdfs:///hbase/WALs
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 08:26 hdfs:///hbase/archive
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 08:35 hdfs:///hbase/coprocessor
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 06:32 hdfs:///hbase/corrupt
drwxr-xr-x - ubuntu supergroup 0 2013-12-19 10:05 hdfs:///hbase/data
-rw-r--r-- 3 ubuntu supergroup 99428 2013-12-26 05:00 hdfs:///hbase/hbase-examples-v2.jar
-rw-r--r-- 3 ubuntu supergroup 42 2013-12-19 10:05 hdfs:///hbase/hbase.id
-rw-r--r-- 3 ubuntu supergroup 7 2013-12-19 10:05 hdfs:///hbase/hbase.version
drwxr-xr-x - ubuntu supergroup 0 2013-12-26 08:32 hdfs:///hbase/oldWALs
# Now we test our preGet() function.
# Check the data we have using shell command scan
hbase(main):025:0> scan 'table1'
ROW COLUMN+CELL
r1 column=f1:1, timestamp=1388046967610, value=value1
r2 column=f1:2, timestamp=1388047108045, value=value2
2 row(s) in 0.1070 seconds
# Now we get the data which with row named 'r1', it should add an extra keyvalue in the result
hbase(main):027:0> get 'table1','r1'
COLUMN CELL
r1:r1 timestamp=9223372036854775807, value=\x00\x00\x01C.\x0E\xF4\x01
f1:1 timestamp=1388046967610, value=value1
3 row(s) in 0.0150 seconds
# Try to get data which with row named 'r2', it should only get the data with row named 'r2'
hbase(main):028:0> get 'table1','r2'
COLUMN CELL
f1:2 timestamp=1388047108045, value=value2
1 row(s) in 0.0090 seconds
Endpoints
To be continued.......