2013-05-26 22 views
64

stm, ağ-kanalı ve kanal kullanan benim GHC Haskell uygulamasında, her bir soket için runTCPServer kullanarak otomatik olarak çatallanan bir dizilim var. Teller, bir yayın TChanının kullanımıyla diğer iplikler ile iletişim kurabilir.Bir işlem borusu, aynı türdeki 2 IO kaynağı

Bu benim kanal "zincirini" kurmak istiyorum ne sergiliyor:

enter image description here

Peki, biz burada var iki kaynak bir Packet nesneyi üretmek (her yardımcı kanallarına bağlı) 'dir hangi encoder kabul edip ByteString'a dönüşecek ve ardından soketi gönderecektir. İki girdinin birleştirilmesiyle verimli (performans endişe verici) bir çok zorluk yaşadım.

Eğer birisi bana doğru yönde işaret ederse çok memnun olurum. bir girişim yapmadan bu soruyu göndermek için kabayım olurdu yana


, ben daha önce burada ne denedim koyacağım;

Bir TMChan'dan (kapatılabilir kanal) bir Kaynak üreten (bloke eden) bir işlevi yazdım/yayınladım;

-- | Takes a generic type of STM chan and, given read and close functionality, 
-- returns a conduit 'Source' which consumes the elements of the channel. 
chanSource 
    :: (MonadIO m, MonadSTM m) 
    => a     --^The channel 
    -> (a -> STM (Maybe b)) --^The read function 
    -> (a -> STM())  --^The close/finalizer function 
    -> Source m b 
chanSource ch readCh closeCh = ConduitM pull 
    where close  = liftSTM $ closeCh ch 
      pull  = PipeM $ liftSTM $ readCh ch >>= translate 
      translate = return . maybe (Done()) (HaveOutput pull close) 

Benzer şekilde, bir Chan'ı bir lavaboya dönüştürmek için bir işlev;

-- | Takes a stream and, given write and close functionality, returns a sink 
-- which wil consume elements and broadcast them into the channel 
chanSink 
    :: (MonadIO m, MonadSTM m) 
    => a     --^The channel 
    -> (a -> b -> STM()) --^The write function 
    -> (a -> STM())  --^The close/finalizer function 
    -> Sink b m() 
chanSink ch writeCh closeCh = ConduitM sink 
    where close = const . liftSTM $ closeCh ch 
      sink = NeedInput push close 
      write = liftSTM . writeCh ch 
      push x = PipeM $ write x >> return sink 

Daha sonra birleştirme kaynakları basittir; 2 maddeyi (gerçekten yapmak istemediğim, ama ne halt), yeni eşyalarını daha sonra bir kaynak ürettiğim bir listeye koyabiliyorum; Ben bu işlevler typecheck yapımında başarılı iken

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns 
-- a source which consumes the elements of the channel. 
mergeSources 
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m) 
    => [Source (ResourceT m) a]    --^The list of sources 
    -> ResourceT m (Source (ResourceT m) a) 
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn 
    where push c s = s $$ chanSink c writeTMChan closeTMChan 
      fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x 
      retn c = return $ chanSource c readTMChan closeTMChan 

, ben typecheck için bu fonksiyonların herhangi kullanımını almakla başarısız oldu;

-- | Helper which represents a conduit chain for each client connection 
serverApp :: Application SessionIO 
serverApp appdata = do 
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast 
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata 
    mergsrc $$ protocol $= encoder =$ appSink appdata 
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan 
      mergsrc = mergeSources [appSource appdata $= decoder, chansrc] 

-- | Structure which holds mutable information for clients 
data SessionState = SessionState 
    { _ssBroadcast  :: TMChan Packet --^Outbound packet broadcast channel 
    } 

makeLenses ''SessionState 

-- | A transformer encompassing both SessionReader and SessionState 
type Session m = ReaderT SessionReader (StateT SessionState m) 

-- | Macro providing Session applied to an IO monad 
type SessionIO = Session IO 

Bu yöntemin her nasılsa kusurlu olduğunu görüyorum - birçok ara liste ve dönüşüm var. Bu performans için iyi olamaz. Rehberlik istemek.


PS. Anlayabildiğimden, bunun bir kopyası değil; Fusing conduits with multiple inputs, benim durumumda olduğu gibi her iki kaynak da aynı tür üretiyor ve Packet nesnesinin hangi kaynaktan üretildiğini umurumda değilken, beklemediğim sürece bir başkası tüketilmeye hazır nesneler var.

PPS. Lens'in örnek kod kullanımından (ve dolayısıyla bilgi ihtiyacından) dolayı özür dilerim.

+2

'stm-conduit' paketinden' Data.Conduit.TMChan 'kullanmadığınız bir neden var mı? "Birleştirme Kaynakları" dahil olmak üzere tanımladığınız tüm işlevlere sahiptir. –

+0

Aslında var - Kaynakların kapanmasıyla birlikte her ikisini de birleştiren kaynağı istiyorum. Stm-conduit paketi, refcount'lar kullanır (ve istenen kaynağı kapatmak için en son kaynağın kapanmasını bekler). Her iki kaynak geçersiz olduktan sonra hemen kapatarak, bana tüm TMChan'ı kapattığımda, her soketi de zamanında kapatmamı sağlar. – kvanberendonck

+3

Boşta bir düşünce: Eğer TMChan birleştirme kaynakları yakalarsanız, ref sayma maddelerini dışarı atın ve tüm kaynakları kapatmak için decRefCount refcount biti kodu ile değiştirin ne olur? – Iain

cevap

1

herhangi bir yardım olmadığını bilmiyorum, ama ben Iain'ın öneri uygulamaya çalıştı ve kısa sürede kanallardan herhangi yaptığı gibi durur o mergeSources' bir varyantını yaptı:

mergeSources' :: (MonadIO m, MonadBaseControl IO m) 
       => [Source (ResourceT m) a] --^The sources to merge. 
       -> Int --^The bound of the intermediate channel. 
       -> ResourceT m (Source (ResourceT m) a) 
mergeSources' sx bound = do 
    c <- liftSTM $ newTBMChan bound 
    mapM_ (\s -> resourceForkIO $ 
        s $$ chanSink c writeTBMChan closeTBMChan) sx 
    return $ sourceTBMChan c 

(Bu basit bir ek olduğunu mevcut here).mergeSources sürümü için

Bazı yorumlar (inanmayarak ile götürün, ben de bir şey anlamadı olabilir):

  • yerine ...TBMChan ait ...TMChan tehlikeli görünüyor kullanma. Yazarlar okuyucudan daha hızlıysa, yığınınız patlayacak. Diyagramınıza bakıldığında, TCP akranız yeterince hızlı veri okumadığı takdirde, bunun kolayca gerçekleşebileceği görülmektedir. Bu yüzden ...TBMChan kesinlikle büyük ama sınırlı bir sınır ile kullanın.
  • MonadSTM m kısıtlamasına ihtiyacınız yoktur. Tüm STM şeyler serverApp yılında mergeSources' kullanırken Belki bu biraz yardımcı olacaktır

    liftSTM = liftIO . atomically 
    

    ile IO içine sarılır. Sadece kozmetik bir sorun

  • , ben nedeniyle (->) r monadın üzerinde liftA2 onun kullanımına

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn 
    

    okumak çok zor

    bulundu.

    daha uzun olurdu, ama okumak daha kolay olurdu.

belki de serverApp oynamak mümkün olacak bir kendi kendine yeten bir proje oluşturmak olabilir mi?

+0

Tavsiye için teşekkürler. Aklımda tutulacağım (yakında sorunu tekrar gözden geçirmem gerekecek). – kvanberendonck