2008-03-21
线程同步模型, 生产者/消费者, 读写同步,线程池,concurrent map.
关键字: 线程 thread
之前写了一篇线程同步的原理分析的文章.
线程同步
http://www.javaeye.com/topic/164905
看到大家的反馈,感到深受鼓励.
看来这种举例说明本质原理的浅显易懂的文章, 还是比较受欢迎的.
关于线程, 我以前也写过一些文章.只是写得不那么清楚易懂.
一是因为自己的理解也有限, 二是很难找到灵感,很难写出一个非常贴切的例子.
应一些坛友的意见. 我这里继续发一些线程相关的内容.
内容的条理性和连贯性可能有些欠缺.
-------------------------------------------------------------------------
生产者/消费者模型
有了信号量这个利器,我们就可以处理比较复杂的线程同步模型了。
首先,我们来看一个比较简单的生产者/消费者模型。还是以Java代码为例。
public static final Object signal = new Object();
public static final char[] buf = new char[1024]; // 需要同步访问的共享资源
// 生产者代码
… produce() {
for(… ) { // 循环执行
synchronized(signal){
// 产生一些东西,放到 buf 共享资源中
signal.notify(); //然后通知消费者
signal.wait(); // 然后自己进入signal待召队列
}
}
}
// 消费者代码
… consume() {
for(… ) { // 循环执行
synchronized(signal){
signal.wait(); // 进入signal待召队列,等待生产者的通知
// 读取buf 共享资源里面的东西
signal.notify(); // 然后通知生产者
}
}
}
上述的生产者/消费者模型的实现非常简单,只用了一个信号量signal。这只是一段示意代码。
实际上的生产者/消费者模型的实现可能非常复杂。可以引入buf已满或者已空的判断,可以引入更多的信号量,也可以引入一个环状的buf链。但那些都是性能优化方面的工作,基本的信号量工作方式还是不变的。
生产者/消费者模型是典型的Coroutine。而且,当消费者或者生产者线程进入待召队列的时候,当前的运行栈状态就暂时保存在系统当中,这种状况又是典型的Continuation。
因此,我们完全可以用信号量机制自己实现Coroutine和Continuation。其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。
线程同步
http://www.javaeye.com/topic/164905
看到大家的反馈,感到深受鼓励.
看来这种举例说明本质原理的浅显易懂的文章, 还是比较受欢迎的.
关于线程, 我以前也写过一些文章.只是写得不那么清楚易懂.
一是因为自己的理解也有限, 二是很难找到灵感,很难写出一个非常贴切的例子.
应一些坛友的意见. 我这里继续发一些线程相关的内容.
内容的条理性和连贯性可能有些欠缺.
-------------------------------------------------------------------------
生产者/消费者模型
有了信号量这个利器,我们就可以处理比较复杂的线程同步模型了。
首先,我们来看一个比较简单的生产者/消费者模型。还是以Java代码为例。
public static final Object signal = new Object();
public static final char[] buf = new char[1024]; // 需要同步访问的共享资源
// 生产者代码
… produce() {
for(… ) { // 循环执行
synchronized(signal){
// 产生一些东西,放到 buf 共享资源中
signal.notify(); //然后通知消费者
signal.wait(); // 然后自己进入signal待召队列
}
}
}
// 消费者代码
… consume() {
for(… ) { // 循环执行
synchronized(signal){
signal.wait(); // 进入signal待召队列,等待生产者的通知
// 读取buf 共享资源里面的东西
signal.notify(); // 然后通知生产者
}
}
}
上述的生产者/消费者模型的实现非常简单,只用了一个信号量signal。这只是一段示意代码。
实际上的生产者/消费者模型的实现可能非常复杂。可以引入buf已满或者已空的判断,可以引入更多的信号量,也可以引入一个环状的buf链。但那些都是性能优化方面的工作,基本的信号量工作方式还是不变的。
生产者/消费者模型是典型的Coroutine。而且,当消费者或者生产者线程进入待召队列的时候,当前的运行栈状态就暂时保存在系统当中,这种状况又是典型的Continuation。
因此,我们完全可以用信号量机制自己实现Coroutine和Continuation。其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。
评论
lzmhehe
2008-05-13
队列结构:
public class SmsQueue {
private LinkedList<ShortMessage>[] queuePool;
@SuppressWarnings("unchecked")
public SmsQueue(int poolSize){
if(poolSize<=0||poolSize>20)
throw new ServiceException("初始化队列出错 应该 在0-20 而实际是"+poolSize);
queuePool = new LinkedList[poolSize];
for(int i=0;i<poolSize;i++){
queuePool[i] = new LinkedList<ShortMessage>();
}
}
/**
* 把短信放到队列尾
* @param shortMessage 短信 {@link #addFirst(ShortMessage)}
*/
public void push(ShortMessage shortMessage){
if(shortMessage!=null&&shortMessage.isFull()){
int index = shortMessage.getPriority();
synchronized (queuePool) {
if (index <= 0 || index >= queuePool.length) {
queuePool[0].addLast(shortMessage);
} else {
queuePool[index].addLast(shortMessage);
}
}
}
}
/**
* 从队列头拿出短信,如果整个队列为空,返回null
* @return
*/
public ShortMessage pop(){
synchronized (queuePool) {
for (int i = queuePool.length; i > 0; i--) {
if (!queuePool[i - 1].isEmpty()) {
return queuePool[i - 1].removeFirst();
}
}
}
return null;
}
/**
* 获得队列共有多少优先级
* @return
*/
public int getQueueWidth(){
synchronized(queuePool){
return queuePool.length;
}
}
/**
* 获得当前队列状态 没有返回空 list
* @return
*/
public List<QueueStatus> getStatus(){
List<QueueStatus> retList = new ArrayList<QueueStatus>();
int i=0;
synchronized (queuePool) {
for (LinkedList<ShortMessage> queue : queuePool) {
QueueStatus status = new QueueStatus();
status.setPrority(++i);
status.setSize(queue.size());
retList.add(status);
}
}
return retList;
}
/**
* 获得所有没有发送出去的对象,发回的list 中对象引用不能修改
* 由于对象已经在内存中 所以不考虑分页
* @return
*/
public List<ShortMessage> getAllShortMessage(){
List<ShortMessage> retList = new ArrayList<ShortMessage>();
synchronized (queuePool) {
for (LinkedList<ShortMessage> queue : queuePool) {
for (ShortMessage shortMessage : queue) {
retList.add(shortMessage);
}
}
}
return Collections.unmodifiableList(retList);
}
/**
* @return id ==null 或者没有找到 返回 false
* @param id
* @return
*/
public boolean removeSMById(String id){
if(id==null)
return false;
synchronized (queuePool) {
for (LinkedList<ShortMessage> queue : queuePool) {
for (ShortMessage shortMessage : queue) {
if(shortMessage.getId().equals(id)){
queue.remove(shortMessage);
return true;
}
}
}
return false;
}
}
/**
* 对列中共有多少短信
* @return
*/
public int totalShortMessageNum(){
int size=0;
synchronized(queuePool){
for(LinkedList list:queuePool){
if(!list.isEmpty()){
size+=list.size();
}
}
return size;
}
}
/**
* 队列是否为空
* @return
*/
public boolean isEmpty() {
synchronized (queuePool) {
for (LinkedList list : queuePool) {
if (!list.isEmpty()) {
return false;
}
}
}
return true;
}
/**
* 清空队列
*
*/
public void clear(){
synchronized (queuePool) {
for (LinkedList list : queuePool) {
list.clear();
}
}
}
/**
* 把短信放到队列头 (发送短信失败的时候使用)
* @param sm
*/
public void addFirst(ShortMessage sm) {
synchronized (queuePool) {
queuePool[queuePool.length - 1].addFirst(sm);
}
}
}
lzmhehe
2008-05-13
您好
我写的生产消费出现了问题,您看看是出在哪里了,谢谢
整个算法大致是:
多个线程同时生产短信,放到队列中(存在优先级,且先进先出),
一个消费线程从队列中拿出短信发送出去
目前出现了一个现象
生产者生产了短信,2个小时后消费者才将短信发送出去
而这2个小时的时间里面,又生产的其他短信都比较正常,都能马上发送出去,就这个停在队列里面,像是放假了不起床
并且程序运行了快半年了都没有出现这个毛病
生产者方面
MessageThread run 方法
消费者
我写的生产消费出现了问题,您看看是出在哪里了,谢谢
整个算法大致是:
多个线程同时生产短信,放到队列中(存在优先级,且先进先出),
一个消费线程从队列中拿出短信发送出去
目前出现了一个现象
生产者生产了短信,2个小时后消费者才将短信发送出去
而这2个小时的时间里面,又生产的其他短信都比较正常,都能马上发送出去,就这个停在队列里面,像是放假了不起床
并且程序运行了快半年了都没有出现这个毛病
生产者方面
public void sendSms(String xml) throws RemoteException {
log.info("Rmi接口接收到了xml"+xml);
MessageThread messageThread = Factory.getMessageThread();
messageThread.setXml(xml);
VerifyResult verifyResult = messageThread.verifyXml();
if (verifyResult.isValid()||verifyResult.equals(VerifyResult.XMLBEAN_HAS_DUPLE_PHONE)) {
log.info("过滤成功,开始放入发送队列");
new Thread(messageThread).start();
} else {
log.error("短信过滤掉了,原因:"+verifyResult.getMessage());
}
}
MessageThread run 方法
public void run() {
if (verified) {
ShortMessage shortMessage = xmlBean.convert2SM();
shortMessage.setServiceNo(shortMsgServiceNo.getServiceNo(xmlBean));
if (log4j.isDebugEnabled()) {
log4j.debug("shortMessage: 开始放入队列:" + shortMessage);
}
synchronized (queue) {
queue.push(shortMessage);
queue.notifyAll();
}
} else {
throw new ServiceException("请先检验在执行,当前没有检验");
}
}
消费者
public void run() {
stop=false;
if(log.isDebugEnabled()){
log.debug("SmsSendManager start run");
}
while (true){
ShortMessage sm =null;
synchronized (queue) {
while (queue.isEmpty()&&!stop) {
try {
queue.wait();
} catch (InterruptedException e) {
e.printStackTrace();
log.error(e);
System.exit(0);
}
}
sm = queue.pop();
}
if(stop){
return;
}
if (sm == null || !sm.isFull()) {
log.info("ShortMessage is null or is not valid");
continue;
}
log.info("will sender ShortMessage:"+sm);
sendShortMessage(sm);
}
}
yapi
2008-03-31
c++的pv问题偶写过,用Java的synchronized来搞还没尝试过
BaSaRa
2008-03-27
LZ的线程文章写得都不错,对理解线程还是很好的,支持
buaawhl
2008-03-26
shellkk 写道
多谢楼主,不过你的读写模型好像不是读并发的,是否应该改成这样:
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
}
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
写也可以做类似的修改,不过那是等价的
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
}
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
写也可以做类似的修改,不过那是等价的
thanks.
多谢你指出我的错误.
前面是写错了. 你的写法是正确的.
我之前的写法, 还是相当于互斥锁.
已改正.
shellkk
2008-03-26
多谢楼主,不过你的读写模型好像不是读并发的,是否应该改成这样:
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
}
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
写也可以做类似的修改,不过那是等价的
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
}
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
写也可以做类似的修改,不过那是等价的
buaawhl
2008-03-22
dsb 写道
同步互斥影响性能不能忽视,写N多多进程多线程的程序, 还是设计成不写同一块数据区域的进程或者线程比较好, 有时候会借助下操作系统的原子调用的一些特性,浪费点内存也忍了。大多数情况完全可以设计成不需要有共享写数据区的形式
我也是这么想的.
以前,曾经和坛友发生过争论.
我的一件事, 程序应该尽量写成 Stateless,如非必要,尽量避免 Statefull.
(Stateless 可以对应于 Purely Functional Programming 的思想).
有坛友说, Stateless 还叫做对象吗 ? 对象没有状态, 还叫做面向对象吗?
其实, 对象里面的可以更改的数据 才叫做状态. 只读数据不是状态.
现在的IoC框架, 可以组装出来大量的 Service 对象, 里面有复杂的对象结构, 只是那些结构一旦生成,就是一个固定的关系, 而不会改变. 这也叫作 Stateless.
-------------------------------
不过,要是完全按照 Stateless, 或者说追求 Purely Functional 的那种目标, 对于编程来说, 是比较麻烦的.
很多地方, Stateful 可以给我们带来很多方便. 或者说, 共享数据可以带来很多方便.
按照需要详细切分数据和程序, 需要在编程上花费更多的心力.
buaawhl
2008-03-22
Trustno1 写道
引用
其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。
??
这是我的一种猜想.
Iterator vs Visitor,PullvsPush
http://www.javaeye.com/topic/21293
引用
Coroutine本来是一个通用的概念。表示几个协同工作的程序。
比如,消费者/生产者,你走几步,我走几步;下棋对弈,你一步我一步。
由于协同工作的程序通常只有2个,而且这两个程序交换的数据通常只有一个。于是人们就很容易想到用Coroutine来实现Iterator。
这里面Iterator就是Coroutine里面的生产者Producer角色,数据提供者。所以,也叫做Generator。
每次Iterator程序就是等在那里,一旦用户(消费者Consumer角色)调用了iterator.next, Iterator就继续向下执行一步,然后把当前遇到的内部数据的Node放到一个消费者用户能够看到的公用的缓冲区(比如,直接放到消费者线程栈里面的局部变量)里面,然后自己就停下来(wait)。然后消费者用户就从缓冲区里面获得了那个Node。
这样Iterator就可以自顾自地进行递归运算,不需要自己管理一个栈,而是迫使计算机帮助它分配和管理运行栈。于是就实现了幸福得像花儿一样,简单得像Visitor一样的梦想。
比如,这样一段代码,遍历一棵二叉树。
public class TreeWalker : Coroutine {
private TreeNode _tree;
public TreeWalker(TreeNode tree) { _tree = tree; }
protected override Execute() {
Walk(_tree);
}
private void Walk(TreeNode tree) {
if (tree != null) {
Walk(tree.Left);
Yield(tree);
Walk(tree.Right);
}
}
}
其中的Yield指令是关键。意思是,首先把当前Node甩到用户的数据空间,然后自己暂停运行(类似于java的thread yield方法或者object.wait方法),把自己当前运行的线程挂起来,这样虚拟机就为自己保存了当前的运行栈(context)。
有人说,这不就是continuation吗?
对。只是Coroutine这里多了一个产生并传递数据的动作。
实现原理如果用Java Thread表示大概就是这样。当然下面的代码只是一个示意。网上有具体的Java Coroutine实现,具体代码我也没有看过,想来实现原理大致如此。
public class TreeIterator implements Iterator{
TreeWalker walker;
// start the walker thread ..
Object next(){
walker.notify();
// wait for a while so that walker can continue run
return walker.currentNode;
}
}
class TreeWalker implements Runnable{
TreeNode currentNode;
TreeWarker(root){
currentNode = root;
}
void run(){
walk(curentNode);
}
private void Walk(TreeNode tree) {
if (tree != null) {
Walk(tree.Left);
currentNode = tree;
this.wait();
Walk(tree.Right);
}
}
}
我们看到,Iterator本身是一个Thread,用户也是一个Thread。Iterator Thread把数据传递给User Thread。
说实话,我宁可自己维护一个Stack,也不愿意引入Coroutine这类Thread Control的方式来实现Iterator。
dsb
2008-03-21
同步互斥影响性能不能忽视,写N多多进程多线程的程序, 还是设计成不写同一块数据区域的进程或者线程比较好, 有时候会借助下操作系统的原子调用的一些特性,浪费点内存也忍了。大多数情况完全可以设计成不需要有共享写数据区的形式
Trustno1
2008-03-21
引用
其实,那些在语法层面上支持Coroutine和Continuation的语言,内部实现原理也是采用类似的信号量同步机制。
??
buaawhl
2008-03-21
线程池
线程是一种比较昂贵的资源。有些系统为了重用线程,引入了线程池的机制。
线程池的工作原理如下:
首先,系统会启动一定数量的线程。这些线程就构成了一个线程池。
当有任务要做的时候,系统就从线程池里面选一个空闲的线程。然后把这个线程标记为“正在运行”。然后把任务传给这个线程执行。线程执行任务完成之后,就把自己标记为“空闲”。
这个过程并不难以理解。难以理解的是,一般来说,线程执行完成之后,运行栈等系统资源就会释放,线程对象就被回收了。一个已经完成的线程,又如何能回到线程池的空闲线程队列中呢?
秘诀就在于,线程池里面的线程永远不会执行完成。线程池里面的线程,都是一个无穷循环。具体代码如下:
Thread pooledThread {
… theTask …. // theTask成员变量,表示要执行的任务
… run() {
while( true ) { // 永不停止的循环
signal.wait(); // 等待系统的通知
theTask.run(); // 执行任务
}
}
}
系统只需要调用 signal.notify() 就可以启动一个空闲线程。
线程是一种比较昂贵的资源。有些系统为了重用线程,引入了线程池的机制。
线程池的工作原理如下:
首先,系统会启动一定数量的线程。这些线程就构成了一个线程池。
当有任务要做的时候,系统就从线程池里面选一个空闲的线程。然后把这个线程标记为“正在运行”。然后把任务传给这个线程执行。线程执行任务完成之后,就把自己标记为“空闲”。
这个过程并不难以理解。难以理解的是,一般来说,线程执行完成之后,运行栈等系统资源就会释放,线程对象就被回收了。一个已经完成的线程,又如何能回到线程池的空闲线程队列中呢?
秘诀就在于,线程池里面的线程永远不会执行完成。线程池里面的线程,都是一个无穷循环。具体代码如下:
Thread pooledThread {
… theTask …. // theTask成员变量,表示要执行的任务
… run() {
while( true ) { // 永不停止的循环
signal.wait(); // 等待系统的通知
theTask.run(); // 执行任务
}
}
}
系统只需要调用 signal.notify() 就可以启动一个空闲线程。
buaawhl
2008-03-21
Concurrent Map
Concurrent Map的最简单的实现方法是直接用java.util.HashTable类,或者用Collections.synchronizedMap() 修饰某个Map。
这样获得的Map能够保证读写同步,但是,并发读的时候,也必须同步,串行读取,效率很低。这个思路显然不适合。
Doug Lea提供了一个Concurrent工具包,
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
包括Lock, ReadWriteLock, CurrentHashMap, CurrentReaderHashMap等类。JDK1.5引入了这些类,作为java.util.concurrent Package。
我设想了一下,CurrentHashMap应该是采用ReadWriteLock实现读写同步。代码看起来应该像这个样子。
但看了CurrentHashMap 的代码,发现不是这样。其中的实现比较复杂,把Table分成段进行分别管理。那个内部类 Segment extends ReentrantLock。
里面的 readValueUnderLock 方法里面用了lock。
我们再来看CurrentReaderHashMap, “A version of Hashtable that supports mostly-concurrent reading, but exclusive writing.”
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/ConcurrentReaderHashMap.java
但是它的 read ( get, contains, size …) 方法里面用到了synchronized。还是要获得系统锁。
Concurrent Map的最简单的实现方法是直接用java.util.HashTable类,或者用Collections.synchronizedMap() 修饰某个Map。
这样获得的Map能够保证读写同步,但是,并发读的时候,也必须同步,串行读取,效率很低。这个思路显然不适合。
Doug Lea提供了一个Concurrent工具包,
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
包括Lock, ReadWriteLock, CurrentHashMap, CurrentReaderHashMap等类。JDK1.5引入了这些类,作为java.util.concurrent Package。
我设想了一下,CurrentHashMap应该是采用ReadWriteLock实现读写同步。代码看起来应该像这个样子。
// my guess
class CocurrentHashMap
{
Private Map map = null;
final ReadWriteLock rwLock = new …. ;
final Lock readLock = rwLock.readLock();
final Lock writeLock = rwLock.writeLock();
// decorate the map as concurrent
public CocurrentHashMap(Map m){
map = m;
}
// all write method, like put, putAll, remove, clear
public void putAll(Map m){
writeLock.lock();
try{
map.putAll(m);
}finally{
writeLock.unlock();
}
}
// all read method. like get, containsKey, containsValue, entrySet()
public Object get(Object key){
readLock.lock();
try{
return map.get(key);
}finally{
readLock.unlock();
}
};
// as we can see, in such places it is convenient to use AOP here. :-)
但看了CurrentHashMap 的代码,发现不是这样。其中的实现比较复杂,把Table分成段进行分别管理。那个内部类 Segment extends ReentrantLock。
里面的 readValueUnderLock 方法里面用了lock。
/**
* Read value field of an entry under lock. Called if value
* field ever appears to be null. This is possible only if a
* compiler happens to reorder a HashEntry initialization with
* its table assignment, which is legal under memory model
* but is not known to ever occur.
*/
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
我们再来看CurrentReaderHashMap, “A version of Hashtable that supports mostly-concurrent reading, but exclusive writing.”
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/ConcurrentReaderHashMap.java
但是它的 read ( get, contains, size …) 方法里面用到了synchronized。还是要获得系统锁。
buaawhl
2008-03-21
读写模型
读写模型是一个稍微复杂一些的模型。
一份共享资源允许多个读者同时读取。但是只要有一个写者在写这份共享资源,任何其他的读者和写者都不能访问这份共享资源。
读写模型实现起来,不仅需要信号量机制,还需要额外的读者计数和写者计数。
public static final Object signal = new Object();
public static int readers = 0;
public static int writers = 0;
// 读者代码
… read() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
} // 这里出了synchronized范围,释放同步锁.以便其他线程读取.
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
// 写者代码
… write() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 || readers > 0)
signal.wait();// 如果有人在写或读,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写,也没有人在读
writers ++ ; // 增加一个写者计数,表示本线程在写
// 进行一些写操作
writers --; // 读取完成,减少一个读者计数,表示本线程不在写
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
上述代码只是一段示意代码。实际应用中,人们通常抽取出来一个专门的读写同步锁。
interface ReadWriteLock {
… getReadLock();
… releaseReadLock();
… getWriteLock();
… releaseWriteLock();
}
具体的实现原理也是类似的信号量同步机制。
class RWLock {
… readers, writers;
… synchronized … getReadLock() { // 相当于synchronized(this)
…
while( writers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
readers++;
}
…synchronized … releaseReadLock(){ //相当于synchronized(this)
readers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
…synchronized … getWriteLock(){// 相当于synchronized(this)
while( writers > 0 || readers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
writers++;
}
…synchronized … releaseWriteLock(){// 相当于synchronized(this)
writers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
}
具体用法是
public static final RWLock lock = new RWLock();
… read() {
lock.getReadLock();
// 读取
lock.releaseReadLock();
}
… write() {
lock.getWriteLock();
// 读取
lock.releaseWriteLock();
}
这种用法要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。
这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。
很巧的是,正在我惴惴不安的时候,我的一个朋友提供了一个信息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。
以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
里面提供了一个ReadWriteLock类,标准用法如下。
Standard usage of ReadWriteLock:
class X {
ReadWriteLock rw;
// ...
public void read() throws InterruptedException {
rw.readLock().acquire();
try {
// ... do the read
}
finally {
rw.readlock().release()
}
}
public void write() throws InterruptedException {
rw.writeLock().acquire();
try {
// ... do the write
}
finally {
rw.writelock().release()
}
}
}
我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。
我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。
读写模型是一个稍微复杂一些的模型。
一份共享资源允许多个读者同时读取。但是只要有一个写者在写这份共享资源,任何其他的读者和写者都不能访问这份共享资源。
读写模型实现起来,不仅需要信号量机制,还需要额外的读者计数和写者计数。
public static final Object signal = new Object();
public static int readers = 0;
public static int writers = 0;
// 读者代码
… read() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 )
signal.wait(); // 如果有人在写,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写
readers ++ ; // 增加一个读者计数,表示本线程在读取
} // 这里出了synchronized范围,释放同步锁.以便其他线程读取.
// 进行一些读取操作
synchronized(signal){
readers --; // 读取完成,减少一个读者计数,表示本线程不在读取
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
// 写者代码
… write() {
for(… ) { // 循环执行
synchronized(signal){
while( writers > 0 || readers > 0)
signal.wait();// 如果有人在写或读,那么就放弃执行,进入待召队列
// 能够到达这里,说明没有人在写,也没有人在读
writers ++ ; // 增加一个写者计数,表示本线程在写
// 进行一些写操作
writers --; // 读取完成,减少一个读者计数,表示本线程不在写
signal.notifyAll(); // 通知待召队列里面的所有其他线程
}
}
}
上述代码只是一段示意代码。实际应用中,人们通常抽取出来一个专门的读写同步锁。
interface ReadWriteLock {
… getReadLock();
… releaseReadLock();
… getWriteLock();
… releaseWriteLock();
}
具体的实现原理也是类似的信号量同步机制。
class RWLock {
… readers, writers;
… synchronized … getReadLock() { // 相当于synchronized(this)
…
while( writers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
readers++;
}
…synchronized … releaseReadLock(){ //相当于synchronized(this)
readers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
…synchronized … getWriteLock(){// 相当于synchronized(this)
while( writers > 0 || readers > 0 )
this.wait(); // 这里我们把RWLock对象本身作为信号量
writers++;
}
…synchronized … releaseWriteLock(){// 相当于synchronized(this)
writers--;
this.notifyAll(); // // 这里我们把RWLock对象本身作为信号量
}
}
具体用法是
public static final RWLock lock = new RWLock();
… read() {
lock.getReadLock();
// 读取
lock.releaseReadLock();
}
… write() {
lock.getWriteLock();
// 读取
lock.releaseWriteLock();
}
这种用法要求在执行一些处理之前,一定要执行某项特殊操作,处理之后一定也要执行某项特殊操作。这种人为的顺序性,无疑增加了代码的耦合度,降低了代码的独立性。很有可能会成为线程死锁和资源操作冲突的根源。
这点一直让我不安,可是没有找到方法避免。毕竟,死锁或者资源操作冲突,是线程的固有问题。
很巧的是,正在我惴惴不安的时候,我的一个朋友提供了一个信息。Sun公司根据JCR,决定在jdk1.5中引入关于concurrency(并发)的部分。
以下这个网址是concurrency部分的util.concurrent一个实现。非常好的信息。对于处理多线程并发问题,很有帮助。
http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html
里面提供了一个ReadWriteLock类,标准用法如下。
Standard usage of ReadWriteLock:
class X {
ReadWriteLock rw;
// ...
public void read() throws InterruptedException {
rw.readLock().acquire();
try {
// ... do the read
}
finally {
rw.readlock().release()
}
}
public void write() throws InterruptedException {
rw.writeLock().acquire();
try {
// ... do the write
}
finally {
rw.writelock().release()
}
}
}
我们可以看到,ReadWriteLock同样要求调用的顺序——aquire()和release()。我对自己的例子增强了一点信心。
我又查看了WriterPreferenceReadWriteLock类,看到里面成对的方法,startRead(),endRead();startWrite(),endWrite()。我的心情完全放松了下来。我的思路虽然粗糙,但大体的方向是正确的。
发表评论
提醒: 该博客已发表在公共论坛,博客所有留言会成为论坛回贴,留言请注意遵守论坛发贴规则
- 浏览: 585649 次
- 性别:

- 来自: china

- 详细资料
搜索本博客
最近加入圈子
最新评论
-
网上银行的安全操作设计探 ...
有道理,不知道建设银行的UKey有啥用?
-- by sjz209 -
网上银行的安全操作设计探 ...
有道理,不知道建设银行的UKey有啥用?
-- by sjz209 -
网上银行的安全操作设计探 ...
1、据了解,动态口令采用的就是楼主说的第2种机制,所以动态口令发生器会有一个容错 ...
-- by jxb8901 -
谁说搞技术的没有幽默感?
yyliuliang 写道部门老大宣布放一年长假,大伙欢呼雀跃,连作三个俯卧撑表 ...
-- by hongfei3 -
谁说搞技术的没有幽默感?
幸存者 写道为什么我觉得不好笑?是因为我没有幽默感么? bingoo
-- by roger






评论排行榜