java - Asynchronous handling of messages - which concurrency primitives to use? -
i building simple implementation of postgresql wire protocol, , want client send messages service, , process them asynchronously in background. having little trouble understanding when use executorservice
versus using raw threads. using 2 blockingqueue
s - 1 put messages on , have them sent server, , 1 receive messages into, code far below.
what want know is, make sense use executorservice here or should create , start receivethread
, sendthread
standalone threads (i.e. new thread(new receivethread()).start();
)?
import java.io.*; import java.net.socket; import java.nio.bytebuffer; import java.util.arraylist; import java.util.list; import java.util.concurrent.*; public class connection { private messagebuilder builder; private messagereader reader; private blockingqueue<byte[]> sendqueue; private blockingqueue<byte[]> receivequeue; private executorservice exec = executors.newfixedthreadpool(2); private socket socket; public connection(string hostname, int port, string username) throws ioexception { this(new messagebuilder(), new messagereader(), hostname, port, username); } public connection(messagebuilder builder, messagereader reader, string hostname, int port, string username) throws ioexception { this.builder = builder; this.reader = reader; this.sendqueue = new linkedblockingdeque<byte[]>(); this.receivequeue = new linkedblockingdeque<byte[]>(); socket = new socket(hostname, port); list<param> params = new arraylist<param>(); params.add(new param("user", username)); sendqueue.add(builder.buildstartupmessage(3, 0, params)); exec.submit(new sendthread(sendqueue, new dataoutputstream(socket.getoutputstream()))); exec.submit(new receivethread(receivequeue, new datainputstream(socket.getinputstream()), new messagereader())); } public void sendmessage(byte[] bytes) { sendqueue.add(bytes); } public void closeconnection() throws ioexception { socket.close(); } } class receivethread implements callable<boolean> { private blockingqueue<byte[]> receivequeue; private datainputstream dis; private messagereader reader; public receivethread(blockingqueue<byte[]> queue, datainputstream dis, messagereader reader) { this.receivequeue = queue; this.dis = dis; this.reader = reader; } public boolean call() throws exception { byte msgbyte = dis.readbyte(); system.out.println("response type is: " + (char) msgbyte); int length = dis.readint(); byte[] message = new byte[length+1]; message[0] = msgbyte; byte[] bytes = bytebuffer.allocate(4).putint(length).array(); system.arraycopy(bytes, 0, message, 1, bytes.length); int readlength = dis.read(message, 5, length - 5 ); system.out.println("readlength : " + readlength + " should length: " + (length-5)); receivequeue.put(message); return true; } } class sendthread implements callable<boolean> { private blockingqueue<byte[]> sendqueue; private dataoutputstream dos; public sendthread(blockingqueue<byte[]> queue, dataoutputstream dos) { this.sendqueue = queue; this.dos = dos; } public boolean call() throws exception { byte[] message = sendqueue.take(); dos.write(message); return true; } }
the main benefit of using executorservice
provides thread pooling. if have many short-lived threads, overhead caused creation of these threads may significant. executorservice can take care of working thread pool, , assigning threads tasks. takes care of scheduling. say, have 15000 jobs do, each takes 5ms complete. creating 15000 threads not make sense. executorservice
schedule jobs on, say, 4 threads. can use different types of executorservice
s depending on needs.
i don't understand in code, why not using loops in send , receive threads. code receive/send 1 message don't think intention. rid of executorservice
, have 2 long-lived threads, 1 sending, 1 receiving (as have). inside these, however, should write while loop periodically checks content of queues.
Comments
Post a Comment