2014-12-31 20 views
23

Bağlantı Esneklik:RavenDB Akış - Öyle gibi 2 veritabanları arasında veri yüklemek dönüştürmek ve göçe RavenDB içinde Akış işlevini kullandığınız

var query = originSession.Query<T>(IndexForQuery); 

using (var stream = originSession.Advanced.Stream(query)) 
{ 
    while (stream.MoveNext()) 
    { 
     var streamedDocument = stream.Current.Document; 

     OpenSessionAndMigrateSingleDocument(streamedDocument); 
    } 
} 

sorun koleksiyonlarından biri milyonlarca olmasıdır satırların ve biz aşağıdaki biçimde bir IOException almaya devam:

Application: MigrateToNewSchema.exe 
Framework Version: v4.0.30319 
Description: The process was terminated due to an unhandled exception. 
Exception Info: System.IO.IOException 
Stack: 
    at System.Net.ConnectStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32) 
    at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32) 
    at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef) 
    at System.IO.StreamReader.Read(Char[], Int32, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char) 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal() 
    at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read() 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader) 
    at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext() 
    at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext() 
    at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection() 
    at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore) 
    at MigrateToNewSchema.Program.Main(System.String[]) 

Bu akış içine ve geçici bağlantı sorunları dönemin bu tür üzerinde meydana gelecektir elbette oldukça uzun bir yol olur (tamamlanması için saat sürer). Ancak, yeniden denediğimizde, Query kullanıyor olduğumuzdan, sıfırdan başlamalıyız. Sonuç olarak, tüm Stream sırasında bir bağlantı hatası varsa, tekrar denemeye ve tekrar bitene kadar tekrar denemeliyiz.

ETag akışını etkin bir şekilde yeniden başlatmak için belirli bir noktada kullanabileceğinizi biliyorum, ancak aktarılan sonuçları filtrelememiz ve doğru koleksiyonu belirtmemiz gereken Query ile aşırı yüklenme yok.

Yani, RavenDB'de, bağlantının iç esnekliğini (bağlantı dizesi özelliği, iç ayarlar vb.) Iyileştirmek veya bir hata üzerinde bir akışı etkin bir şekilde "kurtarmak" için bir yol var mı?

+0

ben keşfettim [Veri Abonelikler] (http://ravendb.net/docs/article-page/3.0/csharp/client-api/data- abonelikler/nasıl oluşturulur-veri-abonelik), RavenDb 3.0 özelliği, belirtilen ölçütlerle eşleşen bir belge koleksiyonunda yineleme yapmak için güvenilir bir mekanizma sağlar ve bıraktığınız yerden kolayca almanızı sağlar. Birisi, bu özelliğin bu soruyu nasıl yanıtlayabileceğini gösteren bazı kod örneklerini bir araya getirmeye istekli olsaydı, bu ödülün layık olduğunu düşünürdüm. – StriplingWarrior

+0

Bir sorguyu kullanarak bağlandınız mı? Daha etkisiz olmasına rağmen, bu bir göçtür, bu nedenle hafıza bir sorun değildir - neden ham doküman koleksiyonlarını yineleme ve bellek içi filtreleme, böylece bir Etag'da devam edebilirsiniz? Tüm akışları nasıl ele alıyorum, asla sorgu kullanmıyorum. – kamranicus

+0

@StriplingWarrior Bir süre oldu :-) Artık RavenDB kullanan şirket için çalışmıyorum ama bu hala beni ilgilendiriyor, bu yüzden bugün veri aboneliği koduna bir cevap yazacağım. –

cevap

2

@StriplingWarrior adlı öneriye göre, Data Subscriptions kullanarak çözümü yeniden oluşturdum.

Bu yaklaşımı kullanarak, tüm 2 milyon satırın üzerinde yineleme yapabildim (her ne kadar itiraf etmeden öğeye göre daha az işlemle);

    sadece abonelik kaldırılacak:
  1. Partiler "kuyruk" bir kez
    1. abone (çoğu standart kuyruklar gibi) kabul: Biz akışları kullanarak aynı mantık uygulamak çalışıyorduk yardım ederdi burada 2 puan IObserver<T>, ayarlanacak bu onay için başarıyla tamamlanmalıdır.
    2. Bu bilgiler müvekkilim müşteri abonelik işlenen son başarılı konumunu etkilemeden yeniden sağlar ziyade sunucu tarafından işlenir sizinle abonelikler oluşturabilir çünkü @StriplingWarrior belirtildiği gibi
    3. See here for more details
  2. mülk seviyesine kadar filtreler, aboneliğin kendisinde bir istisna durumunda daha küçük bir sonuç kümesiyle yeniden oynatılabilir.
    1. İlk nokta, gerçekten bunun yerine geçer; ama bize Akış API

test ortamı 2000000 kayıtların bir koleksiyonu karşı varsayılan ayarlarla (bir windows hizmeti olarak çalışan yerel makine,) bir RavenDB 3.0 veritabanı görmedim ek esneklik sağlar.Bu koleksiyona abone

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    using (var bulkInsert = store.BulkInsert()) 
    { 
     for (var i = 0; i != recordsToCreate; i++) 
     { 
      var person = new Person 
      { 
       Id = Guid.NewGuid(), 
       Firstname = NameGenerator.GenerateFirstName(), 
       Lastname = NameGenerator.GenerateLastName() 
      }; 

      bulkInsert.Store(person); 
     } 
    } 
} 

sonra bir durumdur:

Kod kukla kayıtlarını oluşturmak için

using (IDocumentStore store = GetDocumentStore()) 
{ 
    store.Initialize(); 

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>()); 

    var personSubscription = store.Subscriptions.Open<Person>(
     subscriptionId, new SubscriptionConnectionOptions() 
    { 
     BatchOptions = new SubscriptionBatchOptions() 
     { 
      // Max number of docs that can be sent in a single batch 
      MaxDocCount = 16 * 1024, 
      // Max total batch size in bytes 
      MaxSize = 4 * 1024 * 1024, 
      // Max time the subscription needs to confirm that the batch 
      // has been successfully processed 
      AcknowledgmentTimeout = TimeSpan.FromMinutes(3) 
     }, 
     IgnoreSubscribersErrors = false, 
     ClientAliveNotificationInterval = TimeSpan.FromSeconds(30) 
    }); 

    personSubscription.Subscribe(new PersonObserver()); 

    while (true) 
    { 
     Thread.Sleep(TimeSpan.FromMilliseconds(500)); 
    } 
} 

Not PersonObserver; Bu yüzden böyle IObserver sadece temel bir uygulamasıdır:

public class PersonObserver : IObserver<Person> 
{ 
    public void OnCompleted() 
    { 
     Console.WriteLine("Completed"); 
    } 

    public void OnError(Exception error) 
    { 
     Console.WriteLine("Error occurred: " + error.ToString()); 
    } 

    public void OnNext(Person person) 
    { 
     Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'"); 
    } 
} 
+1

Nice write-up. Bir 'Görev' (veya belirli bir 'CancellationToken' temelli bir 'Görev 'oluştur) ve' (gerçek) 'yerine görev beklemenin faydalı olduğunu buldum. Bu şekilde, çağıran kod tüm iş parçacığını veya prosesi öldürmeden operasyonu güvenli bir şekilde iptal edebilir. Ayrıca, tüm hedef belgelere "tamamlandığında" göçün bilinmesine yardımcı olmak için ETag tabanlı bir mekanizma geliştirdim, böylece kendini durdurabilir, ancak oldukça zor ve her açıdan mükemmel değildir. – StriplingWarrior