Переменка.
Тренировался в способах импорта данных в Elasticsearch, так же хотелось поиграться с "интенсивными и объемными" данными. Попросил данные у метрологов за месяц. Отдали с улыбкой, сказав, что какая-то жутко прогрессивная и дорогая система визуализации данных не справилась с их объемами, поэтому анализ больше чем за 1 день сделать не получается.
Получите и распишитесь.
Характеристика данных:
1. Объем данных 7.2 гб
Старый десктоп и пара ноутбуков (2-4 CPU, 4Gb RAM) собранные в кластер делают месячную выборку по 4м параметрам за несколько секунд.
Схема или, в терминах эластика, mapping:
Тренировался в способах импорта данных в Elasticsearch, так же хотелось поиграться с "интенсивными и объемными" данными. Попросил данные у метрологов за месяц. Отдали с улыбкой, сказав, что какая-то жутко прогрессивная и дорогая система визуализации данных не справилась с их объемами, поэтому анализ больше чем за 1 день сделать не получается.
Получите и распишитесь.
Характеристика данных:
1. Объем данных 7.2 гб
2. Количество записей 122 000 000
3. Количество параметров: 713
4. Период месяц
Старый десктоп и пара ноутбуков (2-4 CPU, 4Gb RAM) собранные в кластер делают месячную выборку по 4м параметрам за несколько секунд.
Схема или, в терминах эластика, mapping:
"sdku_type": {
"_all" : {"enabled" : false},
"_source" : {"enabled" : false},
"properties": {
"@timestamp": {
"type": "date",
"format": "dateOptionalTime"
},
"field": {
"type": "string"
},
"value":{
"type": "double"
}
}
}
Скрипт для импорта:
#!/usr/bin/python
import csv, sys, time, json, elasticsearch
from elasticsearch import helpers
es = elasticsearch.Elasticsearch(['localhost:9200'])
csvfile = 'MyTable.csv'
jdata = dict()
actions = list()
i = 0
with open(csvfile, 'rb') as file :
line = csv.reader(file, delimiter = ',', skipinitialspace = True)
for row in line :
i += 1
ptime = time.strptime(row[0][0:19], "%Y-%m-%d %H:%M:%S")
ctime = time.strftime('%Y-%m-%dT%H:%M:%S', ptime)
jdata = { '@timestamp': ctime, 'field': row[1], 'value': float(row[2]) }
action = { '_index': 'sdku', '_type': 'sdku_type', '_id': i, '_source': json.dumps(jdata, separators=(',', ':'))}
actions.append(action)
if i % 1000000 == 0:
elasticsearch.helpers.bulk(es, actions)
print "Indexed %d, working on next 100000" %(i)
actions = list()
elasticsearch.helpers.bulk(es, actions)
print "Indexed %d, finishing." %(i)
Результат:
Возникла дилемма - показать результат и оказаться втянутым в работу метрологической службы или сделать вид, что проиграл?
No comments:
Post a Comment