I tried to achieve a parallelized image processing technique using Spark. Different from conventional Spark work with millions of tasks. I only want to separate the image into the number of worker (machine) I have and let one worker process one image patch. So one image patch is one task, if I have 12 image patches, I have 12 tasks. The question is how to explicitly control the schedule of task to each worker. The current situation happens that if I parallelize the image patches, they often send several patches to one or two worker and leave the others not working. I tried to set the system property of spark to control the spark.cores.max and spark.default.parallelism. But it seems not helpful. The only way to make the task send to different workers as separate as possible is to enlarge the second parameter of SparkContext.parallelize - numSlices. Here is the code:
img = misc.imread('test_.bmp')
height, width = img.shape
divisions, patch_width, patch_height = partitionParameters(width, height, 2, 2, border=100)
spark = SparkContext(appName="Miner")
# spark.setSystemProperty('spark.cores.max','1')
spark.setSystemProperty('spark.default.parallelism','24')
broadcast_img = spark.broadcast(img)
start = datetime.now()
print "--------------------", divisions
# run spark
run = spark.parallelize(divisions, 24).cache()
print "--------------- RDD size: ", run._jrdd.splits().size()
result = run.map(lambda (x, y): crop_sub_img(broadcast_img.value, x, y, patch_width, patch_height, width, height)) \
.map(lambda ((x, y), subimg): fastSeg.process(subimg, x, y)) \
.collect()
img = cat_sub_img(result, width, height)
end = datetime.now()
print "time cost:", (end-start)
As you can see, I only have four patches set in divisions. divisions is a list of tuple with x and y-axis of the image patch. Only I set the numSlices to a high value 24 which far exceeds the actual tasks I have in divisions, most of workers are used now. But it seems not reasonable. If I set to 4, it will sent all tasks to only one worker! There must be someway to control how many task one worker accept. I am not familiar with the core of Spark. Can anyone help me, Thanks?
One thought it happens is that the image size is too small for one worker. So spark will assume one worker could handle that and send all to one.
Aucun commentaire:
Enregistrer un commentaire