Saturday, November 4, 2017

6--Thread_2 + Semaphore in Java + Futur(Callable) + Synchronized variable

------------------------------------------------------------------------------------------------------------------------





Semaphore in Java

3.8

A semaphore controls access to a shared resource through the use of a counter. If the counter is greater than zero, then access is allowed. If it is zero, then access is denied. What the counter is counting are permits that allow access to the shared resource. Thus, to access the resource, a thread must be granted a permit from the semaphore.
Working of semaphore
In general, to use a semaphore, the thread that wants access to the shared resource tries to acquire a permit.
  • If the semaphore’s count is greater than zero, then the thread acquires a permit, which causes the semaphore’s count to be decremented.
  • Otherwise, the thread will be blocked until a permit can be acquired.
  • When the thread no longer needs an access to the shared resource, it releases the permit, which causes the semaphore’s count to be incremented.
  • If there is another thread waiting for a permit, then that thread will acquire a permit at that time.
Java provide Semaphore class in java.util.concurrent package that implements this mechanism, so you don’t have to implement your own semaphores.
Flow Diagram :
d
Constructors in Semaphore class : There are two constructors in Semaphore class.
Semaphore(int num)
Semaphore(int num, boolean how)
Here, num specifies the initial permit count. Thus, it specifies the number of threads that can access a shared resource at any one time. If it is one, then only one thread can access the resource at any one time. By default, all waiting threads are granted a permit in an undefined order. By setting how to true, you can ensure that waiting threads are granted a permit in the order in which they requested access.
Using Semaphores as Locks(preventing race condition)
We can use a semaphore to lock access to a resource, each thread that wants to use that resource must first call acquire( ) before accessing the resource to acquire the lock. When the thread is done with the resource, it must call release( ) to release lock. Here is an example that demonstrate this:
// java program to demonstrate
// use of semaphores Locks
import java.util.concurrent.*;
//A shared resource/class.
class Shared
{
    static int count = 0;
}
class MyThread extends Thread
{
    Semaphore sem;
    String threadName;
    public MyThread(Semaphore sem, String threadName)
    {
        super(threadName);
        this.sem = sem;
        this.threadName = threadName;
    }
    @Override
    public void run() {
         
        // run by thread A
        if(this.getName().equals("A"))
        {
            System.out.println("Starting " + threadName);
            try
            {
                // First, get a permit.
                System.out.println(threadName + " is waiting for a permit.");
             
                // acquiring the lock
                sem.acquire();
             
                System.out.println(threadName + " gets a permit.");
         
                // Now, accessing the shared resource.
                // other waiting threads will wait, until this
                // thread release the lock
                for(int i=0; i < 5; i++)
                {
                    Shared.count++;
                    System.out.println(threadName + ": " + Shared.count);
         
                    // Now, allowing a context switch -- if possible.
                    // for thread B to execute
                    Thread.sleep(10);
                }
            } catch (InterruptedException exc) {
                    System.out.println(exc);
                }
         
                // Release the permit.
                System.out.println(threadName + " releases the permit.");
                sem.release();
        }
         
        // run by thread B
        else
        {
            System.out.println("Starting " + threadName);
            try
            {
                // First, get a permit.
                System.out.println(threadName + " is waiting for a permit.");
             
                // acquiring the lock
                sem.acquire();
             
                System.out.println(threadName + " gets a permit.");
         
                // Now, accessing the shared resource.
                // other waiting threads will wait, until this
                // thread release the lock
                for(int i=0; i < 5; i++)
                {
                    Shared.count--;
                    System.out.println(threadName + ": " + Shared.count);
         
                    // Now, allowing a context switch -- if possible.
                    // for thread A to execute
                    Thread.sleep(10);
                }
            } catch (InterruptedException exc) {
                    System.out.println(exc);
                }
                // Release the permit.
                System.out.println(threadName + " releases the permit.");
                sem.release();
        }
    }
}
// Driver class
public class SemaphoreDemo
{
    public static void main(String args[]) throws InterruptedException
    {
        // creating a Semaphore object
        // with number of permits 1
        Semaphore sem = new Semaphore(1);
         
        // creating two threads with name A and B
        // Note that thread A will increment the count
        // and thread B will decrement the count
        MyThread mt1 = new MyThread(sem, "A");
        MyThread mt2 = new MyThread(sem, "B");
         
        // stating threads A and B
        mt1.start();
        mt2.start();
         
        // waiting for threads A and B
        mt1.join();
        mt2.join();
         
        // count will always remain 0 after
        // both threads will complete their execution
        System.out.println("count: " + Shared.count);
    }
}
Output:
Starting A
Starting B
B is waiting for a permit.
B gets a permit.
A is waiting for a permit.
B: -1
B: -2
B: -3
B: -4
B: -5
B releases the permit.
A gets a permit.
A: -4
A: -3
A: -2
A: -1
A: 0
A releases the permit.
count: 0
Note : The output can be different in different executions of above program, but final value of count variable will always remain 0.
Explanation of above program :
  • The program uses a semaphore to control access to the count variable, which is a static variable within the Shared class. Shared.count is incremented five times by thread A and decremented five times by thread B.To prevent these two threads from accessing Shared.count at the same time, access is allowed only after a permit is acquired from the controlling semaphore. After access is complete, the permit is released. In this way, only one thread at a time will access Shared.count, as the output shows.
  • Notice the call to sleep( ) within run( ) method inside MyThread class. It is used to “prove” that accesses to Shared.count are synchronized by the semaphore. In run( ), the call to sleep( ) causes the invoking thread to pause between each access to Shared.count. This would normally enable the second thread to run. However, because of the semaphore, the second thread must wait until the first has released the permit, which happens only after all accesses by the first thread are complete. Thus, Shared.count is first incremented five times by thread A and then decremented five times by thread B. The increments and decrements are not intermixed at assembly code.
  • Without the use of the semaphore, accesses to Shared.count by both threads would have occurred simultaneously, and the increments and decrements would be intermixed.To confirm this, try commenting out the calls to acquire( ) and release( ). When you run the program, you will see that access to Shared.count is no longer synchronized, thus you will not always get count value 0.
==============================================
MUTEX--:





=====================================================================


Java multi threads example to show you how to use Semaphore and Mutex to limit the number of threads to access resources.
  1. Semaphores – Restrict the number of threads that can access a resource. Example, limit max 10 connections to access a file simultaneously.
  2. Mutex – Only one thread to access a resource at once. Example, when a client is accessing a file, no one else should have access the same file at the same time.

1. Semaphore

Consider an ATM cubicle with 4 ATMs, Semaphore can make sure only 4 people can access simultaneously.
SemaphoreTest.java
package com.mkyong;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {

 // max 4 people
 static Semaphore semaphore = new Semaphore(4);

 static class MyATMThread extends Thread {

  String name = "";

  MyATMThread(String name) {
   this.name = name;
  }

  public void run() {

   try {

    
    System.out.println(name + " : acquiring lock...");
    System.out.println(name + " : available Semaphore permits now: " 
        + semaphore.availablePermits());
    
    semaphore.acquire();
    System.out.println(name + " : got the permit!");

    try {

     for (int i = 1; i <= 5; i++) {

      System.out.println(name + " : is performing operation " + i 
        + ", available Semaphore permits : "
        + semaphore.availablePermits());

      // sleep 1 second
      Thread.sleep(1000);

     }

    } finally {

     // calling release() after a successful acquire()
     System.out.println(name + " : releasing lock...");
     semaphore.release();
     System.out.println(name + " : available Semaphore permits now: " 
        + semaphore.availablePermits());

    }

   } catch (InterruptedException e) {

    e.printStackTrace();

   }

  }

 }

 public static void main(String[] args) {

  System.out.println("Total available Semaphore permits : " 
    + semaphore.availablePermits());
 
  MyATMThread t1 = new MyATMThread("A");
  t1.start();

  MyATMThread t2 = new MyATMThread("B");
  t2.start();

  MyATMThread t3 = new MyATMThread("C");
  t3.start();

  MyATMThread t4 = new MyATMThread("D");
  t4.start();

  MyATMThread t5 = new MyATMThread("E");
  t5.start();

  MyATMThread t6 = new MyATMThread("F");
  t6.start();

 }
}

Output may vary, but the flow of locking and releasing should be more or less same.
Total available Semaphore permits : 4
A : acquiring lock...
D : acquiring lock...
C : acquiring lock...
B : acquiring lock...
B : available Semaphore permits now: 4
C : available Semaphore permits now: 4
E : acquiring lock...
F : acquiring lock...
F : available Semaphore permits now: 2
F : got the permit!
F : is performing operation 1, available Semaphore permits : 1
D : available Semaphore permits now: 4
A : available Semaphore permits now: 4
D : got the permit!
D : is performing operation 1, available Semaphore permits : 0
E : available Semaphore permits now: 2
C : got the permit!
B : got the permit!
C : is performing operation 1, available Semaphore permits : 0
B : is performing operation 1, available Semaphore permits : 0
F : is performing operation 2, available Semaphore permits : 0
D : is performing operation 2, available Semaphore permits : 0
C : is performing operation 2, available Semaphore permits : 0
B : is performing operation 2, available Semaphore permits : 0
F : is performing operation 3, available Semaphore permits : 0
D : is performing operation 3, available Semaphore permits : 0
C : is performing operation 3, available Semaphore permits : 0
B : is performing operation 3, available Semaphore permits : 0
F : is performing operation 4, available Semaphore permits : 0
D : is performing operation 4, available Semaphore permits : 0
C : is performing operation 4, available Semaphore permits : 0
B : is performing operation 4, available Semaphore permits : 0
D : is performing operation 5, available Semaphore permits : 0
F : is performing operation 5, available Semaphore permits : 0
B : is performing operation 5, available Semaphore permits : 0
C : is performing operation 5, available Semaphore permits : 0


D : releasing lock...
F : releasing lock...
D : available Semaphore permits now: 1
A : got the permit!
A : is performing operation 1, available Semaphore permits : 0
F : available Semaphore permits now: 1
E : got the permit!


E : is performing operation 1, available Semaphore permits : 0
B : releasing lock...
B : available Semaphore permits now: 1
C : releasing lock...
C : available Semaphore permits now: 2
A : is performing operation 2, available Semaphore permits : 2
E : is performing operation 2, available Semaphore permits : 2
A : is performing operation 3, available Semaphore permits : 2
E : is performing operation 3, available Semaphore permits : 2
A : is performing operation 4, available Semaphore permits : 2
E : is performing operation 4, available Semaphore permits : 2
A : is performing operation 5, available Semaphore permits : 2
E : is performing operation 5, available Semaphore permits : 2
A : releasing lock...
A : available Semaphore permits now: 3
E : releasing lock...
E : available Semaphore permits now: 4

Observe the above output carefully, you will see that there are maximum 4 people (C, B, F, D) to perform an operation at a time, the people A and E are waiting. As soon as one of them release the lock (D and F), A and E will acquire it and resumes immediately.

2. Mutex

Mutex is the Semaphore with an access count of 1. Consider a situation of using lockers in the bank. Usually the rule is that only one person is allowed to enter the locker room.
MutexTest.java
package com.mkyong;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {

 // max 1 people
 static Semaphore semaphore = new Semaphore(1);

 static class MyLockerThread extends Thread {

  String name = "";

  MyLockerThread(String name) {
   this.name = name;
  }

  public void run() {

   try {

    System.out.println(name + " : acquiring lock...");
    System.out.println(name + " : available Semaphore permits now: " 
        + semaphore.availablePermits());

    semaphore.acquire();
    System.out.println(name + " : got the permit!");

    try {

     for (int i = 1; i <= 5; i++) {

      System.out.println(name + " : is performing operation " + i 
        + ", available Semaphore permits : "
        + semaphore.availablePermits());

      // sleep 1 second
      Thread.sleep(1000);

     }

    } finally {

     // calling release() after a successful acquire()
     System.out.println(name + " : releasing lock...");
     semaphore.release();
     System.out.println(name + " : available Semaphore permits now: " 
        + semaphore.availablePermits());

    }

   } catch (InterruptedException e) {

    e.printStackTrace();

   }

  }

 }

 public static void main(String[] args) {

  System.out.println("Total available Semaphore permits : " 
    + semaphore.availablePermits());

  MyLockerThread t1 = new MyLockerThread("A");
  t1.start();

  MyLockerThread t2 = new MyLockerThread("B");
  t2.start();

  MyLockerThread t3 = new MyLockerThread("C");
  t3.start();

  MyLockerThread t4 = new MyLockerThread("D");
  t4.start();

  MyLockerThread t5 = new MyLockerThread("E");
  t5.start();

  MyLockerThread t6 = new MyLockerThread("F");
  t6.start();

 }
}

Output may vary, but the flow of locking and releasing should be same.
Total available Semaphore permits : 1
A : acquiring lock...
B : acquiring lock...
A : available Semaphore permits now: 1
C : acquiring lock...
B : available Semaphore permits now: 1
C : available Semaphore permits now: 0
A : got the permit!
D : acquiring lock...
E : acquiring lock...
A : is performing operation 1, available Semaphore permits : 0
E : available Semaphore permits now: 0
D : available Semaphore permits now: 0
F : acquiring lock...
F : available Semaphore permits now: 0
A : is performing operation 2, available Semaphore permits : 0
A : is performing operation 3, available Semaphore permits : 0
A : is performing operation 4, available Semaphore permits : 0
A : is performing operation 5, available Semaphore permits : 0
A : releasing lock...
A : available Semaphore permits now: 1
B : got the permit!
B : is performing operation 1, available Semaphore permits : 0
B : is performing operation 2, available Semaphore permits : 0
B : is performing operation 3, available Semaphore permits : 0
B : is performing operation 4, available Semaphore permits : 0
B : is performing operation 5, available Semaphore permits : 0
B : releasing lock...
B : available Semaphore permits now: 1
C : got the permit!
C : is performing operation 1, available Semaphore permits : 0
C : is performing operation 2, available Semaphore permits : 0
C : is performing operation 3, available Semaphore permits : 0
C : is performing operation 4, available Semaphore permits : 0
C : is performing operation 5, available Semaphore permits : 0
C : releasing lock...
C : available Semaphore permits now: 1
E : got the permit!
E : is performing operation 1, available Semaphore permits : 0
E : is performing operation 2, available Semaphore permits : 0
E : is performing operation 3, available Semaphore permits : 0
E : is performing operation 4, available Semaphore permits : 0
E : is performing operation 5, available Semaphore permits : 0
E : releasing lock...
E : available Semaphore permits now: 1
D : got the permit!
D : is performing operation 1, available Semaphore permits : 0
D : is performing operation 2, available Semaphore permits : 0
D : is performing operation 3, available Semaphore permits : 0
D : is performing operation 4, available Semaphore permits : 0
D : is performing operation 5, available Semaphore permits : 0
D : releasing lock...
D : available Semaphore permits now: 1
F : got the permit!
F : is performing operation 1, available Semaphore permits : 0
F : is performing operation 2, available Semaphore permits : 0
F : is performing operation 3, available Semaphore permits : 0
F : is performing operation 4, available Semaphore permits : 0
F : is performing operation 5, available Semaphore permits : 0
F : releasing lock...
F : available Semaphore permits now: 1





==============================================

Simple Semaphore

Here is a simple Semaphore implementation:
public class Semaphore {
  private boolean signal = false;

  public synchronized void take() {
    this.signal = true;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(!this.signal) wait();
    this.signal = false;
  }

}
The take() method sends a signal which is stored internally in the Semaphore. The release() method waits for a signal. When received the signal flag is cleared again, and the release() method exited.
Using a semaphore like this you can avoid missed signals. You will call take() instead of notify() and release() instead of wait(). If the call to take() happens before the call to release() the thread calling release() will still know that take() was called, because the signal is stored internally in the signal variable. This is not the case with wait() and notify().
The names take() and release() may seem a bit odd when using a semaphore for signaling. The names origin from the use of semaphores as locks, as explained later in this text. In that case the names make more sense.

Using Semaphores for Signaling

Here is a simplified example of two threads signaling each other using a Semaphore:
Semaphore semaphore = new Semaphore();

SendingThread sender = new SendingThread(semaphore);

ReceivingThread receiver = new ReceivingThread(semaphore);

receiver.start();
sender.start();
public class SendingThread {
  Semaphore semaphore = null;

  public SendingThread(Semaphore semaphore){
    this.semaphore = semaphore;
  }

  public void run(){
    while(true){
      //do something, then signal
      this.semaphore.take();

    }
  }
}
public class RecevingThread {
  Semaphore semaphore = null;

  public ReceivingThread(Semaphore semaphore){
    this.semaphore = semaphore;
  }

  public void run(){
    while(true){
      this.semaphore.release();
      //receive signal, then do something...
    }
  }
}


Counting Semaphore

The Semaphore implementation in the previous section does not count the number of signals sent to it by take() method calls. We can change the Semaphore to do so. This is called a counting semaphore. Here is a simple implementation of a counting semaphore:
public class CountingSemaphore {
  private int signals = 0;

  public synchronized void take() {
    this.signals++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(this.signals == 0) wait();
    this.signals--;
  }

}

Bounded Semaphore

The CoutingSemaphore has no upper bound on how many signals it can store. We can change the semaphore implementation to have an upper bound, like this:
public class BoundedSemaphore {
  private int signals = 0;
  private int bound   = 0;

  public BoundedSemaphore(int upperBound){
    this.bound = upperBound;
  }

  public synchronized void take() throws InterruptedException{
    while(this.signals == bound) wait();
    this.signal++;
    this.notify();
  }

  public synchronized void release() throws InterruptedException{
    while(this.signal == 0) wait();
    this.signal--;
  }
}
Notice how the take() method now blocks if the number of signals is equal to the upper bound. Not until a thread has called receive will the thread calling take() be allowed to deliver its signal, if the BoundedSemaphore has reached its upper signal limit.

Using Semaphores as Locks

It is possible to use a bounded semaphore as a lock. To do so, set the upper bound to 1, and have the call to take() and release() guard the critical section. Here is an example:
BoundedSemaphore semaphore = new BoundedSemaphore(1);

...

semaphore.take();

try{
  //critical section
} finally {
  semaphore.release();
}
In contrast to the signaling use case the methods take() and release() are now called by the same thread. Since only one thread is allowed to take the semaphore, all other threads calling take() will be blocked until release() is called. The call to release() will never block since there has always been a call to take() first.
You can also use a bounded semaphore to limit the number of threads allowed into a section of code. For instance, in the example above, what would happen if you set the limit of the BoundedSemaphore to 5? 5 threads would be allowed to enter the critical section at a time. You would have to make sure though, that the thread operations do not conflict for these 5 threads, or you application will fail.
The relase() method is called from inside a finally-block to make sure it is called even if an exception is thrown from the critical section.

------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------

FUTURE:-

package com.Thread.Futur;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ThreadFutur {

public static void main(String[] args) {

ExecutorService executreService = Executors.newCachedThreadPool();

Future<Integer> future = executreService.submit(new Callable<Integer>() {

@Override
public Integer call() throws Exception {
Random random = new Random();
int duration = random.nextInt(4000);

if(duration>2000){
throw new IOException("Waiting too long");
}
System.out.println("Starting .....");

Thread.sleep(duration);

System.out.println("Finish......");
return duration;
}
});

executreService.shutdown();
try {
System.out.println("Future :" + future.get());
} catch (InterruptedException | ExecutionException e) {
//e.printStackTrace();
System.out.println(e.getMessage());
}
}

}
OUTPUT :-
Starting .....
Finish......

Future :278



---------------------------------------------------------------------------------------------------------

package com.Thread.Futur;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutureTest {

public static void main(String[] args) {

ExecutorService executreService = Executors.newCachedThreadPool();

Future<String> future = executreService.submit(new Callable<String>() {

public String call() {
return "Hello";
}
});
//executreService.shutdown();
executreService.shutdownNow();
//if (future.isDone()) {

try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
//}
}
}
}
OUTPUT :-
Hello








----------------------------------------------------------------------------
Synchronized






-------------------------------------------------------------------------------
Synchronized Example


package com.Thread.AtomicVariable;


public class Synchronized_1 {


static int var = 0;


public synchronized static int increment(){
return var++;
}

public static void main(String[] args) {


Thread thread_1 = new Thread(new Runnable() {


@Override

public void run() {
for (int i = 0; i < 100000; i++) {
increment();
}

}


});


Thread thread_2 = new Thread(new Runnable() {


@Override

public void run() {
for (int i = 0; i < 100000; i++) {
increment();
}

}


});

thread_1.start();
thread_2.start();


try {
thread_1.join();
thread_2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.println(var);
}

}
OUTPUT:-

200000

No comments:

Post a Comment