2012-06-27 23 views
10

Bir dosyaya yazılan verileri gerçek zamanlı olarak, node.js kullanarak okumak için en iyi yolu bulmalıyım. Sorun şu ki, Node zor bir problemi ele almak için en iyi yöntemi bulmakta kullanılan hızlı hareket eden bir gemidir.Node.js'yi kullanarak bir dosyayı gerçek zamanlı okuma

Ben bunu bir metin dosyasına yapar bu şey sonuçlarını yazmanın bir şey yapıyor ve bir java süreç var
Do İstiyor Ne. Genellikle 5 dakikadan 5 saate kadar her şeyi alır, veriler tüm zaman boyunca yazılır ve oldukça yüksek verim oranlarına (yaklaşık 1000 satır/saniye) kadar çıkabilir.

Bu dosyayı gerçek zamanlı olarak okumak istiyorum ve ardından düğümü kullanarak verileri birleştirip istemciye çizilebilecek bir yuvaya yaz.

İstemci, grafikler, prizler ve toplama mantığı tamamlandı ancak dosyayı okumak için en iyi yaklaşım konusunda kafam karıştı.

Ben denedim (veya en azından ile oynanan) mı ne
FIFO - Bunun bu anda Perl kullanarak implemted nasıl aslında bu kullanarak düğüm için bir fifo yazmak ve okumak için benim Java sürecini söyleyebilir Ancak, diğer her şey düğümde çalıştığı için kodun üzerinden geçilmesi mantıklıdır.

Unix Sockets - Yukarıdaki gibi.

fs.watchFile - Bu, ihtiyacımız olan şey için işe yarayacak mı?

fs.createReadStream - bu watchFile'dan daha mı iyi? Bir kesmek gibi görünüyor.

, aslında Unix Yuva yönelmek ediyorum benim Soru
nedir, bu hızlı seçenek gibi görünüyor. Ancak, düğümü gerçek zamanlı olarak dosyaları okumak için daha iyi yerleşik özellikler var mı?

cevap

6

ağınızdaki bir sistem çökmesi veya üyelerden birinin durumunda akışının bir kaybını önlemek için verilerinizin kalıcı deposu olarak dosyayı tutmak istiyorsanız Çalışan süreçlerin ölmesi durumunda, bir dosyaya yazmaya devam edip ondan okumaya devam edebilirsiniz.

Bu dosyayı, Java işleminizden üretilen sonuçların kalıcı bir şekilde saklanması gerekmiyorsa, Unix soketine sahip olmak hem kolaylık hem de performans açısından çok daha iyidir.

fs.watchFile() dosya sistemi bunu raporları gibi dosya istatistikleri üzerinde çalıştığı için size gereken ve bunu zaten yazılırken dosyayı okumak istiyorum, çünkü istediğin bu değil ne değildir.

KISA GÜNCELLEME: Ben önceki paragrafta dosya istatistikleri kullanarak fs.watchFile() itham rağmen, aşağıda benim örnek kodda aynı şeyi kendim yapmış olduğunu fark etmek çok üzgünüm! Her ne kadar okuyucuları "dikkat et" konusunda uyarmıştım. çünkü iyi test etmeden sadece birkaç dakika içinde yazmıştım; hala, sistem destekliyorsa watchFile veya fstatSync yerine fs.watch() kullanarak daha iyi yapılabilir. Bir dosyadan yazma/okuma için

, ben sadece benim mola eğlenmek için aşağıda yazdım:

test fs-writer.js: Eğer Java dosyası yazmak beri [Buna gerek kalmayacak süreç]

var fs = require('fs'), 
    lineno=0; 

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); 

stream.on('open', function() { 
    console.log('Stream opened, will start writing in 2 secs'); 
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); 
}); 

test fs-reader.js: [Kendine iyi bak, bu sadece gösteri err nesneleri kontrol dir]

var fs = require('fs'), 
    bite_size = 256, 
    readbytes = 0, 
    file; 

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); 

function readsome() { 
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense! 
    if(stats.size<readbytes+1) { 
     console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); 
     setTimeout(readsome, 3000); 
    } 
    else { 
     fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); 
    } 
} 

function processsome(err, bytecount, buff) { 
    console.log('Read', bytecount, 'and will process it now.'); 

    // Here we will process our incoming data: 
     // Do whatever you need. Just be careful about not using beyond the bytecount in buff. 
     console.log(buff.toString('utf-8', 0, bytecount)); 

    // So we continue reading from where we left: 
    readbytes+=bytecount; 
    process.nextTick(readsome); 
} 
!

nextTick'u güvenle kullanmaktan kaçının ve bunun yerine readsome() numaralı telefonu arayın. Hala burada senkronizasyon yaptığımız için, herhangi bir anlamda gerekli değildir. Sadece beğendim. : P

DÜZENLEME Oliver Lloyd

Yukarıdaki örnek alınarak ancak CSV veri okumak için genişleterek verir:

var lastLineFeed, 
    lineArray; 
function processsome(err, bytecount, buff) { 
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); 

    if(lastLineFeed > -1){ 

     // Split the buffer by line 
     lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); 

     // Then split each line by comma 
     for(i=0;i<lineArray.length;i++){ 
      // Add read rows to an array for use elsewhere 
      valueArray.push(lineArray[i].split(',')); 
     } 

     // Set a new position to read from 
     readbytes+=lastLineFeed+1; 
    } else { 
     // No complete lines were read 
     readbytes+=bytecount; 
    } 
    process.nextTick(readFile); 
} 
+0

Bu, sorumu doğrudan adresleyen iyi bir örnektir. Her seferinde sadece bir satırı işlemek için geliştirmeyi gerektirir, ancak tartışmasız bu iyi bir şeydir; Düğümün mevcut fs arabiriminin olmaması, tamamen kodlanabilir olması gerektiğinden, tam kod yazmam gerekirse bile, tam ihtiyacım olanı elde edebilirim. –

+0

Yukarıdaki örneği bir CSV dosyasıyla çalışmak için genişlettim. –

+0

Bu, düğümü olarak çalıştırıldığında kesinlikle çalışır, ancak bu kodu app.js'ye nasıl koyabilirim ve sonucu html sayfasında alabilir miyim? – sand

4

Neden tail -f hack olduğunu düşünüyorsunuz?

İyi bir örnek bulduğumda, benzer bir şey yaparım. node.js ve WebSocket'e ile Gerçek zamanlı çevrimiçi aktivite monitör örneği:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

Sadece sana 0.8.0 altında çalışır bir örnek kod yazdım, bu yanıt eksiksiz yapmak için - (http sunucusu kesmek belki de).

bir çocuk süreç kuyruğu ile çalışan kökenli ve çocuk süreci üç akışları ile EventEmitter (biz bizim durumumuzda stdout'u kullanın) olduğundan sadece on

dosya adıyla bir dinleyici ekleyebilirsiniz: tailServer.js

kullanımı: node tailServer /var/log/filename.log

var http = require("http"); 
var filename = process.argv[2]; 


if (!filename) 
    return console.log("Usage: node tailServer filename"); 

var spawn = require('child_process').spawn; 
var tail = spawn('tail', ['-f', filename]); 

http.createServer(function (request, response) { 
    console.log('request starting...'); 

    response.writeHead(200, {'Content-Type': 'text/plain' }); 

    tail.stdout.on('data', function (data) { 
     response.write('' + data);     
    }); 
}).listen(8088); 

console.log('Server running at http://127.0.0.1:8088/'); 
+0

tail-f ile Benim endişe dosya yazılmadan önce veri olup değilse okuma süreci, aktif olmasını gerektirir olmasıdır kayıp. Kullanım durumum, verilerin yazılmasından sonra okunmuş olabileceği şekildedir. Güncellemeye +1 için +1, ancak yazmanın ve okunmanın aynı kaynaktan kontrol edildiği yer için iyi bir çözüm. –

+0

watchFile da olay odaklı, ancak belgelere göre nost kararlı. Yukarıdaki örnek, üst düzey kodda yoklama yoluyla dosya değişikliklerini yönetir. Benim için kesmek gibi görünüyor. Ama senin için işe yarıyorsa, bunu yapmak çok güzel. Aksi halde, eğer mevcut değilse dosyaya dokunabilirdiniz ve herhangi bir veri kaybetmezdiniz ve wc -l message.text | awk '{print $ 1}' 've' tail -f -n 'ye verin – vik

1

bu modül prensibi @hasanyasin bir uygulamasıdır göstermektedir olup:

https://github.com/felixge/node-growing-file

+0

Teşekkürler, bu burada iyi çalışır gibi görünüyor ve felixge'in diğer projeleri sağlam olduğundan bu modülü denemekten mutluluk duyuyorum. –

0

@hasanyasin'den cevabı aldım ve sardım Modüler bir söze kadar. Temel fikir, dosyadan okunan dizilmiş arabelleğe sahip bir şey yapan bir dosya ve işleyici işlevini iletmenizdir. İşleyici işlevi true değerini döndürürse, dosya okunmayı durdurur. İşleyici yeterince hızlı dönmüyorsa, okumayı öldürecek bir zaman aşımı da ayarlayabilirsiniz.

Çözüm(), zaman aşımı nedeniyle çağrılırsa, söz vereni true olarak döner, aksi halde false döner.

Kullanım örneği için alt kısma bakın.

// https://stackoverflow.com/a/11233045 

var fs = require('fs'); 
var Promise = require('promise'); 

class liveReaderPromiseMe { 
    constructor(file, buffStringHandler, opts) { 
     /* 
      var opts = { 
       starting_position: 0, 
       byte_size: 256, 
       check_for_bytes_every_ms: 3000, 
       no_handler_resolution_timeout_ms: null 
      }; 
     */ 

     if (file == null) { 
      throw new Error("file arg must be present"); 
     } else { 
      this.file = file; 
     } 

     if (buffStringHandler == null) { 
      throw new Error("buffStringHandler arg must be present"); 
     } else { 
      this.buffStringHandler = buffStringHandler; 
     } 

     if (opts == null) { 
      opts = {}; 
     } 

     if (opts.starting_position == null) { 
      this.current_position = 0; 
     } else { 
      this.current_position = opts.starting_position; 
     } 

     if (opts.byte_size == null) { 
      this.byte_size = 256; 
     } else { 
      this.byte_size = opts.byte_size; 
     } 

     if (opts.check_for_bytes_every_ms == null) { 
      this.check_for_bytes_every_ms = 3000; 
     } else { 
      this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; 
     } 

     if (opts.no_handler_resolution_timeout_ms == null) { 
      this.no_handler_resolution_timeout_ms = null; 
     } else { 
      this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; 
     } 
    } 


    startHandlerTimeout() { 
     if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { 
      var that = this; 
      this._handlerTimer = setTimeout(
       function() { 
        that._is_handler_timed_out = true; 
       }, 
       this.no_handler_resolution_timeout_ms 
      ); 
     } 
    } 

    clearHandlerTimeout() { 
     if (this._handlerTimer != null) { 
      clearTimeout(this._handlerTimer); 
      this._handlerTimer = null; 
     } 
     this._is_handler_timed_out = false; 
    } 

    isHandlerTimedOut() { 
     return !!this._is_handler_timed_out; 
    } 


    fsReadCallback(err, bytecount, buff) { 
     try { 
      if (err) { 
       throw err; 
      } else { 
       this.current_position += bytecount; 
       var buff_str = buff.toString('utf-8', 0, bytecount); 

       var that = this; 

       Promise.resolve().then(function() { 
        return that.buffStringHandler(buff_str); 
       }).then(function(is_handler_resolved) { 
        if (is_handler_resolved) { 
         that.resolve(false); 
        } else { 
         process.nextTick(that.doReading.bind(that)); 
        } 
       }).catch(function(err) { 
        that.reject(err); 
       }); 
      } 
     } catch(err) { 
      this.reject(err); 
     } 
    } 

    fsRead(bytecount) { 
     fs.read(
      this.file, 
      new Buffer(bytecount), 
      0, 
      bytecount, 
      this.current_position, 
      this.fsReadCallback.bind(this) 
     ); 
    } 

    doReading() { 
     if (this.isHandlerTimedOut()) { 
      return this.resolve(true); 
     } 

     var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; 
     if (max_next_bytes) { 
      this.fsRead((this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size); 
     } else { 
      setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); 
     } 
    } 


    promiser() { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.resolve = resolve; 
      that.reject = reject; 
      that.doReading(); 
      that.startHandlerTimeout(); 
     }).then(function(was_resolved_by_timeout) { 
      that.clearHandlerTimeout(); 
      return was_resolved_by_timeout; 
     }); 
    } 
} 


module.exports = function(file, buffStringHandler, opts) { 
    try { 
     var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); 
     return live_reader.promiser(); 
    } catch(err) { 
     return Promise.reject(err); 
    } 
}; 

Sonra böyle yukarıdaki kodu kullanın:

var fs = require('fs'); 
var path = require('path'); 
var Promise = require('promise'); 
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); 

var ending_str = '_THIS_IS_THE_END_'; 
var test_path = path.join('E:/tmp/test.txt'); 

var s_list = []; 
var buffStringHandler = function(s) { 
    s_list.push(s); 
    var tmp = s_list.join(''); 
    if (-1 !== tmp.indexOf(ending_str)) { 
     // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms 
     // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true 
     return true; 
     // you can also return a promise: 
     // return Promise.resolve().then(function() { return true; }); 
    } 
}; 

var appender = fs.openSync(test_path, 'a'); 
try { 
    var reader = fs.openSync(test_path, 'r'); 
    try { 
     var options = { 
      starting_position: 0, 
      byte_size: 256, 
      check_for_bytes_every_ms: 3000, 
      no_handler_resolution_timeout_ms: 10000, 
     }; 

     liveReadAppendingFilePromiser(reader, buffStringHandler, options) 
     .then(function(did_reader_time_out) { 
      console.log('reader timed out: ', did_reader_time_out); 
      console.log(s_list.join('')); 
     }).catch(function(err) { 
      console.error('bad stuff: ', err); 
     }).then(function() { 
      fs.closeSync(appender); 
      fs.closeSync(reader); 
     }); 

     fs.write(appender, '\ncheck it out, I am a string'); 
     fs.write(appender, '\nwho killed kenny'); 
     //fs.write(appender, ending_str); 
    } catch(err) { 
     fs.closeSync(reader); 
     console.log('err1'); 
     throw err; 
    } 
} catch(err) { 
    fs.closeSync(appender); 
     console.log('err2'); 
    throw err; 
}