From b88f253f24b49aa0fd1bcde592029d798ada9759 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Mon, 12 Feb 2018 13:58:54 +0800 Subject: [PATCH 1/7] fix the bug that message is not sent to the right queue --- .../main/java/com/geneea/celery/Celery.java | 83 ++++++++++++++++++- .../celery/brokers/rabbit/RabbitBroker.java | 29 ++++++- .../java/com/geneea/celery/spi/Broker.java | 13 +++ 3 files changed, 120 insertions(+), 5 deletions(-) diff --git a/celery-java/src/main/java/com/geneea/celery/Celery.java b/celery-java/src/main/java/com/geneea/celery/Celery.java index 10e2580..f2e0d95 100644 --- a/celery-java/src/main/java/com/geneea/celery/Celery.java +++ b/celery-java/src/main/java/com/geneea/celery/Celery.java @@ -55,7 +55,9 @@ public class Celery { private Celery(final String brokerUri, @Nullable final String queue, @Nullable final String backendUri, - @Nullable final ExecutorService executor) { + @Nullable final ExecutorService executor, + @Nullable final boolean isPriQueue, + @Nullable final int maxPriority) { this.queue = queue == null ? "celery" : queue; ExecutorService executorService = executor != null ? executor : Executors.newCachedThreadPool(); @@ -63,7 +65,13 @@ private Celery(final String brokerUri, broker = Suppliers.memoize(() -> { Broker b = CeleryBrokers.createBroker(brokerUri, executorService); try { - b.declareQueue(Celery.this.queue); + if(isPriQueue && maxPriority != 0){ + b.declarePriQueue(Celery.this.queue, maxPriority); + } + else { + b.declareQueue(Celery.this.queue); + } + } catch (IOException e) { throw new RuntimeException(e); } @@ -110,6 +118,22 @@ public AsyncResult submit(Class taskClass, String method, Object[] args) t return submit(taskClass.getName() + "#" + method, args); } + /** + * Submit a Java task for processing with priority. You'll probably not need to call this method. rather use @{@link CeleryTask} + * annotation. + * + * @param taskClass task implementing class + * @param method method in {@code taskClass} that does the work + * @param priority the priority of the task + * @param args positional arguments for the method (need to be JSON serializable) + * @return asynchronous result + * + * @throws IOException if the message couldn't be sent + */ + public AsyncResult submitWithPri(Class taskClass, String method, int priority, Object[] args) throws IOException { + return submitWithPri(taskClass.getName() + "#" + method, priority, args); + } + /** * Submit a task by name. A low level method for submitting arbitrary tasks that don't have their proxies * generated by @{@link CeleryTask} annotation. @@ -155,6 +179,61 @@ public AsyncResult submit(String name, Object[] args) throws IOException { headers.setReplyTo(clientId); } + message.send(queue); + + Future result; + if (rp.isPresent()) { + result = rp.get().getResult(taskId); + } else { + result = CompletableFuture.completedFuture(null); + } + return new AsyncResultImpl<>(result); + } + + /** + * Submit a task by name with priority. + * + * @param name task name as understood by the worker + * @param priority the priority of the message + * @param args positional arguments for the method (need to be JSON serializable) + * @return asynchronous result + * @throws IOException + */ + public AsyncResult submitWithPri(String name, int priority, Object[] args) throws IOException { + // Get the provider early to increase the chance to find out there is a connection problem before actually + // sending the message. + // + // This will help for example in the case when the connection can't be established at all. The connection may + // still drop after sending the message but there isn't much we can do about it. + Optional rp = resultsProvider.get(); + String taskId = UUID.randomUUID().toString(); + + ArrayNode payload = jsonMapper.createArrayNode(); + ArrayNode argsArr = payload.addArray(); + for (Object arg : args) { + argsArr.addPOJO(arg); + } + payload.addObject(); + payload.addObject() + .putNull("callbacks") + .putNull("chain") + .putNull("chord") + .putNull("errbacks"); + + Message message = broker.get().newMessageWithPriority(priority); + message.setBody(jsonMapper.writeValueAsBytes(payload)); + message.setContentEncoding("utf-8"); + message.setContentType("application/json"); + + Message.Headers headers = message.getHeaders(); + headers.setId(taskId); + headers.setTaskName(name); + headers.setArgsRepr("(" + Joiner.on(", ").join(args) + ")"); + headers.setOrigin(clientName); + if (rp.isPresent()) { + headers.setReplyTo(clientId); + } + System.out.println(queue); message.send(queue); diff --git a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java index 69f5613..f0b7284 100644 --- a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java +++ b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java @@ -22,18 +22,41 @@ public void declareQueue(String name) throws IOException { channel.queueDeclare(name, true, false, false, null); } + @Override + public void declarePriQueue(String name, int maxPriority) throws IOException { + Map props = new HashMap<>(); + props.put("x-max-priority", maxPriority); + channel.queueDeclare(name, true, false, false, props); + } + @Override public Message newMessage() { return new RabbitMessage(); } + @Override + public Message newMessageWithPriority(int priority) { + return new RabbitMessage(priority); + } + class RabbitMessage implements Message { private byte[] body; - private final AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder() - .deliveryMode(2) - .priority(0); + private final AMQP.BasicProperties.Builder props; + private final RabbitMessageHeaders headers = new RabbitMessageHeaders(); + public RabbitMessage(){ + props = new AMQP.BasicProperties.Builder() + .deliveryMode(2) + .priority(0); + } + + public RabbitMessage(int priority){ + props = new AMQP.BasicProperties.Builder() + .deliveryMode(2) + .priority(priority); + } + @Override public void setBody(byte[] body) { this.body = body; diff --git a/celery-java/src/main/java/com/geneea/celery/spi/Broker.java b/celery-java/src/main/java/com/geneea/celery/spi/Broker.java index cedd66f..bdd4da4 100644 --- a/celery-java/src/main/java/com/geneea/celery/spi/Broker.java +++ b/celery-java/src/main/java/com/geneea/celery/spi/Broker.java @@ -20,8 +20,21 @@ public interface Broker { */ void declareQueue(String name) throws IOException; + /** + * @param name queue name + * @param maxPriority the max priority of the queue with priority + * @throws IOException + */ + void declarePriQueue(String name, int maxPriority) throws IOException; + /** * @return message that can be constructed and later sent */ Message newMessage(); + + /** + * @param priority the priority of the message that is executed + * @return message that can be constructed and later sent + */ + Message newMessageWithPriority(int priority); } From 11978449ca253d31820dff559207f3a5420e8f99 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Wed, 7 Mar 2018 19:30:23 +0800 Subject: [PATCH 2/7] fix the bug that message is not sent to the right queue --- .../main/java/com/geneea/celery/Celery.java | 16 +-- .../celery/brokers/rabbit/RabbitBroker.java | 4 +- .../java/com/geneea/celery/spi/Broker.java | 4 +- celery-java/src/test/java/TestCases.java | 32 ++++++ examples/pom.xml | 5 + .../com/geneea/celery/examples/Sparkinfo.java | 107 ++++++++++++++++++ .../geneea/celery/examples/idata_object.java | 52 +++++++++ .../java/com/geneea/celery/examples/tt.java | 18 +++ 8 files changed, 226 insertions(+), 12 deletions(-) create mode 100644 celery-java/src/test/java/TestCases.java create mode 100644 examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java create mode 100644 examples/src/main/java/com/geneea/celery/examples/idata_object.java create mode 100644 examples/src/main/java/com/geneea/celery/examples/tt.java diff --git a/celery-java/src/main/java/com/geneea/celery/Celery.java b/celery-java/src/main/java/com/geneea/celery/Celery.java index ce9e0e5..2347e55 100644 --- a/celery-java/src/main/java/com/geneea/celery/Celery.java +++ b/celery-java/src/main/java/com/geneea/celery/Celery.java @@ -49,6 +49,7 @@ public class Celery { * * @param brokerUri connection to broker that will dispatch messages * @param backendUri connection to backend providing responses + * @param maxPriority the max priority of the queue if any, otherwise set to zero * @param queue routing tag (specifies into which Rabbit queue the messages will go) */ @Builder @@ -56,8 +57,7 @@ private Celery(final String brokerUri, @Nullable final String queue, @Nullable final String backendUri, @Nullable final ExecutorService executor, - @Nullable final boolean isPriQueue, - @Nullable final int maxPriority) { + Optional maxPriority) { this.queue = queue == null ? "celery" : queue; ExecutorService executorService = executor != null ? executor : Executors.newCachedThreadPool(); @@ -65,8 +65,8 @@ private Celery(final String brokerUri, broker = Suppliers.memoize(() -> { Broker b = CeleryBrokers.createBroker(brokerUri, executorService); try { - if(isPriQueue && maxPriority != 0){ - b.declarePriQueue(Celery.this.queue, maxPriority); + if( maxPriority.isPresent()){ + b.declareQueue(Celery.this.queue, maxPriority.get()); } else { b.declareQueue(Celery.this.queue); @@ -130,8 +130,8 @@ public AsyncResult submit(Class taskClass, String method, Object[] args) t * * @throws IOException if the message couldn't be sent */ - public AsyncResult submitWithPri(Class taskClass, String method, int priority, Object[] args) throws IOException { - return submitWithPri(taskClass.getName() + "#" + method, priority, args); + public AsyncResult submit(Class taskClass, String method, int priority, Object[] args) throws IOException { + return submit(taskClass.getName() + "#" + method, priority, args); } /** @@ -199,7 +199,7 @@ public AsyncResult submit(String name, Object[] args) throws IOException { * @return asynchronous result * @throws IOException */ - public AsyncResult submitWithPri(String name, int priority, Object[] args) throws IOException { + public AsyncResult submit(String name, int priority, Object[] args) throws IOException { // Get the provider early to increase the chance to find out there is a connection problem before actually // sending the message. // @@ -220,7 +220,7 @@ public AsyncResult submitWithPri(String name, int priority, Object[] args) th .putNull("chord") .putNull("errbacks"); - Message message = broker.get().newMessageWithPriority(priority); + Message message = broker.get().newMessage(priority); message.setBody(jsonMapper.writeValueAsBytes(payload)); message.setContentEncoding("utf-8"); message.setContentType("application/json"); diff --git a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java index f0b7284..b4eea39 100644 --- a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java +++ b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java @@ -23,7 +23,7 @@ public void declareQueue(String name) throws IOException { } @Override - public void declarePriQueue(String name, int maxPriority) throws IOException { + public void declareQueue(String name, int maxPriority) throws IOException { Map props = new HashMap<>(); props.put("x-max-priority", maxPriority); channel.queueDeclare(name, true, false, false, props); @@ -35,7 +35,7 @@ public Message newMessage() { } @Override - public Message newMessageWithPriority(int priority) { + public Message newMessage(int priority) { return new RabbitMessage(priority); } diff --git a/celery-java/src/main/java/com/geneea/celery/spi/Broker.java b/celery-java/src/main/java/com/geneea/celery/spi/Broker.java index bdd4da4..5483ceb 100644 --- a/celery-java/src/main/java/com/geneea/celery/spi/Broker.java +++ b/celery-java/src/main/java/com/geneea/celery/spi/Broker.java @@ -25,7 +25,7 @@ public interface Broker { * @param maxPriority the max priority of the queue with priority * @throws IOException */ - void declarePriQueue(String name, int maxPriority) throws IOException; + void declareQueue(String name, int maxPriority) throws IOException; /** * @return message that can be constructed and later sent @@ -36,5 +36,5 @@ public interface Broker { * @param priority the priority of the message that is executed * @return message that can be constructed and later sent */ - Message newMessageWithPriority(int priority); + Message newMessage(int priority); } diff --git a/celery-java/src/test/java/TestCases.java b/celery-java/src/test/java/TestCases.java new file mode 100644 index 0000000..e696862 --- /dev/null +++ b/celery-java/src/test/java/TestCases.java @@ -0,0 +1,32 @@ +import com.geneea.celery.Celery; +import org.junit.Test; + +import java.util.Optional; + +/** + * Created by stone on 18-3-7. + */ +public class TestCases { + + @Test + public void testPriority(){ + try{ + Optional p = Optional.of(10); + + Celery client = Celery.builder() + .queue("test_idata1") + .brokerUri("amqp://localhost/%2F") + .backendUri("rpc://localhost/%2F") + .maxPriority(p) + .build(); + + for(int i=0 ;i <100 ; i++){ + client.submit("app_tasks.add", i % 10, new Object[]{i % 10, 0}); + } + + }catch (Exception ex){ + + } + + } +} diff --git a/examples/pom.xml b/examples/pom.xml index 9eda9be..c731fb0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -61,6 +61,11 @@ 1.2.1 test + + org.springframework + spring-core + RELEASE + diff --git a/examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java b/examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java new file mode 100644 index 0000000..b11d8cf --- /dev/null +++ b/examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java @@ -0,0 +1,107 @@ +package com.geneea.celery.examples; + +import java.util.LinkedHashMap; +import java.util.LinkedList; + +/** + * Created by stone on 18-3-5. + */ +public class Sparkinfo { + private String numberExecutor; + + private String application; + + private String queue; + + private String deployMode; + + private String class_; + + private String executorCores; + + private String executorMemory; + + private LinkedList args; + + private LinkedHashMap kvargs; + + public Sparkinfo(String taskName) { + args = new LinkedList(); + kvargs = new LinkedHashMap(); + } + + public String getNumberExecutor() { + return numberExecutor; + } + + public void setNumberExecutor(String numberExecutor) { + this.numberExecutor = numberExecutor; + } + + + + public String getApplication(){ + return this.application; + } + + public void setApplication(String application) { + this.application = application; + } + + + public LinkedList getArgs() { + return args; + } + + public void setArgs(LinkedList args) { + this.args = args; + } + + public LinkedHashMap getKvargs() { + return kvargs; + } + + public void setKvargs(LinkedHashMap kvargs) { + this.kvargs = kvargs; + } + + public String getQueue() { + return queue; + } + + public void setQueue(String queue) { + this.queue = queue; + } + + public String getDeployMode() { + return deployMode; + } + + public void setDeployMode(String deployMode) { + this.deployMode = deployMode; + } + + public String getClass_() { + return class_; + } + + public void setClass_(String class_) { + this.class_ = class_; + } + + public String getExecutorCores() { + return executorCores; + } + + public void setExecutorCores(String executorCores) { + this.executorCores = executorCores; + } + + public String getExecutorMemory() { + return executorMemory; + } + + public void setExecutorMemory(String executorMemory) { + this.executorMemory = executorMemory; + } +} diff --git a/examples/src/main/java/com/geneea/celery/examples/idata_object.java b/examples/src/main/java/com/geneea/celery/examples/idata_object.java new file mode 100644 index 0000000..022c046 --- /dev/null +++ b/examples/src/main/java/com/geneea/celery/examples/idata_object.java @@ -0,0 +1,52 @@ +package com.geneea.celery.examples; + +/** + * Created by stone on 18-2-8. + */ +public class idata_object { + private String _cron = ""; + private String _principal = ""; + private int _num_executors = 1; + private String _dag_id = ""; + private String _start_date = ""; + + public String get_cron() { + return _cron; + } + + public void set_cron(String _cron) { + this._cron = _cron; + } + + public int get_num_executors() { + return _num_executors; + } + + public void set_num_executors(int _num_executors) { + this._num_executors = _num_executors; + } + + public String get_dag_id() { + return _dag_id; + } + + public void set_dag_id(String _dag_id) { + this._dag_id = _dag_id; + } + + public String get_start_date() { + return _start_date; + } + + public void set_start_date(String _start_date) { + this._start_date = _start_date; + } + + public String get_principal() { + return _principal; + } + + public void set_principal(String _principal) { + this._principal = _principal; + } +} diff --git a/examples/src/main/java/com/geneea/celery/examples/tt.java b/examples/src/main/java/com/geneea/celery/examples/tt.java new file mode 100644 index 0000000..2cd0be4 --- /dev/null +++ b/examples/src/main/java/com/geneea/celery/examples/tt.java @@ -0,0 +1,18 @@ +package com.geneea.celery.examples; + +/** + * Created by stone on 18-3-5. + */ +public class tt { + public void f(Integer i){ + System.out.println("~~~~~~~~~~~~~~~~~~~"); + } + + public static double calculateWeight(Integer i) { + double weight = 0.555; + // Calculate weight + System.out.println("~~~~~~~~~~~~~~~~~~~" + i); + return weight; + } + +} From 6f8db7446e412aa82d15b4d0498c62a6b9f0c5f8 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Wed, 7 Mar 2018 19:38:16 +0800 Subject: [PATCH 3/7] fix the bug that message is not sent to the right queue --- celery-java/pom.xml | 15 +++ .../geneea/celery/MockBrokerFactory.groovy | 14 +-- .../com/geneea/celery/examples/Sparkinfo.java | 107 ------------------ .../geneea/celery/examples/idata_object.java | 52 --------- .../java/com/geneea/celery/examples/tt.java | 18 --- 5 files changed, 16 insertions(+), 190 deletions(-) delete mode 100644 examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java delete mode 100644 examples/src/main/java/com/geneea/celery/examples/idata_object.java delete mode 100644 examples/src/main/java/com/geneea/celery/examples/tt.java diff --git a/celery-java/pom.xml b/celery-java/pom.xml index 4b186f8..f512f96 100644 --- a/celery-java/pom.xml +++ b/celery-java/pom.xml @@ -7,7 +7,9 @@ 4.0.0 + org.sedlakovi.celery celery-java + 1.3-SNAPSHOT jar Celery-Java @@ -137,4 +139,17 @@ + + + + spring-releases + https://repo.spring.io/libs-release + + + + + spring-releases + https://repo.spring.io/libs-release + + diff --git a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy index 5e08aa7..a8c98cf 100644 --- a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy +++ b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy @@ -25,18 +25,6 @@ public class MockBrokerFactory implements BrokerFactory { @Override Broker createBroker(URI uri, ExecutorService executor) throws IOException, TimeoutException { - return new Broker() { - @Override - void declareQueue(String name) throws IOException { - queuesDeclared.add(name) - } - - @Override - Message newMessage() { - def message = messages[messageNum % messages.size()] - messageNum++ - return message - } - } + return null; } } diff --git a/examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java b/examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java deleted file mode 100644 index b11d8cf..0000000 --- a/examples/src/main/java/com/geneea/celery/examples/Sparkinfo.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.geneea.celery.examples; - -import java.util.LinkedHashMap; -import java.util.LinkedList; - -/** - * Created by stone on 18-3-5. - */ -public class Sparkinfo { - private String numberExecutor; - - private String application; - - private String queue; - - private String deployMode; - - private String class_; - - private String executorCores; - - private String executorMemory; - - private LinkedList args; - - private LinkedHashMap kvargs; - - public Sparkinfo(String taskName) { - args = new LinkedList(); - kvargs = new LinkedHashMap(); - } - - public String getNumberExecutor() { - return numberExecutor; - } - - public void setNumberExecutor(String numberExecutor) { - this.numberExecutor = numberExecutor; - } - - - - public String getApplication(){ - return this.application; - } - - public void setApplication(String application) { - this.application = application; - } - - - public LinkedList getArgs() { - return args; - } - - public void setArgs(LinkedList args) { - this.args = args; - } - - public LinkedHashMap getKvargs() { - return kvargs; - } - - public void setKvargs(LinkedHashMap kvargs) { - this.kvargs = kvargs; - } - - public String getQueue() { - return queue; - } - - public void setQueue(String queue) { - this.queue = queue; - } - - public String getDeployMode() { - return deployMode; - } - - public void setDeployMode(String deployMode) { - this.deployMode = deployMode; - } - - public String getClass_() { - return class_; - } - - public void setClass_(String class_) { - this.class_ = class_; - } - - public String getExecutorCores() { - return executorCores; - } - - public void setExecutorCores(String executorCores) { - this.executorCores = executorCores; - } - - public String getExecutorMemory() { - return executorMemory; - } - - public void setExecutorMemory(String executorMemory) { - this.executorMemory = executorMemory; - } -} diff --git a/examples/src/main/java/com/geneea/celery/examples/idata_object.java b/examples/src/main/java/com/geneea/celery/examples/idata_object.java deleted file mode 100644 index 022c046..0000000 --- a/examples/src/main/java/com/geneea/celery/examples/idata_object.java +++ /dev/null @@ -1,52 +0,0 @@ -package com.geneea.celery.examples; - -/** - * Created by stone on 18-2-8. - */ -public class idata_object { - private String _cron = ""; - private String _principal = ""; - private int _num_executors = 1; - private String _dag_id = ""; - private String _start_date = ""; - - public String get_cron() { - return _cron; - } - - public void set_cron(String _cron) { - this._cron = _cron; - } - - public int get_num_executors() { - return _num_executors; - } - - public void set_num_executors(int _num_executors) { - this._num_executors = _num_executors; - } - - public String get_dag_id() { - return _dag_id; - } - - public void set_dag_id(String _dag_id) { - this._dag_id = _dag_id; - } - - public String get_start_date() { - return _start_date; - } - - public void set_start_date(String _start_date) { - this._start_date = _start_date; - } - - public String get_principal() { - return _principal; - } - - public void set_principal(String _principal) { - this._principal = _principal; - } -} diff --git a/examples/src/main/java/com/geneea/celery/examples/tt.java b/examples/src/main/java/com/geneea/celery/examples/tt.java deleted file mode 100644 index 2cd0be4..0000000 --- a/examples/src/main/java/com/geneea/celery/examples/tt.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.geneea.celery.examples; - -/** - * Created by stone on 18-3-5. - */ -public class tt { - public void f(Integer i){ - System.out.println("~~~~~~~~~~~~~~~~~~~"); - } - - public static double calculateWeight(Integer i) { - double weight = 0.555; - // Calculate weight - System.out.println("~~~~~~~~~~~~~~~~~~~" + i); - return weight; - } - -} From 1945ad62e49ade189151d678747bb73769d14bf6 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Wed, 7 Mar 2018 19:39:50 +0800 Subject: [PATCH 4/7] fix the bug that message is not sent to the right queue --- .../com/geneea/celery/MockBrokerFactory.groovy | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy index a8c98cf..5e08aa7 100644 --- a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy +++ b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy @@ -25,6 +25,18 @@ public class MockBrokerFactory implements BrokerFactory { @Override Broker createBroker(URI uri, ExecutorService executor) throws IOException, TimeoutException { - return null; + return new Broker() { + @Override + void declareQueue(String name) throws IOException { + queuesDeclared.add(name) + } + + @Override + Message newMessage() { + def message = messages[messageNum % messages.size()] + messageNum++ + return message + } + } } } From 700e140af0887ae24641fb04510eddcee4899d36 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Mon, 16 Apr 2018 14:02:17 +0800 Subject: [PATCH 5/7] Exposed task id --- .../main/java/com/geneea/celery/Celery.java | 24 +++++++++++++++---- .../geneea/celery/MockBrokerFactory.groovy | 19 +++++++++++++++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/celery-java/src/main/java/com/geneea/celery/Celery.java b/celery-java/src/main/java/com/geneea/celery/Celery.java index 2347e55..6e3f158 100644 --- a/celery-java/src/main/java/com/geneea/celery/Celery.java +++ b/celery-java/src/main/java/com/geneea/celery/Celery.java @@ -36,12 +36,12 @@ public class Celery { private final ObjectMapper jsonMapper = new ObjectMapper(); private final String queue; - // Memoized suppliers help us to deal with a connection that can't be established yet. It may fail several times + // Memorized suppliers help us to deal with a connection that can't be established yet. It may fail several times // with an exception but when it succeeds, it then always returns the same instance. // // This is tailored for the RabbitMQ connections - they fail to be created if the host can't be reached but they // can heal automatically. If other brokers/backends don't work this way, we might need to rework it. - private final Supplier> resultsProvider; + public final Supplier> resultsProvider; private final Supplier broker; /** @@ -187,7 +187,7 @@ public AsyncResult submit(String name, Object[] args) throws IOException { } else { result = CompletableFuture.completedFuture(null); } - return new AsyncResultImpl<>(result); + return new AsyncResultImpl<>(result, taskId); } /** @@ -242,23 +242,31 @@ public AsyncResult submit(String name, int priority, Object[] args) throws IO } else { result = CompletableFuture.completedFuture(null); } - return new AsyncResultImpl<>(result); + return new AsyncResultImpl<>(result, taskId); } public interface AsyncResult { boolean isDone(); T get() throws ExecutionException, InterruptedException; + + String getTaskId(); } private class AsyncResultImpl implements AsyncResult { private final Future future; + private String taskId; AsyncResultImpl(Future future) { this.future = future; } + AsyncResultImpl(Future future, String taskId) { + this.future = future; + this.taskId = taskId; + } + @Override public boolean isDone() { return future.isDone(); @@ -268,5 +276,13 @@ public boolean isDone() { public T get() throws ExecutionException, InterruptedException { return future.get(); } + + public String getTaskId(){ + if(taskId != null) { + return taskId; + }else { + return ""; + } + } } } diff --git a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy index 5e08aa7..b6b7ad6 100644 --- a/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy +++ b/celery-java/src/test/groovy/com/geneea/celery/MockBrokerFactory.groovy @@ -31,12 +31,31 @@ public class MockBrokerFactory implements BrokerFactory { queuesDeclared.add(name) } + /** + * @param name queue name + * @param maxPriority the max priority of the queue with priority + * @throws IOException + */ + @Override + void declareQueue(String name, int maxPriority) throws IOException { + + } + @Override Message newMessage() { def message = messages[messageNum % messages.size()] messageNum++ return message } + + /** + * @param priority the priority of the message that is executed + * @return message that can be constructed and later sent + */ + @Override + Message newMessage(int priority) { + return null + } } } } From 729ed17bf9476d23bbabff1c7d39976be7b24689 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Fri, 27 Jul 2018 09:06:05 +0800 Subject: [PATCH 6/7] expose broker and backend connection to rabbitmq --- .../main/java/com/geneea/celery/Celery.java | 30 ++++++++++++ .../backends/rabbit/RabbitResultConsumer.java | 2 +- .../celery/brokers/rabbit/RabbitBroker.java | 6 ++- celery-java/src/test/java/TestCases.java | 12 +++-- .../java/com/geneea/celery/examples/Main.java | 46 +++++++++++++------ 5 files changed, 78 insertions(+), 18 deletions(-) diff --git a/celery-java/src/main/java/com/geneea/celery/Celery.java b/celery-java/src/main/java/com/geneea/celery/Celery.java index 6e3f158..acbcb92 100644 --- a/celery-java/src/main/java/com/geneea/celery/Celery.java +++ b/celery-java/src/main/java/com/geneea/celery/Celery.java @@ -3,8 +3,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.geneea.celery.backends.rabbit.RabbitResultConsumer; +import com.geneea.celery.brokers.rabbit.RabbitBroker; import com.google.common.base.Joiner; import com.google.common.base.Suppliers; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; import lombok.Builder; import lombok.extern.java.Log; import com.geneea.celery.backends.CeleryBackends; @@ -103,6 +108,30 @@ private String getLocalHostName() { } } + + public Connection getBrokerConnection(){ + try{ + RabbitBroker b = (RabbitBroker)broker.get(); + Connection con = b.getChannel().getConnection(); + return con; + }catch (Exception ex){ + System.out.println(String.format("Can not get celery broker connection with ex:%s", ex.toString())); + return null; + } + } + + public Connection getBackendConnection(){ + try{ + RabbitResultConsumer df = (RabbitResultConsumer)resultsProvider.get().get() ; + + Connection conn = df.getChannel().getConnection(); + return conn; + }catch (Exception ex){ + System.out.println(String.format("Can not get celery backend connection with ex:%s", ex.toString())); + return null; + } + } + /** * Submit a Java task for processing. You'll probably not need to call this method. rather use @{@link CeleryTask} * annotation. @@ -166,6 +195,7 @@ public AsyncResult submit(String name, Object[] args) throws IOException { .putNull("errbacks"); Message message = broker.get().newMessage(); + message.setBody(jsonMapper.writeValueAsBytes(payload)); message.setContentEncoding("utf-8"); message.setContentType("application/json"); diff --git a/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java b/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java index 5853588..846dde9 100644 --- a/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java +++ b/celery-java/src/main/java/com/geneea/celery/backends/rabbit/RabbitResultConsumer.java @@ -18,7 +18,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -class RabbitResultConsumer extends DefaultConsumer implements RabbitBackend.ResultsProvider { +public class RabbitResultConsumer extends DefaultConsumer implements RabbitBackend.ResultsProvider { private final LoadingCache> tasks = CacheBuilder diff --git a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java index b4eea39..0064e31 100644 --- a/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java +++ b/celery-java/src/main/java/com/geneea/celery/brokers/rabbit/RabbitBroker.java @@ -10,7 +10,7 @@ import java.util.HashMap; import java.util.Map; -class RabbitBroker implements Broker { +public class RabbitBroker implements Broker { private final Channel channel; public RabbitBroker(Channel channel) { @@ -34,6 +34,10 @@ public Message newMessage() { return new RabbitMessage(); } + public Channel getChannel() { + return channel; + } + @Override public Message newMessage(int priority) { return new RabbitMessage(priority); diff --git a/celery-java/src/test/java/TestCases.java b/celery-java/src/test/java/TestCases.java index e696862..a98bb03 100644 --- a/celery-java/src/test/java/TestCases.java +++ b/celery-java/src/test/java/TestCases.java @@ -15,13 +15,19 @@ public void testPriority(){ Celery client = Celery.builder() .queue("test_idata1") - .brokerUri("amqp://localhost/%2F") - .backendUri("rpc://localhost/%2F") + .brokerUri("amqp://guest:123456@10.12.6.19:5672/10.12.6.19") + .backendUri("rpc://guest:123456@10.12.6.19:5672/10.12.6.19") .maxPriority(p) .build(); for(int i=0 ;i <100 ; i++){ - client.submit("app_tasks.add", i % 10, new Object[]{i % 10, 0}); + Celery.AsyncResult a = client.submit("app_tasks.add", i % 10, new Object[]{i % 10, 0}); + while(!a.isDone()){ + Thread.sleep(100); + } + + + System.out.println(a.getTaskId() + "~~~~ result is :"+ a.get()); } }catch (Exception ex){ diff --git a/examples/src/main/java/com/geneea/celery/examples/Main.java b/examples/src/main/java/com/geneea/celery/examples/Main.java index 3448f1a..2403653 100644 --- a/examples/src/main/java/com/geneea/celery/examples/Main.java +++ b/examples/src/main/java/com/geneea/celery/examples/Main.java @@ -6,6 +6,7 @@ import com.geneea.celery.Celery; import com.geneea.celery.CeleryWorker; +import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -13,32 +14,51 @@ public class Main { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); - factory.setHost("localhost"); + //factory.setHost("localhost"); ExecutorService executor = Executors.newCachedThreadPool(); - Connection connection = factory.newConnection(executor); + factory.setUri("amqp://guest:123456@10.12.6.19:5672/10.12.6.19"); + factory.setUsername("guest"); + factory.setPassword("123456"); + factory.setVirtualHost("10.12.6.19"); - CeleryWorker worker = CeleryWorker.create("celery", connection); + Connection connection = factory.newConnection(); + + connection.close(); + + //CeleryWorker worker = CeleryWorker.create("celery", connection); Celery client = Celery.builder() - .brokerUri("amqp://localhost/%2F") - .backendUri("rpc://localhost/%2F") + .queue("test_idata1") + .brokerUri("amqp://guest:123456@10.12.6.19:5672/10.12.6.19") + .backendUri("rpc://guest:123456@10.12.6.19:5672/10.12.6.19") + .maxPriority(Optional.of(10)) .build(); try { - for (int i = 0; i < 20; i++) { + /*for (int i = 0; i < 20; i++) { Stopwatch sw = Stopwatch.createStarted(); Integer result = TestTaskProxy.with(client).sum(1, i).get(); System.out.printf("CeleryTask #%d's result was: %s. The task took %s end-to-end.\n", i, result, sw); - } + }*/ + + //System.out.println("Testing result of void task: " + TestVoidTaskProxy.with(client).run(1, 2).get()); + - System.out.println("Testing result of void task: " + TestVoidTaskProxy.with(client).run(1, 2).get()); + String re="hh"; System.out.println("Testing task that should fail and throw exception:"); - client.submit(TestTask.class, "sum", new Object[]{"a", "b"}).get(); + for(int i=0;i <10000; i++){ + Celery.AsyncResult t=client.submit(TestTask.class, "sum", new Object[]{re, 'd'}); + System.out.println( String.format("current is %s with id :%s", i, t.getTaskId())); + + //boolean res= executor.isShutdown(); + } + } finally { - connection.close(); - worker.close(); - worker.join(); - executor.shutdown(); + + Connection cb= client.getBackendConnection(); + Connection cbr = client.getBrokerConnection(); + cb.close(); + cbr.close(); } // The worker threads hang waiting for the messages for some reason for quite a long time but eventually, From 3e8542a0071588ad4aaf78f1ee84e7b3f0406ed1 Mon Sep 17 00:00:00 2001 From: "jiang.qin" Date: Mon, 30 Jul 2018 18:45:52 +0800 Subject: [PATCH 7/7] change pom setting --- celery-java/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/celery-java/pom.xml b/celery-java/pom.xml index f512f96..7d3c2a1 100644 --- a/celery-java/pom.xml +++ b/celery-java/pom.xml @@ -139,6 +139,11 @@ + + 1.8 + 1.8 + +