Базы данных: введение, часть восьмая

Илья Тетерин
2012-11-19

(use arrow keys or PgUp/PgDown to move slides)

Много структурированных данных

Hadoop / MapReduce

Column Oriented Databases

Bigtable / Hbase / Cassandra

Map / Reduce

MapReduce - очень мощный механизм обработки данных, когда мы можем выделить фазы вычислений по типам - зависят от одного элемента, зависят от многих элементов.


У нас есть список значений (записей).

У хотим что-то посчитать по этому списку.

Введем понятие mapper - преобразует одно исходное значение в набор промежуточных (0 .. K значений).

Введем понятие reducer - имея на входе N промежуточных значений выдает на выход 0 .. M результатов.


Примеры алгоритмов - max / min / avg / sum ... top 10 значений по какой-то выборке етс

Map

отображение (map) - делаем новый список, элементы которого получены применением функции к элементам исходного списка

Filter

фильтрация (filter) — каждый элемент списка проверяется на соответствие функции-предикату, и если элемент не соответствует, он выбрасывается из списка

Reduce

свертка (reduce / fold) - "сворачиваем" множество значений в одно, комбинируя последовательно элементы множества

Требуются:
Множество элементов: что мы будем "сворачивать"
Начальный (пустой) элемент (unit): 1 - умножение, 0 - сложение, пустая строка - склеивание строк етс.
функция двух параметров - комбинатор:
v0 = combinator (unit, element);
далее vi = combinator(Vprev, element);

Возведем в квадрат - обычный подход - цикл (python)
def square(x):
    return x*x
vals = [1, 2, 3, 4]
newvals = []
for v in vals:
    newvals.append(square(v))
map встроен в Python в 1993? году - воспользуемся:
def map(f, s):
    result = []
    for x in s:
            result.append(f(x))
    return result

vals = [1, 2, 3, 4]
newvals = map(square,vals)

Функциональная библиотека на Java

interface Filter<T> { boolean fits(T t); }
interface Function<X, Y> { Y apply(X arg); }
interface BiFunction<X, Y, Z> { Z apply(X x, Y y); }

public static <X, Y> List<Y> map(List<X> items, Function<X, Y> fu) {
  final List<Y> out = new ArrayList<Y>(items.size());
  for (final X item : items)
    out.add(fu.apply(item));
  return out; 
}

public static <X> List<X> filter(List<X> items, Filter<X> fi) {
  final List<X> out = new ArrayList<X>();
  for (final X item : items)
    if (fi.fits(item))
      out.add(item);
  return out; 
}

public static <U, X> U reduce(List<X> items, BiFunction<U, X, U> fu, U unit) {
  U out = unit;
  for (final X item : items)
    out = fu.apply(out, item);
  return out; 
}
Функция высшего порядка - функция, принимающая в качестве аргумента (или возвращающая) другую функцию ...
final static Function<Integer, Integer> square = 
new Function<Integer, Integer>() {
  public Integer apply(final Integer arg) {
    return arg * arg;
  }};

final static Filter<Integer> evenFilter = 
new Filter<Integer>() {
  public boolean fits(final Integer integer) {
    return integer % 2 == 0;
  }};

final List<Integer> list = new ArrayList<Integer>();
for(int i = 0; i < 50; i++) 
  list.add(i);

final List<Integer> squaresOfEvens = map(filter(list, evenFilter), square);

final List<Integer> evens = filter(list, evenFilter);
final List<Integer> squaresOfEvens = map(evens, square);

final List<Integer> squares = new ArrayList<Integer>(); for (final Integer integer : list) { if (integer % 2 == 0) { squares.add(integer); // ой!!! забыл в квадрат возвести }}

Плюсы

Минусы

Готовые библиотеки:

LambdaJ is a library that makes easier to to manipulate collections in a pseudo-functional and statically typed way.
Guava-libraries contains several of Google's core libraries that we rely on in our Java-based projects: collections, caching, primitives support, concurrency libraries, common annotations, string processing, I/O, and so forth.

А в чем изюминка?

Детали map / reduce реализации отдельны и скрыты от разработчика...

... следовательно могут оптимизироваться отдельно ...

Многопоточность?

Распределенность?

Реализация многопоточного map

final static int threads = 50;
public static <X, Y> List<Y> mapParallel(
      final List<X> items, final Function<X, Y> fu) 
    throws InterruptedException, ExecutionException {
  // нарежем на кусочки по количеству тредов
  final int chunkSize = items.size() % threads;
  final List<List<X>> chunks = split(items, chunkSize);
  // подготовим исполнение в threads потоков
  final ExecutorService executorService = Executors.newFixedThreadPool(threads);

  final Function<List<X>, Callable<List<Y>>> toCallable 
      = new Function<List<X>, Callable<List<Y>>>() {
    public Callable<List<Y>> apply(final List<X> arg) {
      return new Callable<List<Y>>() {
        public List<Y> call() throws Exception {
          return map(arg, fu); // исполнение через простую map
        } }; } }; 

  // сконвертиуем кусочки и отправим на вычисление
  final List<Future<List<Y>>> futures = executorService
    .invokeAll(map(chunks, toCallable)); // отправим на исполнение
  executorService.shutdown(); // выключим многотредность

  final List<Y> out = new ArrayList<Y>(items.size());
  for (final Future<List<Y>> item : futures) // соберем результаты
      out.addAll(item.get()); // Future.get подождет, если результат еще не готов
  return out; }

... реализация же скрыта от разработчика, да? ...

final static int MAP_MAX_IN_SINGLE_THREAD = 500;
final static int MAP_MIN_CHUNK_PER_THREAD = 100;

public static <X, Y> List<Y> map(List<X> items, Function<X, Y> fu) {
  if (items.size() > MAP_MAX_IN_SINGLE_THREAD &&
       items.size() > MAP_MIN_CHUNK_PER_THREAD)
    return mapParallel(items, fu);

  final List<Y> out = new ArrayList<Y>(items.size());
  for (final X item : items)
    out.add(fu.apply(item));
  return out;
}

Мы сделали (и спрятали от разработчика) многопоточное исполнение вычисления

... отладили и проверили ...

... автоматом встроили в кучу мест, где уже используется...

... автоматически включающееся от размер коллекции.

Кто хочет поискать по всему коду циклы for и встроить по тем местам многопоточность?

MapReduce - мощная вычислительная модель.

Разделение кода

вычисление - бизнес-логика:
фунции, фильтры, комбинаторы
исполнение - библиотека:
map, filter, reduce

Это позволяет кодировать разным людям разные части системы ... и оптимизировать код по частям.

Кому-то интересно возиться с библиотеками, кому-то интересно писать код и получать результаты от бизнес-части

MapReduce by Google

В 2004-ом году Google публикует статью MapReduce: Simplified Data Processing on Large Clusters by Jeffrey Dean и Sanjay Ghemawat (pdf / slides).

Рассматривается вычислительная модель MapReduce и внутренняя реализация, распараллеливающая вычисление в кластера большого размера.


Помните реализацию многопоточного map? Хотите сами написать и отладить?

А можете сами написать свои filter / map / reduce и отладить локально на одной машине?

А потом отправляете их в кластер на вычисление и библиотека позаботится об использовании тысяч машин.

Задачи

Требования

Вход

Наборы ключ-значение (строка - строка)

Программист предоставляет

map (in_key, in_value) -> list(out_key, intermediate_value)
и
reduce (out_key, list(intermediate_value)) -> list(out_value)

Система

Example: Count word occurrences

map(String input_key, String input_value):
    // input_key: document name
    // input_value: document contents
    for each word w in input_value:
      EmitIntermediate(w, "1");

reduce(String output_key, Iterator intermediate_values):
    // output_key: a word
    // output_values: a list of counts
    int result = 0;
    for each v in intermediate_values:
      result += ParseInt(v);
    Emit(AsString(result));

Pseudocode: See appendix in paper for real code

Важные детали

Для чего используется
Литература по MapReduce

Hadoop / Hadoop MapReduce

Hadoop Wiki: PoweredBy

Facebook, Amazon, Adobe, AOL, Baidu, EBay (532 node, 8*532 core, 5.3Pb), Google - для студентов, IBM, Last.fm, LinkedIn, Yahoo (100 000+ cores, 40 000+ nodes) ...

Amazon предлагает облако Hadoop машин - Amazon Elastic MapReduce ... с ценами от $0.015 (45 копеек) за час машинного времени.

MongoDB ... MapReduce

Production Deployments: Disney, GitHub, FourSquare, SourceForge, etc...

Уже упоминалась в лекции № 05.

Как поставить mongo?

Quickstart OS X

$ brew update
$ brew install mongodb
...
mongod run --config /usr/local/Cellar/mongodb/2.0.1-x86_64/mongod.conf

В конфиге указываем
# Store data in /usr/local/var/mongodb instead of the default /data/db
dbpath = /usr/local/var/mongodb
# Only accept local connections
bind_ip = 127.0.0.1

pulser-osx:~ pulser$ mongo
MongoDB shell version: 2.0.1
connecting to: test
> show dbs
admin   (empty)
csc 0.203125GB
local   (empty) 
> use csc
switched to db csc
> db.student.count()
41

Как работать с MongoDB?

Пишем скрипт на javascript ... и кидаем через command-line

  1 t = db.student;
  2 t.drop();
  3 
  4 t.save({name:"Аганезов Сергей",department:"UN",vuz:"ИТМО"})
  5 t.save({name:"Альперович Семен",department:"SE",vuz:"СПбГПУ"})


 48 var m = function () {
 49     emit(this.vuz, {fio:this.name});
 50 };
 51 
 52 var r = function (key, values) {
 53     var sum = 0;
 54     var fu = function (doc) { sum = sum + 1; };
 55     values.forEach(fu);
 56     return {vuz:key, count:sum};
 57 };
 58 
 59 var op = db.student.mapReduce(m, r, {out:"mr_result"});
 60 var done = db.mr_result.find("this.value.count > 0");
 61 
 62 done.forEach(printjson);

Пример вывода

pulser-osx:tmp pulser$ mongo csc insert.js 
MongoDB shell version: 2.0.1
connecting to: csc
{ "_id" : "ГУАП", "value" : { "vuz" : "ГУАП", "count" : 2 } }
{ "_id" : "ИТМО", "value" : { "vuz" : "ИТМО", "count" : 20 } }
{ "_id" : "СПбГПУ", "value" : { "vuz" : "СПбГПУ", "count" : 9 } }
{ "_id" : "СПбГУ", "value" : { "vuz" : "СПбГУ", "count" : 6 } }

Литература:

Простыми словами.

Существует облако ...

Вы пишете последовательность шагов map-reduce-map-map-reduce.

Вы определяете входную коллекцию и название коллекции, куда положить результат.

Вы собираете свои шаги в пакет (скрипт в случае mongo, jar в случае hadoop)

Отправляете скрипт и конфиг в облако на вычисление.

Забираете результат из коллекции результатов.


Облако берет конфиг и пакет к себе, раскладывает его по нодам.

На каждой ноде прогоняет локальный _кусочек_ коллекции через map и кладет в коллекцию временных результатов.

Для каждого reduce обеспечивает что все value для конкретного ключа попадут в один инстанс reduce. Для этого коллекция временных результатов сортируется и собирается на новых нодах.

По окончании всех шагов облако складывает результаты в указанную коллекцию.

This slide intentionally left blank

Bigtable:
A Distibuted Storage System for Structured Data

В 2006 году опубликована статья labs.google.com/papers/bigtable.html.

Описывается система, позволившая хранить петабайты данных на тысячах серверов и используемая в 60+ проектах.

Предоставляет API распределенной, сортированной Map с произвольными текстовыми данными в качестве значений.

Модель данных

Bigtable - распределенная, разряженная, многомерная сортированная Map (ассоциативный массив).

Ключ массива - id ряда, колонка, отметка времени (timestamp).

Значение - произвольный массив байт.

(
  row:string,
  column:string,
  time:int64
) -> string

Ряд

Ключ - произвольная строка с максимальной длиной 64K.

Обычно ключ 10-100 символов.

Любая операция в пределах ключа - атомарна с точки зрения пользователя.

Так проще пользователям понимать поведение системы при многих потоках обновления.

Данные сортируются в соответствии с порядком ключей (по алфавиту строчного представления ключа).

Данные автоматически нарезаются на поддиапазоны ключей - tablet - блок данных для раскладки в кластер и регулировки нагрузки.

Таким образом - запросы по малым диапазонам ключей (Иванов-Ивановы) можно обработать на 1 машине.

Подбирая ключи - можно обеспечить локальность данных и данные будут лежать в одном или соседних tablets, например обратная запись домена в качестве ключа: ru.yandex, ru.yandex.company

Колонки (family:qualifier)

Column family:

Column qualifier:

language:id="en_US"
anchor:ru.yandex.company="http://company.yandex.ru/public/articles/"
anchor:com.google.labs="http://labs.google.com/papers/bigtable.html"

Доступ и хранение - на уровне отдельных column family.

Один процесс - пишет base column family (CF) - contents

Другой на основании этой CF создает anchor CF etc.

Source: Google Bigtable PDF

Timestamp

Пример: для CF content выставляем N=3 ... и получается, что в архиве хранятся последние три версии страницы, что видел crawler.

Модель в картинках

HBase: The Definitive Guide by Lars George, Sept 2011

Запись в Bigtable

// Open the table
Table *T = OpenOrDie("/bigtable/web/webtable");

// Write a new anchor and delete an old anchor
RowMutation r1(T, "com.cnn.www");
r1.Set("anchor:www.c-span.org", "CNN");
r1.Delete("anchor:www.abc.com");
Operation op;
Apply(&op, &r1);

Чтение из Bigtable

Scanner scanner(T);
ScanStream *stream;
stream = scanner.FetchColumnFamily("anchor");
stream->SetReturnAllVersions();
scanner.Lookup("com.cnn.www");
for (; !stream->Done(); stream->Next()) {
  printf("%s %s %lld %s\n",
    scanner.RowName(),
    stream->ColumnName(),
    stream->MicroTimestamp(),
    stream->Value()); } 

Кирпичики

Bigtable:

Ключевые понятия

SSTable (sorted string table)

HBase - http://hbase.apache.org/

HBase - открытая, доступная реализация Bigtable

HBase is the Hadoop database.

Use it when you need random, realtime read/write access to your Big Data.

This project's goal is the hosting of very large tables -- billions of rows X millions of columns -- atop clusters of commodity hardware.

HBase is an open-source, distributed, versioned, column-oriented store modeled after Google' Bigtable: A Distributed Storage System for Structured Data

HBase provides Bigtable-like capabilities on top of Hadoop.

Книга: HBase: The Definitive Guide by Lars George, Sept 2011

NB: документация утверждает, что если у вас сотни Mb данных и десятки миллионов рядов - не используйте, данных мало, кластер простаивать будет :).

HBase - history

source: HBase: The Definitive Guide by Lars George, Sept 2011

Архитектура

source: HBase Architecture 101 - Storage by Lars George

HFile - SSTable

source: HBase Architecture 101 - Storage by Lars George

Write Ahead Log

Список литературы

labs.google.com/papers/bigtable.html
Bigtable: A Distributed Storage System for Structured Data by Fay Chang, et.

HBase: The Definitive Guide: Intro by Lars George, at OReilly

HBase: The Definitive Guide by Lars George, Sept 2011

Блоги:
HBase Architecture 101 - Storage by Lars George
HBase Architecture 101 - Write-ahead-Log by Lars George
BigTable Model with Cassandra and HBase by Ricky Ho
HBase vs. BigTable Comparison by Lars George
WTF is a SuperColumn? An Intro to the Cassandra Data Model by Arin Sarkissan
The Underlying Technology of Messages by Kannan Muthukkaruppan
Myths Rumors Fud Hate NoSQL Cassandra vs hbase by Edward Capriolo
HBase vs Cassandra by Jesse Shieh
Real Life Cassandra by Dave Gardner

Немножко баловства. / Hbase: Quickstart

wget http://www.sai.msu.su/apache/hbase/hbase-0.94.2/hbase-0.94.2.tar.gz
tar -xzvf hbase-0.94.2.tar.gz
cd hbase-0.94.2

mkdir /hbase.pulser
chmod a+w /hbase.pulser

vim conf/hbase-site.xml
<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>file:///hbase.pulser/hbase</value>
  </property>
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/hbase.pulser/zookeeper</value>
  </property>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>localhost</value>
  </property>
</configuration>

# закомментируйте 127.0.1.1 !!! на Ubuntu
vim /etc/hosts 
# 127.0.1.1 my-server-name-here
pulser@nas:~/my.hbase/hbase-0.94.2$ ./bin/start-hbase.sh 
starting master, logging to /home/pulser/my.hbase/hbase-0.94.2/logs/hbase-master.out

pulser@nas:~/my.hbase/hbase-0.94.2$ ./bin/hbase shell
hbase(main):001:0>help

hbase(main):053:0> create 'test', 'cfi', 'cfs'
hbase(main):054:0> put 'test', 'pu', 'cfi:login', 'pulser'
hbase(main):055:0> put 'test', 'pu', 'cfi:sname', 'Teterin'
hbase(main):056:0> put 'test', 'pu', 'cfi:name', 'Ilja'

hbase(main):058:0> put 'test', 'pu', 'cfi:name', 'Ilya'
hbase(main):059:0> scan 'test'
ROW                    COLUMN+CELL                                                    
 pu                    column=cfi:login, timestamp=1353318747337, value=pulser        
 pu                    column=cfi:name, timestamp=1353318837414, value=Ilya           
 pu                    column=cfi:sname, timestamp=1353318767678, value=Teterin       

hbase(main):063:0> get 'test', 'pu', {COLUMN=>'cfi:name', VERSIONS=>5}
COLUMN                 CELL                                                           
 cfi:name              timestamp=1353318837414, value=Ilya                            
 cfi:name              timestamp=1353318783486, value=Ilja  

hbase(main):064:0> put 'test', 'pu', 'cfs:01', 'vovan'
hbase(main):065:0> put 'test', 'pu', 'cfs:03', 'piter'
hbase(main):066:0> put 'test', 'pu', 'cfs:02', 'kolyan'

Cassandra / http://cassandra.apache.org/

Column oriented хранилище, изначально из Facebook

Отказ от Single Point Of Failure (SPOF) - все ноды равны.

Каждая нода "поддерживает" несколько других (служит backup нодой для M предыдущих по кольцу).

Ноды выстроены в hash-ring - можете по hash (2^127) = равномерно, но никаких range scan

Можете положить токены по префиксам ключей - получите range scan, но неравномерную раскладку по нодам

N-R-W = настраиваемая consistency.

N - колво нод в реплике, R - сколько нод спросим при чтении, W - на сколько нод запишем значение.

N < R + W => жесткая полная consistency!

N-R-W настраивается на каждую column family.

Запись можно делать на любую ноду кластера.

Read Repair - если R ноды "не согласны" - выпихиваем самое свежее на них (в фоне).

Hinted Handoff - (надо записать в B, B недоступен, пишем в C и говорим "отдай B, когда вернется")

Brisk (http://www.datastax.com/docs/0.8/brisk/about_brisk) - Brisk is an open-source Hadoop and Hive distribution developed by DataStax that utilizes Apache Cassandra for its core services and storage. Brisk provides Hadoop MapReduce capabilities using CassandraFS, an HDFS-compatible storage layer inside Cassandra.

Итого:

Вопросы?