paho-mqtt-cpp
MQTT C++ Client for POSIX and Windows
client.h
Go to the documentation of this file.
1 
8 /*******************************************************************************
9  * Copyright (c) 2013-2023 Frank Pagliughi <fpagliughi@mindspring.com>
10  *
11  * All rights reserved. This program and the accompanying materials
12  * are made available under the terms of the Eclipse Public License v2.0
13  * and Eclipse Distribution License v1.0 which accompany this distribution.
14  *
15  * The Eclipse Public License is available at
16  * http://www.eclipse.org/legal/epl-v20.html
17  * and the Eclipse Distribution License is available at
18  * http://www.eclipse.org/org/documents/edl-v10.php.
19  *
20  * Contributors:
21  * Frank Pagliughi - initial implementation and documentation
22  *******************************************************************************/
23 
24 #ifndef __mqtt_client_h
25 #define __mqtt_client_h
26 
27 #include "mqtt/async_client.h"
28 #include <future>
29 
30 namespace mqtt {
31 
33 
38 class client : private callback
39 {
41  PAHO_MQTTPP_EXPORT static const std::chrono::seconds DFLT_TIMEOUT;
43  PAHO_MQTTPP_EXPORT static const int DFLT_QOS; // =1;
44 
46  async_client cli_;
48  std::chrono::milliseconds timeout_;
50  callback* userCallback_;
51 
62  template <typename T>
63  std::shared_ptr<T> ptr(const T& val) {
64  return std::shared_ptr<T>(const_cast<T*>(&val), [](T*){});
65  }
66 
67  // User callbacks
68  // Most are launched in a separate thread, for convenience, except
69  // message_arrived, for performance.
70  void connected(const string& cause) override {
71  std::async(std::launch::async, &callback::connected, userCallback_, cause).wait();
72  }
73  void connection_lost(const string& cause) override {
74  std::async(std::launch::async,
75  &callback::connection_lost, userCallback_, cause).wait();
76  }
77  void message_arrived(const_message_ptr msg) override {
78  userCallback_->message_arrived(msg);
79  }
80  void delivery_complete(delivery_token_ptr tok) override {
81  std::async(std::launch::async, &callback::delivery_complete, userCallback_, tok).wait();
82  }
83 
85  client() =delete;
86  client(const async_client&) =delete;
87  client& operator=(const async_client&) =delete;
88 
89 public:
91  using ptr_t = std::shared_ptr<client>;
94 
97 
109  client(const string& serverURI, const string& clientId,
110  iclient_persistence* persistence=nullptr);
121  client(const string& serverURI, const string& clientId,
122  const string& persistDir);
137  client(const string& serverURI, const string& clientId,
138  int maxBufferedMessages,
139  iclient_persistence* persistence=nullptr);
152  client(const string& serverURI, const string& clientId,
153  int maxBufferedMessages, const string& persistDir);
167  client(const string& serverURI, const string& clientId,
168  const create_options& opts,
169  iclient_persistence* persistence=nullptr);
173  virtual ~client() {}
177  virtual connect_response connect();
187  virtual connect_response reconnect();
191  virtual void disconnect();
198  virtual void disconnect(int timeoutMS);
205  template <class Rep, class Period>
206  void disconnect(const std::chrono::duration<Rep, Period>& to) {
208  }
213  virtual string get_client_id() const { return cli_.get_client_id(); }
218  virtual string get_server_uri() const { return cli_.get_server_uri(); }
223  virtual std::chrono::milliseconds get_timeout() const { return timeout_; }
232  virtual topic get_topic(const string& top, int qos=message::DFLT_QOS,
233  bool retained=message::DFLT_RETAINED) {
234  return topic(cli_, top, qos, retained);
235  }
241  virtual bool is_connected() const { return cli_.is_connected(); }
249  }
250 
260  virtual void publish(string_ref top, const void* payload, size_t n,
261  int qos, bool retained) {
262  if (!cli_.publish(std::move(top), payload, n, qos, retained)->wait_for(timeout_))
263  throw timeout_error();
264  }
272  virtual void publish(string_ref top, const void* payload, size_t n) {
273  if (!cli_.publish(std::move(top), payload, n)->wait_for(timeout_))
274  throw timeout_error();
275  }
280  virtual void publish(const_message_ptr msg) {
281  if (!cli_.publish(msg)->wait_for(timeout_))
282  throw timeout_error();
283  }
291  virtual void publish(const message& msg) {
292  cli_.publish(ptr(msg))->wait();
293  }
299  virtual void set_callback(callback& cb);
304  virtual void set_timeout(int timeoutMS) {
305  timeout_ = std::chrono::milliseconds(timeoutMS);
306  }
311  template <class Rep, class Period>
312  void set_timeout(const std::chrono::duration<Rep, Period>& to) {
313  timeout_ = to_milliseconds(to);
314  }
322  virtual subscribe_response subscribe(const string& topicFilter,
323  const subscribe_options& opts=subscribe_options(),
324  const properties& props=properties());
333  virtual subscribe_response subscribe(const string& topicFilter, int qos,
334  const subscribe_options& opts=subscribe_options(),
335  const properties& props=properties());
344  virtual subscribe_response subscribe(const string_collection& topicFilters,
345  const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
346  const properties& props=properties());
355  virtual subscribe_response subscribe(const string_collection& topicFilters,
356  const qos_collection& qos,
357  const std::vector<subscribe_options>& opts=std::vector<subscribe_options>(),
358  const properties& props=properties());
365  virtual unsubscribe_response unsubscribe(const string& topicFilter,
366  const properties& props=properties());
373  virtual unsubscribe_response unsubscribe(const string_collection& topicFilters,
374  const properties& props=properties());
380  virtual void start_consuming() { cli_.start_consuming(); }
386  virtual void stop_consuming() { cli_.stop_consuming(); }
392  virtual const_message_ptr consume_message() { return cli_.consume_message(); }
400  return cli_.try_consume_message(msg);
401  }
409  template <typename Rep, class Period>
411  const std::chrono::duration<Rep, Period>& relTime) {
412  return cli_.try_consume_message_for(msg, relTime);
413  }
421  template <class Clock, class Duration>
423  const std::chrono::time_point<Clock,Duration>& absTime) {
424  return cli_.try_consume_message_until(msg, absTime);
425  }
426 };
427 
430 
432 // end namespace mqtt
433 }
434 
435 #endif // __mqtt_client_h
436 
message::const_ptr_t const_message_ptr
Definition: message.h:368
std::shared_ptr< callback > ptr_t
Definition: callback.h:45
Definition: async_client.h:107
Definition: iclient_persistence.h:72
string get_server_uri() const override
Definition: async_client.h:498
client::ptr_t client_ptr
Definition: client.h:429
virtual string get_client_id() const
Definition: client.h:213
virtual const_message_ptr consume_message()
Definition: client.h:392
virtual std::chrono::milliseconds get_timeout() const
Definition: client.h:223
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition: client.h:410
virtual void connection_lost(const string &)
Definition: callback.h:64
virtual ~client()
Definition: client.h:173
Definition: exception.h:193
Definition: create_options.h:37
bool is_connected() const override
Definition: async_client.h:513
virtual void message_arrived(const_message_ptr)
Definition: callback.h:68
virtual subscribe_response subscribe(const string &topicFilter, const subscribe_options &opts=subscribe_options(), const properties &props=properties())
const_message_ptr consume_message() override
Definition: async_client.h:744
virtual bool is_connected() const
Definition: client.h:241
virtual connect_response connect()
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: client.h:422
Definition: connect_options.h:48
Definition: server_response.h:74
virtual void connected(const string &)
Definition: callback.h:60
Definition: topic.h:43
virtual void set_callback(callback &cb)
virtual void publish(string_ref top, const void *payload, size_t n, int qos, bool retained)
Definition: client.h:260
virtual void delivery_complete(delivery_token_ptr)
Definition: callback.h:73
virtual void publish(const message &msg)
Definition: client.h:291
long to_milliseconds_count(const std::chrono::duration< Rep, Period > &dur)
Definition: types.h:149
Definition: string_collection.h:42
void stop_consuming() override
virtual void publish(string_ref top, const void *payload, size_t n)
Definition: client.h:272
delivery_token_ptr publish(string_ref topic, const void *payload, size_t n, int qos, bool retained) override
void set_update_connection_handler(update_connection_handler cb)
Definition: properties.h:255
virtual void start_consuming()
Definition: client.h:380
void disconnect(const std::chrono::duration< Rep, Period > &to)
Definition: client.h:206
async_client::update_connection_handler update_connection_handler
Definition: client.h:96
virtual void publish(const_message_ptr msg)
Definition: client.h:280
virtual void disconnect()
bool try_consume_message_for(const_message_ptr *msg, const std::chrono::duration< Rep, Period > &relTime)
Definition: async_client.h:762
async_client::qos_collection qos_collection
Definition: client.h:93
Definition: message.h:55
void set_timeout(const std::chrono::duration< Rep, Period > &to)
Definition: client.h:312
std::shared_ptr< client > ptr_t
Definition: client.h:91
delivery_token::ptr_t delivery_token_ptr
Definition: delivery_token.h:125
Definition: client.h:38
static PAHO_MQTTPP_EXPORT const int DFLT_QOS
Definition: message.h:59
Definition: subscribe_options.h:41
Definition: server_response.h:176
#define PAHO_MQTTPP_EXPORT
Definition: export.h:40
virtual void set_timeout(int timeoutMS)
Definition: client.h:304
Definition: callback.h:41
virtual connect_response reconnect()
void start_consuming() override
bool try_consume_message_until(const_message_ptr *msg, const std::chrono::time_point< Clock, Duration > &absTime)
Definition: async_client.h:786
std::function< bool(connect_data &)> update_connection_handler
Definition: async_client.h:122
string get_client_id() const override
Definition: async_client.h:493
std::chrono::milliseconds to_milliseconds(const std::chrono::duration< Rep, Period > &dur)
Definition: types.h:138
virtual unsubscribe_response unsubscribe(const string &topicFilter, const properties &props=properties())
virtual void stop_consuming()
Definition: client.h:386
virtual bool try_consume_message(const_message_ptr *msg)
Definition: client.h:399
void set_update_connection_handler(update_connection_handler cb)
Definition: client.h:247
virtual topic get_topic(const string &top, int qos=message::DFLT_QOS, bool retained=message::DFLT_RETAINED)
Definition: client.h:232
Definition: async_client.h:49
Definition: server_response.h:122
static PAHO_MQTTPP_EXPORT const bool DFLT_RETAINED
Definition: message.h:61
virtual string get_server_uri() const
Definition: client.h:218
std::vector< int > qos_collection
Definition: iasync_client.h:65
bool try_consume_message(const_message_ptr *msg) override
Definition: async_client.h:751