摘要: 主要介绍如何通过官方 ETL 工具 Exchange 将业务线上数据从 Neo4j 直接导入到 Nebula Graph 以及在导入过程中遇到的问题和优化方法。

本文首发于 Nebula 论坛:https://discuss.nebula-graph.com.cn/t/topic/2044

Neo4j 导入 Nebula Graph 的实践总结

1 背景

随着业务数据量不断增长,业务对图数据库在线数据实时更新写入和查询的效率要求也不断增加。Neo4j 存在明显性能不足,Neo4j 社区开源版本只支持单机部署,扩展能力存在比较大的问题,无法满足读写性能的线性扩展以及读写分离的业务需求,并且开源版本 Neo4j 对点和边的总数据量也有限制;而 Neo4j 企业版因果集群也存在单机主节点 Cypher 实时写入的性能瓶颈。

相比于 Neo4j,Nebula Graph 最大的特色便是采用 shared-nothing 分布式的架构,无单主写入瓶颈问题,读写支持线性扩展,擅长处理千亿节点、万亿条边的超大规模数据集。

本文主要介绍如何通过官方 ETL 工具 Exchange 将业务线上数据从 Neo4j 直接导入到 Nebula Graph 以及在导入过程中遇到的问题和优化方法。其中绝大部分问题都已经通过论坛发帖的方式得到社区的支持和解决,本文会结合问题进行逐一列举。

2 部署环境

系统环境:

  • CPU name:Intel(R) Xeon(R) Silver 4114 CPU @ 2.20GHz
  • CPU Cores:40
  • Memory Size:376 GB
  • Disk:HDD
  • System:CentOS Linux release 7.4.1708 (Core)

软件环境:

  • Neo4j:3.4 版本,五节点因果集群
  • Nebula Graph:
    • 版本: nebula-graph v1.1.0 源码编译安装,
    • 部署:单台服务器部署三节点 Nebula Graph 集群。
  • Exchange:nebula-java v1.1.0 源码编译 jar 包
  • 数仓环境:
    • hadoop-2.7.4
    • spark-2.3.1

注意:单台机器部署 Nebula 多节点的端口分配:每个 storage 还会将用户配置的端口号 + 1的端口作为内部使用。请参考论坛帖子 nebula从neo4j导入数据出现Get UUID Failed错误

3 全量 & 增量数据导入

3.1 全量导入

根据 Neo4j 点和边的属性信息创建 Nebula Graph 的 Tag 和 Edge 结构,这里需要注意一点,业务可能会根据不同需求只在部分点和边上增加 Neo4j 点和边的属性信息,其他点和边对应的属性为 NULL,所以需要先跟业务明确一下点和边的全部属性信息,避免遗漏属性。Nebula Graph 的 Schema 信息类似 MySQL,支持 Create 和 Alter 添加属性,并且所有的 Tag 和 Edge 的元数据信息是一致的。

1、Nebula Graph 创建 Tag 和 Edge

# 示例
# 创建图空间,10 个分区,3 个 storage 副本。
CREATE SPACE test(partition_num=10,replica_factor=3);
# 选择图空间 test
USE test;
# 创建标签 tagA
CREATE TAG tagA(vid string, field-a0 string, field-a1 bool, field-a2 double);
# 创建标签 tagB
CREATE TAG tagB(vid string, field-b0 string, field-b1 bool, field-b2 double);
# 创建边类型 edgeAB
CREATE EDGE edgeAB(vid string, field-e0 string, field-e1 bool, field-e2 double); 

2、Exchange 导入配置文件

  • Exchange 配置目前不支持 bolt+routing 的方式连接neo4j,如果是因果集群,可以选择一个从节点进行 bolt 方式直连读取数据,减少集群压力。
  • 我们业务的 Neo4j 数据点和边的 vid 是 string 类型,Nebula v1.x 版本还不支持 string 直接当做 vid(v2.0支持),考虑到官方文档中的描述:“当点数量到达十亿级别时,用 hash 函数生成 vid 有一定的冲突概率。因此 Nebula Graph 提供 UUID 函数来避免大量点时的 vid 冲突。” 选择了uuid() 作为转化函数,但是导入效率要比 hash 低,而且 uuid() 在未来版本可能存在兼容问题。
  • partition: 是指 Exchange 从 Neo4j 拉取数据的分页个数。
  • batch: 是指批量插入 Nebula 的 batch 大小。
{
  # Spark relation config
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # Nebula Graph relation config
  nebula: {
    address:{
      graph:["xxx.xxx.xxx.xx:3699"]
      meta:["xxx.xxx.xxx.xx:45500"]
    }
    user: user
    pswd: password
    space: test

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }
  
  # Processing tags
  tags: [
    # Loading tag from neo4j
    {
      name: tagA
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://xxx.xxx.xxx.xxx:7687"
      user: neo4j
      password: neo4j
      exec: "match (n:tagA) where id(n) < 300000000 return n.vid as vid, n.field-a0 as field-a0, n.field-a1 as field-a1, n.field-a2 as field-a2 order by id(n)" fields: [vid, field-a0, field-a1, field-a2] nebula.fields: [vid, field-a0, field-a1, field-a2] vertex: { field: vid policy: "uuid" } partition: 10 batch: 1000 check_point_path: /tmp/test } # Loading tag from neo4j { name: tagB type: { source: neo4j sink: client } server: "bolt://xxx.xxx.xxx.xxx:7687" user: neo4j password: neo4j exec: "match (n:tagB) where id(n) < 300000000 return n.vid as vid, n.field-b0 as field-b0, n.field-b1 as field-b1, n.field-b2 as field-b2 order by id(n)" fields: [vid, field-b0, field-b1, field-b2] nebula.fields: [vid, field-b0, field-b1, field-b2] vertex: { field: vid policy: "uuid" } partition: 10 batch: 1000 check_point_path: /tmp/test } ] # Processing edges edges: [ # Loading edges from neo4j { name: edgeAB type: { source: neo4j sink: client } server: "bolt://xxx.xxx.xxx.xxx:7687" user: neo4j password: neo4j exec: "match (a:tagA)-[r:edgeAB]->(b:tagB) where id(r) < 300000000 return n.vid as vid, n.field-e0 as field-e0, n.field-e1 as field-e1, n.field-e2 as field-e2 order by id(r)" fields: [vid, field-e0, field-e1, field-e2] nebula.fields: [vid, field-e0, field-e1, field-e2] source: { field: a.vid policy: "uuid" } target: { field: b.vid policy: "uuid" } partition: 10 batch: 1000 check_point_path: /tmp/test } ] } 

3、执行导入命令

nohup spark-submit --class com.vesoft.nebula.tools.importer.Exchange --master "local" exchange-1.1.0.jar -c test.conf > test.log & 

4、查看导入 Nebula Graph 的数据量

./bin/db_dump --space=test --db_path=./data/storage/nebula/ --meta_server=127.0.0.1:45500 -limit 0 --mode=stat --tags=tagA,tagB --edges=edgeAB 

注意:Nebula 1.x 版本目前还只能用 db_dump 统计,2.0 会支持 nGQL 命令的方式统计数量。

3.2 增量导入

增量数据导入主要是通过 Neo4j 内部点和边的自增 id() 进行切割,在导入配置文件 exec 项执行 Neo4j Cypher 语句时增加 id() 范围限制,但前提是需要业务停掉删数据操作,因为增量导入时,如果之前的数据被删除后 Neo4j 会复用 id(),这会导致复用 id() 的增量数据导入时查询不到造成数据丢失。当然业务如果有条件支持 Neo4j Nebula 双写的话,增量导入就不会出现这种问题

exec: "match (n:user) where id(n) >= 300000000 and id(n) < 400000000 return xxx order by id(n)" 

请参考论坛帖子 neo4j到nebula如何做增量导入

3.3 导入问题及解决

使用 Exchange 导入过程中遇到两个问题,及时的得到官方 @nicole 的支持和解决,具体请参考下面两个帖子:

  • nebula从neo4j导入数据,部分属性带回车,拼insert报错,有什么好办法解决吗?
  • 使用Exchange 从neo4j导入nebula,label中有些顶点的属性值是null,导致导入失败

问题 1:Exchange 不支持「换行回车」等特殊字符的转义。如下 string 数据中带有回车,在拼接 insert 语句插入时会因为换行导致插入失败。

Neo4j 导入 Nebula Graph 的实践总结

PR:https://github.com/vesoft-inc/nebula-java/pull/203 已经合入 exchange v1.0 分支

问题 2:Exchange 不支持属性为 NULL 的数据导入。前文 3.1 中提到,业务可能会根据不同需求为某些点和边增加属性,这时其他点和边属性则是 NULL,这样在使用 Exchange 导入时会报错。

Neo4j 导入 Nebula Graph 的实践总结

参考帖子 2 给出的修改建议解决:修改 com.vesoft.nebula.tools.importer.processor.Processor#extraValue,增加 NULL 类型的转化值。

case NullType => {
  fieldTypeMap(field) match {
    case StringType => ""
    case IntegerType => 0
    case LongType => 0L
    case DoubleType => 0.0
    case BooleanType => false
  }
} 

4 导入效率优化

关于导入效率的优化,请参考下面两个帖子:

  • 关于使用Exchange从neo4j导入nebula的性能问题
  • 使用exchange并发 spark-submit –master “local\[16\]” 报错

优化 1:通过适当增加导入配置中的 partition 和 batch 值,提升导入效率。
优化 2:如果是 string 类型做 vid 的话,1.x 版本尽量使用 hash() 函数转化,2.0 版本会支持 string id 类型;如果是int类型做vid的话,可以直接使用,不用转化效率更高。
优化 3:官方建议 spark-submit 提交命令 master 配置改为 yarn-cluster, 若不使用 yarn,可配置成 spark://ip:port;我们是通过 spark-submit --master "local[16]" 的方式增加 spark 并发,导入效率比使用 "local" 提升 4 倍+,测试环境单机三节点 HDD 盘 IO 峰值能到 200-300 MB/s。但在指定 --master "local[16]" 并发导入时遇到 hadoop 缓存问题,采用增加 hdfs 配置 fs.hdfs.impl.disable.cache=true 后重启 hadoop 解决。具体请参考第二个帖子。

5 总结

使用 Exchange 从 Neo4j 导入 Nebula Graph 过程中遇到一些问题,通过积极与社区进行沟通得到了官方 @nicole 及其他小伙伴的快速响应和大力支持,这一点在 Neo4j 导入 Nebula Graph 的实践过程中起到了十分关键的作用,感谢社区的大力支持。期待支持 openCypher 的 Nebula Graph 2.0。

6 参考链接

  1. https://nebula-graph.com.cn/posts/how-to-import-data-from-neo4j-to-nebula-graph/
  2. https://github.com/vesoft-inc/nebula-java/tree/v1.0
  3. https://docs.nebula-graph.com.cn/manual-CN/2.query-language/2.functions-and-operators/uuid/
  4. http://arganzheng.life/hadoop-filesystem-closed-exception.html

推荐阅读

  • 在 Spark 数据导入中的一些实践细节
  • Neo4j 导入 Nebula Graph 的实现原理与实践

Neo4j 导入 Nebula Graph 的实践总结的更多相关文章

  1. JS实现公告上线滚动效果

    本文实例为大家分享了JS实现公告上线滚动效果的具体代码,供大家参考,具体内容如下实现的效果如下,新闻公告上下滚动。代......

  2. Node使用koa2实现一个简单JWT鉴权的方法

    JWT 简介什么是 JWT全称 JSON Web Token , 是目前最流行的跨域认证解决方案。基本的实现是服务端......

  3. node.js常用内置模块一

    在使用内模块的时候需要先将所需的内置模块进行引入、OS模块在nodejs中OS模块提供了与操作系统相关的属性和方法/......

  4. 一文秒懂nodejs中的异步编程

    文章目录 简介同步异步和阻塞非阻塞javascript中的回调回调函数的错误处理回调地狱 ES6中的Promise什......

  5. three.js cannon.js物理引擎之Heightfield

    今天郭先生说一说cannon.js物理引擎之Heightfield高度场,学过场论的朋友都知道物理学中把某个物理量在......

  6. 使用nodejs和express搭建http web服务

    目录简介使用nodejs搭建HTTP web服务请求nodejs服务第三方lib请求post获取http请求的正文E......

  7. three.js cannon.js物理引擎之制作拥有物理特性的汽车

    今天郭先生说一说使用cannon.js的车辆辅助类让我们的汽车模型拥有物理特性。效果图如下,在线案例请点击博客原文。......

  8. nodejs中的文件系统

    、目录简介nodejs中的文件系统模块Promise版本的fs文件描述符fs.stat文件状态信息fs的文件读写fs......

  9. 在nodejs中创建cluster

    目录简介cluster集群cluster详解cluster中的eventcluster中的方法cluster中的属性......

  10. pixi.js 自定义光标样式

    pixi 介绍Pixi是一个超快的2D渲染引擎,通过Javascript和Html技术创建动画或管理交互式图像,从而......

随机推荐

  1. Asp.Net 加密解密

    #region DES加密解密 /// /// DES加密 /// /// 待加密字串 /// 32位Key值 ......

  2. Java反射全解析(使用、原理、问题、在Android中的应用)

    前言今天说Java模块内容:反射。反射介绍正常情况下,我们知晓我们要操作的类和对象是什么,可以直接操作这些对象中的变......

  3. python用分数表示矩阵的方法实例

    前言在机器学习中,我们会经常和矩阵打交道。在矩阵的运算中,python默认的输出是浮点数,但是如果我们想要矩阵的元素......

  4. 在PHP中灵活使用foreach+list处理多维数组的方法

    先抛出问题,有时候我们接收到的参数是多维数组,我们需要将他们转成普通的数组,比如: $arr = [ [1, 2......

  5. C#基于jwt实现分布式登录

    一、传统的session登录在服务器存储一份用户登录的信息,这份登录信息会在响应时传递给浏览器,告诉其保存为cook......

  6. python基于爬虫+django,打造个性化API接口

    简述今天也是同事在做微信小程序的开发,需要音乐接口的测试,可是用网易云的开放接口比较麻烦,也不能进行测试,这里也是和......

  7. tcpdump抓包及tshark解包方法介绍

    tshark是wireshark的命令行工具,通过shell命令抓取、解析报文。tcpdump是Linux系统下的抓......

  8. Linux Shell 编程基础详解——吐血整理,墙裂推荐!

    第一部分:Linux Shell 简介Shell 是一个用 C 语言编写的程序,它是用户使用 Linux 的桥梁。S......

  9. Java并发编程实战(5)- 线程生命周期

    在这篇文章中,我们来聊一下线程的生命周期。在这篇文章中,我们来聊一下线程的生命周期。目录概述操作系统中的线程生命周期......

  10. MySql8 WITH RECURSIVE递归查询父子集的方法

    背景开发过程中遇到类似评论的功能是,需要时用查询所有评论的子集。不同数据库中实现方式也不同,本文使用Mysql数据库......