40 GB veri üzerinde hesaplamalar yapıyorum. Her dosya, json satırlarını içeren sıkıştırılmış bir gzip dosyasıdır. Her dosyada en fazla 500.000 satır veya yaklaşık 500 MB bulunur. 128 cpu ve 1952 GB bellek ile çalışan bir Amazon örneğim var. Yapmaya çalıştığım şey, her dosyayı olabildiğince çabuk işlemek.Python Çoklu İşlem Havuzu Yeterli İşlemler Oluşturmuyor
def initializeLock(l):
global lock
lock = l
if __name__ == '__main__':
directory = '/home/ubuntu/[directory_containing_files]/*.gz'
file_names = glob.glob(directory)
lock = Lock()
pool = Pool(initializer=initializeLock, initargs=(lock,))
pool.map(do_analysis, file_names)
pool.close()
pool.join()
Ne olmasını beklediğiniz süreçlerin büyük miktarda içindir oluşturulacak ve her biri bir dosya işler:
böyle çoklu işlem Havuzları kullanıyorum. Gerçekte olan şey başlangıçta 100'ün üzerinde süreç yaratılıyor. Bu noktada hafızamın yaklaşık% 85'ini kullanıyorum, bu harika! Sonra her biri tamamlandı. Sonunda çalışan işlemlerin miktarı yaklaşık 10'a düşüyor. Bu noktada, sadece belleğin% 5'ini kullanıyorum. Periyodik olarak ek işlemler başlatılır, ancak hiçbir zaman 100 ya da daha fazla çalışmaya devam etmez. Bu kadar büyük belleğe sahip bu boş hafızaya sahibim, ama çoğu zaman en fazla 10 işlemde çalışıyorum.Tüm dosyalar tamamlanana kadar 100 işlemi çalıştırmaya devam etmek için nasıl bir fikir edineceğinize dair bir fikir var mı?
DÜZENLEME:
ben uygulamaya bazı günlük ekledi. Başlangıçta 127 işlem yükler, sanırım bunun 128 CPU'su var ve süreçler yüklendiğinde kullanımdaydı. Bazı işlemler başarıyla tamamlanır ve sonuç kaydedilir. Daha sonra bir noktada tüm çalışan işlemlerin birkaçı bitiyor. Kaç dosya bittiğini kontrol ettiğimde 127'den sadece 22'si tamamlandı. Sonra sadece 5-10 işlemlerini kullanarak çalışır ve bunların hepsi başarıyla bitirir. Düşünüyorum belki hafızası tükenmiş ve çöküyor. Ama neden? Çok fazla belleğim ve çok fazla CPU'm var.
DÜZENLEME 2:
Yani sorunu buldum. Sorun, do_analysis yönteminde bir kilit oluşturuyordu ve tüm süreçler aynı zamanda bitiyordu ve kilidin serbest kalmasını bekliyordu. Süreçler durmadı, uyuyorlardı. Yani bu beni başka bir soruya getiriyor: Burada ana amacım birçok json satırı olan her bir dosyayı almak, json satırındaki ID özelliğini almak ve daha sonra aynı kimliğe sahip diğer satırları içeren bir dosyaya eklemektir. Eğer dosya mevcut değilse onu yaratıyorum. Yaptığım dosyaya, başka bir işlem tarafından erişilmesini önlemek için erişildiği zaman bir kilit oluşturuldu. İşte benim kodum.
for key, value in dataframe.iteritems():
if os.path.isfile(file_name):
lock.acquire()
value.to_csv(filename), mode='a', header=False, encoding='utf-8')
lock.release()
else:
value.to_csv(filename), header=True, encoding='utf-8')
yüzden şimdi dosyalara eklenecek yaratıcı bir şekilde düşünmeye çalışıyorum, ama diğer her sürecini engelleme değilim. Çok fazla veri ile uğraşıyorum ve iki dosyaya aynı anda erişilmesi gereken şans düşük, ama yine de olacak. Bu yüzden, bir dosya eklendiğinde, başka bir işlemin bu dosyayı açmaya çalışmadığından emin olmam gerekir.
Havuzun argümanlarına işlem sayısını eklemeyi deneyin (initializer = initializeLock, process = 100, initargs = (kilit,)) –
, "pool.map" yerine "pool.imap_unordered" işlevini kullanmayı düşündünüz mü? –
@SeregaLuchko Bunu denedim. Aynı şey olsa da oldu. – Gabriel