從Java走進(jìn)Scala:深入了解Scala并發(fā)性
2003 年,Herb Sutter 在他的文章 “The Free Lunch Is Over” 中揭露了行業(yè)中最不可告人的一個(gè)小秘密,他明確論證了處理器在速度上的發(fā)展已經(jīng)走到了盡頭,并且將由全新的單芯片上的并行 “內(nèi)核”(虛擬 CPU)所取代。這一發(fā)現(xiàn)對(duì)編程社區(qū)造成了不小的沖擊,因?yàn)檎_創(chuàng)建線程安全的代碼,在理論而非實(shí)踐中,始終會(huì)提高高性能開(kāi)發(fā)人員的身價(jià),而讓各公司難以聘用他們??瓷先?,僅有少數(shù)人充分理解了 Java 的線程模型、并發(fā) API 以及 “同步” 的含義,以便能夠編寫(xiě)同時(shí)提供安全性和吞吐量的代碼 —— 并且大多數(shù)人已經(jīng)明白了它的困難所在。
51CTO編輯推薦:Scala編程語(yǔ)言專題
據(jù)推測(cè),行業(yè)的其余部分將自力更生,這顯然不是一個(gè)理想的結(jié)局,至少不是 IT 部門努力開(kāi)發(fā)軟件所應(yīng)得的回報(bào)。
與 Scala 在 .NET 領(lǐng)域中的姐妹語(yǔ)言 F# 相似,Scala 是針對(duì) “并發(fā)性問(wèn)題” 的解決方案之一。在本期文章中,我討論了 Scala 的一些屬性,這些屬性使它更加勝任于編寫(xiě)線程安全的代碼,比如默認(rèn)不可修改的對(duì)象,并討論了一種返回對(duì)象副本而不是修改它們內(nèi)容的***設(shè)計(jì)方案。Scala 對(duì)并發(fā)性的支持遠(yuǎn)比此深遠(yuǎn);現(xiàn)在,我們有必要來(lái)了解一下 Scala 的各種庫(kù)。
并發(fā)性基礎(chǔ)
在深入研究 Scala 的并發(fā)性支持之前,有必要確保您具備了對(duì) Java 基本并發(fā)性模型的良好理解,因?yàn)?Scala 的并發(fā)性支持,從某種程度上說(shuō),建立在 JVM 和支持庫(kù)所提供的特性和功能的基礎(chǔ)之上。為此,清單 1 中的代碼包含了一個(gè)已知的 Producer/Consumer 并發(fā)性問(wèn)題(詳見(jiàn) Sun Java Tutorial 的 “Guarded Blocks” 小節(jié))。注意,Java Tutorial 版本并未在其解決方案中使用 java.util.concurrent 類,而是擇優(yōu)使用了 java.lang.Object 中的較舊的 wait()/notifyAll() 方法:
清單 1. Producer/Consumer(Java5 之前)
- package com.tedneward.scalaexamples.notj5;
- class Producer implements Runnable
- {
- private Drop drop;
- private String importantInfo[] = {
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- };
- public Producer(Drop drop) { this.drop = drop; }
- public void run()
- {
- for (int i = 0; i < importantInfo.length; i++)
- {
- drop.put(importantInfo[i]);
- }
- drop.put("DONE");
- }
- }
- class Consumer implements Runnable
- {
- private Drop drop;
- public Consumer(Drop drop) { this.drop = drop; }
- public void run()
- {
- for (String message = drop.take(); !message.equals("DONE");
- message = drop.take())
- {
- System.out.format("MESSAGE RECEIVED: %s%n", message);
- }
- }
- }
- class Drop
- {
- //Message sent from producer to consumer.
- private String message;
- //True if consumer should wait for producer to send message,
- //false if producer should wait for consumer to retrieve message.
- private boolean empty = true;
- //Object to use to synchronize against so as to not "leak" the
- //"this" monitor
- private Object lock = new Object();
- public String take()
- {
- synchronized(lock)
- {
- //Wait until message is available.
- while (empty)
- {
- try
- {
- lock.wait();
- }
- catch (InterruptedException e) {}
- }
- //Toggle status.
- empty = true;
- //Notify producer that status has changed.
- lock.notifyAll();
- return message;
- }
- }
- public void put(String message)
- {
- synchronized(lock)
- {
- //Wait until message has been retrieved.
- while (!empty)
- {
- try
- {
- lock.wait();
- } catch (InterruptedException e) {}
- }
- //Toggle status.
- empty = false;
- //Store message.
- this.message = message;
- //Notify consumer that status has changed.
- lock.notifyAll();
- }
- }
- }
- public class ProdConSample
- {
- public static void main(String[] args)
- {
- Drop drop = new Drop();
- (new Thread(new Producer(drop))).start();
- (new Thread(new Consumer(drop))).start();
- }
- }
Java 教程 “缺陷”
好奇的讀者可能會(huì)將此處的代碼與 Java Tutorial 中的代碼進(jìn)行比較,尋找它們之間有哪些不同;他們會(huì)發(fā)現(xiàn)我并未 “同步” put 和 take 方法,而是使用了存儲(chǔ)在 Drop 中的 lock 對(duì)象。其原因非常簡(jiǎn)單:對(duì)象的監(jiān)測(cè)程序永遠(yuǎn)都不會(huì)封裝在類的內(nèi)部,因此 Java Tutorial 版本允許此代碼打破此規(guī)則(顯然很瘋狂):
- public class ProdConSample
- {
- public static void main(String[] args)
- {
- Drop drop = new Drop();
- (new Thread(new Producer(drop))).start();
- (new Thread(new Consumer(drop))).start();
- synchronized(drop)
- {
- Thread.sleep(60 * 60 * 24 * 365 * 10); // sleep for 10 years?!?
- }
- }
- }
通過(guò)使用私有對(duì)象作為鎖定所依托的監(jiān)測(cè)程序,此代碼將不會(huì)有任何效果。從本質(zhì)上說(shuō),現(xiàn)在已經(jīng)封裝了線程安全的實(shí)現(xiàn);然后,它才能依賴客戶機(jī)的優(yōu)勢(shì)正常運(yùn)行。
注意:我在此處展示的代碼對(duì) Sun 教程解決方案做了少許修改;它們提供的代碼存在一個(gè)很小的設(shè)計(jì)缺陷(參見(jiàn) Java 教程 “缺陷”)。
Producer/Consumer 問(wèn)題的核心非常容易理解:一個(gè)(或多個(gè))生產(chǎn)者實(shí)體希望將數(shù)據(jù)提供給一個(gè)(或多個(gè))使用者實(shí)體供它們使用和操作(在本例中,它包括將數(shù)據(jù)打印到控制臺(tái))。Producer 和 Consumer 類是相應(yīng)直觀的 Runnable-實(shí)現(xiàn)類:Producer 從數(shù)組中獲取 String,并通過(guò) put 將它們放置到 Consumer 的緩沖區(qū)中,并根據(jù)需要執(zhí)行 take。
問(wèn)題的難點(diǎn)在于,如果 Producer 運(yùn)行過(guò)快,則數(shù)據(jù)在覆蓋時(shí)可能會(huì)丟失;如果 Consumer 運(yùn)行過(guò)快,則當(dāng) Consumer 讀取相同的數(shù)據(jù)兩次時(shí),數(shù)據(jù)可能會(huì)得到重復(fù)處理。緩沖區(qū)(在 Java Tutorial 代碼中稱作 Drop)將確保不會(huì)出現(xiàn)這兩種情況。數(shù)據(jù)破壞的可能性就更不用提了(在 String 引用的例子中很困難,但仍然值得注意),因?yàn)閿?shù)據(jù)會(huì)由 put 放入緩沖區(qū),并由 take 取出。
關(guān)于此主題的全面討論請(qǐng)閱讀 Brian Goetz 的 Java Concurrency in Practice 或 Doug Lea 的 Concurrent Programming in Java(參見(jiàn) 參考資料),但是,在應(yīng)用 Scala 之前有必要快速了解一下此代碼的運(yùn)行原理。
當(dāng) Java 編譯器看到 synchronized 關(guān)鍵字時(shí),它會(huì)在同步塊的位置生成一個(gè) try/finally 塊,其頂部包括一個(gè) monitorenter 操作碼,并且 finally 塊中包括一個(gè) monitorexit 操作碼,以確保監(jiān)控程序(Java 的原子性基礎(chǔ))已經(jīng)發(fā)布,而與代碼退出的方式無(wú)關(guān)。因此,Drop 中的 put 代碼將被重寫(xiě),如清單 2 所示:
清單 2. 編譯器失效后的 Drop.put
- // This is pseudocode
- public void put(String message)
- {
- try
- {
- monitorenter(lock)
- //Wait until message has been retrieved.
- while (!empty)
- {
- try
- {
- lock.wait();
- } catch (InterruptedException e) {}
- }
- //Toggle status.
- empty = false;
- //Store message.
- this.message = message;
- //Notify consumer that status has changed.
- lock.notifyAll();
- }
- finally
- {
- monitorexit(lock)
- }
- }
wait() 方法將通知當(dāng)前線程進(jìn)入非活動(dòng)狀態(tài),并等待另一個(gè)線對(duì)該對(duì)象調(diào)用 notifyAll()。然后,通知的線程必須在能夠繼續(xù)執(zhí)行的時(shí)候嘗試再次獲取監(jiān)控程序。從本質(zhì)上說(shuō),wait() 和 notify()/notifyAll() 允許一種簡(jiǎn)單的信令機(jī)制,它允許 Drop 在 Producer 和 Consumer 線程之間進(jìn)行協(xié)調(diào),每個(gè) put 都有相應(yīng)的 take。
本文的 代碼下載 部分使用 Java5 并發(fā)性增強(qiáng)(Lock 和 Condition 接口以及 ReentrantLock 鎖定實(shí)現(xiàn))提供 清單 2 的基于超時(shí)的版本,但基本代碼模式仍然相同。這就是問(wèn)題所在:編寫(xiě)清單 2 這樣的代碼的開(kāi)發(fā)人員需要過(guò)度專注于線程和鎖定的細(xì)節(jié)以及低級(jí)實(shí)現(xiàn)代碼,以便讓它們能夠正確運(yùn)行。此外,開(kāi)發(fā)人員需要對(duì)每一行代碼刨根知底,以確定是否需要保護(hù)它們,因?yàn)檫^(guò)度同步與過(guò)少同步同樣有害。
現(xiàn)在,我們來(lái)看到 Scala 替代方案。
#p#
良好的 Scala 并發(fā)性 (v1)
開(kāi)始應(yīng)用 Scala 并發(fā)性的一種方法是將 Java 代碼直接轉(zhuǎn)換為 Scala,以便利用 Scala 的語(yǔ)法優(yōu)勢(shì)來(lái)簡(jiǎn)化代碼(至少能簡(jiǎn)化一點(diǎn)):
清單 3. ProdConSample (Scala)
- object ProdConSample
- {
- class Producer(drop : Drop)
- extends Runnable
- {
- val importantInfo : Array[String] = Array(
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- );
- override def run() : Unit =
- {
- importantInfo.foreach((msg) => drop.put(msg))
- drop.put("DONE")
- }
- }
- class Consumer(drop : Drop)
- extends Runnable
- {
- override def run() : Unit =
- {
- var message = drop.take()
- while (message != "DONE")
- {
- System.out.format("MESSAGE RECEIVED: %s%n", message)
- message = drop.take()
- }
- }
- }
- class Drop
- {
- var message : String = ""
- var empty : Boolean = true
- var lock : AnyRef = new Object()
- def put(x: String) : Unit =
- lock.synchronized
- {
- // Wait until message has been retrieved
- await (empty == true)
- // Toggle status
- empty = false
- // Store message
- message = x
- // Notify consumer that status has changed
- lock.notifyAll()
- }
- def take() : String =
- lock.synchronized
- {
- // Wait until message is available.
- await (empty == false)
- // Toggle status
- empty=true
- // Notify producer that staus has changed
- lock.notifyAll()
- // Return the message
- message
- }
- private def await(cond: => Boolean) =
- while (!cond) { lock.wait() }
- }
- def main(args : Array[String]) : Unit =
- {
- // Create Drop
- val drop = new Drop();
- // Spawn Producer
- new Thread(new Producer(drop)).start();
- // Spawn Consumer
- new Thread(new Consumer(drop)).start();
- }
- }
Producer 和 Consumer 類幾乎與它們的 Java 同類相同,再一次擴(kuò)展(實(shí)現(xiàn))了 Runnable 接口并覆蓋了 run() 方法,并且 — 對(duì)于 Producer 的情況 — 分別使用了內(nèi)置迭代方法來(lái)遍歷 importantInfo 數(shù)組的內(nèi)容。(實(shí)際上,為了讓它更像 Scala,importantInfo 可能應(yīng)該是一個(gè) List 而不是 Array,但在***次嘗試時(shí),我希望盡可能保證它們與原始 Java 代碼一致。)
Drop 類同樣類似于它的 Java 版本。但 Scala 中有一些例外,“synchronized” 并不是關(guān)鍵字,它是針對(duì) AnyRef 類定義的一個(gè)方法,即 Scala “所有引用類型的根”。這意味著,要同步某個(gè)特定的對(duì)象,您只需要對(duì)該對(duì)象調(diào)用同步方法;在本例中,對(duì) Drop 上的 lock 字段中所保存的對(duì)象調(diào)用同步方法。
注意,我們?cè)?await() 方法定義的 Drop 類中還利用了一種 Scala 機(jī)制:cond 參數(shù)是等待計(jì)算的代碼塊,而不是在傳遞給該方法之前進(jìn)行計(jì)算。在 Scala 中,這被稱作 “call-by-name”;此處,它是一種實(shí)用的方法,可以捕獲需要在 Java 版本中表示兩次的條件等待邏輯(分別用于 put 和 take)。
***,在 main() 中,創(chuàng)建 Drop 實(shí)例,實(shí)例化兩個(gè)線程,使用 start() 啟動(dòng)它們,然后在 main() 的結(jié)束部分退出,相信 JVM 會(huì)在 main() 結(jié)束之前啟動(dòng)這兩個(gè)線程。(在生產(chǎn)代碼中,可能無(wú)法保證這種情況,但對(duì)于這樣的簡(jiǎn)單的例子,99.99% 沒(méi)有問(wèn)題。)
但是,已經(jīng)說(shuō)過(guò),仍然存在相同的基本問(wèn)題:程序員仍然需要過(guò)分擔(dān)心兩個(gè)線程之間的通信和協(xié)調(diào)問(wèn)題。雖然一些 Scala 機(jī)制可以簡(jiǎn)化語(yǔ)法,但這目前為止并沒(méi)有相當(dāng)大的吸引力。
Scala 并發(fā)性 v2
Scala Library Reference 中有一個(gè)有趣的包:scala.concurrency。這個(gè)包包含許多不同的并發(fā)性結(jié)構(gòu),包括我們即將利用的 MailBox 類。
顧名思義,MailBox 從本質(zhì)上說(shuō)就是 Drop,用于在檢測(cè)之前保存數(shù)據(jù)塊的單槽緩沖區(qū)。但是,MailBox ***的優(yōu)勢(shì)在于它將發(fā)送和接收數(shù)據(jù)的細(xì)節(jié)完全封裝到模式匹配和 case 類中,這使它比簡(jiǎn)單的 Drop(或 Drop 的多槽數(shù)據(jù)保存類 java.util.concurrent.BoundedBuffer)更加靈活。
清單 4. ProdConSample, v2 (Scala)
- package com.tedneward.scalaexamples.scala.V2
- {
- import concurrent.{MailBox, ops}
- object ProdConSample
- {
- class Producer(drop : Drop)
- extends Runnable
- {
- val importantInfo : Array[String] = Array(
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- );
- override def run() : Unit =
- {
- importantInfo.foreach((msg) => drop.put(msg))
- drop.put("DONE")
- }
- }
- class Consumer(drop : Drop)
- extends Runnable
- {
- override def run() : Unit =
- {
- var message = drop.take()
- while (message != "DONE")
- {
- System.out.format("MESSAGE RECEIVED: %s%n", message)
- message = drop.take()
- }
- }
- }
- class Drop
- {
- private val m = new MailBox()
- private case class Empty()
- private case class Full(x : String)
- m send Empty() // initialization
- def put(msg : String) : Unit =
- {
- m receive
- {
- case Empty() =>
- m send Full(msg)
- }
- }
- def take() : String =
- {
- m receive
- {
- case Full(msg) =>
- m send Empty(); msg
- }
- }
- }
- def main(args : Array[String]) : Unit =
- {
- // Create Drop
- val drop = new Drop()
- // Spawn Producer
- new Thread(new Producer(drop)).start();
- // Spawn Consumer
- new Thread(new Consumer(drop)).start();
- }
- }
- }
此處,v2 和 v1 之間的惟一區(qū)別在于 Drop 的實(shí)現(xiàn),它現(xiàn)在利用 MailBox 類處理傳入以及從 Drop 中刪除的消息的阻塞和信號(hào)事務(wù)。(我們可以重寫(xiě) Producer 和 Consumer,讓它們直接使用 MailBox,但考慮到簡(jiǎn)單性,我們假定希望保持所有示例中的 Drop API 相一致。)使用 MailBox 與使用典型的 BoundedBuffer(Drop)稍有不同,因此我們來(lái)仔細(xì)看看其代碼。
MailBox 有兩個(gè)基本操作:send 和 receive。receiveWithin 方法僅僅是基于超時(shí)的 receive。MailBox 接收任何類型的消息。send() 方法將消息放置到郵箱中,并立即通知任何關(guān)心該類型消息的等待接收者,并將它附加到一個(gè)消息鏈表中以便稍后檢索。receive() 方法將阻塞,直到接收到對(duì)于功能塊合適的消息。
因此,在這種情況下,我們將創(chuàng)建兩個(gè) case 類,一個(gè)不包含任何內(nèi)容(Empty),這表示 MailBox 為空,另一個(gè)包含消息數(shù)據(jù)(Full。
put 方法,由于它會(huì)將數(shù)據(jù)放置在 Drop 中,對(duì) MailBox 調(diào)用 receive() 以查找 Empty 實(shí)例,因此會(huì)阻塞直到發(fā)送 Empty。此時(shí),它發(fā)送一個(gè) Full 實(shí)例給包含新數(shù)據(jù)的 MailBox。
take 方法,由于它會(huì)從 Drop 中刪除數(shù)據(jù),對(duì) MailBox 調(diào)用 receive() 以查找 Full 實(shí)例,提取消息(再次得益于模式匹配從 case 類內(nèi)部提取值并將它們綁到本地變量的能力)并發(fā)送一個(gè) Empty 實(shí)例給 MailBox。
不需要明確的鎖定,并且不需要考慮監(jiān)控程序。
#p#
Scala 并發(fā)性 v3
事實(shí)上,我們可以顯著縮短代碼,只要 Producer 和 Consumer 不需要功能全面的類(此處便是如此) — 兩者從本質(zhì)上說(shuō)都是 Runnable.run() 方法的瘦包裝器,Scala 可以使用 scala.concurrent.ops 對(duì)象的 spawn 方法來(lái)實(shí)現(xiàn),如清單 5 所示:
清單 5. ProdConSample, v3 (Scala)
- package com.tedneward.scalaexamples.scala.V3
- {
- import concurrent.MailBox
- import concurrent.ops._
- object ProdConSample
- {
- class Drop
- {
- private val m = new MailBox()
- private case class Empty()
- private case class Full(x : String)
- m send Empty() // initialization
- def put(msg : String) : Unit =
- {
- m receive
- {
- case Empty() =>
- m send Full(msg)
- }
- }
- def take() : String =
- {
- m receive
- {
- case Full(msg) =>
- m send Empty(); msg
- }
- }
- }
- def main(args : Array[String]) : Unit =
- {
- // Create Drop
- val drop = new Drop()
- // Spawn Producer
- spawn
- {
- val importantInfo : Array[String] = Array(
- "Mares eat oats",
- "Does eat oats",
- "Little lambs eat ivy",
- "A kid will eat ivy too"
- );
- importantInfo.foreach((msg) => drop.put(msg))
- drop.put("DONE")
- }
- // Spawn Consumer
- spawn
- {
- var message = drop.take()
- while (message != "DONE")
- {
- System.out.format("MESSAGE RECEIVED: %s%n", message)
- message = drop.take()
- }
- }
- }
- }
- }
spawn 方法(通過(guò)包塊頂部的 ops 對(duì)象導(dǎo)入)接收一個(gè)代碼塊(另一個(gè) by-name 參數(shù)示例)并將它包裝在匿名構(gòu)造的線程對(duì)象的 run() 方法內(nèi)部。事實(shí)上,并不難理解 spawn 的定義在 ops 類的內(nèi)部是什么樣的:
清單 6. scala.concurrent.ops.spawn()
- def spawn(p: => Unit) = {
- val t = new Thread() { override def run() = p }
- t.start()
- }
……這再一次強(qiáng)調(diào)了 by-name 參數(shù)的強(qiáng)大之處。
ops.spawn 方法的一個(gè)缺點(diǎn)在于,它是在 2003 年 Java 5 concurrency 類還不可用的時(shí)候編寫(xiě)的。特別是,java.util.concurrent.Executor 及其同類的作用是讓開(kāi)發(fā)人員更加輕松地生成線程,而不需要實(shí)際處理直接創(chuàng)建線程對(duì)象的細(xì)節(jié)。幸運(yùn)的是,在您自己的自定義庫(kù)中重新創(chuàng)建 spawn 的定義是相當(dāng)簡(jiǎn)單的,這需要利用 Executor(或 ExecutorService 或 ScheduledExecutorService)來(lái)執(zhí)行線程的實(shí)際啟動(dòng)任務(wù)。
事實(shí)上,Scala 的并發(fā)性支持超越了 MailBox 和 ops 類;Scala 還支持一個(gè)類似的 “Actors” 概念,它使用了與 MailBox 所采用的方法相類似的消息傳遞方法,但應(yīng)用更加全面并且靈活性也更好。但是,這部分內(nèi)容將在下期討論。
結(jié)束語(yǔ)
Scala 為并發(fā)性提供了兩種級(jí)別的支持,這與其他與 Java 相關(guān)的主題極為類似:
首先,對(duì)底層庫(kù)的完全訪問(wèn)(比如說(shuō) java.util.concurrent)以及對(duì) “傳統(tǒng)” Java 并發(fā)性語(yǔ)義的支持(比如說(shuō)監(jiān)控程序和 wait()/notifyAll())。
其次,這些基本機(jī)制上面有一個(gè)抽象層,詳見(jiàn)本文所討論的 MailBox 類以及將在本系列下一篇文章中討論的 Actors 庫(kù)。
兩個(gè)例子中的目標(biāo)是相同的:讓開(kāi)發(fā)人員能夠更加輕松地專注于問(wèn)題的實(shí)質(zhì),而不用考慮并發(fā)編程的低級(jí)細(xì)節(jié)(顯然,第二種方法更好地實(shí)現(xiàn)了這一目標(biāo),至少對(duì)于沒(méi)有過(guò)多考慮低級(jí)細(xì)節(jié)的人來(lái)說(shuō)是這樣的。)
但是,當(dāng)前 Scala 庫(kù)的一個(gè)明顯的缺陷就是缺乏 Java 5 支持;scala.concurrent.ops 類應(yīng)該具有 spawn 這樣的利用新的 Executor 接口的方法。它還應(yīng)該支持利用新的 Lock 接口的各種版本的 synchronized。幸運(yùn)的是,這些都是可以在 Scala 生命周期中實(shí)現(xiàn)的庫(kù)增強(qiáng),而不會(huì)破壞已有代碼;它們甚至可以由 Scala 開(kāi)發(fā)人員自己完成,而不需要等待 Scala 的核心開(kāi)發(fā)團(tuán)隊(duì)提供給他們(只需要花費(fèi)少量時(shí)間)。
【相關(guān)閱讀】