/******************************************************************************* * Copyright (c) 2012, 2020 IBM Corp., and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial contribution * Guilherme Maciel Ferreira - add keep alive option * Ian Craggs - add full capability *******************************************************************************/ #include "MQTTAsync.h" #include "pubsub_opts.h" #include #include #include #include #if defined(_WIN32) #include #define sleep Sleep #else #include #include #include #endif #if defined(_WRS_KERNEL) #include #endif volatile int toStop = 0; struct pubsub_opts opts = { 1, 0, 0, 0, "\n", 100, /* debug/app options */ NULL, NULL, 1, 0, 0, /* message options */ MQTTVERSION_DEFAULT, NULL, "paho-c-pub", 0, 0, NULL, NULL, "localhost", "1883", NULL, 10, /* MQTT options */ NULL, NULL, 0, 0, /* will options */ 0, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, /* TLS options */ 0, {NULL, NULL}, /* MQTT V5 options */ NULL, NULL, /* HTTP and HTTPS proxies */ }; MQTTAsync_responseOptions pub_opts = MQTTAsync_responseOptions_initializer; MQTTProperty property; MQTTProperties props = MQTTProperties_initializer; void mysleep(int ms) { #if defined(_WIN32) Sleep(ms); #else usleep(ms * 1000); #endif } void cfinish(int sig) { signal(SIGINT, NULL); toStop = 1; } int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m) { /* not expecting any messages */ return 1; } static int disconnected = 0; void onDisconnect5(void* context, MQTTAsync_successData5* response) { disconnected = 1; } void onDisconnect(void* context, MQTTAsync_successData* response) { disconnected = 1; } static int connected = 0; void myconnect(MQTTAsync client); int mypublish(MQTTAsync client, int datalen, char* data); void onConnectFailure5(void* context, MQTTAsync_failureData5* response) { fprintf(stderr, "Connect failed, rc %s reason code %s\n", MQTTAsync_strerror(response->code), MQTTReasonCode_toString(response->reasonCode)); connected = -1; MQTTAsync client = (MQTTAsync)context; } void onConnectFailure(void* context, MQTTAsync_failureData* response) { fprintf(stderr, "Connect failed, rc %s\n", response ? MQTTAsync_strerror(response->code) : "none"); connected = -1; MQTTAsync client = (MQTTAsync)context; } void onConnect5(void* context, MQTTAsync_successData5* response) { MQTTAsync client = (MQTTAsync)context; int rc = 0; if (opts.verbose) printf("Connected\n"); if (opts.null_message == 1) rc = mypublish(client, 0, ""); else if (opts.message) rc = mypublish(client, (int)strlen(opts.message), opts.message); else if (opts.filename) { int data_len = 0; char* buffer = readfile(&data_len, &opts); if (buffer == NULL) toStop = 1; else { rc = mypublish(client, data_len, buffer); free(buffer); } } connected = 1; } void onConnect(void* context, MQTTAsync_successData* response) { MQTTAsync client = (MQTTAsync)context; int rc = 0; if (opts.verbose) printf("Connected\n"); if (opts.null_message == 1) rc = mypublish(client, 0, ""); else if (opts.message) rc = mypublish(client, (int)strlen(opts.message), opts.message); else if (opts.filename) { int data_len = 0; char* buffer = readfile(&data_len, &opts); if (buffer == NULL) toStop = 1; else { rc = mypublish(client, data_len, buffer); free(buffer); } } connected = 1; } static int published = 0; void onPublishFailure5(void* context, MQTTAsync_failureData5* response) { if (opts.verbose) fprintf(stderr, "Publish failed, rc %s reason code %s\n", MQTTAsync_strerror(response->code), MQTTReasonCode_toString(response->reasonCode)); published = -1; } void onPublishFailure(void* context, MQTTAsync_failureData* response) { if (opts.verbose) fprintf(stderr, "Publish failed, rc %s\n", MQTTAsync_strerror(response->code)); published = -1; } void onPublish5(void* context, MQTTAsync_successData5* response) { if (opts.verbose) printf("Publish succeeded, reason code %s\n", MQTTReasonCode_toString(response->reasonCode)); if (opts.null_message || opts.message || opts.filename) toStop = 1; published = 1; } void onPublish(void* context, MQTTAsync_successData* response) { if (opts.verbose) printf("Publish succeeded\n"); if (opts.null_message || opts.message || opts.filename) toStop = 1; published = 1; } static int onSSLError(const char *str, size_t len, void *context) { MQTTAsync client = (MQTTAsync)context; return fprintf(stderr, "SSL error: %s\n", str); } static unsigned int onPSKAuth(const char* hint, char* identity, unsigned int max_identity_len, unsigned char* psk, unsigned int max_psk_len, void* context) { int psk_len; int k, n; int rc = 0; struct pubsub_opts* opts = context; /* printf("Trying TLS-PSK auth with hint: %s\n", hint);*/ if (opts->psk == NULL || opts->psk_identity == NULL) { /* printf("No PSK entered\n"); */ goto exit; } /* psk should be array of bytes. This is a quick and dirty way to * convert hex to bytes without input validation */ psk_len = (int)strlen(opts->psk) / 2; if (psk_len > max_psk_len) { fprintf(stderr, "PSK too long\n"); goto exit; } for (k=0, n=0; k < psk_len; k++, n += 2) { sscanf(&opts->psk[n], "%2hhx", &psk[k]); } /* identity should be NULL terminated string */ strncpy(identity, opts->psk_identity, max_identity_len); if (identity[max_identity_len - 1] != '\0') { fprintf(stderr, "Identity too long\n"); goto exit; } /* Function should return length of psk on success. */ rc = psk_len; exit: return rc; } void myconnect(MQTTAsync client) { MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_SSLOptions ssl_opts = MQTTAsync_SSLOptions_initializer; MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer; int rc = 0; if (opts.verbose) printf("Connecting\n"); if (opts.MQTTVersion == MQTTVERSION_5) { MQTTAsync_connectOptions conn_opts5 = MQTTAsync_connectOptions_initializer5; conn_opts = conn_opts5; conn_opts.onSuccess5 = onConnect5; conn_opts.onFailure5 = onConnectFailure5; conn_opts.cleanstart = 1; } else { conn_opts.onSuccess = onConnect; conn_opts.onFailure = onConnectFailure; conn_opts.cleansession = 1; } conn_opts.keepAliveInterval = opts.keepalive; conn_opts.username = opts.username; conn_opts.password = opts.password; conn_opts.MQTTVersion = opts.MQTTVersion; conn_opts.context = client; conn_opts.automaticReconnect = 1; conn_opts.httpProxy = opts.http_proxy; conn_opts.httpsProxy = opts.https_proxy; if (opts.will_topic) /* will options */ { will_opts.message = opts.will_payload; will_opts.topicName = opts.will_topic; will_opts.qos = opts.will_qos; will_opts.retained = opts.will_retain; conn_opts.will = &will_opts; } if (opts.connection && (strncmp(opts.connection, "ssl://", 6) == 0 || strncmp(opts.connection, "wss://", 6) == 0)) { if (opts.insecure) ssl_opts.verify = 0; else ssl_opts.verify = 1; ssl_opts.CApath = opts.capath; ssl_opts.keyStore = opts.cert; ssl_opts.trustStore = opts.cafile; ssl_opts.privateKey = opts.key; ssl_opts.privateKeyPassword = opts.keypass; ssl_opts.enabledCipherSuites = opts.ciphers; ssl_opts.ssl_error_cb = onSSLError; ssl_opts.ssl_error_context = client; ssl_opts.ssl_psk_cb = onPSKAuth; ssl_opts.ssl_psk_context = &opts; conn_opts.ssl = &ssl_opts; } connected = 0; if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { fprintf(stderr, "Failed to start connect, return code %s\n", MQTTAsync_strerror(rc)); exit(EXIT_FAILURE); } } int mypublish(MQTTAsync client, int datalen, char* data) { int rc; if (opts.verbose) printf("Publishing data of length %d\n", datalen); rc = MQTTAsync_send(client, opts.topic, datalen, data, opts.qos, opts.retained, &pub_opts); if (opts.verbose && rc != MQTTASYNC_SUCCESS && !opts.quiet) fprintf(stderr, "Error from MQTTAsync_send: %s\n", MQTTAsync_strerror(rc)); return rc; } void trace_callback(enum MQTTASYNC_TRACE_LEVELS level, char* message) { fprintf(stderr, "Trace : %d, %s\n", level, message); } int main(int argc, char** argv) { MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer; MQTTAsync_createOptions create_opts = MQTTAsync_createOptions_initializer; MQTTAsync client; char* buffer = NULL; char* url = NULL; int url_allocated = 0; int rc = 0; const char* version = NULL; const char* program_name = "paho_c_pub"; MQTTAsync_nameValue* infos = MQTTAsync_getVersionInfo(); #if !defined(_WIN32) struct sigaction sa; #endif if (argc < 2) usage(&opts, (pubsub_opts_nameValue*)infos, program_name); if (getopts(argc, argv, &opts) != 0) usage(&opts, (pubsub_opts_nameValue*)infos, program_name); if (opts.connection) url = opts.connection; else { url = malloc(100); url_allocated = 1; sprintf(url, "%s:%s", opts.host, opts.port); } if (opts.verbose) printf("URL is %s\n", url); if (opts.tracelevel > 0) { MQTTAsync_setTraceCallback(trace_callback); MQTTAsync_setTraceLevel(opts.tracelevel); } create_opts.sendWhileDisconnected = 1; if (opts.MQTTVersion >= MQTTVERSION_5) create_opts.MQTTVersion = MQTTVERSION_5; rc = MQTTAsync_createWithOptions(&client, url, opts.clientid, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts); if (rc != MQTTASYNC_SUCCESS) { if (!opts.quiet) fprintf(stderr, "Failed to create client, return code: %s\n", MQTTAsync_strerror(rc)); exit(EXIT_FAILURE); } #if defined(_WIN32) signal(SIGINT, cfinish); signal(SIGTERM, cfinish); #else memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = cfinish; sa.sa_flags = 0; sigaction(SIGINT, &sa, NULL); sigaction(SIGTERM, &sa, NULL); #endif rc = MQTTAsync_setCallbacks(client, client, NULL, messageArrived, NULL); if (rc != MQTTASYNC_SUCCESS) { if (!opts.quiet) fprintf(stderr, "Failed to set callbacks, return code: %s\n", MQTTAsync_strerror(rc)); exit(EXIT_FAILURE); } if (opts.MQTTVersion >= MQTTVERSION_5) { pub_opts.onSuccess5 = onPublish5; pub_opts.onFailure5 = onPublishFailure5; if (opts.message_expiry > 0) { property.identifier = MQTTPROPERTY_CODE_MESSAGE_EXPIRY_INTERVAL; property.value.integer4 = opts.message_expiry; MQTTProperties_add(&props, &property); } if (opts.user_property.name) { property.identifier = MQTTPROPERTY_CODE_USER_PROPERTY; property.value.data.data = opts.user_property.name; property.value.data.len = (int)strlen(opts.user_property.name); property.value.value.data = opts.user_property.value; property.value.value.len = (int)strlen(opts.user_property.value); MQTTProperties_add(&props, &property); } pub_opts.properties = props; } else { pub_opts.onSuccess = onPublish; pub_opts.onFailure = onPublishFailure; } myconnect(client); while (!toStop) { int data_len = 0; int delim_len = 0; if (opts.stdin_lines) { buffer = malloc(opts.maxdatalen); delim_len = (int)strlen(opts.delimiter); do { buffer[data_len++] = getchar(); if (data_len > delim_len) { if (strncmp(opts.delimiter, &buffer[data_len - delim_len], delim_len) == 0) break; } } while (data_len < opts.maxdatalen); rc = mypublish(client, data_len, buffer); } else mysleep(100); } if (opts.message == 0 && opts.null_message == 0 && opts.filename == 0) free(buffer); if (opts.MQTTVersion >= MQTTVERSION_5) disc_opts.onSuccess5 = onDisconnect5; else disc_opts.onSuccess = onDisconnect; if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS) { if (!opts.quiet) fprintf(stderr, "Failed to start disconnect, return code: %s\n", MQTTAsync_strerror(rc)); exit(EXIT_FAILURE); } while (!disconnected) mysleep(100); MQTTAsync_destroy(&client); if (url_allocated) free(url); return EXIT_SUCCESS; }