Java ConcurrentLinkedQueue队列线程安全操作

/ Java / 没有评论 / 2224浏览

代码示例:

package async;  
  
import java.util.ArrayList;  
import java.util.List;  
import java.util.Queue;  
import java.util.concurrent.ConcurrentLinkedQueue;  
  
/** 
 *  
 * 线程安全队列Queue实现计算科学古老的线程同步问题:生产者-消费者模型 
 *  
 * @author phil 
 * 
 */  
public class Demo {  
    public static void main(String[] args) {  
        // 线程操作安全队列,装载数据  
        Queue<String> queue = new ConcurrentLinkedQueue<String>();  
  
        // 消费者线程:不断的消费队列中的数据  
        // 该线程不停的从队列中取出队列中最头部的数据  
        new Thread(new Runnable() {  
            @Override  
            public void run() {  
                while (true) {  
                    // 从队列的头部取出并删除该条数据  
                    String s = queue.poll();  
  
                    if (s != null) {  
                        System.out.println(System.currentTimeMillis() + "取出数据:" + s);  
                    }  
                }  
            }  
  
        }).start();  
  
        // 生产者线程A:不断的生产单个数据并装入队列中  
        // 该线程模拟不停的在队列中装入一个数据  
        new Thread(new Runnable() {  
            private int count = 0;  
            private int number = 0;  
  
            @Override  
            public void run() {  
                while (true) {  
                    number = count++;  
                    System.out.println("装载数据:" + number);  
                    queue.add(String.valueOf(number));  
  
                    try {  
                        Thread.sleep(1000);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
  
        }).start();  
  
        // 生产者线程B:不断的生产批量数据并装入队列中  
        // 该线程模拟不停的在队列中装入一批数据  
        new Thread(new Runnable() {  
            private List<String> lists = new ArrayList<String>();  
            private int count = 0;  
  
            @Override  
            public void run() {  
                while (true) {  
                    // 一批数据的数量,不定长  
                    count = (int) (Math.random() * 5);  
                    for (int i = 0; i < count; i++) {  
                        lists.add("批量数据-" + i + "," + Math.random());  
                    }  
  
                    queue.addAll(lists);  
                    lists.clear();  
  
                    try {  
                        Thread.sleep(1000);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            }  
  
        }).start();  
    }  
}  

代码运行结果:

装载数据:0  
1490417519703取出数据:0  
1490417519705取出数据:批量数据-0,0.9045746291285363  
装载数据:1  
1490417520703取出数据:1  
1490417520705取出数据:批量数据-0,0.9378737341890285  
1490417520705取出数据:批量数据-1,0.5609780480099475  
1490417520705取出数据:批量数据-2,0.27383078038481046  
1490417520705取出数据:批量数据-3,0.6824300990854635  
装载数据:2  
1490417521704取出数据:2  
1490417521706取出数据:批量数据-0,0.23916865770830298  
装载数据:3  
1490417522704取出数据:3  
1490417522706取出数据:批量数据-0,0.4169859285695523  
1490417522706取出数据:批量数据-1,0.6667611178989155  
1490417522706取出数据:批量数据-2,0.9026516620769446  
1490417522706取出数据:批量数据-3,0.3491566771349188  
装载数据:4  
1490417523705取出数据:4  
1490417523707取出数据:批量数据-0,0.5634243124726268  
1490417523707取出数据:批量数据-1,0.4021826644433847  
装载数据:5  
1490417524705取出数据:5  
1490417524707取出数据:批量数据-0,0.9431588628811881  
1490417524707取出数据:批量数据-1,0.8762171226841987  
1490417524707取出数据:批量数据-2,0.0470837112538508  
装载数据:6  
1490417525705取出数据:6  
1490417525707取出数据:批量数据-0,0.31099249499423265  
1490417525707取出数据:批量数据-1,0.7660770869693369  
1490417525707取出数据:批量数据-2,0.3209338524956993  
1490417525707取出数据:批量数据-3,0.2525376039263991  
装载数据:7  
1490417526706取出数据:7  
1490417526708取出数据:批量数据-0,0.6369286896471094