livy指南

Livy是一个基于Spark的开源REST服务,它能够通过REST的方式将代码片段或是序列化的二进制代码提交到Spark集群中去执行。它提供了以下这些基本功能:

  • 提交Scala、Python或是R代码片段到远端的Spark集群上执行;
  • 提交Java、Scala、Python所编写的Spark作业到远端的Spark集群上执行;
  • 提交批处理应用在集群中运行。

livy安装

livy安装很简单,从官网下载zip包。
解压,设置spark和hadoop的环境。

1
2
export SPARK_HOME=spark的路径
export HADOOP_CONF_DIR=/etc/hadoop/conf (hadoop配置路径)

然后在livy的bin目录下执行:
livy-server start

livy配置

在conf/livy.conf文件中有配置,常用的配置:

配置信息 含义
livy.server.host = 0.0.0.0 livy服务启动的host
livy.server.port = 80 启动端口号
livy.spark.master = yarn livy执行spark master
livy.spark.deploy-mode = client spark启动模式
livy.repl.enable-hive-context = true 启动hiveContext
livy.ui.enabled = true livy ui页面

还有其他的配置,用户可以根据自己需求自行配置。
另外,当集群开启了kerberos验证,执行spark任务时报gss错误,需要在配置中加上:

1
2
livy.server.launch.kerberos.keytab = /conf/xxx.keytab
livy.server.launch.kerberos.principal = xxx@HADOOP.COM

Read more
airflow使用指南

airflow 是一个编排、调度和监控workflow的平台,由Airbnb开源。airflow 将workflow编排为tasks组成的DAGs,调度器在一组workers上按照指定的依赖关系执行tasks。同时,airflow 提供了丰富的命令行工具和简单易用的用户界面以便用户查看和操作,并且airflow提供了监控和报警系统。

airflow 核心概念

  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。
  • Operators:可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令…同时,用户可以自定义Operator,这给用户提供了极大的便利性。
  • Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
  • Task Instance:task的一次运行。task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
  • Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 TaskA >> TaskB,表明TaskB依赖于TaskA。

通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 workflow了。

operators

下面讲解下常见的operator,以及如何使用,注意点。

Read more
Google Spanner

原始译文厦门大学林子雨老师翻译,见Google Spanner (中文版)

简介

Spanner是谷歌公司研发的、可扩展的、多版本、全球分布式、同步复制数据库。它是第一个把数据分布在全球范围内的系统,并且支持外部一致性的分布式事务。本文描述了Spanner的架构、特性、不同设计决策的背后机理和一个新的时间API,这个API可以暴露时钟的不确定性。这个API及其实现,对于支持外部一致性和许多强大特性而言,是非常重要的,这些强大特性包括:非阻塞的读、不采用锁机制的只读事务、原子模式变更。

Spanner是个可扩展,多版本,全球分布式还支持同步复制的数据库。他是Google的第一个可以全球扩展并且支持外部一致的事务。Spanner能做到这些,离不开一个用GPS和原子钟实现的时间API。这个API能将数据中心之间的时间同步精确到10ms以内。因此有几个给力的功能:无锁读事务,原子schema修改,读历史数据无block。

功能

从高层看Spanner是通过Paxos状态机将分区好的数据分布在全球的。数据复制全球化的,用户可以指定数据复制的份数和存储的地点。Spanner可以在集群或者数据发生变化的时候将数据迁移到合适的地点,做负载均衡。

spanner提供一些有趣的特性:

  • 应用可以细粒度的指定数据分布的位置。精确的指定数据离用户有多远,可以有效的控制读延迟(读延迟取决于最近的拷贝)。指定数据拷贝之间有多远,可以控制写的延迟(写延迟取决于最远的拷贝)。还要数据的复制份数,可以控制数据的可靠性和读性能。(多写几份,可以抵御更大的事故)
  • Spanner还有两个一般分布式数据库不具备的特性:读写的外部一致性,基于时间戳的全局的读一致。这两个特性可以让Spanner支持一致的备份,一致的MapReduce,还有原子的Schema修改。

这些特性都得益于spanner有个全球时间同步机制,可以在数据提交的时候给出一个时间戳。因为时间是系列化的,所以才有外部一致性。这个很容易理解,如果有两个提交,一个在T1,一个在T2。那有更晚的时间戳那个提交是正确的。

与关系型数据库和nosql对比

https://www.infoq.cn/article/growth-path-of-spanner


HBase case之scan batch

标签数据存储在Hbase中,为了加速标签探索功能,会每天导出一份全量数据表。有时候用户也会进行特定标签勾选导出需求。

导出遇到的问题

导出后,出现部分用户,同一个用户多条数据(两条为主),见下图:

从图中可以看出,出现了重复用户uid。

Read more
HBase系列之snapshot

snapshot(快照)基础原理

snapshot是很多存储系统和数据库系统都支持的功能。一个snapshot是一个全部文件系统、或者某个目录在某一时刻的镜像。实现数据文件镜像最简单粗暴的方式是加锁拷贝(之所以需要加锁,是因为镜像得到的数据必须是某一时刻完全一致的数据),拷贝的这段时间不允许对原数据进行任何形式的更新删除,仅提供只读操作,拷贝完成之后再释放锁。这种方式涉及数据的实际拷贝,数据量大的情况下必然会花费大量时间,长时间的加锁拷贝必然导致客户端长时间不能更新删除,这是生产线上不能容忍的。

snapshot机制并不会拷贝数据,可以理解为它是原数据的一份指针。在HBase这种LSM类型系统结构下是比较容易理解的,我们知道HBase数据文件一旦落到磁盘之后就不再允许更新删除等原地修改操作,如果想更新删除的话可以追加写入新文件(HBase中根本没有更新接口,删除命令也是追加写入)。这种机制下实现某个表的snapshot只需要给当前表的所有文件分别新建一个引用(指针),其他新写入的数据重新创建一个新文件写入即可。

snapshot流程主要涉及3个步骤:

  1. 加一把全局锁,此时不允许任何的数据写入更新以及删除

  2. 将Memstore中的缓存数据flush到文件中(可选)

  3. 为所有HFile文件分别新建引用指针,这些指针元数据就是snapshot

snapshot作用

  • 备份:通常情况下,对重要的业务数据,建议至少每天执行一次snapshot来保存数据的快照记录,并且定期清理过期快照,这样如果业务发生重要错误需要回滚的话是可以回滚到之前的一个快照点的。
  • 迁移:可以使用ExportSnapshot功能将快照导出到另一个集群,实现数据的迁移
Read more
HBase系列之region-split上

Region自动切分是HBase能够拥有良好扩张性的最重要因素之一,也必然是所有分布式系统追求无限扩展性的一副良药。那么region又是如何自动切分的呢,触发的条件又是什么?

Region切分触发策略

在最新稳定版(1.2.6)中,HBase已经有多达6种切分触发策略。即RegionSplitPolicy的实现子类共有6个,如下类图:

当然,每种触发策略都有各自的适用场景,用户可以根据业务在表级别选择不同的切分触发策略。常见的切分策略如下图:

ConstantSizeRegionSplitPolicy

0.94版本前默认切分策略。这是最容易理解但也最容易产生误解的切分策略,从字面意思来看,当region大小大于某个阈值(hbase.hregion.max.filesize)之后就会触发切分,实际上并不是这样,真正实现中这个阈值是对于某个store来说的,即一个region中最大store的大小大于设置阈值之后才会触发切分。另外一个大家比较关心的问题是这里所说的store大小是压缩后的文件总大小还是未压缩文件总大小,实际实现中store大小为压缩后的文件大小(采用压缩的场景)。ConstantSizeRegionSplitPolicy相对来来说最容易想到,但是在生产线上这种切分策略却有相当大的弊端:切分策略对于大表和小表没有明显的区分。阈值(hbase.hregion.max.filesize)设置较大对大表比较友好,但是小表就有可能不会触发分裂,极端情况下可能就1个,这对业务来说并不是什么好事。如果设置较小则对小表友好,但一个大表就会在整个集群产生大量的region,这对于集群的管理、资源使用、failover来说都不是一件好事。

Read more
HBase系列之compact

在介绍HBase Compaction之前,我们先来看一下HBase是如何存储和操作数据。
HBase数据存储

如上图所示,HRegionServer负责打开region,并创建对应的HRegion实例。当HRegion打开之后,它会为每个表的HColumnFamily创建一Store实例,ColumnFamily是用户在创建表时定义好的,ColumnFamily在每个region中和Store实例一一对应。每个Store实例包含一个或者多个StoreFile实例,StoreFile是对实际存储数据文件HFile的轻量级封装。每个Store对应一个MemStore(也就是写内存)。一个HRegionServer共享一个HLog实例。

当我们不停地往HBase中写入数据,也就是往MemStore写入数据,HBase会检查MemStore是否达到了需要刷写到磁盘的阈值(更多关于MemStore刷写的信息,可以参考HBase Reference Guide关于MemStore的介绍)。如果达到刷写的条件,MemStore中的记录就会被刷写到磁盘,形成一个新的StoreFile。可想而知,随着MemStore的不断刷写,会形成越来越多的磁盘文件。然而,对于HBase来说,当每个HStore仅包含一个文件时,才会达到最佳的读效率。因此HBase会通过合并已有的HFile来减少每次读数据的磁盘寻道时间,从而提高读速度,这个文件合并过程就称为Compaction。在这里需要说明的是,显然磁盘IO也是有代价的,如果使用不慎的话,不停地重写数据可能会导致网络和磁盘过载。换句话说,compaction其实就是用当前更高的磁盘IO来换取将来更低的磁盘寻道时间。因此,何时执行compaction,其实是一个相当复杂的决策。

Compaction会从一个region的一个store中选择一些hfile文件进行合并。合并说来原理很简单,先从这些待合并的数据文件中读出KeyValues,再按照由小到大排列后写入一个新的文件中。之后,这个新生成的文件就会取代之前待合并的所有文件对外提供服务。HBase的compaction分为minor和major两种,每次触发compact检查,系统会自动决定执行哪一种compaction(合并)。有三种情况会触发compact检查:

  • MemStore被刷写到磁盘;
  • 用户执行shell命令compact、major_compact或者调用了相应的API;
  • HBase后台线程周期性触发检查。

除非是用户使用shell命令major_compact或者调用了majorCompact() API(这种情况会强制HBase执行major合并),在其他的触发情况下,HBase服务器会首先检查上次运行到现在是否达到一个指定的时限。如果没有达到这个时限,系统会选择执行minor合并,接着检查是否满足minor合并的条件。

major合并中会删除那些被标记为删除的数据、超过TTL(time-to-live)时限的数据,以及超过了版本数量限制的数据,将HStore中所有的HFile重写成一个HFile。如此多的工作量,理所当然地,major合并会耗费更多的资源,合并进行时也会影响HBase的响应时间。在HBase 0.96之前,默认每天对region做一次major compact,现在这个周期被改成了7天。然而,因为major compact可能导致某台server短时间内无法响应客户端的请求,如果无法容忍这种情况的话,可以关闭自动major compact,改成在请求低谷期手动触发这一操作。

Minor Compaction是指选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,在这个过程中不会处理已经Deleted或Expired的Cell。一次Minor Compaction的结果是更少并且更大的StoreFile。

Read more
八大基础分析模型

模型是指对于某个实际问题或客观事物、规律进行抽象后的一种形式化表达方式。任何模型都有三个部分组成:目标、变量和关系。
通俗来讲:

  • 目标:这个模型是干嘛用的,要解决什么问题。
  • 变量:自变量、因变量、中介变量,总之就是,明确变量,改变变量,即可直接呈现结果,实现目标。
  • 关系:可以理解为对目标和变量进行组织。
    下面介绍常用的八种基础分析模型,可能大家也都了解。而且这些模型,其实也在不断的优化,并且又有了一些新特性,比如用户分群模型中的“新增后”、事件模型中的“活跃比”

用户模型

“用户”是以人为中心的数据分析平台的最小单元,对单个用户画像构建越完整,数据多维交叉的分析能力才能凸显。

事件模型

用户在产品上的行为(所有代码的交互)都是会被记录的,怎样标记是事件模型的核心,它是漏斗模型,自定义留存模型,全行为路径分析模型的数据源。

活跃比:某一时间区间内触发某事件的人数占该时间区间内活跃人数的百分比。

Read more
传统的MapReduce框架慢在哪里

本文转载于传统的MapReduce框架慢在那里,翻译自Shark: SQL and Rich Analytics at Scale的论文第七章节,从理论上讨论了相比于Hive,Shark的优势在哪里,原文可见http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-214.pdf.

为什么之前的MapReduce系统比较慢

常理上有几个理由使得MapReduce框架慢于MPP数据库:

  • 容错所引入的昂贵数据实体化 (data materialization)开销。
  • 孱弱的数据布局 (data layout),比如缺少索引。
  • 执行策略的开销[1 2]

而我们对于Hive的实验也进一步证明了上述的理由,但是通过对Hive“工程上”的改进,如改变存储引擎(内存存储引擎)、改善执行架构(partial DAG execution)能够缩小此种差距。同时我们也发现一些MapReduce实现的细节会对性能有巨大的影响,如任务调度的开销,如果减小调度开销将极大地提高负载的均衡性。

中间结果输出:类似于Hive这样的基于MapReduce的查询引擎,往往会将中间结果实体化 (materialize)到磁盘上:

  • 在MapReduce任务内部,为了防止Reduce任务的失败,Map通常会把结果存储在磁盘上。
  • 通常一些查询在翻译到MapReduce任务的时候,往往会产生多个stage,而这些串联的stage则又依赖于底层文件系统(如HDFS)来存储每一个stage的输出结果。

对于第一种情况,Map的输出结果存储在磁盘上是为了确保能够有足够的空间来存储这些大数据批量任务的输出。而Map的输出并不会复制到不同的节点上去,因此如果执行Map任务的节点失效的话仍会造成数据丢失[3]。由此可以推出,如果将这部分输出数据缓存在内存中,而不是全部输出到磁盘上面也是合理的。Shark Shuffle的实现正是应用了此推论,将Map的输出结果存储在内存中,极大地提高Shuffle的吞吐量。通常对于聚合 (aggregation)和过滤之类的查询,它们的输出结果往往远小于输入,这种设计是非常合理的。而SSD的流行,也会极大地提高随机读取的性能,对于大数据量的Shuffle,能够获得较大的吞吐量,同时也拥有比内存更大的空间。

Read more
Performance optimizations in Apache Impala

历史和动机

SQL on Apache Hadoop

  • SQL
  • Run on top of HDFS
  • Supported various file formats
  • Converted query operators to map-reduce jobs
  • Run at scale
  • Fault-tolerant
  • High startup-costs/materialization overhead (slow…)

impala performance optimizations

query planning

2-phase cost-based optimizer

  • Phase 1: Generate a single node plan (transformations, join ordering, static partition pruning, runtime filters)
  • Phase 2: Convert the single node plan into a distributed query execution plan (add exchange nodes, decide join strategy)
    Query fragments (units of work):
  • Parts of query execution tree
  • Each fragment is executed in one or more impalads
Read more