2011-07-20 10 views
8

Psycopg2'yi kullanarak aynı ağdaki iki makine arasındaki bazı verilerin bir kopyasını yapmak için bir komut dosyası yazıyorum. Benboru gönderimleri psycog2 ile python'da COPY

psql -c -h remote.host "COPY table TO STDOUT" | psql -c "COPY table FROM STDIN" 

ile kopyasını yapar bazı eski, çirkin bash yerine gidiyorum Bu kopyasını yapmak için en basit ve most efficient şekilde hem benziyor.

buf = StringIO() 

from_curs = from_conn.cursor() 
to_curs  = to_conn.cursor() 

from_curs.copy_expert("COPY table TO STDOUT", buf) 
buf.seek(0, os.SEEK_SET) 
to_curs.copy_expert("COPY table FROM STDIN", buf) 

... ama bu bellekte/diske tüm verileri kaydetme içerir: O kadar olduğu gibi, bir StringIO veya geçici-dosya ile python çoğaltmak kolay.

Böyle bir kopyada bir Unix borusunun davranışını taklit etmenin bir yolu var mı? POpen içermeyen bir unix-pipe nesnesi bulamıyorum - Belki de en iyi çözüm sadece POpen ve subprocess kullanmaktır.

+0

Aşağıdaki çözüm işe yarayıp yaramadığını os.pipe() kullanabilirsiniz. – agf

cevap

0

Sen okuma yazma desteklemek için sınıflandırma olduğunuz bir deque kullanabilirsiniz: okuyucu yazar çok daha hızlı ise

from collections import deque 
from Exceptions import IndexError 

class DequeBuffer(deque): 
    def write(self, data): 
     self.append(data) 
    def read(self): 
     try: 
      return self.popleft() 
     except IndexError: 
      return '' 

buf = DequeBuffer() 

ve tablo büyük, deque hala büyük alacak ama her şeyi saklamaktan daha küçük olacak.

Ayrıca, deque boş olduğunda, return '' boş olduğunu bilmiyorum, ancak boş olana kadar yeniden denemek yerine güvenli olduğunu bilmiyorum. Eğer işe yararsa beni bilgilendir.

Kopyalamanın tamamlandığından emin olduğunuzda, özellikle komut dosyası yalnızca bu noktada çıkmıyorsa, del buf'u unutmayın.

12

Aramalarınızdan birini ayrı bir konuya yerleştirmeniz gerekecektir.

#!/usr/bin/python 
import psycopg2 
import os 
import threading 

fromdb = psycopg2.connect("dbname=from_db") 
todb = psycopg2.connect("dbname=to_db") 

r_fd, w_fd = os.pipe() 

def copy_from(): 
    cur = todb.cursor() 
    cur.copy_from(os.fdopen(r_fd), 'table') 
    cur.close() 
    todb.commit() 

to_thread = threading.Thread(target=copy_from) 
to_thread.start() 

cur = fromdb.cursor() 
write_f = os.fdopen(w_fd, 'w') 
cur.copy_to(write_f, 'table') 
write_f.close() # or deadlock... 

to_thread.join() 
+0

Bu harika bir çözüm! Bir Thread nesnesini tanıtmanın neden gerekli olduğunu merak ediyorum. – Demitri

+3

@Demitri, 'copy_from() ve' copy_to() 'komutları engelliyor; İşlem bitene kadar geri dönmezler. İlk aramayı ana iş parçasında yapsaydık, sadece borudaki verileri beklerdik ve diğer aramayı yapmak için hiçbir zaman kontrolümüzü geri alamazdık. –

+0

Ah, anlıyorum. Yeni ipliğin üzerinde hala duracak, ancak ana ipliğin boruyu kendi boş zamanlarında beslemesine izin verin. Teşekkürler. – Demitri