Gnoti

빅데이터 분석을 위한 준비 Kafka + Python 연결

LABORATORY

[ Bigdata&Machine Learning ]

Kafka + Python 연결 테스트

#1 Start zookeeper & kafka

[root@zepp zookeeper-3.4.14]# vi conf/zoo.cfg

[root@zepp zookeeper-3.4.14]# bin/zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /home/min/zookeeper-3.4.14/bin/../conf/zoo.cfg

Starting zookeeper … STARTED

 

[root@zepp kafka_2.12-2.2.0]# bin/kafka-server-start.sh config/server.properties

[2019-04-19 18:12:03,572] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)

[2019-04-19 18:12:04,104] INFO starting (kafka.server.KafkaServer)

[2019-04-19 18:12:04,105] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)

[2019-04-19 18:12:04,128] INFO [ZooKeeperClient] Initializing a new session to localhost:2181. (kafka.zookeeper.ZooKeeperClient)

#2 create topic

[root@zepp kafka_2.12-2.2.0]# ./bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic suicides

Created topic suicides.

[root@zepp kafka_2.12-2.2.0]# bin/kafka-topics.sh –list –zookeeper localhost

suicides

#3 python용 kafka 설치

Collecting kafka

  Downloading https://files.pythonhosted.org/packages/21/71/73286e748ac5045b6a669c2fe44b03ac4c5d3d2af9291c4c6fc76438a9a9/kafka-1.3.5-py2.py3-none-any.whl (207kB)

    100% |████████████████████████████████| 215kB 12.2MB/s

Installing collected packages: kafka

Successfully installed kafka-1.3.5

(base) [min@zepp ~]$

#4 verify message

[root@zepp kafka_2.12-2.2.0]# bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic suicides

#5  python source

#6 Result

빅데이터 분석을 위한 준비 Kafka

LABORATORY

[ Bigdata&Machine Learning ]

Kafka는 대용량 데이터를 위한 분산형 스트리밍 플랫폼입니다.

Kafka clustring Test 수행해 봅니다.

전체Archtecture

#1 ip setting

3대의 Centos 7.3 준비

server.1=192.168.0.145:2888:3888
server.2=192.168.0.174:2888:3888
server.3=192.168.0.175:2888:3888

#2 setting zookeeper & kafka

## download zookeeper & kafka
wget http://mirror.navercorp.com/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz tar zxvf kafka_2.12-2.2.0.tgz mv kafka_2.12-2.2.0 kafka

configure zookeeper  (1,2,3 server)[root@kafka1 min]# vi kafka/config/zookeeper.properties

# zookeeper servers
server.1=192.168.0.145:2888:3888
server.2=192.168.0.174:2888:3888
server.3=192.168.0.175:2888:3888

### configure kafka
[root@kafka1 min]# vi kafka/config/server.properties

broker.id=1 ## id를 server별로 설정
listeners=PLAINTEXT://:9092
zookeeper.connect=192.168.0.145:2181,192.168.0.174:2181,192.168.0.175:2181
delete.topic.enable=true

#3 start zookeeper & kafka

#### start zookeeper server and kafka server

[root@kafka1 min]# kafka/bin/zookeeper-server-start.sh -daemon ./kafka/config/zookeeper.properties

[root@kafka1 min]# kafka/bin/kafka-server-start.sh -daemon ./kafka/config/server.properties

#4 Verify all brokers

[root@kafka1 min]#  ./kafka/bin/zookeeper-shell.sh 192.168.0.145:2181 ls /brokers/ids

Connecting to 192.168.0.145:2181

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[1, 2, 3]

#5 Create topic

### Create topic

kafka/bin/kafka-topics.sh –create –zookeeper 192.168.0.145:2181,192.168.0.175:2181,192.168.0.175:2181 –partitions 3 –replication-factor 3 –topic suicides

 

### List topic

kafka/bin/kafka-topics.sh –list –zookeeper 192.168.0.145:2181,192.168.0.175:2181,192.168.0.175:2181

 

### descirbe topic

[root@kafka1 min]# kafka/bin/kafka-topics.sh –describe –zookeeper 192.168.0.145:2181,192.168.0.175:2181,192.168.0.175:2181

Topic:suicides  PartitionCount:3        ReplicationFactor:3     Configs:

        Topic: suicides Partition: 0    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2

        Topic: suicides Partition: 1    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3

        Topic: suicides Partition: 2    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1

#6 produce & consume

### produce

kafka/bin/kafka-console-producer.sh –broker-list 192.168.0.145:9092 –topic suicides

### consume

kafka/bin/kafka-console-consumer.sh –bootstrap-server 192.168.0.145:9092 –topic suicides –from-beginning

머신러닝을 활용한 빅데이터 분석 #4

LABORATORY

[ Bigdata&Machine Learning ]

druid에 수집된 데이터셋을 superset에서 시각화를 통한 분석을 수행합니다.

전체 분석 flow

#1 druid 수집

머신러닝을 활용한 빅데이터 분석 #3 에서 수집된 데이터셋의 확인

## druid-kafka indexing list
http://192.168.0.166:8081/#/indexing-service

## dsql에서 데이터셋 확인

select * from suicides4kafka

#2 Druid-Superset

Druid에서 수집된 데이터셋을 Superset과 연결하고 데이터 분석을 수행합니다.

#2-1 Connect Superset-Druid

Druid Cluster와의 연결을 위한 설정을 수행합니다.

연결이 완료되면 Datasources 스캔을 통해 Druid Datasources설정 메뉴에서 데이터셋 스키마를 확인 할 수 있습니다.

#2-2 Analysis

Superset에서 간단하게 수집된 데이터셋에 대한 내용을 확인해 봅니다.

#3 Result

전체 flow

Apachi nifi -> kafka -> druid -> superset

머신러닝을 활용한 빅데이터 분석 #3

LABORATORY

[ Bigdata&Machine Learning ]

Apachi nifi에서 취득한 json 데이터를 kafka broker를 이용해 메세징 처리합니다.

이후, druid indexing처리를 이용하여 분석을 위한 데이터셋를 작성합니다.

전체 분석 flow

#1 Kafka topic 만들기

이하 명령을 실행하여 suicide4 라는 카프카 항목을 만들고 여기에 데이터를 보내십시오

## list kafka topic
bin/kafka-topics.sh –list –zookeeper localhost

## delete kafka topic
bin/kafka-topics.sh –delete –zookeeper localhost –topic suicides2

## 위의 명령으로 삭제 되지 않을 경우, zookeeper shell을 이용하여 삭제
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[suicides3, suicides4, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 3] rmr /brokers/topics/suicides3
[zk: localhost:2181(CONNECTED) 4] rmr /brokers/topics/suicides4
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets]
[zk: localhost:2181(CONNECTED) 6]

## create kafka topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic suicides4

#2 Druid Kafka ingestion

Druid의 Kafka 인덱싱 서비스를 사용하여 #1에서 작성한 suicides4에서 메시지를 수집합니다.

vi sucides4-kafka-supervisor.json

{

  “type”: “kafka”,

  “dataSchema”: {

    “dataSource”: “suicides4”,

    “parser”: {

      “type”: “string”,

      “parseSpec”: {

        “format”: “json”,

        “timestampSpec”: {

          “column”: “time”,

          “format”: “auto”

        },

        “dimensionsSpec”: {     

        “timestampSpec”: {

          “column”: “time”,

          “format”: “auto”

         },

          “dimensions”: [“time”,”country”,”year”,”sex”,”agegroup”,”count_of_suicides”,“population”,

 “suicide_rate”,”country_year_composite_key”,”HDI_for_year”,”gdp_for_year”,”gdp_per_capita”,”generation”]

        }

      }

    },

    “metricsSpec” : [],

    “granularitySpec”: {

      “type”: “uniform”,

      “segmentGranularity”: “DAY”,

      “queryGranularity”: “NONE”,

      “rollup”: false

    }

  },

  “tuningConfig”: {

    “type”: “kafka”,

    “reportParseExceptions”: false

  },

  “ioConfig”: {

    “topic”: “suicides4”,

    “replicas”: 2,

    “taskDuration”: “PT10M”,

    “completionTimeout”: “PT20M”,

    “consumerProperties”: {

      “bootstrap.servers”: “localhost:9092”

    }

  }

}

## Enable Druid Kafka ingestion
curl -XPOST -H’Content-Type: application/json’ -d @/home/min/work/sucides4-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor

#3 Result

kafka consumer 확인

./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic suicides4

머신러닝을 활용한 빅데이터 분석 #2

LABORATORY

[ Bigdata&Machine Learning ]

Apachi nifi를 이용해서 csv파일을 로드하고 json형태의 데이타로 data transform을 구현합니다.

전체 분석 flow

#1 Dataset 개요

https://www.kaggle.com/russellyates88/suicide-rates-overview-1985-to-2016
시간,장소, 연령 등의 요소가 포함된 데이터 셋을 분석하여 자살율 증가를 예방함을 목적으로 합니다. 

데이터 파일의 구성요소
 

country, year, sex, age group, count of suicides, population, suicide rate, country-year composite key, HDI for year, gdp_for_year, gdp_per_capita, generation (based on age grouping average).

#2 Load Data with NIFI

프로세서 flow

GetFile -> InferAvroSchema -> ConvertCSVToAvro -> ConvertAvroToJSON -> PublishKafka

#2-1 GetFile Processor

Apache NiFi 프로세서는 데이터 흐름을 만드는 블록입니다. 모든 프로세서는 출력 흐름 파일 생성에 기여하는 각각의 기능을 가지고 있습니다. 아래 이미지에 표시된 데이터 흐름은 GetFile 프로세서를 사용하여 한 sucides cvs 파일을 가져 와서 PutFile 프로세서를 사용하여 다른 디렉터리에 저장합니다.

Input Directory, File Filter란에 수집대상 cvs파일의 Directory 및 파일명을 설정합니다.

#2-2 InferAvroSchema Processor

입력 받은 콘텐츠에서 Avro 스키마를 자동으로 생성합니다.

CSV Header Definition
 

country,year,sex,agegroup,count_of_suicides,population,suicide_rate,country_year_composite_key,HDI_for_year,gdp_for_year,gdp_per_capita,generation

#2-3 ConvertCSVToAvro Processor

Avro 스키마에 따라 CSV 파일을 Avro로 변환합니다.

#2-4 ConvertAvroToJSON Processor

Avro 레코드를 JSON 객체로 변환합니다. 이 프로세서는 Avro 필드를 JSON 필드에 직접 매핑하여 결과 JSON이 Avro 문서와 동일한 계층 구조를 갖도록합니다.

#2-5 PublishKafka Processor

Kafka Producer를 사용하여 Stream Data를 Apache Kafka에 메시지로 보냅니다. 

#3 Result

프로세서 flow

GetFile -> InferAvroSchema -> ConvertCSVToAvro -> ConvertAvroToJSON -> PublishKafka

머신러닝을 활용한 빅데이터 분석 #1

LABORATORY

[ Bigdata&Machine Learning ]

분석 Suicide Rates Overview 1985 to 2016 

 

Suicide Rates Data를 csv형태로 취득한 후, json변환을 거쳐 메세지큐를 통해 Druid에 저장하고 Machine Learning으로 분석 가공한 후, 시각화처리를 수행합니다.

분석에 필요한 솔루션

http://kafka.apache.org/
https://nifi.apache.org/
http://druid.io/
https://scikit-learn.org/
https://superset.incubator.apache.org/

전체 분석 flow

#1 Start Druid

./bin/supervise -c quickstart/tutorial/conf/tutorial-cluster.conf

**Druid 의 경우, zookeeper의 선행 시작이 필수.

#2 Start Kafka broker

./bin/kafka-server-start.sh config/server.properties

#3 Start Nifi

./bin/nifi.sh start

ELK설치 9편 (Cluster구성)

LABORATORY

[ ELK ]

3개의 데이터 전용 노드, 1개의 마스터 전용 노드로 구성합니다.
Master Node가 설치된 서버에는 Kibana, Logstash 을 설치합니다.
데이터는 Master Node 를 통해서만 색인됩니다.

서버스펙

운영 서버에 필요한 CPU, RAM, 저장장치는 총4대
  – OS : Centos 7
  – CPU : 4core
  – RAM : 4GB
  – JAVA : Openjdk 8

Cluster 설치

es-master는 마스터
es-client1는 데이터, es-client2는 데이터, es-client3는 데이터

cluster.name: es-demo
node.name: node-1   (각각node-2, node-3으로 설정)
node.master: true      (마스터 노드만 true로 설정, 데이터 노드는 false)
node.data: false         (마스터 노드만 false로 설정, 데이터 노드는 true)

기동 장애 발생 시 대응..

rm -rf /var/lib/elasticsearch/nodes/0
로 데이터 삭제 후, 재설정

실행 결과

es-client1, es-client2, es-client3에 각각 분산되어 데이터 수집

보나스 테스트(Metricbeat로 apache,system 모니터링)

[root@localhost modules.d]# metricbeat modules list
Enabled:
apache
system

vi apache.yml

# Module: apache
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/6.4/metricbeat-module-apache.html

– module: apache
#metricsets:
metricsets: [“status”]
# – status
period: 10s
hosts: [“http://127.0.0.1”]
server_status_path: “server-status”
#username: “user”
#password: “secret”

vi system.yml

# Module: system
# Docs: https://www.elastic.co/guide/en/beats/metricbeat/6.4/metricbeat-module-system.html

– module: system
period: 10s
metricsets:
– cpu
– load
– memory
– network
– process
– process_summary
– core
– diskio
– socket
process.include_top_n:
by_cpu: 5 # include top 5 processes by CPU
by_memory: 5 # include top 5 processes by memory

– module: system
period: 1m
metricsets:
– filesystem
– fsstat
processors:
– drop_event.when.regexp:
system.filesystem.mount_point: ‘^/(sys|cgroup|proc|dev|etc|host|lib)($|/)’

– module: system
period: 15m
metricsets:
– uptime

실행 결과

es-client1, es-client2, es-client3에 각각 분산된 system, apache 데이터 결과

ELK설치 8편 (Heartbeat Monitoring)

LABORATORY

[ ELK ]

동작 시간 모니터링으로
활성 상태를 탐지하고 서비스가 가능한지 모니터링합니다

Heartbeat 설치

Heartbeat 다운로드 및 설치

설치파일 다운로드

https://artifacts.elastic.co/downloads/beats/heartbeat/heartbeat-6.4.2-x86_64.rpm

설치

# yum install heartbeat-6.4.2-x86_64.rpm

환경설정

# heartbeat setup –template -E output.logstash.enabled=false -E ‘output.elasticsearch.hosts=[“192.168.0.113:9200”]’

키바나 대시보드 템플릿 추가

# heartbeat setup –dashboards Loading dashboards (Kibana must be running and reachable) Loaded dashboards

대시보드 kibana 접속설정 및 데이터 수집 elasticsearch 경로 설정

http, icmp 형태로 서버의 서비스 유무를 확인

Heartbeat 실행

# /usr/share/heartbeat/bin/heartbeat -e -c ./heartbeat.yml -d “publish”

Heartbeat 모니터링 대시보드

ELK설치 7편 (Winlogbeat로 Windows모니터링)

LABORATORY

[ ELK ]

Windows 기반 인프라의 상태를 확인하기 위해 Winlogbeat를 설치하고 Windows 이벤트 로그를 수집합니다

Winlogbeat 설치

설치파일 다운로드 

https://artifacts.elastic.co/downloads/beats/winlogbeat/winlogbeat-6.4.2-windows-x86_64.zip

Powershell로 설치

.\install-service-winlogbeat.ps1

대시보드 템플릿 템플릿 셋업

 .\winlogbeat.exe setup –dashboards

Winlogbeat 환경설정

대시보드 kibana 접속설정 및 데이터 수집 elasticsearch 경로 설정

Winlogbeat 모니터링 대시보드

ELK설치 6편 (Metricbeat)

LABORATORY

[ ELK ]

Metricbeat은 다양한 시스템 및 서비스 메트릭을 수집하여 지정된 출력 대상에 제공하는 경량 log수집기입니다.

Metricbeat은 사용자 환경의 다른 서버에 설치되며 성능 모니터링뿐만 아니라 서버에서 실행중인 다른 외부 서비스의 성능 모니터링에도 사용됩니다. 예를 들어, Metricbeat을 사용하여 시스템 CPU, 메모리 등을 모니터링하고 분석 할 수 있습니다. 

서버스펙

운영 서버에 필요한 CPU, RAM, 저장장치
  – OS : Centos 7
  – CPU : 4core
  – RAM : 4GB
  – JAVA : Openjdk 8

Metricbeat 설치

curl -L -O https://artifacts.elastic.co/downloads/beats/metricbeat/metricbeat-6.4.2-x86_64.rpm

rpm -vi metricbeat-6.4.2-x86_64.rpm

# vi /etc/metricbeat/metricbeat.yml

kibana 대시보드, 수집된 데이터를 elasticsearch로 보내기 위한 설정

시스템 모듈 활성화

metricbeat modules enable system

Metricbeat 실행

#metricbeat setup

#metricbeat -e -c /etc/metricbeat/metricbeat.yml

Windows, Centos 복수대의 서버에 Metricbeat를 실행한 결과