<Read-Write Lock>

(1)ReentrantReadWriteLock

Read-Write Lockは「読む処理」と「書く処理」に分けてロックを取ります。
「読む処理」X「書く処理」または「書く処理」X「書く処理」の時にはロックをかけますが 「読む処理」X「読む処理」の時にはロックをかけません。
読み込み処理が多い時または読み込み処理に時間がかかる場合など、一般的な排他制御より効率が期待されます。 java.util.concurrent.locks.ReentrantReadWriteLockクラスを使えば、Read-Write Lockの機能が容易に実現できます。
Read-Write Lockのパターンは

lock
try {
  処理
} finally {
  unlock
}

のようにfinally節で必ずロックを開放しなければいけません。

●Read-Write Lockを利用するクラス import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Data { private final char[] buffer; private final ReadWriteLock lock = new ReentrantReadWriteLock(true); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); public Data(int size) { this.buffer = new char[size]; } public String read() throws InterruptedException { readLock.lock(); try { return new String(doRead()); } finally { readLock.unlock(); } } public void write(String line) throws InterruptedException { writeLock.lock(); try { int index = 0; for (int i=0; i<buffer.length;i++) { if (index >= line.length()) { index = 0; } doWrite(line.charAt(index), i); index++; } } finally { writeLock.unlock(); } } private char[] doRead() throws InterruptedException { char[] newbuf = new char[buffer.length]; for (int i=0; i<buffer.length; i++) { newbuf[i] = buffer[i]; } Thread.sleep(50); return newbuf; } private void doWrite(char c, int index) throws InterruptedException { buffer[index] = c; Thread.sleep(50); } } ●Readerスレッドクラス public class ReaderThread extends Thread { private final Data data; public ReaderThread(Data data) { this.data = data; } public void run() { try { while (true) { String readbuf = data.read(); System.out.println(Thread.currentThread().getName() + " reads " + readbuf); } } catch (InterruptedException e) { } } } ●Writerスレッドクラス import java.util.Random; public class WriterThread extends Thread { private static final Random random = new Random(); private final Data data; private final String chars; public WriterThread(Data data, String chars) { this.data = data; this.chars = chars; } public void run() { try { while (true) { char[] writebuf = new char[10]; for (int i=0; i<writebuf.length; i++) { writebuf[i] = nextchar(); } data.write(new String(writebuf)); Thread.sleep(random.nextInt(3000)); } } catch (InterruptedException e) { } } private char nextchar() { int index = random.nextInt(chars.length()); return chars.charAt(index); } } ●Mainクラス public class Main { public static void main(String[] args) { Data data = new Data(10); new ReaderThread(data).start(); new ReaderThread(data).start(); new ReaderThread(data).start(); new ReaderThread(data).start(); new ReaderThread(data).start(); new ReaderThread(data).start(); new WriterThread(data, "ABCDEFGHIJKLMNOPQTSTUVWXYZ").start(); new WriterThread(data, "abcdefghijklmnopqrstuvwxyz").start(); } }
(参考)
パフォーマンスは落ちますが、Read-Write Lockを使わずsynchronizedブロックによる排他制御も利用できます。
public class Data { private final char[] buffer; public Data(int size) { this.buffer = new char[size]; } public synchronized String read() throws InterruptedException { return new String(doRead()); } public synchronized void write(String line) throws InterruptedException { int index = 0; ・・・略・・・ } ・・・略・・・ }

(2)ReadWriteLockクラス

java.util.concurrent.locksパッケージが使えない場合は、以下のようにReadWriteLockクラスを作成します。

●ReadWriteLockクラス public final class ReadWriteLock { private int readingReaders = 0; // 実際に読んでいるスレッドの数 private int waitingWriters = 0; // 書くのを待っているスレッドの数 private int writingWriters = 0; // 実際に書いているスレッドの数 private boolean preferWriter = true; // true:書き込み優先 public synchronized void readLock() throws InterruptedException { while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) { wait(); } // 読んでいるスレッドの数を1増やす readingReaders++; } public synchronized void readUnlock() { // 読んでいるスレッドの数を1減らす readingReaders--; // 書き込み優先にする preferWriter = true; notifyAll(); } public synchronized void writeLock() throws InterruptedException { // 書くのを待っているスレッドの数を1増やす waitingWriters++; try { while (readingReaders > 0 || writingWriters > 0) { wait(); } } finally { // 書くのを待っているスレッドの数を1減らす waitingWriters--; } // 書いているスレッドの数を1増やす writingWriters++; } public synchronized void writeUnlock() { // 書いているスレッドの数を1減らす writingWriters--; // 読み込み優先にする preferWriter = false; notifyAll(); } } ●Read-Write Lockを利用するクラス public class Data { private final char[] buffer; private final ReadWriteLock lock = new ReadWriteLock(); public Data(int size) { this.buffer = new char[size]; } public String read() throws InterruptedException { lock.readLock(); try { return new String(doRead()); } finally { lock.readUnlock(); } } public void write(String line) throws InterruptedException { lock.writeLock(); try { int index = 0; for (int i=0; i<buffer.length;i++) { if (index >= line.length()) { index = 0; } doWrite(line.charAt(index), i); index++; } } finally { lock.writeUnlock(); } } private char[] doRead() throws InterruptedException { char[] newbuf = new char[buffer.length]; for (int i=0; i<buffer.length; i++) { newbuf[i] = buffer[i]; } Thread.sleep(50); return newbuf; } private void doWrite(char c, int index) throws InterruptedException { buffer[index] = c; Thread.sleep(50); } }

(3)HashMapに適用

HashMapクラスはスレッドセーフではありませんが、java.util.concurrent.locksパッケージを使って
Read-Write Lockの機能を適用できます。

●Mapクラス import java.util.HashMap; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class DataMap<K,V> { private final Map<K,V> map = new HashMap<K,V>(); private final ReadWriteLock lock = new ReentrantReadWriteLock(true); private final Lock readLock = lock.readLock(); private final Lock writeLock = lock.writeLock(); // クリア public void clear() { writeLock.lock(); try { map.clear(); } finally { writeLock.unlock(); } } // keyにvalueを割り当てる public void assign(K key, V value) { writeLock.lock(); try { map.put(key, value); } finally { writeLock.unlock(); } } // keyに割り当てた値を取得する public V retrieve(K key) { readLock.lock(); try { return map.get(key); } finally { readLock.unlock(); } } } ●Assignスレッドクラス public class AssignThread extends Thread { private final DataMap<String, String> dataMap; private final String key; private final String value; public AssignThread(DataMap<String, String> dataMap, String key, String value) { this.dataMap = dataMap; this.key = key; this.value = value; } public void run() { while (true) { System.out.println(Thread.currentThread().getName() + ":assign(" + key + "," + value + ")"); dataMap.assign(key, value); try { Thread.sleep(500); } catch (InterruptedException e) { } } } } ●Retrieveスレッドクラス import java.util.concurrent.atomic.AtomicInteger; public class RetrieveThread extends Thread { private final DataMap<String, String> dataMap; private final String key; private static final AtomicInteger atomicCounter = new AtomicInteger(0); public RetrieveThread(DataMap<String, String> dataMap, String key) { this.dataMap = dataMap; this.key = key; } public void run() { while(true) { int counter = atomicCounter.incrementAndGet(); String value = dataMap.retrieve(key); System.out.println(counter + ":" + key + " => " + value); try { Thread.sleep(50); } catch (InterruptedException e) { } } } } ●Mainクラス public class Main { public static void main(String[] args) { DataMap<String, String> dataMap = new DataMap<String, String>(); new AssignThread(dataMap, "Alice", "Alaska").start(); new AssignThread(dataMap, "Alice", "Australia").start(); new AssignThread(dataMap, "Bobby", "Brazil").start(); new AssignThread(dataMap, "Bobby", "Bulgaria").start(); for (int i=0; i<100; i++) { new RetrieveThread(dataMap, "Alice").start(); new RetrieveThread(dataMap, "Bobby").start(); } try { Thread.sleep(10000); } catch (InterruptedException e) { } System.exit(0); } }
(参考)
パフォーマンスは落ちますが、synchronizedを使った一般的な排他制御も可能です。

import java.util.HashMap; import java.util.Map; public class DataMap<K,V> { private final Map<K,V> map = new HashMap<K,V>(); // クリア public synchronized void clear() { map.clear(); } // keyにvalueを割り当てる public synchronized void assign(K key, V value) { map.put(key, value); } // keyに割り当てた値を取得する public synchronized V retrieve(K key) { return map.get(key); } }

(2008/6 作成)