更新硬件SDK

This commit is contained in:
kerwincui
2023-03-04 03:44:56 +08:00
parent dcdf6e1b7c
commit e39d3d2f03
1900 changed files with 663153 additions and 0 deletions

View File

@@ -0,0 +1,21 @@
#*******************************************************************************
# Copyright (c) 2017 IBM Corp.
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Ian Craggs - initial version
#*******************************************************************************/
project("paho-mqttclient" C)
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(samples)
ADD_SUBDIRECTORY(test)

View File

@@ -0,0 +1,695 @@
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
* Ian Craggs - convert to FreeRTOS
*******************************************************************************/
#include "MQTTFreeRTOS.h"
#include "debug_trace.h"
#include DEBUG_LOG_HEADER_FILE
int FreeRTOSConnectTimeout(INT32 connectFd, UINT32 timeout)
{
fd_set writeSet;
fd_set errorSet;
FD_ZERO(&writeSet);
FD_ZERO(&errorSet);
FD_SET(connectFd,&writeSet);
FD_SET(connectFd,&errorSet);
struct timeval tv;
tv.tv_sec = timeout;
tv.tv_usec = 0;
if(select(connectFd+1, NULL, &writeSet, &errorSet, &tv)<=0)
{
int mErr = sock_get_errno(connectFd);
//////ECOMM_TRACE(UNILOG_MQTT, mqttConnectTimeout_1, P_WARNING, 1, "connect select<0 get errno=%d", mErr);
if(mErr)
{
return MQTT_CONN;
}
else
{
return MQTT_TIMEOUT;
}
}
else
{
if(FD_ISSET(connectFd, &errorSet))
{
int mErr = sock_get_errno(connectFd);
//////ECOMM_TRACE(UNILOG_MQTT, mqttConnectTimeout_2, P_WARNING, 1, "select error fd set get errno=%d", mErr);
if(mErr)
{
return MQTT_CONN;
}
}
else if(FD_ISSET(connectFd, &writeSet))
{
//////ECOMM_TRACE(UNILOG_MQTT, mqttConnectTimeout_3, P_WARNING, 0, "errno=115(EINPROGRESS) connect success in time(10s)");
}
}
return MQTT_CONN_OK;
}
int ThreadStart(Thread* thread, void (*fn)(void*), void* arg)
{
int rc = 0;
uint16_t usTaskStackSize = (configMINIMAL_STACK_SIZE * 5);
UBaseType_t uxTaskPriority = uxTaskPriorityGet(NULL); /* set the priority as the same as the calling task*/
rc = xTaskCreate(fn, /* The function that implements the task. */
"MQTTTask", /* Just a text name for the task to aid debugging. */
usTaskStackSize, /* The stack size is defined in FreeRTOSIPConfig.h. */
arg, /* The task parameter, not used in this case. */
uxTaskPriority, /* The priority assigned to the task is defined in FreeRTOSConfig.h. */
&thread->task); /* The task handle is not used. */
return rc;
}
void MutexInit(Mutex* mutex)
{
mutex->sem = xSemaphoreCreateMutex();
}
int MutexLock(Mutex* mutex)
{
return xSemaphoreTake(mutex->sem, portMAX_DELAY);
}
int MutexUnlock(Mutex* mutex)
{
return xSemaphoreGive(mutex->sem);
}
void TimerCountdownMS(Timer* timer, unsigned int timeout_ms)
{
timer->xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
vTaskSetTimeOutState(&timer->xTimeOut); /* Record the time at which this function was entered. */
}
void TimerCountdown(Timer* timer, unsigned int timeout)
{
TimerCountdownMS(timer, timeout * 1000);
}
int TimerLeftMS(Timer* timer)
{
xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait); /* updates xTicksToWait to the number left */
return (int)(((int)timer->xTicksToWait < 0) ? 0 : (timer->xTicksToWait * portTICK_PERIOD_MS));
}
char TimerIsExpired(Timer* timer)
{
return xTaskCheckForTimeOut(&timer->xTimeOut, &timer->xTicksToWait) == pdTRUE;
}
void TimerInit(Timer* timer)
{
timer->xTicksToWait = 0;
memset(&timer->xTimeOut, '\0', sizeof(timer->xTimeOut));
}
int socket_connect(Network* n, char* addr){
int retVal = -1;
INT32 errCode;
INT32 flags = 0;
struct sockaddr_in address;
int rc = -1;
sa_family_t family = AF_INET;
struct addrinfo *result = NULL;
struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
if((NetworkSetConnTimeout(n, 5000, 5000)) != 0){
return 1;
}
if ((rc = FreeRTOS_gethostbyname(addr, NULL, &hints, &result)) == 0)
{
struct addrinfo* res = result;
/* prefer ip4 addresses */
while (res)
{
if (res->ai_family == AF_INET)
{
result = res;
break;
}
res = res->ai_next;
}
if (result->ai_family == AF_INET)
{
address.sin_port = htons(n->port);
address.sin_family = family = AF_INET;
address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
}
else
rc = -1;
freeaddrinfo(result);
}
if(rc == 0)
{
flags = fcntl(n->my_socket, F_GETFL, 0);
if ((retVal = FreeRTOS_connect(n->my_socket, (struct sockaddr *)&address, sizeof(address))) < 0)
{
errCode = sock_get_errno(n->my_socket);
if(errCode == EINPROGRESS)
{
// ECOMM_TRACE(UNILOG_MQTT, mqttConnectSocket_2, P_ERROR, 0, "mqttConnectSocket connect is ongoing");
retVal = FreeRTOSConnectTimeout(n->my_socket, 30); //for bearer suspend timeout is 25s
if(retVal == 0)
{
// ECOMM_TRACE(UNILOG_MQTT, mqttConnectSocket_3, P_INFO, 0, "mqttConnectSocket connect success");
}
else
{
// ECOMM_TRACE(UNILOG_MQTT, mqttConnectSocket_4, P_ERROR, 1, "mqttConnectSocket connect fail,error code %d", errCode);
if(socket_error_is_fatal(errCode))
{
retVal = 1;
}
}
}
else
{
// DBG("sock_get_errno errCode:%d\n",errCode);
// ECOMM_TRACE(UNILOG_MQTT, mqttConnectSocket_5, P_ERROR, 1, "mqttConnectSocket connect fail %d",errCode);
retVal = 1;
}
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttConnectSocket_1, P_ERROR, 0, "mqttConnectSocket connect success");
}
fcntl(n->my_socket, F_SETFL, flags&~O_NONBLOCK);
}
return retVal;
}
int socket_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
#if LWIP_SO_SNDRCVTIMEO_NONSTANDARD
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
int netSecToWait = timeout_ms / 1000; /* seconds */
#else
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
struct timeval netSecToWait;
netSecToWait.tv_sec = timeout_ms / 1000; /* seconds */
netSecToWait.tv_usec = 0;
if(netSecToWait.tv_sec == 0)
{
netSecToWait.tv_sec = 1;
}
#endif
TimeOut_t xTimeOut;
int recvLen = 0;
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do
{
int rc = 0;
FreeRTOS_setsockopt(n->my_socket, SOL_SOCKET, FREERTOS_SO_RCVTIMEO, &netSecToWait, sizeof(netSecToWait));
rc = FreeRTOS_recv(n->my_socket, buffer + recvLen, len - recvLen, 0);
if (rc > 0)
recvLen += rc;
else if (rc < 0)
{
recvLen = rc;
break;
}
else{
int mErr = sock_get_errno(n->my_socket);
if(socket_error_is_fatal(mErr)==1){
return -2;
}//maybe closed or reset by peer
}
} while (recvLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
return recvLen;
}
int socket_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
#if LWIP_SO_SNDRCVTIMEO_NONSTANDARD
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
int netSecToWait = timeout_ms / 1000; /* seconds */
#else
TickType_t xTicksToWait = timeout_ms / portTICK_PERIOD_MS; /* convert milliseconds to ticks */
struct timeval netSecToWait;
netSecToWait.tv_sec = timeout_ms / 1000; /* seconds */
netSecToWait.tv_usec = 0;
if(netSecToWait.tv_sec == 0)
{
netSecToWait.tv_sec = 1;
}
#endif
TimeOut_t xTimeOut;
int sentLen = 0;
vTaskSetTimeOutState(&xTimeOut); /* Record the time at which this function was entered. */
do
{
int rc = 0;
FreeRTOS_setsockopt(n->my_socket, SOL_SOCKET, FRERRTOS_SO_SNDTIMEO, &netSecToWait, sizeof(netSecToWait));
rc = FreeRTOS_send(n->my_socket, buffer + sentLen, len - sentLen, 0);
if (rc > 0)
sentLen += rc;
else if (rc < 0)
{
sentLen = rc;
break;
}
} while (sentLen < len && xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait) == pdFALSE);
return sentLen;
}
int socket_disconnect(Network* n)
{
int ret;
//ECPLAT_PRINTF(UNILOG_CTWING, FreeRTOS_disconnect_0, P_INFO, "FreeRTOS_disconnect my_socket %d", n->my_socket);
ret = FreeRTOS_closesocket(n->my_socket);
n->my_socket = -1;
return ret;
}
int mqttSslRandom(void *p_rng, unsigned char *output, size_t output_len){
uint32_t rnglen = output_len;
uint8_t rngoffset = 0;
while (rnglen > 0)
{
*(output + rngoffset) = (unsigned char)rand();
rngoffset++;
rnglen--;
}
return 0;
}
static void mqttSslDebug(void *ctx, int level, const char *file, int line, const char *str){
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTls_00, P_INFO, "%s(%d):%s", file, line, str);
// DBG("%s", str);
}
int socket_ssl_connect(Network* n, char* addr){
int value;
mqttsClientSsl *ssl;
const char *custom = "mqtts";
char port[10] = {0};
int authmode = MBEDTLS_SSL_VERIFY_NONE;
uint32_t flag;
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_2, P_INFO, "before ssl context malloc:%d", xBytesTaskMalloced);
n->ssl = malloc(sizeof(mqttsClientSsl));
ssl = n->ssl;
/*
* 0. Initialize the RNG and the session data
*/
// #if defined(MBEDTLS_DEBUG_C)
// mbedtls_debug_set_threshold((int)2);
// #endif
mbedtls_net_init(&ssl->netContext);
mbedtls_ssl_init(&ssl->sslContext);
mbedtls_ssl_config_init(&ssl->sslConfig);
mbedtls_x509_crt_init(&ssl->caCert);
mbedtls_x509_crt_init(&ssl->clientCert);
mbedtls_pk_init(&ssl->pkContext);
mbedtls_ctr_drbg_init(&ssl->ctrDrbgContext);
mbedtls_entropy_init(&ssl->entropyContext);
if((value = mbedtls_ctr_drbg_seed(&ssl->ctrDrbgContext,
mbedtls_entropy_func,
&ssl->entropyContext,
(const unsigned char*)custom,
strlen(custom))) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_0, P_INFO, "mbedtls_ctr_drbg_seed failed, value:-0x%x.", -value);
return MQTT_MBEDTLS_ERR;
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_3, P_INFO, "after ssl init:%d", xBytesTaskMalloced);
/*
* 0. Initialize certificates
*/
if(n->seclevel != 0){
if (NULL != n->caCert) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_1, P_INFO, "STEP 0. Loading the CA root certificate ...");
authmode = MBEDTLS_SSL_VERIFY_REQUIRED;
if (0 != (value = mbedtls_x509_crt_parse(&(ssl->caCert), (const unsigned char *)n->caCert, n->caCertLen))) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_2, P_INFO, "failed ! value:-0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
}
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_4, P_INFO, "after ca cert parse:%d", xBytesTaskMalloced);
/* Setup Client Cert/Key */
if(n->seclevel == 2){
if (n->clientCert != NULL && n->clientPk != NULL) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_3, P_INFO, "STEP 0. start prepare client cert ...");
value = mbedtls_x509_crt_parse(&(ssl->clientCert), (const unsigned char *) n->clientCert, n->clientCertLen);
if (value != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_4, P_INFO, "failed! mbedtls_x509_crt_parse returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_5, P_INFO, "n->clientPkLen=%d", n->clientPkLen);
value = mbedtls_pk_parse_key(&ssl->pkContext, (const unsigned char *) n->clientPk, n->clientPkLen, NULL, 0);
if (value != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_6, P_INFO, "failed ! mbedtls_pk_parse_key returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
}
}
if(n->seclevel == 0){
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_7, P_INFO, "user set verify none");
authmode = MBEDTLS_SSL_VERIFY_NONE;
}
//ali mqtts is psk tls
if((n->psk_key != NULL)&&(n->psk_identity != NULL))
{
mbedtls_ssl_conf_psk(&ssl->sslConfig, (const unsigned char *)n->psk_key, strlen(n->psk_key),
(const unsigned char *)n->psk_identity, strlen(n->psk_identity));
}
/*
* 1. Start the connection
*/
snprintf(port, sizeof(port), "%d", n->port);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_0, P_INFO, "STEP 1. host:%s", host);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_1, P_INFO, "STEP 1. Connecting to PORT:%d",n->port);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_2, P_INFO, "STEP 1. port:%s", port);
if (0 != (value = mbedtls_net_connect(&ssl->netContext, addr, port, MBEDTLS_NET_PROTO_TCP, LWIP_PS_INVALID_CID))) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_9, P_INFO, " failed ! mbedtls_net_connect returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
/*
* 2. Setup stuff
*/
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_10, P_INFO, "STEP 2. Setting up the SSL/TLS structure...");
if ((value = mbedtls_ssl_config_defaults(&(ssl->sslConfig), MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_11, P_INFO, " failed! mbedtls_ssl_config_defaults returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_6, P_INFO, "after net connect:%d", xBytesTaskMalloced);
mbedtls_ssl_conf_max_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
mbedtls_ssl_conf_min_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
memcpy(&(ssl->crtProfile), ssl->sslConfig.cert_profile, sizeof(mbedtls_x509_crt_profile));
mbedtls_ssl_conf_authmode(&(ssl->sslConfig), authmode);
#if defined(MBEDTLS_SSL_MAX_FRAGMENT_LENGTH)
if ((value = mbedtls_ssl_conf_max_frag_len(&(ssl->sslConfig), MBEDTLS_SSL_MAX_FRAG_LEN_4096)) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_12, P_INFO, " mbedtls_ssl_conf_max_frag_len returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
#endif
#if defined(MBEDTLS_X509_CRT_PARSE_C)
mbedtls_ssl_conf_cert_profile(&ssl->sslConfig, &ssl->crtProfile);
mbedtls_ssl_conf_ca_chain(&(ssl->sslConfig), &(ssl->caCert), NULL);
if(n->clientCert) {
if ((value = mbedtls_ssl_conf_own_cert(&(ssl->sslConfig), &(ssl->clientCert), &(ssl->pkContext))) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_13, P_INFO, " failed! mbedtls_ssl_conf_own_cert returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
}
#endif
if(n->ciphersuite[0] != 0xFFFF){
mbedtls_ssl_conf_ciphersuites(&(ssl->sslConfig), (const int *)(n->ciphersuite));
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_14, P_INFO, "conf ciphersuite 0x%x", n->ciphersuite[0]);
}
mbedtls_ssl_conf_rng(&(ssl->sslConfig), mqttSslRandom, &(ssl->ctrDrbgContext));
mbedtls_ssl_conf_dbg(&(ssl->sslConfig), mqttSslDebug, NULL);
#if defined(MBEDTLS_SSL_ALPN)
const char *alpn_list[] = { "http/1.1", NULL };
mbedtls_ssl_conf_alpn_protocols(&(ssl->sslConfig),alpn_list);
#endif
if(n->timeout_r > 0) {
uint32_t recvTimeout;
recvTimeout = n->timeout_r > MQTT_MAX_TIMEOUT ? MQTT_MAX_TIMEOUT * 1000 : n->timeout_r * 1000;
mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), recvTimeout);
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_7, P_INFO, "before ssl setup:%d", xBytesTaskMalloced);
if ((value = mbedtls_ssl_setup(&(ssl->sslContext), &(ssl->sslConfig))) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_15, P_INFO, " failed! mbedtls_ssl_setup returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
if(n->hostName != NULL)
{
mbedtls_ssl_set_hostname(&(ssl->sslContext), n->hostName);
}
else
{
mbedtls_ssl_set_hostname(&(ssl->sslContext), addr);
}
mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), (mbedtls_ssl_send_t*)mbedtls_net_send, (mbedtls_ssl_recv_t*)mbedtls_net_recv, (mbedtls_ssl_recv_timeout_t*)mbedtls_net_recv_timeout);
/*
* 3. Handshake
*/
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_8, P_INFO, "after ssl setup before handshake:%d", xBytesTaskMalloced);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_16, P_INFO, "STEP 3. Performing the SSL/TLS handshake...");
while ((value = mbedtls_ssl_handshake(&(ssl->sslContext))) != 0) {
if ((value != MBEDTLS_ERR_SSL_WANT_READ) && (value != MBEDTLS_ERR_SSL_WANT_WRITE)) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_17, P_INFO, "failed ! mbedtls_ssl_handshake returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_9, P_INFO, "after handshake:%d", xBytesTaskMalloced);
/*
* 4. Verify the server certificate
*/
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_18, P_INFO, "STEP 4. Verifying peer X.509 certificate..");
flag = mbedtls_ssl_get_verify_result(&(ssl->sslContext));
if (flag != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_19, P_INFO, " failed ! verify result not confirmed.");
return MQTT_MBEDTLS_ERR;
}
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_20, P_INFO, "caCert varification ok");
return MQTT_CONN_OK;
}
int socket_ssl_read(Network *n, unsigned char *buffer, int len, int timeout_ms) {
UINT32 readLen = 0;
static int net_status = 0;
INT32 ret = -1;
char err_str[33];
mqttsClientSsl *ssl = (mqttsClientSsl *)n->ssl;
mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), timeout_ms);
while (readLen < len) {
ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char *)(buffer + readLen), (len - readLen));
if (ret > 0) {
readLen += ret;
net_status = 0;
} else if (ret == 0) {
/* if ret is 0 and net_status is -2, indicate the connection is closed during last call */
return (net_status == -2) ? net_status : readLen;
} else {
if (MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY == ret) {
//mbedtls_strerror(ret, err_str, sizeof(err_str));
printf("ssl recv error: code = -0x%04X, err_str = '%s'\n", -ret, err_str);
net_status = -2; /* connection is closed */
break;
} else if ((MBEDTLS_ERR_SSL_TIMEOUT == ret)
|| (MBEDTLS_ERR_SSL_CONN_EOF == ret)
|| (MBEDTLS_ERR_SSL_SESSION_TICKET_EXPIRED == ret)
|| (MBEDTLS_ERR_SSL_NON_FATAL == ret)) {
/* read already complete */
/* if call mbedtls_ssl_read again, it will return 0 (means EOF) */
return readLen;
} else {
//mbedtls_strerror(ret, err_str, sizeof(err_str));
printf("ssl recv error: code = -0x%04X, err_str = '%s'\n", -ret, err_str);
net_status = -1;
return -1; /* Connection error */
}
}
}
return (readLen > 0) ? readLen : net_status;
}
int socket_ssl_write(Network *n, unsigned char *buffer, int len, int timeout_ms) {
INT32 waitToSend = len;
INT32 hasSend = 0;
mqttsClientSsl *ssl = (mqttsClientSsl *)n->ssl;
do
{
hasSend = mbedtls_ssl_write(&(ssl->sslContext), (unsigned char *)(buffer + len - waitToSend), waitToSend);
if(hasSend > 0)
{
waitToSend -= hasSend;
}
else if(hasSend == 0)
{
return len;
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_23, P_SIG, 0, "mqtt_client(ssl): send failed \n");
return 0;
}
}while(waitToSend>0);
return len;
}
void socket_ssl_disconnect(Network *n) {
mqttsClientSsl *ssl = (mqttsClientSsl *)n->ssl;
if(ssl == NULL)
return ;
mbedtls_ssl_close_notify(&(ssl->sslContext));
mbedtls_net_free(&(ssl->netContext));
mbedtls_x509_crt_free(&(ssl->caCert));
mbedtls_x509_crt_free(&(ssl->clientCert));
mbedtls_pk_free(&(ssl->pkContext));
mbedtls_ssl_free(&(ssl->sslContext));
mbedtls_ssl_config_free(&(ssl->sslConfig));
mbedtls_ctr_drbg_free(&(ssl->ctrDrbgContext));
mbedtls_entropy_free(&(ssl->entropyContext));
free(ssl);
n->ssl = NULL;
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_36, P_INFO, 0, "mqtt tls close ok");
return ;
}
int NetworkConnect(Network* n, char* addr, int port){
n->addr = addr;
n->port = port;
if (n->isMqtts){
if (socket_ssl_connect(n, addr)==MQTT_CONN_OK){
mqttsClientSsl *ssl = (mqttsClientSsl *)n->ssl;
n->my_socket = ssl->netContext.fd;
return 0;
}
return 1;
}
else
return socket_connect(n, addr);
}
#include "luat_wdt.h"
int FreeRTOS_read(Network* n, unsigned char* buffer, int len, int timeout_ms){
luat_wdt_feed();
if (n->isMqtts)
return socket_ssl_read(n, buffer, len, timeout_ms);
else
return socket_read(n, buffer, len, timeout_ms);
}
int FreeRTOS_write(Network* n, unsigned char* buffer, int len, int timeout_ms){
if (n->isMqtts)
return socket_ssl_write(n, buffer, len, timeout_ms);
else
return socket_write(n, buffer, len, timeout_ms);
}
int FreeRTOS_disconnect(Network* n){
if (n->isMqtts)
socket_ssl_disconnect(n);
else
socket_disconnect(n);
return 0;
}
void NetworkInit(Network* n)
{
n->my_socket = -1;
n->mqttread = FreeRTOS_read;
n->mqttwrite = FreeRTOS_write;
n->disconnect = FreeRTOS_disconnect;
}
int NetworkSetConnTimeout(Network* n, int send_timeout, int recv_timeout)
{
int ret = 0;
#if LWIP_SO_SNDRCVTIMEO_NONSTANDARD
int tx_timeout = send_timeout;
int rx_timeout = recv_timeout;
#else
struct timeval tx_timeout;
tx_timeout.tv_sec = send_timeout/1000;
tx_timeout.tv_usec = (send_timeout%1000)*1000;
struct timeval rx_timeout;
rx_timeout.tv_sec = recv_timeout/1000;
rx_timeout.tv_usec = (recv_timeout%1000)*1000;
#endif
if(n->my_socket == -1)
{
if ((n->my_socket = FreeRTOS_socket(FREERTOS_AF_INET, FREERTOS_SOCK_STREAM, FREERTOS_IPPROTO_TCP)) < 0)
{
//ECPLAT_PRINTF(UNILOG_CTWING, NetworkSetConnTimeout_0, P_INFO, "NetworkSetConnTimeout my_socket fail %d", n->my_socket);
return 1;
}
}
ret = FreeRTOS_setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, &tx_timeout, sizeof(tx_timeout));
if(ret != 0)
{
//ECPLAT_PRINTF(UNILOG_CTWING, NetworkSetConnTimeout_1, P_INFO, "NetworkSetConnTimeout send fail %d", ret);
//return 1;
}
ret = FreeRTOS_setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, &rx_timeout, sizeof(rx_timeout));
if(ret != 0)
{
//ECPLAT_PRINTF(UNILOG_CTWING, NetworkSetConnTimeout_2, P_INFO, "NetworkSetConnTimeout recv fail %d", ret);
//return 1;
}
INT32 flags = fcntl(n->my_socket, F_GETFL, 0);
if(flags < 0)
{
//ECPLAT_PRINTF(UNILOG_CTWING, NetworkSetConnTimeout_3, P_INFO, "NetworkSetConnTimeout fcntl fail %d", flags);
close(n->my_socket);
n->my_socket = -1;
return 1;
}
fcntl(n->my_socket, F_SETFL, flags|O_NONBLOCK); //set socket as nonblock for connect timeout
return 0;
}

View File

@@ -0,0 +1,174 @@
/*******************************************************************************
* Copyright (c) 2014, 2015 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(MQTTFreeRTOS_H)
#define MQTTFreeRTOS_H
#include "FreeRTOS.h"
#include "semphr.h"
#include "task.h"
#include "common_api.h"
// #ifdef FEATURE_MQTT_TLS_ENABLE
// #include "MQTTTls.h"
// #endif
#include "sockets.h"
#include "api.h"
#include "netdb.h"
#ifdef FEATURE_MQTT_TLS_ENABLE
#include "mbedtls/net.h"
#include "mbedtls/ssl.h"
#include "mbedtls/certs.h"
#include "mbedtls/entropy.h"
#include "mbedtls/ctr_drbg.h"
#endif
#include "sockets.h"
#include "api.h"
#include "netdb.h"
#define FreeRTOS_setsockopt setsockopt
#define FreeRTOS_getsockopt getsockopt
#define FreeRTOS_recv recv
#define FreeRTOS_closesocket close
#define FreeRTOS_shutdownsocket shutdown
#define FreeRTOS_socket socket
#define FreeRTOS_connect connect
#define FreeRTOS_gethostbyname getaddrinfo
#define FreeRTOS_htons htons
#define FreeRTOS_send send
#define FREERTOS_SO_RCVTIMEO SO_RCVTIMEO
#define FRERRTOS_SO_SNDTIMEO SO_SNDTIMEO
#define FRERRTOS_SO_ERROR SO_ERROR
#define FREERTOS_AF_INET AF_INET
#define FREERTOS_SOCK_STREAM SOCK_STREAM
#define FREERTOS_IPPROTO_TCP IPPROTO_TCP
#define FREERTOS_SOL_SOCKET SOL_SOCKET
#define freertos_sockaddr sockaddr_in
typedef int xSocket_t;
///MQTT client results
typedef enum {
MQTT_CONN_OK = 0, ///<Success
MQTT_PROCESSING, ///<Processing
MQTT_PARSE, ///<url Parse error
MQTT_DNS, ///<Could not resolve name
MQTT_PRTCL, ///<Protocol error
MQTT_NOTFOUND, ///<HTTP 404 Error
MQTT_REFUSED, ///<HTTP 403 Error
MQTT_ERROR, ///<HTTP xxx error
MQTT_TIMEOUT, ///<Connection timeout
MQTT_CONN, ///<Connection error
MQTT_FATAL_ERROR, //fatal error when conenct
MQTT_CLOSED, ///<Connection was closed by remote host
MQTT_MOREDATA, ///<Need get more data
MQTT_OVERFLOW, ///<Buffer overflow
MQTT_MBEDTLS_ERR,
}MQTTResult;
typedef struct Timer
{
TickType_t xTicksToWait;
TimeOut_t xTimeOut;
} Timer;
#define MQTT_MAX_TIMEOUT (10 * 60) //10 min
typedef struct mqttsClientSslTag
{
mbedtls_ssl_context sslContext;
mbedtls_net_context netContext;
mbedtls_ssl_config sslConfig;
mbedtls_entropy_context entropyContext;
mbedtls_ctr_drbg_context ctrDrbgContext;
mbedtls_x509_crt_profile crtProfile;
mbedtls_x509_crt caCert;
mbedtls_x509_crt clientCert;
mbedtls_pk_context pkContext;
}mqttsClientSsl;
typedef struct Network Network;
struct Network
{
xSocket_t my_socket;
int timeout_r;
char *addr;
uint16_t port;
bool isMqtts;
#ifdef FEATURE_MQTT_TLS_ENABLE
mqttsClientSsl * ssl;
char *caCert;
char *clientCert;
char *clientPk;
char *hostName;
char *psk_key;
char *psk_identity;
int caCertLen;
int clientCertLen;
int clientPkLen;
uint8_t seclevel;//0:no verify; 1:verify server; 2:both verify
int32_t ciphersuite[2];//just like 0x0035 TLS_RSA_WITH_AES_256_CBC_SHA,ciphersuite[1] must NULL
uint8_t pdpId;//pdp context id--cid--0 is default
uint8_t cache;//0:no session resumption; 1:session resumption
uint8_t sni;//0:no sni; 1:has sni
uint8_t ignore;//0:not ignore; 1:ignore
uint8_t saveMem;//0:disable; 1:enable
#endif
int (*mqttread) (Network*, unsigned char*, int, int);
int (*mqttwrite) (Network*, unsigned char*, int, int);
int (*disconnect) (Network*);
};
void TimerInit(Timer*);
char TimerIsExpired(Timer*);
void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*);
typedef struct Mutex
{
SemaphoreHandle_t sem;
} Mutex;
void MutexInit(Mutex*);
int MutexLock(Mutex*);
int MutexUnlock(Mutex*);
typedef struct Thread
{
TaskHandle_t task;
} Thread;
int ThreadStart(Thread*, void (*fn)(void*), void* arg);
int FreeRTOS_read(Network* n, unsigned char* buffer, int len, int timeout_ms);
int FreeRTOS_write(Network* n, unsigned char* buffer, int len, int timeout_ms);
int FreeRTOS_disconnect(Network* n);
void NetworkInit(Network* n);
int NetworkConnect(Network* n, char* addr, int port);
int NetworkSetConnTimeout(Network* n, int send_timeout, int recv_timeout);
/*int NetworkConnectTLS(Network*, char*, int, SlSockSecureFiles_t*, unsigned char, unsigned int, char);*/
#endif

View File

@@ -0,0 +1,860 @@
/*******************************************************************************
* Copyright (c) 2014, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - fix for #96 - check rem_len in readPacket
* Ian Craggs - add ability to set message handler separately #6
*******************************************************************************/
#include <stdio.h>
#include <stdarg.h>
#include "commonTypedef.h"
#include "MQTTClient.h"
#include "luat_debug.h"
#include "debug_trace.h"
#include DEBUG_LOG_HEADER_FILE
// QueueHandle_t mqttSendMsgHandle = NULL;
// void mqttDefMessageArrived(MessageData* data)
// {
// char *bufTemp = NULL;
// bufTemp = malloc(data->message->payloadlen+1);
// memset(bufTemp, 0, data->message->payloadlen+1);
// memcpy(bufTemp, data->message->payload, data->message->payloadlen);
// ////ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_2, P_SIG, ".........MQTT topic is:%s", (const uint8_t *)data->topicName->lenstring.data);
// ////ECPLAT_PRINTF(UNILOG_MQTT, mqttRecvTask_1, P_SIG, ".........MQTT_messageArrived is:%s", (const uint8_t *)bufTemp);
// free(bufTemp);
// }
static void NewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
md->topicName = aTopicName;
md->message = aMessage;
}
static int getNextPacketId(MQTTClient *c) {
return c->next_packetid = (c->next_packetid == MAX_PACKET_ID) ? 1 : c->next_packetid + 1;
}
static int sendPacket(MQTTClient* c, int length, Timer* timer)
{
int rc = FAILURE,
sent = 0;
while (sent < length && !TimerIsExpired(timer))
{
rc = c->ipstack->mqttwrite(c->ipstack, &c->sendbuf[sent], length, TimerLeftMS(timer));
if (rc < 0) // there was an error writing the data
break;
sent += rc;
}
if (sent == length)
{
TimerCountdown(&c->last_sent, c->keepAliveInterval); // record the fact that we have successfully sent the packet
rc = SUCCESS;
}
else
rc = FAILURE;
return rc;
}
void MQTTClientInit(MQTTClient* c, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
{
int i;
c->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = 0;
c->command_timeout_ms = command_timeout_ms;
c->sendbuf = sendbuf;
c->sendbuf_size = sendbuf_size;
c->readbuf = readbuf;
c->readbuf_size = readbuf_size;
c->mqtt_keepalive_retry_count = 0;
c->isconnected = 0;
c->cleansession = 0;
c->ping_outstanding = 0;
c->defaultMessageHandler = NULL;
c->next_packetid = 1;
TimerInit(&c->last_sent);
TimerInit(&c->last_received);
#if defined(MQTT_TASK)
MutexInit(&c->mutex);
#endif
}
static int decodePacket(MQTTClient* c, int* value, int timeout)
{
unsigned char i;
int multiplier = 1;
int len = 0;
const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
rc = c->ipstack->mqttread(c->ipstack, &i, 1, timeout);
if (rc != 1)
goto exit;
*value += (i & 127) * multiplier;
multiplier *= 128;
} while ((i & 128) != 0);
exit:
return len;
}
static int readPacket(MQTTClient* c, Timer* timer)
{
MQTTHeader header = {0};
int len = 0;
int rem_len = 0;
/* 1. read the header byte. This has the packet type in it */
int rc = c->ipstack->mqttread(c->ipstack, c->readbuf, 1, TimerLeftMS(timer));
if (rc != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
decodePacket(c, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(c->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
if (rem_len > (c->readbuf_size - len))
{
rc = BUFFER_OVERFLOW;
goto exit;
}
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (c->ipstack->mqttread(c->ipstack, c->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
rc = 0;
goto exit;
}
header.byte = c->readbuf[0];
rc = header.bits.type;
if (c->keepAliveInterval > 0)
TimerCountdown(&c->last_received, c->keepAliveInterval); // record the fact that we have successfully received a packet
exit:
return rc;
}
// assume topic filter and name is in correct format
// # can only be at end
// + and # can only be next to separator
static char isTopicMatched(char* topicFilter, MQTTString* topicName)
{
char* curf = topicFilter;
char* curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end)
{
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+')
{ // skip until we meet the next separator, or end of string
char* nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/')
nextpos = ++curn + 1;
}
else if (*curf == '#')
curn = curn_end - 1; // skip until end of string
curf++;
curn++;
};
return (curn == curn_end) && (*curf == '\0');
}
int deliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
{
int i;
int rc = FAILURE;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage1, P_SIG, 0, "....1....deliverMessage..");
// we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
isTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
{
if (c->messageHandlers[i].fp != NULL)
{
MessageData md;
NewMessageData(&md, topicName, message);
c->messageHandlers[i].fp(&md);
rc = SUCCESS;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage2, P_SIG, 0, "....2....deliverMessage..");
}
}
}
if (rc == FAILURE && c->defaultMessageHandler != NULL)
{
MessageData md;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage3, P_SIG, 0, "....3....deliverMessage..");
NewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md);
rc = SUCCESS;
//ECOMM_TRACE(UNILOG_MQTT, deliverMessage4, P_SIG, 0, "....4....deliverMessage..");
}
return rc;
}
int keepalive(MQTTClient* c)
{
int rc = SUCCESS;
if (c->keepAliveInterval == 0)
{
goto exit;
}
if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
{
if (c->ping_outstanding)
{
c->mqtt_keepalive_retry_count++;
//ECOMM_TRACE(UNILOG_MQTT, keepalive_0, P_SIG, 0, "....keepalive....ping_outstanding..=1..");
rc = FAILURE; /* PINGRESP not received in keepalive interval */
}
else
{
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
memset(c->sendbuf, 0, c->sendbuf_size);
memset(c->readbuf, 0, c->readbuf_size);
int len = MQTTSerialize_pingreq(c->sendbuf, c->sendbuf_size);
//ECOMM_TRACE(UNILOG_MQTT, keepalive_1, P_SIG, 0, "....keepalive....send packet..");
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
c->ping_outstanding = 1;
}
}
exit:
return rc;
}
int keepaliveRetry(MQTTClient* c)
{
int rc = SUCCESS;
if (c->keepAliveInterval == 0)
{
goto exit;
}
if (TimerIsExpired(&c->last_sent) || TimerIsExpired(&c->last_received))
{
{
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
memset(c->sendbuf, 0, c->sendbuf_size);
memset(c->readbuf, 0, c->readbuf_size);
int len = MQTTSerialize_pingreq(c->sendbuf, c->sendbuf_size);
//ECOMM_TRACE(UNILOG_MQTT, keepalive_1pp, P_SIG, 0, "....keepalive....send packet..");
if (len > 0 && (rc = sendPacket(c, len, &timer)) == SUCCESS) // send the ping packet
c->ping_outstanding = 1;
}
}
exit:
return rc;
}
void MQTTCleanSession(MQTTClient* c)
{
int i = 0;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
c->messageHandlers[i].topicFilter = NULL;
}
void MQTTCloseSession(MQTTClient* c)
{
c->ping_outstanding = 0;
c->isconnected = 0;
if (c->cleansession)
MQTTCleanSession(c);
}
int cycle(MQTTClient* c, Timer* timer)
{
int len = 0,
rc = SUCCESS;
int socket_stat = sock_get_errno(c->ipstack->my_socket);
int socket_err = socket_error_is_fatal(socket_stat);
if (socket_err == 1){
return -2;
}
int packet_type = readPacket(c, timer); /* read the socket, see what work is due */
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0001, P_SIG, 1, ".....mqttRecvTask..packet_type=%d....",packet_type);
////ECPLAT_PRINTF(UNILOG_DM1, cycle0, P_SIG, ".....autoReg..packet_type=%d ",packet_type);
switch (packet_type)
{
default:
/* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
rc = packet_type;
break;
case 0: /* timed out reading packet */
break;
case CONNACK:
case PUBACK:
case SUBACK:
case UNSUBACK:
if(packet_type == SUBACK)
{
//rc = packet_type;
}
break;
case PUBLISH:
{
MQTTString topicName;
MQTTMessage msg;
int intQoS;
msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->readbuf, c->readbuf_size) != 1)
goto exit;
msg.qos = (enum QoS)intQoS;
deliverMessage(c, &topicName, &msg);
if (msg.qos != QOS0)
{
if (msg.qos == QOS1)
len = MQTTSerialize_ack(c->sendbuf, c->sendbuf_size, PUBACK, 0, msg.id);
else if (msg.qos == QOS2)
len = MQTTSerialize_ack(c->sendbuf, c->sendbuf_size, PUBREC, 0, msg.id);
if (len <= 0)
rc = FAILURE;
else
rc = sendPacket(c, len, timer);
if (rc == FAILURE)
goto exit; // there was a problem
}
break;
}
case PUBREC:
case PUBREL:
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
else if ((len = MQTTSerialize_ack(c->sendbuf, c->sendbuf_size,
(packet_type == PUBREC) ? PUBREL : PUBCOMP, 0, mypacketid)) <= 0)
rc = FAILURE;
else if ((rc = sendPacket(c, len, timer)) != SUCCESS) // send the PUBREL packet
rc = FAILURE; // there was a problem
if (rc == FAILURE)
goto exit; // there was a problem
break;
}
case PUBCOMP:
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
}
if (keepalive(c) != SUCCESS) {
int socket_stat = 0;
mqttSendMsg mqttMsg;
//check only keepalive FAILURE status so that previous FAILURE status can be considered as FAULT
rc = FAILURE;
socket_stat = sock_get_errno(c->ipstack->my_socket);
if((socket_stat == MQTT_ERR_ABRT)||(socket_stat == MQTT_ERR_RST)||(socket_stat == MQTT_ERR_CLSD)||(socket_stat == MQTT_ERR_BADE))
{
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_0, P_INFO, 0, ".....now, need reconnect.......");
/* send reconnect msg to send task */
memset(&mqttMsg, 0, sizeof(mqttMsg));
mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
// xQueueSend(mqttSendMsgHandle, &mqttMsg, MQTT_MSG_TIMEOUT);
}
else
{
if(c->mqtt_keepalive_retry_count>3)
{
c->mqtt_keepalive_retry_count = 0;
//ECOMM_TRACE(UNILOG_MQTT, mqttRecvTask_ee0, P_INFO, 0, ".....now, need reconnect.......");
/* send reconnect msg to send task */
memset(&mqttMsg, 0, sizeof(mqttMsg));
mqttMsg.cmdType = MQTT_DEMO_MSG_RECONNECT;
// xQueueSend(mqttSendMsgHandle, &mqttMsg, MQTT_MSG_TIMEOUT);
}
else
{
keepaliveRetry(c);
}
}
}
exit:
if (rc == SUCCESS)
rc = packet_type;
else if (c->isconnected)
;//MQTTCloseSession(c);
return rc;
}
int MQTTYield(MQTTClient* c, int timeout_ms)
{
int rc = SUCCESS;
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, timeout_ms);
do
{
if (cycle(c, &timer) < 0)
{
rc = FAILURE;
break;
}
} while (!TimerIsExpired(&timer));
return rc;
}
int MQTTIsConnected(MQTTClient* client)
{
return client->isconnected;
}
void MQTTRun(void* parm)
{
Timer timer;
MQTTClient* c = (MQTTClient*)parm;
// if(mqttSendMsgHandle == NULL)
// {
// mqttSendMsgHandle = xQueueCreate(16, sizeof(mqttSendMsg));
// }
TimerInit(&timer);
while (1)
{
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
TimerCountdownMS(&timer, 1500); /* Don't wait too long if no traffic is incoming */
int rc = cycle(c, &timer);
if (rc == -2){
c->isconnected = 0;
}
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
osDelay(200);
}
vTaskDelete(NULL);
}
#if defined(MQTT_TASK)
int MQTTStartTask(MQTTClient* client)
{
return ThreadStart(&client->thread, &MQTTRun, client);
}
#endif
int waitfor(MQTTClient* c, int packet_type, Timer* timer)
{
int rc = FAILURE;
do
{
if (TimerIsExpired(timer))
break; // we timed out
rc = cycle(c, timer);
}
while (rc != packet_type && rc >= 0);
return rc;
}
int MQTTConnectWithResults(MQTTClient* c, MQTTPacket_connectData* options, MQTTConnackData* data)
{
Timer connect_timer;
int rc = FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (c->isconnected) /* don't send connect packet again if we are already connected */
goto exit;
TimerInit(&connect_timer);
TimerCountdownMS(&connect_timer, c->command_timeout_ms);
if (options == 0)
options = &default_options; /* set default options if none were supplied */
c->keepAliveInterval = options->keepAliveInterval;
c->cleansession = options->cleansession;
TimerCountdown(&c->last_received, c->keepAliveInterval);
if ((len = MQTTSerialize_connect(c->sendbuf, c->sendbuf_size, options)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &connect_timer)) != SUCCESS) // send the connect packet
goto exit; // there was a problem
// this will be a blocking call, wait for the connack
if (waitfor(c, CONNACK, &connect_timer) == CONNACK)
{
data->rc = 0;
data->sessionPresent = 0;
if ( MQTTDeserialize_connack(&data->sessionPresent, &data->rc, c->readbuf, c->readbuf_size) == 1)
rc = data->rc;
else
rc = FAILURE;
}
else
rc = FAILURE;
exit:
if (rc == SUCCESS)
{
c->isconnected = 1;
c->ping_outstanding = 0;
}
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTReConnect(MQTTClient* client, MQTTPacket_connectData* connectData)
{
int ret = FAILURE;
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_13hh0, P_INFO, 0, "...start tcp disconnect ..");
client->ipstack->disconnect(client->ipstack);
client->isconnected = 0;
if((NetworkConnect(client->ipstack, client->ipstack->addr, client->ipstack->port)) < 0)
{
LUAT_DEBUG_PRINT("NetworkConnect fail");
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_17hh4, P_INFO, 0, "...tcp reconnect fail!!!...\r\n");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_18hh5, P_INFO, 0, "...start mqtt connect ..");
if ((MQTTConnect(client, connectData)) != 0)
{
LUAT_DEBUG_PRINT("MQTTConnect fail");
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_19hh6, P_INFO, 0, "...mqtt reconnect fial!!!...");
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttSendTask_20hh7, P_INFO, 0, "...mqtt reconnect ok!!!...");
ret = SUCCESS;
}
}
return ret;
}
int MQTTConnect(MQTTClient* c, MQTTPacket_connectData* options)
{
MQTTConnackData data;
return MQTTConnectWithResults(c, options, &data);
}
int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler)
{
int rc = FAILURE;
int i = -1;
/* first check for an existing matching slot */
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != NULL && strcmp(c->messageHandlers[i].topicFilter, topicFilter) == 0)
{
if (messageHandler == NULL) /* remove existing */
{
c->messageHandlers[i].topicFilter = NULL;
c->messageHandlers[i].fp = NULL;
}
rc = SUCCESS; /* return i when adding new subscription */
break;
}
}
/* if no existing, look for empty slot (unless we are removing) */
if (messageHandler != NULL) {
if (rc == FAILURE)
{
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter == NULL)
{
rc = SUCCESS;
break;
}
}
}
if (i < MAX_MESSAGE_HANDLERS)
{
c->messageHandlers[i].topicFilter = topicFilter;
c->messageHandlers[i].fp = messageHandler;
}
}
return rc;
}
int MQTTSubscribeWithResults(MQTTClient* c, const char* topicFilter, enum QoS qos,
messageHandler messageHandler, MQTTSubackData* data)
{
int rc = FAILURE;
Timer timer;
int len = 0;
int mqttQos = (int)qos;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_subscribe(c->sendbuf, c->sendbuf_size, 0, getNextPacketId(c), 1, &topic, (int*)&mqttQos);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, SUBACK, &timer) == SUBACK) // wait for suback
{
int count = 0;
unsigned short mypacketid;
//data->grantedQoS = QOS0;
mqttQos = QOS0;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, c->readbuf, c->readbuf_size) == 1)
{
if (mqttQos != 0x80){
rc = MQTTSetMessageHandler(c, topicFilter, messageHandler);
}else{
rc = FAILURE;
}
}
}
else
rc = FAILURE;
DBG("tick5:%llu ",soc_get_poweron_time_tick());
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTSubscribe(MQTTClient* c, const char* topicFilter, enum QoS qos,
messageHandler messageHandler)
{
MQTTSubackData data;
return MQTTSubscribeWithResults(c, topicFilter, qos, messageHandler, &data);
}
int MQTTUnsubscribe(MQTTClient* c, const char* topicFilter)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicFilter;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if ((len = MQTTSerialize_unsubscribe(c->sendbuf, c->sendbuf_size, 0, getNextPacketId(c), 1, &topic)) <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (waitfor(c, UNSUBACK, &timer) == UNSUBACK)
{
unsigned short mypacketid; // should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, c->readbuf, c->readbuf_size) == 1)
{
/* remove the subscription message handler associated with this topic, if there is one */
MQTTSetMessageHandler(c, topicFilter, NULL);
}
}
else
rc = FAILURE;
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTPublish(MQTTClient* c, const char* topicName, MQTTMessage* message)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicName;
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
if (!c->isconnected)
goto exit;
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
if (message->qos == QOS1 || message->qos == QOS2)
message->id = getNextPacketId(c);
len = MQTTSerialize_publish(c->sendbuf, c->sendbuf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = sendPacket(c, len, &timer)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
if (message->qos == QOS1)
{
if (waitfor(c, PUBACK, &timer) == PUBACK)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
else if (message->qos == QOS2)
{
if (waitfor(c, PUBCOMP, &timer) == PUBCOMP)
{
unsigned short mypacketid;
unsigned char dup, type;
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, c->readbuf, c->readbuf_size) != 1)
rc = FAILURE;
}
else
rc = FAILURE;
}
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int MQTTDisconnect(MQTTClient* c)
{
int rc = FAILURE;
Timer timer; // we might wait for incomplete incoming publishes to complete
int len = 0;
#if defined(MQTT_TASK)
MutexLock(&c->mutex);
#endif
TimerInit(&timer);
TimerCountdownMS(&timer, c->command_timeout_ms);
len = MQTTSerialize_disconnect(c->sendbuf, c->sendbuf_size);
if (len > 0)
rc = sendPacket(c, len, &timer); // send the disconnect packet
MQTTCloseSession(c);
#if defined(MQTT_TASK)
MutexUnlock(&c->mutex);
#endif
return rc;
}
int mqtt_connect(MQTTClient* c,Network *n,char *serverAddr, int port, MQTTPacket_connectData* connData)
{
unsigned char * mqttSendbuf = malloc(MQTT_SEND_BUFF_LEN);
unsigned char * mqttReadbuf = malloc(MQTT_RECV_BUFF_LEN);
memset(mqttSendbuf, 0, MQTT_SEND_BUFF_LEN);
memset(mqttReadbuf, 0, MQTT_RECV_BUFF_LEN);
NetworkInit(n);
MQTTClientInit(c, n, 40000, mqttSendbuf, MQTT_SEND_BUFF_LEN, mqttReadbuf, MQTT_RECV_BUFF_LEN);
if ((NetworkConnect(n, serverAddr, port)) != 0){
c->keepAliveInterval = connData->keepAliveInterval;
c->ping_outstanding = 1;
return 1;
}else{
if ((MQTTConnect(c, connData)) != 0){
c->ping_outstanding = 1;
return 1;
}else{
#if defined(MQTT_TASK)
if ((MQTTStartTask(c)) != pdPASS){
return 1;
}else{
return 0;
}
#else
return 0;
#endif
}
}
return 1;
}

View File

@@ -0,0 +1,289 @@
/*******************************************************************************
* Copyright (c) 2014, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander/Ian Craggs - initial API and implementation and/or initial documentation
* Ian Craggs - documentation and platform specific header
* Ian Craggs - add setMessageHandler function
*******************************************************************************/
#if !defined(MQTT_CLIENT_H)
#define MQTT_CLIENT_H
#if defined(__cplusplus)
extern "C" {
#endif
#if defined(WIN32_DLL) || defined(WIN64_DLL)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO)
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#else
#define DLLImport
#define DLLExport
#endif
#include "MQTTPacket.h"
#include "MQTTFreeRTOS.h"
#if defined(MQTTCLIENT_PLATFORM_HEADER)
/* The following sequence of macros converts the MQTTCLIENT_PLATFORM_HEADER value
* into a string constant suitable for use with include.
*/
#define xstr(s) str(s)
#define str(s) #s
#include xstr(MQTTCLIENT_PLATFORM_HEADER)
#endif
#define MAX_PACKET_ID 65535 /* according to the MQTT specification - do not change! */
#if !defined(MAX_MESSAGE_HANDLERS)
#define MAX_MESSAGE_HANDLERS 5 /* redefinable - how many subscriptions do you want? */
#endif
enum QoS { QOS0, QOS1, QOS2, SUBFAIL=0x80 };
/* all failure return codes must be negative */
enum returnCode { BUFFER_OVERFLOW = -2, FAILURE = -1, SUCCESS = 0 };
/* The Platform specific header must define the Network and Timer structures and functions
* which operate on them.
*
typedef struct Network
{
int (*mqttread)(Network*, unsigned char* read_buffer, int, int);
int (*mqttwrite)(Network*, unsigned char* send_buffer, int, int);
} Network;*/
/* The Timer structure must be defined in the platform specific header,
* and have the following functions to operate on it. */
extern void TimerInit(Timer*);
extern char TimerIsExpired(Timer*);
extern void TimerCountdownMS(Timer*, unsigned int);
extern void TimerCountdown(Timer*, unsigned int);
extern int TimerLeftMS(Timer*);
typedef struct MQTTMessage
{
enum QoS qos;
unsigned char retained;
unsigned char dup;
unsigned short id;
void *payload;
size_t payloadlen;
} MQTTMessage;
typedef struct MessageData
{
MQTTMessage* message;
MQTTString* topicName;
} MessageData;
typedef struct MQTTConnackData
{
unsigned char rc;
unsigned char sessionPresent;
} MQTTConnackData;
typedef struct MQTTSubackData
{
enum QoS grantedQoS;
} MQTTSubackData;
typedef void (*messageHandler)(MessageData*);
typedef struct MQTTClient
{
unsigned int next_packetid,command_timeout_ms;
size_t sendbuf_size,readbuf_size;
unsigned char *sendbuf,*readbuf;
unsigned int keepAliveInterval;
int mqtt_keepalive_retry_count;
char ping_outstanding;
int isconnected;
int cleansession;
struct MessageHandlers{
const char* topicFilter;
void (*fp) (MessageData*);
} messageHandlers[MAX_MESSAGE_HANDLERS]; /* Message handlers are indexed by subscription topic */
void (*defaultMessageHandler) (MessageData*);
Network* ipstack;
Timer last_sent, last_received;
#if defined(MQTT_TASK)
Mutex mutex;
Thread thread;
#endif
} MQTTClient;
typedef struct
{
int cmdType;
char *topic;
int topicLen;
MQTTMessage message;
}mqttSendMsg;
typedef struct
{
int cmdType;
int ret;
char *data;
int dataLen;
}mqttDataMsg;
enum MQTT_MSG_CMD_
{
MQTT_DEMO_MSG_PUBLISH = 1,
MQTT_DEMO_MSG_PUBLISH_ACK = 2,
MQTT_DEMO_MSG_SUB,
MQTT_DEMO_MSG_UNSUB,
MQTT_DEMO_MSG_RECONNECT,
};
#define DefaultClient {0, 0, 0, 0, NULL, NULL, 0, 0, 0}
#define ALI_SHA1_KEY_IOPAD_SIZE (64)
#define ALI_SHA1_DIGEST_SIZE (20)
#define ALI_SHA256_KEY_IOPAD_SIZE (64)
#define ALI_SHA256_DIGEST_SIZE (32)
#define ALI_MD5_KEY_IOPAD_SIZE (64)
#define ALI_MD5_DIGEST_SIZE (16)
#define ALI_HMAC_USED (1)
#define ALI_HMAC_NOT_USED (0)
#define MQTT_DEMO_TASK_STACK_SIZE 2048
#define MQTT_SEND_BUFF_LEN (1024)
#define MQTT_RECV_BUFF_LEN (1024)
#define MQTT_SERVER_URI "183.230.40.39"
#define MQTT_SERVER_PORT (6002)
#define MQTT_SERVER_TOPIC_0 "$dp"
#define MQTT_SERVER_TOPIC_1 "$dp"
#define MQTT_SERVER_TOPIC_2 "$dp"
#define MQTT_MSG_TIMEOUT 1000
#define MQTT_ERR_ABRT (-13)
#define MQTT_ERR_RST (-14)
#define MQTT_ERR_CLSD (-15)
#define MQTT_ERR_BADE (9)
#define MQTT_SEND_TIMEOUT 5000
#define MQTT_RECV_TIMEOUT 5000
/**
* Create an MQTT client object
* @param client
* @param network
* @param command_timeout_ms
* @param
*/
DLLExport void MQTTClientInit(MQTTClient* client, Network* network, unsigned int command_timeout_ms,
unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size);
/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this
* @param options - connect options
* @return success code
*/
DLLExport int MQTTConnectWithResults(MQTTClient* client, MQTTPacket_connectData* options,
MQTTConnackData* data);
/** MQTT Connect - send an MQTT connect packet down the network and wait for a Connack
* The nework object must be connected to the network endpoint before calling this
* @param options - connect options
* @return success code
*/
DLLExport int MQTTConnect(MQTTClient* client, MQTTPacket_connectData* options);
/** MQTT Publish - send an MQTT publish packet and wait for all acks to complete for all QoSs
* @param client - the client object to use
* @param topic - the topic to publish to
* @param message - the message to send
* @return success code
*/
DLLExport int MQTTPublish(MQTTClient* client, const char*, MQTTMessage*);
/** MQTT SetMessageHandler - set or remove a per topic message handler
* @param client - the client object to use
* @param topicFilter - the topic filter set the message handler for
* @param messageHandler - pointer to the message handler function or NULL to remove
* @return success code
*/
DLLExport int MQTTSetMessageHandler(MQTTClient* c, const char* topicFilter, messageHandler messageHandler);
/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use
* @param topicFilter - the topic filter to subscribe to
* @param message - the message to send
* @return success code
*/
DLLExport int MQTTSubscribe(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler);
/** MQTT Subscribe - send an MQTT subscribe packet and wait for suback before returning.
* @param client - the client object to use
* @param topicFilter - the topic filter to subscribe to
* @param message - the message to send
* @param data - suback granted QoS returned
* @return success code
*/
DLLExport int MQTTSubscribeWithResults(MQTTClient* client, const char* topicFilter, enum QoS, messageHandler, MQTTSubackData* data);
/** MQTT Subscribe - send an MQTT unsubscribe packet and wait for unsuback before returning.
* @param client - the client object to use
* @param topicFilter - the topic filter to unsubscribe from
* @return success code
*/
DLLExport int MQTTUnsubscribe(MQTTClient* client, const char* topicFilter);
/** MQTT Disconnect - send an MQTT disconnect packet and close the connection
* @param client - the client object to use
* @return success code
*/
DLLExport int MQTTDisconnect(MQTTClient* client);
/** MQTT Yield - MQTT background
* @param client - the client object to use
* @param time - the time, in milliseconds, to yield for
* @return success code
*/
DLLExport int MQTTYield(MQTTClient* client, int time);
/** MQTT isConnected
* @param client - the client object to use
* @return truth value indicating whether the client is connected to the server
*/
DLLExport int MQTTIsConnected(MQTTClient* client);
#if defined(MQTT_TASK)
/** MQTT start background thread for a client. After this, MQTTYield should not be called.
* @param client - the client object to use
* @return success code
*/
DLLExport int MQTTStartTask(MQTTClient* client);
DLLExport int MQTTStopTask(MQTTClient* client);
#endif
int mqtt_connect(MQTTClient* c,Network *n, char *serverAddr, int port, MQTTPacket_connectData* connData);
#if defined(__cplusplus)
}
#endif
#endif

View File

@@ -0,0 +1,196 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/
#include "MQTTCC3200.h"
unsigned long MilliTimer;
void SysTickIntHandler(void) {
MilliTimer++;
}
char expired(Timer* timer) {
long left = timer->end_time - MilliTimer;
return (left < 0);
}
void countdown_ms(Timer* timer, unsigned int timeout) {
timer->end_time = MilliTimer + timeout;
}
void countdown(Timer* timer, unsigned int timeout) {
timer->end_time = MilliTimer + (timeout * 1000);
}
int left_ms(Timer* timer) {
long left = timer->end_time - MilliTimer;
return (left < 0) ? 0 : left;
}
void InitTimer(Timer* timer) {
timer->end_time = 0;
}
int cc3200_read(Network* n, unsigned char* buffer, int len, int timeout_ms) {
SlTimeval_t timeVal;
SlFdSet_t fdset;
int rc = 0;
int recvLen = 0;
SL_FD_ZERO(&fdset);
SL_FD_SET(n->my_socket, &fdset);
timeVal.tv_sec = 0;
timeVal.tv_usec = timeout_ms * 1000;
if (sl_Select(n->my_socket + 1, &fdset, NULL, NULL, &timeVal) == 1) {
do {
rc = sl_Recv(n->my_socket, buffer + recvLen, len - recvLen, 0);
recvLen += rc;
} while(recvLen < len);
}
return recvLen;
}
int cc3200_write(Network* n, unsigned char* buffer, int len, int timeout_ms) {
SlTimeval_t timeVal;
SlFdSet_t fdset;
int rc = 0;
int readySock;
SL_FD_ZERO(&fdset);
SL_FD_SET(n->my_socket, &fdset);
timeVal.tv_sec = 0;
timeVal.tv_usec = timeout_ms * 1000;
do {
readySock = sl_Select(n->my_socket + 1, NULL, &fdset, NULL, &timeVal);
} while(readySock != 1);
rc = sl_Send(n->my_socket, buffer, len, 0);
return rc;
}
void cc3200_disconnect(Network* n) {
sl_Close(n->my_socket);
}
void NewNetwork(Network* n) {
n->my_socket = 0;
n->mqttread = cc3200_read;
n->mqttwrite = cc3200_write;
n->disconnect = cc3200_disconnect;
}
int TLSConnectNetwork(Network *n, char* addr, int port, SlSockSecureFiles_t* certificates, unsigned char sec_method, unsigned int cipher, char server_verify) {
SlSockAddrIn_t sAddr;
int addrSize;
int retVal;
unsigned long ipAddress;
retVal = sl_NetAppDnsGetHostByName(addr, strlen(addr), &ipAddress, AF_INET);
if (retVal < 0) {
return -1;
}
sAddr.sin_family = AF_INET;
sAddr.sin_port = sl_Htons((unsigned short)port);
sAddr.sin_addr.s_addr = sl_Htonl(ipAddress);
addrSize = sizeof(SlSockAddrIn_t);
n->my_socket = sl_Socket(SL_AF_INET,SL_SOCK_STREAM, SL_SEC_SOCKET);
if (n->my_socket < 0) {
return -1;
}
SlSockSecureMethod method;
method.secureMethod = sec_method;
retVal = sl_SetSockOpt(n->my_socket, SL_SOL_SOCKET, SL_SO_SECMETHOD, &method, sizeof(method));
if (retVal < 0) {
return retVal;
}
SlSockSecureMask mask;
mask.secureMask = cipher;
retVal = sl_SetSockOpt(n->my_socket, SL_SOL_SOCKET, SL_SO_SECURE_MASK, &mask, sizeof(mask));
if (retVal < 0) {
return retVal;
}
if (certificates != NULL) {
retVal = sl_SetSockOpt(n->my_socket, SL_SOL_SOCKET, SL_SO_SECURE_FILES, certificates->secureFiles, sizeof(SlSockSecureFiles_t));
if(retVal < 0)
{
return retVal;
}
}
retVal = sl_Connect(n->my_socket, ( SlSockAddr_t *)&sAddr, addrSize);
if( retVal < 0 ) {
if (server_verify || retVal != -453) {
sl_Close(n->my_socket);
return retVal;
}
}
SysTickIntRegister(SysTickIntHandler);
SysTickPeriodSet(80000);
SysTickEnable();
return retVal;
}
int ConnectNetwork(Network* n, char* addr, int port)
{
SlSockAddrIn_t sAddr;
int addrSize;
int retVal;
unsigned long ipAddress;
sl_NetAppDnsGetHostByName(addr, strlen(addr), &ipAddress, AF_INET);
sAddr.sin_family = AF_INET;
sAddr.sin_port = sl_Htons((unsigned short)port);
sAddr.sin_addr.s_addr = sl_Htonl(ipAddress);
addrSize = sizeof(SlSockAddrIn_t);
n->my_socket = sl_Socket(SL_AF_INET,SL_SOCK_STREAM, 0);
if( n->my_socket < 0 ) {
// error
return -1;
}
retVal = sl_Connect(n->my_socket, ( SlSockAddr_t *)&sAddr, addrSize);
if( retVal < 0 ) {
// error
sl_Close(n->my_socket);
return retVal;
}
SysTickIntRegister(SysTickIntHandler);
SysTickPeriodSet(80000);
SysTickEnable();
return retVal;
}

View File

@@ -0,0 +1,58 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/
#ifndef __MQTT_CC3200_
#define __MQTT_CC3200_
#include "simplelink.h"
#include "netapp.h"
#include "socket.h"
#include "hw_types.h"
#include "systick.h"
typedef struct Timer Timer;
struct Timer {
unsigned long systick_period;
unsigned long end_time;
};
typedef struct Network Network;
struct Network
{
int my_socket;
int (*mqttread) (Network*, unsigned char*, int, int);
int (*mqttwrite) (Network*, unsigned char*, int, int);
void (*disconnect) (Network*);
};
char expired(Timer*);
void countdown_ms(Timer*, unsigned int);
void countdown(Timer*, unsigned int);
int left_ms(Timer*);
void InitTimer(Timer*);
int cc3200_read(Network*, unsigned char*, int, int);
int cc3200_write(Network*, unsigned char*, int, int);
void cc3200_disconnect(Network*);
void NewNetwork(Network*);
int ConnectNetwork(Network*, char*, int);
int TLSConnectNetwork(Network*, char*, int, SlSockSecureFiles_t*, unsigned char, unsigned int, char);
#endif

View File

@@ -0,0 +1,602 @@
#ifdef FEATURE_MBEDTLS_ENABLE
#include "sha1.h"
#include "sha256.h"
#include "md5.h"
#endif
#include "MQTTTls.h"
#include "MQTTFreeRTOS.h"
#include "error.h"
int mqttSslRandom(void *p_rng, unsigned char *output, size_t output_len)
{
uint32_t rnglen = output_len;
uint8_t rngoffset = 0;
while (rnglen > 0)
{
*(output + rngoffset) = (unsigned char)rand();
rngoffset++;
rnglen--;
}
return 0;
}
static void mqttSslDebug(void *ctx, int level, const char *file, int line, const char *str)
{
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTls_00, P_INFO, "%s(%d):%s", file, line, str);
// DBG("%s", str);
}
int mqttSslNonblockRecv(void *netContext, UINT8 *buf, size_t len)
{
int ret;
int fd = ((mbedtls_net_context *)netContext)->fd;
if(fd < 0)
{
return MQTT_MBEDTLS_ERR;
}
ret = (int)recv(fd, buf, len, MSG_DONTWAIT);
if(ret<0)
{
if( errno == EPIPE || errno == ECONNRESET)
{
return (MBEDTLS_ERR_NET_CONN_RESET);
}
if( errno == EINTR )
{
return (MBEDTLS_ERR_SSL_WANT_READ);
}
if(ret == -1 && errno == EWOULDBLOCK)
{
return ret;
}
return (MBEDTLS_ERR_NET_RECV_FAILED);
}
return (ret);
}
extern void mbedtls_debug_set_threshold( int threshold );
int mqttSslConn_new(mqttsClientContext* context, char* host)
{
int value;
mqttsClientSsl *ssl;
const char *custom = "mqtts";
char port[10] = {0};
int authmode = MBEDTLS_SSL_VERIFY_NONE;
uint32_t flag;
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_2, P_INFO, "before ssl context malloc:%d", xBytesTaskMalloced);
context->ssl = malloc(sizeof(mqttsClientSsl));
ssl = context->ssl;
/*
* 0. Initialize the RNG and the session data
*/
#if defined(MBEDTLS_DEBUG_C)
mbedtls_debug_set_threshold((int)2);
#endif
mbedtls_net_init(&ssl->netContext);
mbedtls_ssl_init(&ssl->sslContext);
mbedtls_ssl_config_init(&ssl->sslConfig);
mbedtls_x509_crt_init(&ssl->caCert);
mbedtls_x509_crt_init(&ssl->clientCert);
mbedtls_pk_init(&ssl->pkContext);
mbedtls_ctr_drbg_init(&ssl->ctrDrbgContext);
mbedtls_entropy_init(&ssl->entropyContext);
if((value = mbedtls_ctr_drbg_seed(&ssl->ctrDrbgContext,
mbedtls_entropy_func,
&ssl->entropyContext,
(const unsigned char*)custom,
strlen(custom))) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_0, P_INFO, "mbedtls_ctr_drbg_seed failed, value:-0x%x.", -value);
return MQTT_MBEDTLS_ERR;
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_3, P_INFO, "after ssl init:%d", xBytesTaskMalloced);
/*
* 0. Initialize certificates
*/
if(context->seclevel != 0){
if (NULL != context->caCert) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_1, P_INFO, "STEP 0. Loading the CA root certificate ...");
authmode = MBEDTLS_SSL_VERIFY_REQUIRED;
if (0 != (value = mbedtls_x509_crt_parse(&(ssl->caCert), (const unsigned char *)context->caCert, context->caCertLen))) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_2, P_INFO, "failed ! value:-0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
}
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_4, P_INFO, "after ca cert parse:%d", xBytesTaskMalloced);
/* Setup Client Cert/Key */
if(context->seclevel == 2){
if (context->clientCert != NULL && context->clientPk != NULL) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_3, P_INFO, "STEP 0. start prepare client cert ...");
value = mbedtls_x509_crt_parse(&(ssl->clientCert), (const unsigned char *) context->clientCert, context->clientCertLen);
if (value != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_4, P_INFO, "failed! mbedtls_x509_crt_parse returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_5, P_INFO, "context->clientPkLen=%d", context->clientPkLen);
value = mbedtls_pk_parse_key(&ssl->pkContext, (const unsigned char *) context->clientPk, context->clientPkLen, NULL, 0);
if (value != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_6, P_INFO, "failed ! mbedtls_pk_parse_key returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
}
}
if(context->seclevel == 0){
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_7, P_INFO, "user set verify none");
authmode = MBEDTLS_SSL_VERIFY_NONE;
}
//ali mqtts is psk tls
if((context->psk_key != NULL)&&(context->psk_identity != NULL))
{
mbedtls_ssl_conf_psk(&ssl->sslConfig, (const unsigned char *)context->psk_key, strlen(context->psk_key),
(const unsigned char *)context->psk_identity, strlen(context->psk_identity));
}
/*
* 1. Start the connection
*/
snprintf(port, sizeof(port), "%d", context->port);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_0, P_INFO, "STEP 1. host:%s", host);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_1, P_INFO, "STEP 1. Connecting to PORT:%d",context->port);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_8_2, P_INFO, "STEP 1. port:%s", port);
if (0 != (value = mbedtls_net_connect(&ssl->netContext, host, port, MBEDTLS_NET_PROTO_TCP, LWIP_PS_INVALID_CID))) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_9, P_INFO, " failed ! mbedtls_net_connect returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
/*
* 2. Setup stuff
*/
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_10, P_INFO, "STEP 2. Setting up the SSL/TLS structure...");
if ((value = mbedtls_ssl_config_defaults(&(ssl->sslConfig), MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT)) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_11, P_INFO, " failed! mbedtls_ssl_config_defaults returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_6, P_INFO, "after net connect:%d", xBytesTaskMalloced);
mbedtls_ssl_conf_max_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
mbedtls_ssl_conf_min_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
memcpy(&(ssl->crtProfile), ssl->sslConfig.cert_profile, sizeof(mbedtls_x509_crt_profile));
mbedtls_ssl_conf_authmode(&(ssl->sslConfig), authmode);
#if defined(MBEDTLS_SSL_MAX_FRAGMENT_LENGTH)
if ((value = mbedtls_ssl_conf_max_frag_len(&(ssl->sslConfig), MBEDTLS_SSL_MAX_FRAG_LEN_4096)) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_12, P_INFO, " mbedtls_ssl_conf_max_frag_len returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
#endif
#if defined(MBEDTLS_X509_CRT_PARSE_C)
mbedtls_ssl_conf_cert_profile(&ssl->sslConfig, &ssl->crtProfile);
mbedtls_ssl_conf_ca_chain(&(ssl->sslConfig), &(ssl->caCert), NULL);
if(context->clientCert) {
if ((value = mbedtls_ssl_conf_own_cert(&(ssl->sslConfig), &(ssl->clientCert), &(ssl->pkContext))) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_13, P_INFO, " failed! mbedtls_ssl_conf_own_cert returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
}
#endif
if(context->ciphersuite[0] != 0xFFFF){
mbedtls_ssl_conf_ciphersuites(&(ssl->sslConfig), (const int *)(context->ciphersuite));
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_14, P_INFO, "conf ciphersuite 0x%x", context->ciphersuite[0]);
}
mbedtls_ssl_conf_rng(&(ssl->sslConfig), mqttSslRandom, &(ssl->ctrDrbgContext));
mbedtls_ssl_conf_dbg(&(ssl->sslConfig), mqttSslDebug, NULL);
#if defined(MBEDTLS_SSL_ALPN)
const char *alpn_list[] = { "http/1.1", NULL };
mbedtls_ssl_conf_alpn_protocols(&(ssl->sslConfig),alpn_list);
#endif
if(context->timeout_r > 0) {
uint32_t recvTimeout;
recvTimeout = context->timeout_r > MQTT_MAX_TIMEOUT ? MQTT_MAX_TIMEOUT * 1000 : context->timeout_r * 1000;
mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), recvTimeout);
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_7, P_INFO, "before ssl setup:%d", xBytesTaskMalloced);
if ((value = mbedtls_ssl_setup(&(ssl->sslContext), &(ssl->sslConfig))) != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_15, P_INFO, " failed! mbedtls_ssl_setup returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
if(context->hostName != NULL)
{
mbedtls_ssl_set_hostname(&(ssl->sslContext), context->hostName);
}
else
{
mbedtls_ssl_set_hostname(&(ssl->sslContext), host);
}
mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), (mbedtls_ssl_send_t*)mbedtls_net_send, (mbedtls_ssl_recv_t*)mbedtls_net_recv, (mbedtls_ssl_recv_timeout_t*)mbedtls_net_recv_timeout);
/*
* 3. Handshake
*/
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_8, P_INFO, "after ssl setup before handshake:%d", xBytesTaskMalloced);
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_16, P_INFO, "STEP 3. Performing the SSL/TLS handshake...");
while ((value = mbedtls_ssl_handshake(&(ssl->sslContext))) != 0) {
if ((value != MBEDTLS_ERR_SSL_WANT_READ) && (value != MBEDTLS_ERR_SSL_WANT_WRITE)) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_17, P_INFO, "failed ! mbedtls_ssl_handshake returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
}
////ECPLAT_PRINTF(UNILOG_HTTP, sslMEM_9, P_INFO, "after handshake:%d", xBytesTaskMalloced);
/*
* 4. Verify the server certificate
*/
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_18, P_INFO, "STEP 4. Verifying peer X.509 certificate..");
flag = mbedtls_ssl_get_verify_result(&(ssl->sslContext));
if (flag != 0) {
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_19, P_INFO, " failed ! verify result not confirmed.");
return MQTT_MBEDTLS_ERR;
}
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTlsConn_20, P_INFO, "caCert varification ok");
return MQTT_CONN_OK;
}
int mqttSslConn_old(mqttsClientContext* context, char* host)
{
INT32 value;
mqttsClientSsl *ssl;
const char *custom = "mqtts";
char port[10] = {0};
INT32 authmode = MBEDTLS_SSL_VERIFY_NONE;
UINT32 flag;
context->ssl = malloc(sizeof(mqttsClientSsl));
ssl = context->ssl;
/*
* 0. Initialize the RNG and the session data
*/
#if defined(MBEDTLS_DEBUG_C)
mbedtls_debug_set_threshold((int)2);
#endif
mbedtls_net_init(&ssl->netContext);
mbedtls_ssl_init(&ssl->sslContext);
mbedtls_ssl_config_init(&ssl->sslConfig);
mbedtls_x509_crt_init(&ssl->caCert);
mbedtls_x509_crt_init(&ssl->clientCert);
mbedtls_pk_init(&ssl->pkContext);
mbedtls_ctr_drbg_init(&ssl->ctrDrbgContext);
mbedtls_entropy_init(&ssl->entropyContext);
if((context->psk_key != NULL)&&(context->psk_identity != NULL))
{
mbedtls_ssl_conf_psk(&ssl->sslConfig, (const unsigned char *)context->psk_key, strlen(context->psk_key),
(const unsigned char *)context->psk_identity, strlen(context->psk_identity));
}
if((value = mbedtls_ctr_drbg_seed(&ssl->ctrDrbgContext,
mbedtls_entropy_func,
&ssl->entropyContext,
(const unsigned char*)custom,
strlen(custom))) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_0, P_SIG, 1, "mbedtls_ctr_drbg_seed failed, value:-0x%x.", -value);
return MQTT_MBEDTLS_ERR;
}
/*
* 0. Initialize certificates
*/
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_1, P_SIG, 0, "STEP 0. Loading the CA root certificate ...");
if (NULL != context->caCert)
{
//authmode = MBEDTLS_SSL_VERIFY_REQUIRED;
if (0 != (value = mbedtls_x509_crt_parse(&(ssl->caCert), (const unsigned char *)context->caCert, context->caCertLen)))
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_2, P_SIG, 1, "failed ! value:-0x%x", -value);
//return MQTT_MBEDTLS_ERR;
}
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_3, P_SIG, 1, " ok (%d skipped)", value);
}
/* Setup Client Cert/Key */
if (context->clientCert != NULL && context->clientPk != NULL)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_4, P_SIG, 0, "STEP 0. start prepare client cert ...");
value = mbedtls_x509_crt_parse(&(ssl->clientCert), (const unsigned char *) context->clientCert, context->clientCertLen);
if (value != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_5, P_SIG, 1, " failed! mbedtls_x509_crt_parse returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_6, P_SIG, 1, "STEP 0. start mbedtls_pk_parse_key[%s]", context->clientPk);
value = mbedtls_pk_parse_key(&ssl->pkContext, (const unsigned char *) context->clientPk, context->clientPkLen, NULL, 0);
if (value != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_7, P_SIG, 1, " failed\n ! mbedtls_pk_parse_key returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
}
/*
* 1. Start the connection
*/
snprintf(port, sizeof(port), "%d", context->port);
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_8, P_SIG, 2, "STEP 1. Connecting to /%s/%s...", host, port);
if (0 != (value = mbedtls_net_connect(&ssl->netContext, host, port, MBEDTLS_NET_PROTO_TCP, LWIP_PS_INVALID_CID)))
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_9, P_SIG, 1, " failed ! mbedtls_net_connect returned -0x%x", -value);
return MQTT_MBEDTLS_ERR;
}
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_10, P_SIG, 0, " ok");
/*
* 2. Setup stuff
*/
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_11, P_SIG, 0, "STEP 2. Setting up the SSL/TLS structure...");
if ((value = mbedtls_ssl_config_defaults(&(ssl->sslConfig), MBEDTLS_SSL_IS_CLIENT, MBEDTLS_SSL_TRANSPORT_STREAM,
MBEDTLS_SSL_PRESET_DEFAULT)) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_12, P_SIG, 1, " failed! mbedtls_ssl_config_defaults returned %d", value);
return MQTT_MBEDTLS_ERR;
}
mbedtls_ssl_conf_max_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
mbedtls_ssl_conf_min_version(&ssl->sslConfig, MBEDTLS_SSL_MAJOR_VERSION_3, MBEDTLS_SSL_MINOR_VERSION_3);
memcpy(&(ssl->crtProfile), ssl->sslConfig.cert_profile, sizeof(mbedtls_x509_crt_profile));
mbedtls_ssl_conf_authmode(&(ssl->sslConfig), authmode);
#if defined(MBEDTLS_SSL_MAX_FRAGMENT_LENGTH)
if ((mbedtls_ssl_conf_max_frag_len(&(ssl->sslConfig), MBEDTLS_SSL_MAX_FRAG_LEN_1024)) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_13, P_SIG, 0, "mbedtls_ssl_conf_max_frag_len returned\r\n");
return MQTT_MBEDTLS_ERR;
}
#endif
#if defined(MBEDTLS_X509_CRT_PARSE_C)
mbedtls_ssl_conf_cert_profile(&ssl->sslConfig, &ssl->crtProfile);
mbedtls_ssl_conf_ca_chain(&(ssl->sslConfig), &(ssl->caCert), NULL);
if(context->clientCert)
{
if ((value = mbedtls_ssl_conf_own_cert(&(ssl->sslConfig), &(ssl->clientCert), &(ssl->pkContext))) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_14, P_SIG, 1, " failed\n ! mbedtls_ssl_conf_own_cert returned %d\n", value);
return MQTT_MBEDTLS_ERR;
}
}
#endif
if(context->ciphersuite[0] != 0xFFFF){
mbedtls_ssl_conf_ciphersuites(&(ssl->sslConfig), (const int *)(context->ciphersuite));
//ECPLAT_PRINTF(UNILOG_MQTT, mqttTls_14_1, P_INFO, "conf ciphersuite 0x%x", context->ciphersuite[0]);
}
mbedtls_ssl_conf_rng(&(ssl->sslConfig), mqttSslRandom, &(ssl->ctrDrbgContext));
mbedtls_ssl_conf_dbg(&(ssl->sslConfig), mqttSslDebug, NULL);
if(context->timeout_r > 0)
{
UINT32 recvTimeout;
recvTimeout = context->timeout_r > MQTT_MAX_TIMEOUT ? MQTT_MAX_TIMEOUT * 1000 : context->timeout_r * 1000;
mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), recvTimeout);
}
if ((value = mbedtls_ssl_setup(&(ssl->sslContext), &(ssl->sslConfig))) != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_15, P_SIG, 1, "failed! mbedtls_ssl_setup returned %d", value);
return MQTT_MBEDTLS_ERR;
}
if(context->hostName != NULL)
{
mbedtls_ssl_set_hostname(&(ssl->sslContext), context->hostName);
//mbedtls_ssl_set_hostname(&(ssl->sslContext), "OneNET MQTTS");
}
else
{
mbedtls_ssl_set_hostname(&(ssl->sslContext), host);
}
mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), mbedtls_net_send, mbedtls_net_recv, mbedtls_net_recv_timeout);
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_16, P_SIG, 0, " ok");
/*
* 3. Handshake
*/
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_17, P_SIG, 0, "STEP 3. Performing the SSL/TLS handshake...");
while ((value = mbedtls_ssl_handshake(&(ssl->sslContext))) != 0)
{
if ((value != MBEDTLS_ERR_SSL_WANT_READ) && (value != MBEDTLS_ERR_SSL_WANT_WRITE))
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_18, P_SIG, 1, "failed ! mbedtls_ssl_handshake returned -0x%x\n", -value);
return MQTT_MBEDTLS_ERR;
}
}
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_19, P_SIG, 0, " ok");
/*
* 4. Verify the server certificate
*/
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_20, P_SIG, 0, "STEP 4. Verifying peer X.509 certificate..");
flag = mbedtls_ssl_get_verify_result(&(ssl->sslContext));
if (flag != 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_21, P_SIG, 0, " failed ! verify result not confirmed.");
return MQTT_MBEDTLS_ERR;
}
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_22, P_SIG, 0, "caCert varification ok");
return MQTT_CONN_OK;
}
//INT32 mqttSslSend(mbedtls_ssl_context* sslContext, const char* buf, UINT16 len)
int mqttSslSend(mqttsClientContext* context, unsigned char* buf, int len)
{
INT32 waitToSend = len;
INT32 hasSend = 0;
do
{
hasSend = mbedtls_ssl_write(&(context->ssl->sslContext), (unsigned char *)(buf + len - waitToSend), waitToSend);
if(hasSend > 0)
{
waitToSend -= hasSend;
}
else if(hasSend == 0)
{
return MQTT_CONN_OK;
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_23, P_SIG, 0, "mqtt_client(ssl): send failed \n");
return MQTT_CONN;
}
}while(waitToSend>0);
return MQTT_CONN_OK;
}
int mqttSslRecv(mqttsClientContext* context, unsigned char* buf, int minLen, int maxLen, int* pReadLen) //0 on success, err code on failure
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_24, P_INFO, 2, "Trying to read between %d and %d bytes", minLen, maxLen);
INT32 readLen = 0;
INT32 ret;
while (readLen < maxLen)
{
mqttsClientSsl *ssl = (mqttsClientSsl *)context->ssl;
if (readLen < minLen)
{
mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), mbedtls_net_send, mbedtls_net_recv, NULL);
ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char *)(buf+readLen), minLen-readLen);
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_30, P_INFO, 1, "mbedtls_ssl_read [blocking] return:0x%x", ret);
if(ret == 0)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_31, P_INFO, 0, "mbedtls_ssl_read [blocking] return 0 connect error");
return MQTT_CONN;
}
}
else
{
mbedtls_ssl_set_bio(&(ssl->sslContext), &(ssl->netContext), mbedtls_net_send, mqttSslNonblockRecv, NULL);
ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char*)(buf+readLen), maxLen-readLen);
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_32, P_INFO, 1, "mbedtls_ssl_read [not blocking] return:0x%x", ret);
if(ret == -1 && errno == EWOULDBLOCK)
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_33, P_INFO, 0, "mbedtls_ssl_read [not blocking] errno == EWOULDBLOCK");
break;
}
}
if(ret == MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY)
{
return MQTT_CLOSED;
}
if (ret > 0)
{
readLen += ret;
}
else if ( ret == 0 )
{
break;
}
else
{
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_34, P_INFO, 1, "Connection error (recv returned %d)", ret);
*pReadLen = readLen;
return MQTT_CONN;
}
}
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_35, P_INFO, 1, "Read %d bytes", readLen);
buf[readLen] = '\0'; // DS makes it easier to see what's new.
*pReadLen = readLen;
return MQTT_CONN_OK;
}
int mqttSslRead(mqttsClientContext* context, unsigned char *buffer, int len, int timeout_ms)
{
UINT32 readLen = 0;
static int net_status = 0;
INT32 ret = -1;
char err_str[33];
mqttsClientSsl *ssl = (mqttsClientSsl *)context->ssl;
mbedtls_ssl_conf_read_timeout(&(ssl->sslConfig), timeout_ms);
while (readLen < len) {
ret = mbedtls_ssl_read(&(ssl->sslContext), (unsigned char *)(buffer + readLen), (len - readLen));
if (ret > 0) {
readLen += ret;
net_status = 0;
} else if (ret == 0) {
/* if ret is 0 and net_status is -2, indicate the connection is closed during last call */
return (net_status == -2) ? net_status : readLen;
} else {
if (MBEDTLS_ERR_SSL_PEER_CLOSE_NOTIFY == ret) {
//mbedtls_strerror(ret, err_str, sizeof(err_str));
printf("ssl recv error: code = -0x%04X, err_str = '%s'\n", -ret, err_str);
net_status = -2; /* connection is closed */
break;
} else if ((MBEDTLS_ERR_SSL_TIMEOUT == ret)
|| (MBEDTLS_ERR_SSL_CONN_EOF == ret)
|| (MBEDTLS_ERR_SSL_SESSION_TICKET_EXPIRED == ret)
|| (MBEDTLS_ERR_SSL_NON_FATAL == ret)) {
/* read already complete */
/* if call mbedtls_ssl_read again, it will return 0 (means EOF) */
return readLen;
} else {
//mbedtls_strerror(ret, err_str, sizeof(err_str));
printf("ssl recv error: code = -0x%04X, err_str = '%s'\n", -ret, err_str);
net_status = -1;
return -1; /* Connection error */
}
}
}
return (readLen > 0) ? readLen : net_status;
}
int mqttSslClose(mqttsClientContext* context)
{
mqttsClientSsl *ssl = (mqttsClientSsl *)context->ssl;
/*context->clientCert = NULL;
context->caCert = NULL;
context->clientPk = NULL; let up level free it*/
if(ssl == NULL)
return MQTT_MBEDTLS_ERR;
mbedtls_ssl_close_notify(&(ssl->sslContext));
mbedtls_net_free(&(ssl->netContext));
mbedtls_x509_crt_free(&(ssl->caCert));
mbedtls_x509_crt_free(&(ssl->clientCert));
mbedtls_pk_free(&(ssl->pkContext));
mbedtls_ssl_free(&(ssl->sslContext));
mbedtls_ssl_config_free(&(ssl->sslConfig));
mbedtls_ctr_drbg_free(&(ssl->ctrDrbgContext));
mbedtls_entropy_free(&(ssl->entropyContext));
free(ssl);
context->ssl = NULL;
//ECOMM_TRACE(UNILOG_MQTT, mqttTls_36, P_INFO, 0, "mqtt tls close ok");
return MQTT_CONN_OK;
}

View File

@@ -0,0 +1,72 @@
#ifndef MQTT_DTLS_H
#define MQTT_DTLS_H
#include "commonTypedef.h"
#include "mbedtls/net.h"
#include "mbedtls/ssl.h"
#include "mbedtls/certs.h"
#include "mbedtls/entropy.h"
#include "mbedtls/ctr_drbg.h"
#define MQTT_MAX_TIMEOUT (10 * 60) //10 min
typedef struct mqttsClientSslTag
{
mbedtls_ssl_context sslContext;
mbedtls_net_context netContext;
mbedtls_ssl_config sslConfig;
mbedtls_entropy_context entropyContext;
mbedtls_ctr_drbg_context ctrDrbgContext;
mbedtls_x509_crt_profile crtProfile;
mbedtls_x509_crt caCert;
mbedtls_x509_crt clientCert;
mbedtls_pk_context pkContext;
}mqttsClientSsl;
typedef struct mqttsClientContextTag
{
int socket;
int timeout_s;
int timeout_r;
int isMqtts;
int method;
UINT16 port;
unsigned int keepAliveInterval;
size_t sendBufSize;
size_t readBufSize;
unsigned char *sendBuf;
unsigned char *readBuf;
mqttsClientSsl * ssl;
char *caCert;
char *clientCert;
char *clientPk;
char *hostName;
char *psk_key;
char *psk_identity;
int caCertLen;
int clientCertLen;
int clientPkLen;
uint8_t seclevel;//0:no verify; 1:verify server; 2:both verify
int32_t ciphersuite[2];//just like 0x0035 TLS_RSA_WITH_AES_256_CBC_SHA,ciphersuite[1] must NULL
uint8_t pdpId;//pdp context id--cid--0 is default
}mqttsClientContext;
int mqttSslConn_old(mqttsClientContext* context, char* host);
int mqttSslSend(mqttsClientContext* context, unsigned char* buf, int len);
int mqttSslRecv(mqttsClientContext* context, unsigned char* buf, int minLen, int maxLen, int* pReadLen);
int mqttSslRead(mqttsClientContext* context, unsigned char *buffer, int len, int timeout_ms);
int mqttSslClose(mqttsClientContext* context);
int mqttSslConn_new(mqttsClientContext* context, char* host);
#endif

View File

@@ -0,0 +1,168 @@
/*******************************************************************************
* Copyright (c) 2014, 2017 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
* Ian Craggs - return codes from linux_read
*******************************************************************************/
#include "MQTTLinux.h"
void TimerInit(Timer* timer)
{
timer->end_time = (struct timeval){0, 0};
}
char TimerIsExpired(Timer* timer)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
return res.tv_sec < 0 || (res.tv_sec == 0 && res.tv_usec <= 0);
}
void TimerCountdownMS(Timer* timer, unsigned int timeout)
{
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout / 1000, (timeout % 1000) * 1000};
timeradd(&now, &interval, &timer->end_time);
}
void TimerCountdown(Timer* timer, unsigned int timeout)
{
struct timeval now;
gettimeofday(&now, NULL);
struct timeval interval = {timeout, 0};
timeradd(&now, &interval, &timer->end_time);
}
int TimerLeftMS(Timer* timer)
{
struct timeval now, res;
gettimeofday(&now, NULL);
timersub(&timer->end_time, &now, &res);
//printf("left %d ms\n", (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000);
return (res.tv_sec < 0) ? 0 : res.tv_sec * 1000 + res.tv_usec / 1000;
}
int linux_read(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
struct timeval interval = {timeout_ms / 1000, (timeout_ms % 1000) * 1000};
if (interval.tv_sec < 0 || (interval.tv_sec == 0 && interval.tv_usec <= 0))
{
interval.tv_sec = 0;
interval.tv_usec = 100;
}
setsockopt(n->my_socket, SOL_SOCKET, SO_RCVTIMEO, (char *)&interval, sizeof(struct timeval));
int bytes = 0;
while (bytes < len)
{
int rc = recv(n->my_socket, &buffer[bytes], (size_t)(len - bytes), 0);
if (rc == -1)
{
if (errno != EAGAIN && errno != EWOULDBLOCK)
bytes = -1;
break;
}
else if (rc == 0)
{
bytes = 0;
break;
}
else
bytes += rc;
}
return bytes;
}
int linux_write(Network* n, unsigned char* buffer, int len, int timeout_ms)
{
struct timeval tv;
tv.tv_sec = 0; /* 30 Secs Timeout */
tv.tv_usec = timeout_ms * 1000; // Not init'ing this can cause strange errors
setsockopt(n->my_socket, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,sizeof(struct timeval));
int rc = write(n->my_socket, buffer, len);
return rc;
}
void NetworkInit(Network* n)
{
n->my_socket = 0;
n->mqttread = linux_read;
n->mqttwrite = linux_write;
}
int NetworkConnect(Network* n, char* addr, int port)
{
int type = SOCK_STREAM;
struct sockaddr_in address;
int rc = -1;
sa_family_t family = AF_INET;
struct addrinfo *result = NULL;
struct addrinfo hints = {0, AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP, 0, NULL, NULL, NULL};
if ((rc = getaddrinfo(addr, NULL, &hints, &result)) == 0)
{
struct addrinfo* res = result;
/* prefer ip4 addresses */
while (res)
{
if (res->ai_family == AF_INET)
{
result = res;
break;
}
res = res->ai_next;
}
if (result->ai_family == AF_INET)
{
address.sin_port = htons(port);
address.sin_family = family = AF_INET;
address.sin_addr = ((struct sockaddr_in*)(result->ai_addr))->sin_addr;
}
else
rc = -1;
freeaddrinfo(result);
}
if (rc == 0)
{
n->my_socket = socket(family, type, 0);
if (n->my_socket != -1)
rc = connect(n->my_socket, (struct sockaddr*)&address, sizeof(address));
else
rc = -1;
}
return rc;
}
void NetworkDisconnect(Network* n)
{
close(n->my_socket);
}

View File

@@ -0,0 +1,74 @@
/*******************************************************************************
* Copyright (c) 2014 IBM Corp.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Allan Stockdill-Mander - initial API and implementation and/or initial documentation
*******************************************************************************/
#if !defined(__MQTT_LINUX_)
#define __MQTT_LINUX_
#if defined(WIN32_DLL) || defined(WIN64_DLL)
#define DLLImport __declspec(dllimport)
#define DLLExport __declspec(dllexport)
#elif defined(LINUX_SO)
#define DLLImport extern
#define DLLExport __attribute__ ((visibility ("default")))
#else
#define DLLImport
#define DLLExport
#endif
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/param.h>
#include <sys/time.h>
#include <sys/select.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
typedef struct Timer
{
struct timeval end_time;
} Timer;
void TimerInit(Timer*);
char TimerIsExpired(Timer*);
void TimerCountdownMS(Timer*, unsigned int);
void TimerCountdown(Timer*, unsigned int);
int TimerLeftMS(Timer*);
typedef struct Network
{
int my_socket;
int (*mqttread) (struct Network*, unsigned char*, int, int);
int (*mqttwrite) (struct Network*, unsigned char*, int, int);
} Network;
int linux_read(Network*, unsigned char*, int, int);
int linux_write(Network*, unsigned char*, int, int);
DLLExport void NetworkInit(Network*);
DLLExport int NetworkConnect(Network*, char*, int);
DLLExport void NetworkDisconnect(Network*);
#endif