Gnoti

머신러닝을 활용한 빅데이터 분석 #3

Apachi nifi에서 취득한 json 데이터를 kafka broker를 이용해 메세징 처리합니다.

이후, druid indexing처리를 이용하여 분석을 위한 데이터셋를 작성합니다.

전체 분석 flow

#1 Kafka topic 만들기

이하 명령을 실행하여 suicide4 라는 카프카 항목을 만들고 여기에 데이터를 보내십시오

## list kafka topic
bin/kafka-topics.sh –list –zookeeper localhost

## delete kafka topic
bin/kafka-topics.sh –delete –zookeeper localhost –topic suicides2

## 위의 명령으로 삭제 되지 않을 경우, zookeeper shell을 이용하여 삭제
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[suicides3, suicides4, __consumer_offsets]
[zk: localhost:2181(CONNECTED) 3] rmr /brokers/topics/suicides3
[zk: localhost:2181(CONNECTED) 4] rmr /brokers/topics/suicides4
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics
[__consumer_offsets]
[zk: localhost:2181(CONNECTED) 6]

## create kafka topic
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic suicides4

#2 Druid Kafka ingestion

Druid의 Kafka 인덱싱 서비스를 사용하여 #1에서 작성한 suicides4에서 메시지를 수집합니다.

vi sucides4-kafka-supervisor.json

{

  “type”: “kafka”,

  “dataSchema”: {

    “dataSource”: “suicides4”,

    “parser”: {

      “type”: “string”,

      “parseSpec”: {

        “format”: “json”,

        “timestampSpec”: {

          “column”: “time”,

          “format”: “auto”

        },

        “dimensionsSpec”: {     

        “timestampSpec”: {

          “column”: “time”,

          “format”: “auto”

         },

          “dimensions”: [“time”,”country”,”year”,”sex”,”agegroup”,”count_of_suicides”,“population”,

 “suicide_rate”,”country_year_composite_key”,”HDI_for_year”,”gdp_for_year”,”gdp_per_capita”,”generation”]

        }

      }

    },

    “metricsSpec” : [],

    “granularitySpec”: {

      “type”: “uniform”,

      “segmentGranularity”: “DAY”,

      “queryGranularity”: “NONE”,

      “rollup”: false

    }

  },

  “tuningConfig”: {

    “type”: “kafka”,

    “reportParseExceptions”: false

  },

  “ioConfig”: {

    “topic”: “suicides4”,

    “replicas”: 2,

    “taskDuration”: “PT10M”,

    “completionTimeout”: “PT20M”,

    “consumerProperties”: {

      “bootstrap.servers”: “localhost:9092”

    }

  }

}

## Enable Druid Kafka ingestion
curl -XPOST -H’Content-Type: application/json’ -d @/home/min/work/sucides4-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor

#3 Result

kafka consumer 확인

./bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic suicides4

답글 남기기