7. HBase实战之谷粒微博

HBase实战之谷粒微博

1. 需求分析

  1. 微博内容的浏览,数据库表设计。
  2. 用户社交体现:关注用户,取关用户。
  3. 拉取关注的人的微博内容。

2. 代码实现

2.1 代码设计总览
  1. 创建命名空间以及表名的定义
  2. 创建微博内容表
  3. 创建用户关系表
  4. 创建用户微博内容接收邮件表
  5. 发布微博内容
  6. 添加关注用户
  7. 移除(取关)用户
  8. 获取关注的人的微博内容
  9. 测试

img

创建guli-weibo的maven工程,引入以下依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>

然后在resources文件夹添加hbase-site.xml文件

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://hadoop:9000/HBase</value>
    </property>

    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>

    <!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>

    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>hadoop,hadoop101,hadoop102</value>
    </property>

    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/opt/module/zookeeper-3.4.6/zkData</value>
    </property>
</configuration>

工程文件结构如下:

在这里插入图片描述

定义常量接口Constants

package com.atguigu.constants;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

/**
 * @Date 2021/1/30 21:46
 * @Version 10.21
 * @Author DuanChaojie
 */
public interface Constants {

    Configuration CONFIGURATION = HBaseConfiguration.create();
    /**
     * 命名空间
     */
    String NAMESPACE = "weibo";

    /**
     * 内容表
     */
    String CONTENT_TABLE = "weibo:content";
    String CONTENT_TABLE_CF = "info";
    int CONTENT_TABLE_VERSIONS = 1;

    /**
     * 用户关系表
     */
    String RELATION_TABLE = "weibo:relations";
    String RELATION_TABLE_CF1 = "attends";
    String RELATION_TABLE_CF2 = "fans";
    int RELATION_TABLE_VERSIONS = 1;

    /**
     * 收件箱表
     */
    String INBOX_TABLE = "weibo:inbox";
    String INBOX_TABLE_CF = "info";
    int INBOX_TABLE_VERSIONS = 2;
}
2.2 创建命名空间以及表名的定义
/**
 * @Date 2021/1/30 21:47
 * @Version 10.21
 * @Author DuanChaojie
 */
public class HBaseUtil {

    /**
     * 1.创建命名空间
     * 2.判断表是否存在
     * 3.创建表(三张表)
     */


    public static void createNameSpace(String nc) throws IOException {
        // 1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);

        // 2.获取Admin对象
        Admin admin = connection.getAdmin();

        // 3.构建命名空间描述器

        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create(nc).build();

        // 4.创建命名空间
        admin.createNamespace(namespaceDescriptor);

        // 5.关闭资源
        admin.close();
        connection.close();
    }


    public static boolean isTableExist(String tableName) throws IOException {
        // 1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);

        // 2.获取Admin对象
        Admin admin = connection.getAdmin();

        // 3.判断是否存在
        boolean exists = admin.tableExists(TableName.valueOf(tableName));

        // 4.关闭资源
        admin.close();
        connection.close();

        return exists;
    }

    public static void createTable(String tableName,int versions,String... cfs) throws IOException {
        // 1.判断是否传入了列簇信息
        if (cfs.length <= 0){
            System.out.println("请设置列簇信息!");
            return;
        }
        // 2.判断是否存在该表
        if (isTableExist(tableName)){
            System.out.println(tableName + "表已存在!");
            return;
        }
        // 3.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
        // 4.获取Admin对象
        Admin admin = connection.getAdmin();
        // 5.创建表描述器
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        // 6.添加列簇信息
        for (String cf : cfs) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
            // 7.设置版本
            hColumnDescriptor.setMaxVersions(versions);

            hTableDescriptor.addFamily(hColumnDescriptor);
        }

        // 8.创建表的操作
        admin.createTable(hTableDescriptor);

        // 9.关闭资源
        admin.close();
        connection.close();

    }
}
2.3 发布微博
package com.atguigu.dao;

import com.atguigu.constants.Constants;
import javafx.scene.control.Tab;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;

/**
 * @Date 2021/1/30 21:47
 * @Version 10.21
 * @Author DuanChaojie
 */
public class HBaseDao {
    /**
     * 1.发布微博
     * 2.删除微博
     * 3.关注用户
     * 4.取关用户
     * 5.获取用户的的初始化页面
     * 6.获取用户的微博详情
     */

    public static void publishWeiBo(String uid,String content) throws IOException {
        // 1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);

        // 第一部分:操作微博内容表
        // 1.获取微博内容表对象
        Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));

        // 2.获取当前时间戳
        long ts = System.currentTimeMillis();

        // 3.获取rowkey
        String rowKey = uid + "_" + ts;

        // 4.创建Put对象
        Put conPut = new Put(Bytes.toBytes(rowKey));

        // 5.给put对象赋值
        conPut.addColumn(Bytes.toBytes(Constants.CONTENT_TABLE_CF),Bytes.toBytes("content"),Bytes.toBytes(content));

        // 6.执行插入数据的操作
        conTable.put(conPut);

        // 第二部分:操作微博收件箱表
        // 1.获取用户关系表对象
        Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));

        // 2.获取当前发布微博人的fans列簇数据
        Get get = new Get(Bytes.toBytes(uid));
        Result result = relaTable.get(get);

        // 3.创建一个集合,用于存放微博内容表的Put对象
        ArrayList<Put> inboxPuts = new ArrayList<Put>();
        // 4.遍历粉丝
        for (Cell cell : result.rawCells()) {
            // 5.构建微博收件箱表的Put对象
            Put inboxPut = new Put(CellUtil.cloneQualifier(cell));

            // 6.给收件箱表的Put对象赋值
            inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF),Bytes.toBytes(uid),Bytes.toBytes(rowKey));

            // 7.将收件箱表的Put对象存入集合
            inboxPuts.add(inboxPut);
        }

        // 8.判断是否有粉丝
        if (inboxPuts.size() > 0) {
            // 获取收件箱对象
            Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));

            // 执行收件箱表数据插入操作
            inboxTable.put(inboxPuts);

            // 关闭收件箱表
            inboxTable.close();
        }

        // 关闭资源
        connection.close();
        relaTable.close();
        conTable.close();
    }
}
2.4 删除微博
//....
2.5 关注用户
 public static void addAttends(String uid,String... attends) throws IOException {

        //校验是否添加了待关注的人
        if (attends.length <= 0) {
            System.out.println("请选择待关注的人!");
            return;
        }

        // 获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);

        // 第一部分:操作用户关系表
        // 1.获取用户关系表对象
        Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));

        // 2.创建一个集合,用于存放用户关系表的Put对象
        ArrayList<Put> relaPuts = new ArrayList<Put>();

        // 3.创建操作着的Put对象
        Put uidPut = new Put(Bytes.toBytes(uid));

        // 4.循环创建被关注的着Put对象
        for (String attend : attends) {
            // 5.给操作者的Put对象赋值
            uidPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF1),Bytes.toBytes(attend),Bytes.toBytes(attend));

            // 6.创建被关注着的Put对象
            Put attendPut = new Put(Bytes.toBytes(attend));

            // 7.给被关注着的Put对象赋值
            attendPut.addColumn(Bytes.toBytes(Constants.RELATION_TABLE_CF2),Bytes.toBytes(uid),Bytes.toBytes(uid));

            // 8.将被关注着的Put对象放入集合
            relaPuts.add(attendPut);

        }
        // 9.将操作者的Put对象添加至集合
        relaPuts.add(uidPut);

        // 10.执行用户关系表的插入数据操作
        relaTable.put(relaPuts);

        // 第二部分:操作收件箱表
        // 1.获取微博内容表的对象
        Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));

        // 2.创建收件箱表的Put对象
        Put inboxPut = new Put(Bytes.toBytes(uid));

        // 3.循环attends,获取每个被关注着的近期发布的微博
        for (String attend : attends) {

            // 4.获取当前被关注着的近期发布的微博scan
            Scan scan = new Scan(Bytes.toBytes(attend + "_"), Bytes.toBytes(attend + "|"));
            ResultScanner resultScanner = conTable.getScanner(scan);
            // 5.对获取的值进行遍历

            // 定义一个时间戳
            long ts = System.currentTimeMillis();

            for (Result result : resultScanner) {
                // 6.给收件箱的Put对象赋值
                inboxPut.addColumn(Bytes.toBytes(Constants.INBOX_TABLE_CF),Bytes.toBytes(attend),ts++,result.getRow());
            }

        }
        // 7.判断当前的Put对象是否为空
        if (!inboxPut.isEmpty()){


            // 获取收件箱表对象
            Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));

            // 插入数据
            inboxTable.put(inboxPut);

            // 关闭收件箱表连接
            inboxTable.close();
        }

        // 关闭资源
        relaTable.close();
        conTable.close();
        connection.close();
    }
2.6 取关用户
    public static void deleteAttends(String uid,String... dels) throws IOException {

        if (dels.length <= 0) {
            System.out.println("请添加待取关的用户!");
            return;
        }

        // 1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);

        // 第一部分:操作用户关系表
        // 1.获取用户关系表对象
        Table relaTable = connection.getTable(TableName.valueOf(Constants.RELATION_TABLE));

        // 2.创建一个集合,用于存放用户关系表的Delete对象
        ArrayList<Delete> relaDeletes = new ArrayList<Delete>();

        // 3.创建操作者的Delete对象
        Delete uidDelete = new Delete(Bytes.toBytes(uid));

        // 4.循环创建被取关的Delete对象
        for (String del : dels) {
            // 5.给操作者的Delete对象赋值
            uidDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF1),Bytes.toBytes(del));
            // 6.创建被取关的Delete对象
            Delete delDelete = new Delete(Bytes.toBytes(del));
            // 7.给被取关者的Delete对象赋值
            delDelete.addColumns(Bytes.toBytes(Constants.RELATION_TABLE_CF2),Bytes.toBytes(uid));
            // 8.将被取关者的Delete对象添加至集合
            relaDeletes.add(delDelete);
        }
        // 9.将操作者的Delete对象添加至集合
        relaDeletes.add(uidDelete);

        // 10.执行用户关系表的删除操作
        relaTable.delete(relaDeletes);

        // 第二部分:操作收件箱表

        // 1.获取收件箱表对象
        Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));

        // 2.创建操作者的Delete对象
        Delete inboxDelete = new Delete(Bytes.toBytes(uid));

        // 3.给操作者的Delete赋值
        for (String del : dels) {
            inboxDelete.addColumns(Bytes.toBytes(Constants.INBOX_TABLE_CF),Bytes.toBytes(del));
        }
        // 4.执行收件箱表的删除操作
        inboxTable.delete(inboxDelete);

        // 关闭资源
        relaTable.close();
        inboxTable.close();
        connection.close();
    }
2.7 获取用户的的初始化页面
    public static void getInit(String uid) throws IOException {
        // 1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
        // 2.获取收件箱表对象
        Table inboxTable = connection.getTable(TableName.valueOf(Constants.INBOX_TABLE));

        // 3.获取微博内容表对象
        Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));

        // 4.创建收件箱表Get对象,并获取数据(设置最大版本)
        Get inboxGet = new Get(Bytes.toBytes(uid));
        inboxGet.setMaxVersions();
        Result result = inboxTable.get(inboxGet);

        // 5.遍历获取的数据
        for (Cell cell : result.rawCells()) {
            // 6.构建微博内容表Get对象
            Get contGet = new Get(CellUtil.cloneValue(cell));

            // 7.获取该Get对象的数据内容
            Result contResult = conTable.get(contGet);
            // 8.解析内容并打印
            for (Cell conCell : contResult.rawCells()) {
                System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(conCell)));
                System.out.println("CF:"+Bytes.toString(CellUtil.cloneFamily(conCell)));
                System.out.println("CN:"+Bytes.toString(CellUtil.cloneQualifier(conCell)));
                System.out.println("Value:"+Bytes.toString(CellUtil.cloneValue(conCell)));
            }

        }

        // 9.关闭资源
        inboxTable.close();
        conTable.close();
        connection.close();
    }
2.8 获取用户的微博详情
   public static void getWeiBO(String uid) throws IOException {
        // 1.获取Connection对象
        Connection connection = ConnectionFactory.createConnection(Constants.CONFIGURATION);
        // 2.获取微博内容表对象
        Table conTable = connection.getTable(TableName.valueOf(Constants.CONTENT_TABLE));

        // 3.构建Scan对象
        Scan scan = new Scan();
        // 构建过滤器
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator(uid + "_"));

        scan.setFilter(rowFilter);
        // 4.获取数据
        ResultScanner resultScanner = conTable.getScanner(scan);

        // 5.解析数据并打印
        for (Result result : resultScanner) {
            for (Cell conCell : result.rawCells()) {
                System.out.println("RK="+Bytes.toString(CellUtil.cloneRow(conCell)));
                System.out.println("CF="+Bytes.toString(CellUtil.cloneFamily(conCell)));
                System.out.println("CN="+Bytes.toString(CellUtil.cloneQualifier(conCell)));
                System.out.println("Value="+Bytes.toString(CellUtil.cloneValue(conCell)));
            }
        }

        // 6.关闭资源
        conTable.close();
        connection.close();
    }

3. 测试

启动集群

在这里插入图片描述

package com.atguigu.test;

import com.atguigu.constants.Constants;
import com.atguigu.dao.HBaseDao;
import com.atguigu.utils.HBaseUtil;

import java.io.IOException;

/**
 * @Date 2021/1/31 19:30
 * @Version 10.21
 * @Author DuanChaojie
 */
public class TestWeiBo {
    public static void init(){
        try {
            // 创建命名空间
            HBaseUtil.createNameSpace(Constants.NAMESPACE);
        } catch (IOException e) {
            e.printStackTrace();
        }

        try {
            // 创建微博内容表
            HBaseUtil.createTable(Constants.CONTENT_TABLE,Constants.CONTENT_TABLE_VERSIONS,Constants.CONTENT_TABLE_CF);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 创建用户关系表
        try {
            HBaseUtil.createTable(Constants.RELATION_TABLE,Constants.RELATION_TABLE_VERSIONS,Constants.RELATION_TABLE_CF1,Constants.RELATION_TABLE_CF2);
        } catch (IOException e) {
            e.printStackTrace();
        }
        // 创建收件箱表
        try {
            HBaseUtil.createTable(Constants.INBOX_TABLE,Constants.INBOX_TABLE_VERSIONS,Constants.INBOX_TABLE_CF);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        // 初始化
        init();
        // 1001发微博
        HBaseDao.publishWeiBo("1001","赶紧下课ba!");
        // 1002关注1001和1003
        HBaseDao.addAttends("1001","1002","1003");
        // 获取1002初始化页面
        HBaseDao.getInit("1002");

        System.out.println("----------------------------------------------");
        // 1003发布3条微博,同时1001发布2条微博
        HBaseDao.publishWeiBo("1003","谁说的赶紧下课!");
        Thread.sleep(10);
        HBaseDao.publishWeiBo("1001","我没说话!");
        Thread.sleep(10);
        HBaseDao.publishWeiBo("1003","那谁说的!");
        Thread.sleep(10);
        HBaseDao.publishWeiBo("1002","就是不下课!");

        HBaseDao.publishWeiBo("1002","爱咋咋地!");
        System.out.println("发送消息完毕!!!");

        // 获取1002初始化页面
        HBaseDao.getInit("1002");
        // 1002取关1003
        HBaseDao.deleteAttends("1002","1003");
        // 获取1002初始化页面
        HBaseDao.getInit("1002");
        // 1002再次关注1003
        HBaseDao.addAttends("1002","1003");
        // 获取1002初始化页面
        HBaseDao.getInit("1002");
    }
}