Искра: найдите пары, имеющие хотя бы n общих атрибутов?
У меня есть набор данных, состоящий из (sensor_id, timestamp, data)
(sensor_id
- идентификаторы IoT-устройств, отметка времени - это UNIX-время, а данные - это хеш MD5 их вывода в то время). В таблице нет первичного ключа, но каждая строка уникальна.
Мне нужно найти все пары sensor_id
s1
и s2
, так что эти два датчика имеют как минимум n
(n=50
) записи (timestamp, data)
, общие между ними, т.е. на n
different в случаях, когда они выдавали одни и те же данные за одну и ту же метку времени.
Для ощущения величин данных у меня есть 10B строк и ~ 50M различных sensor_ids
, и я считаю, что существует около ~ 5M пар датчиков, которые излучали одни и те же данные на одной отметке времени не менее 50 раз.
Какой лучший способ сделать это в Spark? Я пробовал различные подходы (group-by (timestamp, data)
и/или самосоединение), но они являются чрезмерно дорогостоящими по сложности.
Ответы
Ответ 1
Это псевдокод, абстрагирующийся от Искры. Сначала вы можете отсортировать свой набор данных:
select id, timestamp, data order by timestamp, data, id
Пример 10 строк:
s1,100,a #1
s2,100,a #2
s3,100,a #3
s4,100,b #4
s1,101,a #5
s3,101,b #6
s4,101,b #7
s2,101,a #8
s3,102,b #9
s4,102,b #10
Теперь итерация сверху вниз, и пока метка времени и данные совпадают с предыдущей строкой ввода списка записей.
В нашем примере строки 1-3 образуют такой список, поэтому мы уже видим некоторые потенциальные пары:
s1, s2
s1, s3
s2, s3
Строка №4 - это всего лишь одна запись с (100, b), мы можем пропустить ее.
Строка № 5 только одна запись с (101, а), мы можем пропустить ее.
Строка №6 и # 7 - новая пара:
s3, s4
Также # 9 и # 10 образуют пару
Объединяя все это вместе, можно легко подсчитать пары:
s1, s2
s1, s3
s2, s3
s3, s4
s3, s4
Преимущество этого метода заключается в том, что если вы можете сортировать файл, вы можете разбить отсортированный набор данных на несколько меньших фрагментов (фрагменты должны быть разделены на границах группы - т.е. # 1,2,3 должны быть в одном куске), вычислить пары и соединить конечные результаты в качестве последнего шага.
Надеюсь, это поможет.
Ответ 2
Если мое понимание верное, я могу достичь этого, используя простой код ниже,
test("Spark: Find pairs having atleast n common attributes"){
/**
* s1,1210283218710,34
s1,1210283218730,24
s1,1210283218750,84
s1,1210283218780,54
s2,1210283218710,34
s2,1210283218730,24
s2,1210283218750,84
s2,1210283218780,54
s3,1210283218730,24
s3,1210283218750,84
s3,1210283218780,54
*/
val duplicateSensors = sc.textFile("sensor_data")
.map(line => line.split(",")).map(ar=>((ar(1),ar(2)),ar(0) )) // (ts,val),sid
.aggregateByKey(List.empty[String])(_ :+_,_:::_)// grouped(ts,val)(List(n sid))
.flatMapValues(l => l.sorted.combinations(2))// (ts,val)(List(2 sid combination))
.map(_._2).countByValue() // List(s1, s3) -> 3, List(s2, s3) -> 3, List(s1, s2) -> 4 (2sensors, no of common entries)
// Now Do the filter .... grater than 50
duplicateSensors.foreach(println)
}
И вы получите пар с общим количеством атрибутов.
Ответ 3
Вот как я это сделаю.
Сначала создайте некоторые поддельные данные:
#!/usr/bin/env python3
import random
fout = open('test_data.csv','w')
i=0
for x in range(100000):
if i>=1000000:
break
for y in range(random.randint(0,100)):
i = i + 1
timestamp = x
sensor_id = random.randint(0,50)
data = random.randint(0,1000)
fout.write("{} {} {}\n".format(timestamp,sensor_id,data))
Теперь вы можете обрабатывать данные следующим образом.
Если количество строк равно N, количество уникальных временных меток равно T, а ожидаемое количество датчиков на метку времени равно S, тогда сложность каждой операции будет такой же, как в комментариях
import itertools
#Turn a set into a list of all unique unordered pairs in the set, without
#including self-pairs
def Pairs(x):
temp = []
x = list(x)
for i in range(len(x)):
for j in range(i+1,len(x)):
temp.append((x[i],x[j]))
return temp
#Load data
#O(N) time to load data
fin = sc.textFile("file:///z/test_data.csv")
#Split data at spaces, keep only the timestamp and sensorid portions
#O(N) time to split each line of data
lines = fin.map(lambda line: line.split(" ")[0:2])
#Convert each line into a timestamp-set pair, where the set contains the sensor
#O(N) time to make each line into a timestamp-hashset pair
monosets = lines.map(lambda line: (line[0],set(line[1])))
#Combine sets by timestamp to produce a list of timestamps and all sensors at
#each timestamp
#O(TS) time to place each line into a hash table of size O(T) where each
#entry in the hashtable is a hashset combining
timegroups = sets.reduceByKey(lambda a,b: a | b)
#Convert sets at each timestamp into a list of all pairs of sensors that took
#data at that timestamp
#O(T S^2) time to do all pairs for each timestamp
shared = timegroups.flatMap(lambda tg: PairsWithoutSelf(tg[1]))
#Associate each sensor pair with a value one
#O(T S^2) time
monoshared = shared.map(lambda x: (x,1))
#Sum by sensor pair
#O(T S^2) time
paircounts = monoshared.reduceByKey(lambda a,b: a+b)
#Filter by high hitters
#O(<S^2) time
good = paircounts.filter(lambda x: x[1]>5)
#Display results
good.count()
Временные сложности немного волнистые, поскольку я работаю над этим ответом вроде поздно, но узкие места должны быть видимы, по крайней мере.