2025-04-10 17:31:33 +08:00

1726 lines
64 KiB
C

/****************************************************************************
*
* Copy right: 2017-, Copyrigths of EigenComm Ltd.
* File name: ctw_mqtt.c
* Description: EC618 CTWING http access source file
* History: Rev1.0 2022-2-15
*
****************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "osasys.h"
#include "networkmgr.h"
#include "ps_event_callback.h"
#include "ps_lib_api.h"
#include "slpman.h"
#include DEBUG_LOG_HEADER_FILE
#include "at_util.h"
#include "MQTTClient.h"
#ifdef FEATURE_MQTT_TLS_ENABLE
#include "mqtttls.h"
#endif
//#include "MQTTConnect.h"
#include "ctw_mqtt.h"
#include "at_ctwing_task.h"
#define MQTT_REG_TASK_STACK_SIZE (1024*6)
#define MQTT_REG_RECV_TASK_STACK_SIZE (1024*6)
#define MQTT_REG_SEND_BUF_SIZE (1024)
#define MQTT_REG_READ_BUF_SIZE (1024)
#define MQTT_REG_AUTH_BUF_SIZE (256)
#define MQTT_REG_AUTH_CLIENT_SIZE (64)
#define MQTT_REG_AUTH_USERNAME_SIZE (256)
#define MQTT_REG_AUTH_PWD_SIZE (72)
#define MQTT_REG_KEEPALIVE_RETRY_MAX 3
#define MQTT_CONTTYPE "application/json"
#define CTW_MQTT_FOTA_TOPIC "/ota/upgrade"
#define CTW_MQTT_FOTA_PROGRESS_TOPIC "/ota/progress"
#define CTW_MQTT_FOTA_ACK_TOPIC "/ota/inform"
#define CTW_MQTT_FOTA_URL_MAX_LEN (256)
#define CTW_MQTT_FOTA_ACK_MAX_LEN (300)
#define CTW_MQTT_MACHINESTATE(S) \
((S) == MQTT_INIT_STATE ? "MQTT_INIT_STATE" : \
((S) == MQTT_IPREADY_STATE ? "MQTT_IPREADY_STATE" : \
((S) == MQTT_REG_STATE ? "MQTT_REG_STATE" : \
((S) == MQTT_KEEP_ONLINE_STATE ? "MQTT_KEEP_ONLINE_STATE" : \
"Unknown"))))
static MWNvmCfgCtwMqttParam* pCtwMqttParam = NULL;
static UINT8 ctwMqttSlpHandler = 0xff;
static ctwMqttStateMachine_t ctwMqttStaMachine = MQTT_INIT_STATE;
UINT32 ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_NOT_CREATE;
UINT32 ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_NOT_CREATE;
UINT32 ctwMqttReconnectStatusFlag = 0;
char* pCtwMqttFotaUrl = NULL;
static ctwMqttRegContext_t gCtwMqttContext = {0};
int gCtwMqttKeepaliveRetryTime = 0;
osMessageQueueId_t ctwMqttMsgQueue = NULL;
extern MWNvmCfgCtwMqttParam *ctwGetMqttParam;
#ifdef FEATURE_HTTPC_ENABLE
#include "at_http_task.h"
extern httpAtContext_t gHttpAtCxt;
extern uint16_t ctwMqttReqHandle;
extern void httpClearAtContext(httpAtContext_t* atContext);
extern void httpSleepVote(HTTPSleep_e sleep);
extern CmsRetId httpSendReq();
#endif
void ctwMqttContextConfig(ctwMqttRegContext_t *ctwMqttContext)
{
MQTTPacket_connectData mqttConnectData = MQTTPacket_connectData_initializer;
ctwMqttContext->mqttClient = mallocEc(sizeof(MQTTClient));
memset(ctwMqttContext->mqttClient, 0, sizeof(MQTTClient));
#ifdef FEATURE_MQTT_TLS_ENABLE
ctwMqttContext->mqttsClient = mallocEc(sizeof(mqttsClientContext));
memset(ctwMqttContext->mqttsClient, 0, sizeof(mqttsClientContext));
ctwMqttContext->mqttsClient->timeout_s = 5;
ctwMqttContext->mqttsClient->timeout_r = 20;
#endif
ctwMqttContext->mqttNetwork = mallocEc(sizeof(Network));
memset(ctwMqttContext->mqttNetwork, 0, sizeof(Network));
ctwMqttContext->mqttSendBuf = mallocEc(MQTT_REG_SEND_BUF_SIZE);
memset(ctwMqttContext->mqttSendBuf, 0, MQTT_REG_SEND_BUF_SIZE);
ctwMqttContext->mqttReadBuf = mallocEc(MQTT_REG_READ_BUF_SIZE);
memset(ctwMqttContext->mqttReadBuf, 0, MQTT_REG_READ_BUF_SIZE);
ctwMqttContext->mqttUri = mallocEc(MQTT_REG_URI_MAX_LEN);
memset(ctwMqttContext->mqttUri, 0, MQTT_REG_URI_MAX_LEN);
memcpy(&ctwMqttContext->mqttConnectData, &mqttConnectData, sizeof(mqttConnectData));
//get pCtwMqttParamn for keepAlive
ctwMqttContext->mqttConnectData.keepAliveInterval = pCtwMqttParam->keepAlive;
ctwMqttContext->mqttClient->command_timeout_ms = 3000;
ctwMqttContext->mqttRegParam = mallocEc(sizeof(ctwMqttRegParam_t));
memset(ctwMqttContext->mqttRegParam, 0, sizeof(ctwMqttRegParam_t));
}
void ctwMqttContextDelete(ctwMqttRegContext_t *ctwMqttContext)
{
if(ctwMqttContext->mqttRegParam != NULL)
{
freeEc(ctwMqttContext->mqttRegParam);
ctwMqttContext->mqttRegParam = NULL;
}
if(ctwMqttContext->mqttUri != NULL)
{
freeEc(ctwMqttContext->mqttUri);
ctwMqttContext->mqttUri = NULL;
}
if(ctwMqttContext->mqttSendBuf != NULL)
{
freeEc(ctwMqttContext->mqttSendBuf);
ctwMqttContext->mqttSendBuf = NULL;
}
if(ctwMqttContext->mqttReadBuf != NULL)
{
freeEc(ctwMqttContext->mqttReadBuf);
ctwMqttContext->mqttReadBuf = NULL;
}
if(ctwMqttContext->mqttClient != NULL)
{
freeEc(ctwMqttContext->mqttClient);
ctwMqttContext->mqttClient = NULL;
}
#ifdef FEATURE_MQTT_TLS_ENABLE
if(ctwMqttContext->mqttsClient != NULL)
{
if(ctwMqttContext->mqttsClient->ssl != NULL)
{
freeEc(ctwMqttContext->mqttsClient->ssl);
ctwMqttContext->mqttsClient->ssl = NULL;
}
if(ctwMqttContext->mqttsClient->caCert != NULL)
{
freeEc(ctwMqttContext->mqttsClient->caCert);
ctwMqttContext->mqttsClient->caCert = NULL;
}
if(ctwMqttContext->mqttsClient->clientCert != NULL)
{
freeEc(ctwMqttContext->mqttsClient->clientCert);
ctwMqttContext->mqttsClient->clientCert = NULL;
}
if(ctwMqttContext->mqttsClient->clientPk != NULL)
{
freeEc(ctwMqttContext->mqttsClient->clientPk);
ctwMqttContext->mqttsClient->clientPk = NULL;
}
if(ctwMqttContext->mqttsClient->psk_key != NULL)
{
freeEc(ctwMqttContext->mqttsClient->psk_key);
ctwMqttContext->mqttsClient->psk_key = NULL;
}
if(ctwMqttContext->mqttsClient->psk_identity != NULL)
{
freeEc(ctwMqttContext->mqttsClient->psk_identity);
ctwMqttContext->mqttsClient->psk_identity = NULL;
}
freeEc(ctwMqttContext->mqttsClient);
ctwMqttContext->mqttsClient = NULL;
}
#endif
if(ctwMqttContext->mqttNetwork != NULL)
{
freeEc(ctwMqttContext->mqttNetwork);
ctwMqttContext->mqttNetwork = NULL;
}
if(ctwMqttContext->mqttConnectData.clientID.cstring != NULL)
{
freeEc(ctwMqttContext->mqttConnectData.clientID.cstring);
ctwMqttContext->mqttConnectData.clientID.cstring = NULL;
}
if(ctwMqttContext->mqttConnectData.username.cstring != NULL)
{
freeEc(ctwMqttContext->mqttConnectData.username.cstring);
ctwMqttContext->mqttConnectData.username.cstring = NULL;
}
if(ctwMqttContext->mqttConnectData.password.cstring != NULL)
{
freeEc(ctwMqttContext->mqttConnectData.password.cstring);
ctwMqttContext->mqttConnectData.password.cstring = NULL;
}
ctwMqttContext->port = 0;
}
int ctwMqttClose(ctwMqttRegContext_t* mqttContext)
{
ctwMqttRegContext_t* mqttNewContext = mqttContext;
if(mqttContext->mqttsFlag == 0)
{
mqttNewContext->mqttNetwork->disconnect(mqttNewContext->mqttNetwork);
}
else
{
#ifdef FEATURE_MQTT_TLS_ENABLE
mqttSslClose(mqttContext->mqttsClient);
#endif
}
return SUCCESS;
}
int ctwMqttDisconnect(ctwMqttRegContext_t *mqttContext)
{
int rc = SUCCESS;
Timer timer; // we might wait for incomplete incoming publishes to complete
int len = 0;
TimerInit(&timer);
TimerCountdownMS(&timer, mqttContext->mqttClient->command_timeout_ms);
len = MQTTSerialize_disconnect(mqttContext->mqttClient->buf, mqttContext->mqttClient->buf_size);
if (len > 0)
rc = ctwMqttSendPacket(mqttContext, len, &timer, 0, false); // send the disconnect packet
//mqttCloseSession(c);
return rc;
}
int ctwMqttKeepalive(ctwMqttRegContext_t *mqttContext)
{
int rc = SUCCESS;
if (mqttContext->mqttClient->keepAliveInterval == 0)
goto exit;
if (TimerIsExpired(&mqttContext->mqttClient->last_sent) || TimerIsExpired(&mqttContext->mqttClient->last_received))
{
if (mqttContext->mqttClient->ping_outstanding)
rc = FAILURE; /* PINGRESP not received in keepalive interval */
else
{
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, 1000);
int len = MQTTSerialize_pingreq(mqttContext->mqttClient->buf, mqttContext->mqttClient->buf_size);
if (len > 0 && (rc = ctwMqttSendPacket(mqttContext, len, &timer, 0, false)) == SUCCESS) // send the ping packet
mqttContext->mqttClient->ping_outstanding = 1;
}
}
exit:
return rc;
}
static void ctwMqttNewMessageData(MessageData* md, MQTTString* aTopicName, MQTTMessage* aMessage) {
md->topicName = aTopicName;
md->message = aMessage;
}
static char ctwMqttIsTopicMatched(char* topicFilter, MQTTString* topicName)
{
char* curf = topicFilter;
char* curn = topicName->lenstring.data;
char* curn_end = curn + topicName->lenstring.len;
while (*curf && curn < curn_end)
{
if (*curn == '/' && *curf != '/')
break;
if (*curf != '+' && *curf != '#' && *curf != *curn)
break;
if (*curf == '+')
{ // skip until we meet the next separator, or end of string
char* nextpos = curn + 1;
while (nextpos < curn_end && *nextpos != '/')
nextpos = ++curn + 1;
}
else if (*curf == '#')
curn = curn_end - 1; // skip until end of string
curf++;
curn++;
};
return (curn == curn_end) && (*curf == '\0');
}
int ctwMqttDeliverMessage(MQTTClient* c, MQTTString* topicName, MQTTMessage* message)
{
int i;
int rc = FAILURE;
// we have to find the right message handler - indexed by topic
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
if (c->messageHandlers[i].topicFilter != 0 && (MQTTPacket_equals(topicName, (char*)c->messageHandlers[i].topicFilter) ||
ctwMqttIsTopicMatched((char*)c->messageHandlers[i].topicFilter, topicName)))
{
if (c->messageHandlers[i].fp != NULL)
{
MessageData md;
ctwMqttNewMessageData(&md, topicName, message);
c->messageHandlers[i].fp(&md);
rc = SUCCESS;
}
}
}
if (rc == FAILURE && c->defaultMessageHandler != NULL)
{
MessageData md;
ctwMqttNewMessageData(&md, topicName, message);
c->defaultMessageHandler(&md);
rc = SUCCESS;
}
return rc;
}
void ctwMqttDefaultMessageArrived(MessageData* data)
{
int primSize;
//ctwMqttRegContext_t *mqttContext = &gCtwMqttContext;
ctwMqttMessage mqtt_msg;
int mqtt_topic_len = 0;
int mqtt_payload_len = 0;
char *payloadPtrStartTemp = NULL;
char *payloadPtrEndTemp = NULL;
int payloadLenTemp = 0;
//MWNvmCfgCtwMqttParam ctwMqttParam = {0};
ECPLAT_PRINTF(UNILOG_CTWING, mqtt_task_0, P_SIG, ".........MQTT_messageArrived is:%s", data->message->payload);
memset(&mqtt_msg, 0, sizeof(mqtt_msg));
mqtt_msg.msg_id = data->message->id;
if(data->topicName->lenstring.data != NULL)
{
if(memcmp(data->topicName->lenstring.data, CTW_MQTT_FOTA_TOPIC, strlen(CTW_MQTT_FOTA_TOPIC)) == 0)
{
payloadPtrStartTemp = strstr((char*)data->message->payload, "taskId\":");
if(payloadPtrStartTemp != NULL)
{
gCtwMqttContext.fotaFlag = 1;
}
}
mqtt_topic_len = data->topicName->lenstring.len+1;
mqtt_msg.mqtt_topic = mallocEc(mqtt_topic_len);
memset(mqtt_msg.mqtt_topic, 0, mqtt_topic_len);
memcpy(mqtt_msg.mqtt_topic, data->topicName->lenstring.data, (mqtt_topic_len-1));
mqtt_msg.mqtt_topic_len = data->topicName->lenstring.len;
memset(data->topicName->lenstring.data, 0, (mqtt_topic_len-1));
}
primSize = sizeof(mqtt_msg);
if((gCtwMqttContext.fotaFlag == 1)&&(data->message->payload != NULL))
{
mqtt_msg.fota_cmd = MQTT_FOTA_STAT_START;
mqtt_msg.fota_percent = 1;
mqtt_msg.fota_ret = 1;
//mwNvmCfgGetCtwMqttParam(&ctwMqttParam);
//memcpy(pCtwMqttParam, &ctwMqttParam, sizeof(ctwMqttParam));
//pCtwMqttParam->fotaFlag = 1;
memset(pCtwMqttParam->fotaTaskId, 0, MID_WARE_CTW_MQTT_FOTA_TASK_ID);
memset(pCtwMqttParam->fotaVersion, 0, MID_WARE_CTW_MQTT_FOTA_VER_LEN);
memset(pCtwMqttParam->fotaToken, 0, MID_WARE_CTW_MQTT_FOTA_TOKEN_LEN);
memset(pCtwMqttParam->fotaModule, 0, MID_WARE_CTW_MQTT_FOTA_MODULE_LEN);
payloadPtrStartTemp = strstr((char*)data->message->payload, "taskId\":");
if(payloadPtrStartTemp != NULL)
{
payloadPtrStartTemp = payloadPtrStartTemp + strlen("taskId\":");
payloadPtrEndTemp = strstr(payloadPtrStartTemp, ",\"");
payloadLenTemp = payloadPtrEndTemp - payloadPtrStartTemp;
memcpy(pCtwMqttParam->fotaTaskId, payloadPtrStartTemp, payloadLenTemp);
payloadPtrStartTemp = strstr((char*)data->message->payload, "version\":\"");
payloadPtrStartTemp = payloadPtrStartTemp + strlen("version\":\"");
payloadPtrEndTemp = strstr(payloadPtrStartTemp, "\",");
payloadLenTemp = payloadPtrEndTemp - payloadPtrStartTemp;
memcpy(pCtwMqttParam->fotaVersion, payloadPtrStartTemp, payloadLenTemp);
payloadPtrStartTemp = strstr((char*)data->message->payload, "token\":\"");
payloadPtrStartTemp = payloadPtrStartTemp + strlen("token\":\"");
payloadPtrEndTemp = strstr(payloadPtrStartTemp, "\",");
payloadLenTemp = payloadPtrEndTemp - payloadPtrStartTemp;
memcpy(pCtwMqttParam->fotaToken, payloadPtrStartTemp, payloadLenTemp);
payloadPtrStartTemp = strstr((char*)data->message->payload, "module\":\"");
payloadPtrStartTemp = payloadPtrStartTemp + strlen("module\":\"");
payloadPtrEndTemp = strstr(payloadPtrStartTemp, "\",");
payloadLenTemp = payloadPtrEndTemp - payloadPtrStartTemp;
memcpy(pCtwMqttParam->fotaModule, payloadPtrStartTemp, payloadLenTemp);
pCtwMqttFotaUrl = mallocEc(CTW_MQTT_FOTA_URL_MAX_LEN);
memset(pCtwMqttFotaUrl, 0, CTW_MQTT_FOTA_URL_MAX_LEN);
payloadPtrStartTemp = strstr((char*)data->message->payload, "url\":\"");
payloadPtrStartTemp = payloadPtrStartTemp + strlen("url\":\"");
payloadPtrEndTemp = strstr(payloadPtrStartTemp, "\",");
payloadLenTemp = payloadPtrEndTemp - payloadPtrStartTemp;
memcpy(pCtwMqttFotaUrl, payloadPtrStartTemp, payloadLenTemp);
mwNvmCfgSetAndSaveCtwMqttParam(pCtwMqttParam);
#ifdef FEATURE_HTTPC_ENABLE
httpClearAtContext(&gHttpAtCxt);
gHttpAtCxt.reqhandle = ctwMqttReqHandle;
gHttpAtCxt.isFota = TRUE;
gHttpAtCxt.dlUrcRag = 100;
gHttpAtCxt.method = 0;
httpCreateClientContext(&gHttpAtCxt);
if(gHttpAtCxt.url != NULL)
{
freeEc(gHttpAtCxt.url);
gHttpAtCxt.url = NULL;
}
gHttpAtCxt.url = mallocEc(CTW_MQTT_FOTA_URL_MAX_LEN);
memset(gHttpAtCxt.url, 0, CTW_MQTT_FOTA_URL_MAX_LEN);
snprintf(gHttpAtCxt.url, CTW_MQTT_FOTA_URL_MAX_LEN, "%s&deviceId=%s&accessToken=%s", pCtwMqttFotaUrl, pCtwMqttParam->clientId, pCtwMqttParam->fotaToken);
httpSleepVote(HTTP_DIS_SLEEP);
gHttpAtCxt.reqStat = HTTP_REQ_HANDLE;
httpSendReq();//send fota download request
#endif
}
if(data->message->payload != NULL)
{
mqtt_payload_len = data->message->payloadlen+1;
mqtt_msg.mqtt_payload = mallocEc(mqtt_payload_len);
memset(mqtt_msg.mqtt_payload, 0, mqtt_payload_len);
memcpy(mqtt_msg.mqtt_payload, data->message->payload, (mqtt_payload_len-1));
mqtt_msg.mqtt_payload_len = data->message->payloadlen;
memset(data->message->payload, 0, (mqtt_payload_len-1));
}
applSendCmsInd(BROADCAST_IND_HANDLER, APPL_CTW, APPL_CTW_MQTT_FOTA_IND, primSize, (void *)&mqtt_msg);
}
else
{
applSendCmsInd(BROADCAST_IND_HANDLER, APPL_CTW, APPL_CTW_MQTT_DL_IND, primSize, (void *)&mqtt_msg);
}
}
void ctwMqttClientInit(ctwMqttRegContext_t *mqttContext, Network* network, unsigned char* sendbuf, size_t sendbuf_size, unsigned char* readbuf, size_t readbuf_size)
{
int i;
mqttContext->mqttClient->ipstack = network;
for (i = 0; i < MAX_MESSAGE_HANDLERS; ++i)
{
mqttContext->mqttClient->messageHandlers[i].topicFilter = 0;
}
mqttContext->mqttClient->buf = sendbuf;
mqttContext->mqttClient->buf_size = sendbuf_size;
mqttContext->mqttClient->readbuf = readbuf;
mqttContext->mqttClient->readbuf_size = readbuf_size;
mqttContext->mqttClient->isconnected = 0;
mqttContext->mqttClient->cleansession = 0;
mqttContext->mqttClient->ping_outstanding = 0;
mqttContext->mqttClient->defaultMessageHandler = ctwMqttDefaultMessageArrived;
mqttContext->mqttClient->next_packetid = 1;
mqttContext->mqttClient->keepAliveInterval = pCtwMqttParam->keepAlive;
TimerInit(&mqttContext->mqttClient->last_sent);
TimerInit(&mqttContext->mqttClient->last_received);
#ifdef FEATURE_MQTT_TLS_ENABLE
mqttContext->mqttsClient->sendBuf = sendbuf;
mqttContext->mqttsClient->sendBufSize = sendbuf_size;
mqttContext->mqttsClient->readBuf = readbuf;
mqttContext->mqttsClient->readBufSize = readbuf_size;
#endif
}
static INT32 ctwMqttPSUrcCallback(PsEventID eventID, void *param, UINT32 paramLen)
{
NmAtiNetInfoInd *netif = NULL;
CmiSimImsiStr *imsi = NULL;
switch(eventID)
{
case PS_URC_ID_SIM_READY:
{
imsi = (CmiSimImsiStr *)param;
memcpy(gCtwMqttContext.mqttRegParam->imsi, imsi->contents, imsi->length);
break;
}
case PS_URC_ID_PS_NETINFO:
{
netif = (NmAtiNetInfoInd *)param;
if ((netif->netifInfo.netStatus == NM_NETIF_ACTIVATED)&&((netif->netifInfo.ipType == NM_NET_TYPE_IPV4)||(netif->netifInfo.ipType == NM_NET_TYPE_IPV6)||(netif->netifInfo.ipType == NM_NET_TYPE_IPV4V6)))
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttPSUrcCallback_2_1, P_INFO, "PSIF network active change ctwMqttStaMachine to MQTT_IPREADY_STATE");
ctwMqttStaMachine = MQTT_IPREADY_STATE;
}
else if (netif->netifInfo.netStatus == NM_NETIF_OOS)
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttPSUrcCallback_2_2, P_INFO, "PSIF network OOS");
}
else if (netif->netifInfo.netStatus == NM_NO_NETIF_OR_DEACTIVATED ||
netif->netifInfo.netStatus == NM_NO_NETIF_NOT_DIAL)
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttPSUrcCallback_2_3, P_INFO, "PSIF network deactive");
}
break;
}
default:
break;
}
return 0;
}
void ctwMqttCheckIfNeedExit(UINT8 forceExit)
{
UINT8 cfun = 0xff;
UINT8 i = 0;
if(forceExit == 1)
{
cfun = 0;
}
else
{
appGetCFUN(&cfun);
}
if(cfun == 0)
{
if(ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_CREATE)
{
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_NOT_CREATE;
}
for(i=0; i<20; i++)
{
if((ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_DELETE) || (forceExit == 1))
{
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_NOT_CREATE;
if((ctwMqttStatusFlag == MQTT_CTW_SEND_TASK_CREATE)||(forceExit == 1))
{
ctwMqttContextDelete(&gCtwMqttContext);
deregisterPSEventCallback((psEventCallback_t)ctwMqttPSUrcCallback);
slpManPlatVoteEnableSleep(ctwMqttSlpHandler, SLP_SLP2_STATE);
ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_NOT_CREATE;
if(pCtwMqttParam != NULL)
{
freeEc(pCtwMqttParam);
pCtwMqttParam = NULL;
ctwGetMqttParam = NULL;
}
if(ctwMqttMsgQueue != NULL)
{
osMessageQueueDelete(ctwMqttMsgQueue);
ctwMqttMsgQueue = NULL;
}
osThreadExit();
}
}
osDelay(500);
}
}
}
static void ctwMqttGetInvariPara()
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttGetInvariPara_1, P_INFO, "get imei and iccid");
OsaGetImeiNumSync(gCtwMqttContext.mqttRegParam->imei);
appGetIccidNumSync(gCtwMqttContext.mqttRegParam->iccid);
}
static void ctwMqttGetVariPara()
{
UeExtStatusInfo statusInfo;
CmsRetId ret = CMS_RET_SUCC;
ret = appGetUeExtStatusInfoSync(UE_EXT_STATUS_PHY, &statusInfo);
if(ret == CMS_RET_SUCC)
{
gCtwMqttContext.mqttRegParam->phyCellId = statusInfo.phyStatus.phyCellId;
gCtwMqttContext.mqttRegParam->rsrp = statusInfo.phyStatus.sRsrp/100;
gCtwMqttContext.mqttRegParam->snr = statusInfo.phyStatus.snr;
gCtwMqttContext.mqttRegParam->txPower = statusInfo.phyStatus.txPower;
}
//ctwMqttCalcSign(gCtwMqttContext.mqttRegParam->signature, &(gCtwMqttContext.mqttRegParam->timestamp), gCtwMqttContext.mqttRegParam->masterKey);
}
void ctwMqttCalcSign(char* outSign, uint32_t* outTimestamp, char* masterKey)
{
utc_timer_value_t* timeUtc = NULL;
char input[55] = {0};
uint8_t output[32] = {0};
timeUtc = OsaSystemTimeReadUtc();
if(timeUtc != NULL){
*outTimestamp = timeUtc->UTCsecs;
}
snprintf(input,55,"eigencomm%s%d000", masterKey, *outTimestamp);
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttCalcSign_1, P_INFO, "input str=%s", input);
atSha256((unsigned char *)input, 54, output);
cmsHexToLowHexStr(outSign, 65, output, 32);
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttCalcSign_2, P_INFO, "signature=%s", outSign);
}
int ctwMqttDecodePacket(ctwMqttRegContext_t *mqttContext, int* value, int timeout)
{
unsigned char i;
int multiplier = 1;
int len = 0;
const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
*value = 0;
do
{
int rc = MQTTPACKET_READ_ERROR;
if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
{
rc = MQTTPACKET_READ_ERROR; /* bad data */
goto exit;
}
#ifdef FEATURE_MQTT_TLS_ENABLE
if(mqttContext->mqttsFlag == 0)
{
rc = mqttContext->mqttClient->ipstack->mqttread(mqttContext->mqttClient->ipstack, &i, 1, timeout);
}
else
{
rc = mqttSslRead(mqttContext->mqttsClient, &i, 1, timeout);
}
#else
rc = mqttContext->mqttClient->ipstack->mqttread(mqttContext->mqttClient->ipstack, &i, 1, timeout);
#endif
if (rc != 1)
goto exit;
*value += (i & 127) * multiplier;
multiplier *= 128;
} while ((i & 128) != 0);
exit:
return len;
}
int ctwMqttReadPacket(ctwMqttRegContext_t *mqttContext, Timer* timer)
{
MQTTHeader header = {0};
int len = 0;
int rem_len = 0;
int rc = 0;
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_read, P_INFO, 0, "...mqttReadPacket..");
if(mqttContext->mqttClient == NULL)
{
rc = -2;
goto exit;
}
#ifdef FEATURE_MQTT_TLS_ENABLE
if(mqttContext->mqttsFlag != 0)
{
if(mqttContext->mqttsClient == NULL)
{
rc = -2;
goto exit;
}
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls0, P_INFO, 0, "...mqttReadPacket..0.");
/* 1. read the header byte. This has the packet type in it */
rc = mqttSslRead(mqttContext->mqttsClient, mqttContext->mqttsClient->readBuf, 1, TimerLeftMS(timer));
if (rc != 1)
{
goto exit;
}
len = 1;
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls1, P_INFO, 0, "...mqttReadPacket..1.");
/* 2. read the remaining length. This is variable in itself */
ctwMqttDecodePacket(mqttContext, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(mqttContext->mqttsClient->readBuf + 1, rem_len); /* put the original remaining length back into the buffer */
if (rem_len > (mqttContext->mqttsClient->readBufSize - len))
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls202, P_INFO, 0, "...mqttReadPacket..202.");
rc = BUFFER_OVERFLOW;
goto exit;
}
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls2, P_INFO, 0, "...mqttReadPacket..2.");
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (mqttSslRead(mqttContext->mqttsClient, mqttContext->mqttsClient->readBuf + len, rem_len, TimerLeftMS(timer)) != rem_len))
{
rc = 0;
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls200, P_INFO, 0, "...mqttReadPacket..200.");
goto exit;
}
header.byte = mqttContext->mqttsClient->readBuf[0];
rc = header.bits.type;
if (mqttContext->mqttClient->keepAliveInterval > 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls201, P_INFO, 0, "...mqttReadPacket..201.");
TimerCountdown(&mqttContext->mqttClient->last_received, mqttContext->mqttClient->keepAliveInterval); // record the fact that we have successfully received a packet
}
}
else
#endif
{
/* 1. read the header byte. This has the packet type in it */
rc = mqttContext->mqttClient->ipstack->mqttread(mqttContext->mqttClient->ipstack, mqttContext->mqttClient->readbuf, 1, TimerLeftMS(timer));
if (rc != 1)
goto exit;
len = 1;
/* 2. read the remaining length. This is variable in itself */
ctwMqttDecodePacket(mqttContext, &rem_len, TimerLeftMS(timer));
len += MQTTPacket_encode(mqttContext->mqttClient->readbuf + 1, rem_len); /* put the original remaining length back into the buffer */
if (rem_len > (mqttContext->mqttClient->readbuf_size - len))
{
rc = BUFFER_OVERFLOW;
goto exit;
}
/* 3. read the rest of the buffer using a callback to supply the rest of the data */
if (rem_len > 0 && (mqttContext->mqttClient->ipstack->mqttread(mqttContext->mqttClient->ipstack, mqttContext->mqttClient->readbuf + len, rem_len, TimerLeftMS(timer)) != rem_len)) {
rc = 0;
goto exit;
}
header.byte = mqttContext->mqttClient->readbuf[0];
rc = header.bits.type;
if (mqttContext->mqttClient->keepAliveInterval > 0)
{
TimerCountdown(&mqttContext->mqttClient->last_received, mqttContext->mqttClient->keepAliveInterval); // record the fact that we have successfully received a packet
}
}
exit:
return rc;
}
int ctwMqttSendPacket(ctwMqttRegContext_t *mqttContext, int length, Timer* timer, int rai, bool exceptdata)
{
int rc = FAILURE,
sent = 0;
ECOMM_TRACE(UNILOG_CTWING, ctwMqttSendPacket0, P_INFO, 0, "...mqttSendPacket..len=%d.", length);
#ifdef FEATURE_MQTT_TLS_ENABLE
if(mqttContext->mqttsFlag != 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls3, P_INFO, 0, "...mqttSendPacket..0.");
//mqttContext->mqttsClient->ssl->sslContext.rai = rai;
rc = mqttSslSend(mqttContext->mqttsClient, &mqttContext->mqttsClient->sendBuf[sent], length);
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_tls4, P_INFO, 1, "...mqttSendPacket..=%d.", rc);
TimerCountdown(&mqttContext->mqttClient->last_sent, (mqttContext->mqttClient->keepAliveInterval-2)); // record the fact that we have successfully sent the packet
return rc;
}
else
#endif
{
while (sent < length && !TimerIsExpired(timer))
{
#ifdef MQTT_RAI_OPTIMIZE
rc = mqttContext->mqttClient->ipstack->mqttwrite(mqttContext->mqttClient->ipstack, &mqttContext->mqttClient->buf[sent], length, TimerLeftMS(timer), rai, exceptdata);
#else
rc = mqttContext->mqttClient->ipstack->mqttwrite(mqttContext->mqttClient->ipstack, &mqttContext->mqttClient->buf[sent], length, TimerLeftMS(timer));
#endif
if (rc < 0) // there was an error writing the data
break;
sent += rc;
}
if (sent == length)
{
TimerCountdown(&mqttContext->mqttClient->last_sent, (mqttContext->mqttClient->keepAliveInterval-2)); // record the fact that we have successfully sent the packet
rc = SUCCESS;
}
else
{
rc = FAILURE;
}
ECOMM_TRACE(UNILOG_CTWING, ctwMqttSendPacket1, P_INFO, 0, "...mqttSendPacket..rc=%d.", rc);
return rc;
}
}
int ctwMqttPublish(ctwMqttRegContext_t *mqttContext, int msgID, const char* topicName, MQTTMessage* message, int rai, bool exceptdata)
{
int rc = FAILURE;
Timer timer;
MQTTString topic = MQTTString_initializer;
topic.cstring = (char *)topicName;
int len = 0;
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_88, P_INFO, 1, "..mqttPublish.len.%d",message->payloadlen);
TimerInit(&timer);
TimerCountdownMS(&timer, mqttContext->mqttClient->command_timeout_ms);
if (message->qos == QOS1 || message->qos == QOS2)
//message->id = mqttGetNextPacketId(c);
message->id = msgID;
len = MQTTSerialize_publish(mqttContext->mqttClient->buf, mqttContext->mqttClient->buf_size, 0, message->qos, message->retained, message->id,
topic, (unsigned char*)message->payload, message->payloadlen);
if (len <= 0)
goto exit;
if ((rc = ctwMqttSendPacket(mqttContext, len, &timer, rai, exceptdata)) != SUCCESS) // send the subscribe packet
goto exit; // there was a problem
exit:
if (rc == FAILURE)
;//MQTTCloseSession(c);
return rc;
}
int ctwMqttConnectWithResults(ctwMqttRegContext_t *mqttContext, MQTTPacket_connectData* options, MQTTConnackData* data)
{
Timer connect_timer;
int rc = FAILURE;
MQTTPacket_connectData default_options = MQTTPacket_connectData_initializer;
int len = 0;
TimerInit(&connect_timer);
TimerCountdownMS(&connect_timer, mqttContext->mqttClient->command_timeout_ms);
if (options == 0)
options = &default_options; /* set default options if none were supplied */
mqttContext->mqttClient->keepAliveInterval = options->keepAliveInterval;
mqttContext->mqttClient->cleansession = options->cleansession;
TimerCountdown(&mqttContext->mqttClient->last_received, mqttContext->mqttClient->keepAliveInterval);
if ((len = MQTTSerialize_connect(mqttContext->mqttClient->buf, mqttContext->mqttClient->buf_size, options)) <= 0)
goto exit;
if ((rc = ctwMqttSendPacket(mqttContext, len, &connect_timer, 0, false)) != SUCCESS) // send the connect packet
goto exit; // there was a problem
exit:
if (rc == SUCCESS)
{
mqttContext->mqttClient->isconnected = 1;
mqttContext->mqttClient->ping_outstanding = 0;
}
return rc;
}
int ctwMqttConnect(ctwMqttRegContext_t *mqttContext, MQTTPacket_connectData* options)
{
MQTTConnackData data;
return ctwMqttConnectWithResults(mqttContext, options, &data);
}
int ctwMqttReconnectTcp(void *context)
{
int rc = -1;
ctwMqttRegContext_t *mqttContext = (ctwMqttRegContext_t *)context;
int socket_stat = -1;
int socket_err = -1;
socket_stat = sock_get_errno(mqttContext->mqttClient->ipstack->my_socket);
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_89, P_INFO, 1, "...get socket stat..socket_stat=%d..", socket_stat);
{
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_90, P_INFO, 1, "...start tcp disconnect ..");
rc = mqttContext->mqttClient->ipstack->disconnect(mqttContext->mqttClient->ipstack);
if(rc != 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_91, P_INFO, 1, "...tcp disconnect fail!!!.rc=%d..", rc);
}
}
{
socket_err = socket_error_is_fatal(socket_stat);
//if(socket_err == 1)
{
//ec_socket_delete_flag = 0xff;
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_92, P_INFO, 1, "...tcp disconnect ok!!!....%d.",socket_err);
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_93, P_INFO, 1, "...start tcp connect ...");
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_95, P_INFO, 1, "...tcp connect ok...");
mqttContext->mqttClient->isconnected = 0;
if(mqttContext->mqttsFlag == 0)
{
if((rc = NetworkConnectTimeout(mqttContext->mqttClient->ipstack, mqttContext->mqttUri, mqttContext->port, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) == 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_96, P_INFO, 1, "...start mqtt connect ..");
}
else
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_97, P_INFO, 1, "...tcp reconnect fail!!!...\r\n");
}
}
else
{
#ifdef FEATURE_MQTT_TLS_ENABLE
mqttContext->mqttsClient->isMqtts = 1;
mqttContext->mqttsClient->ciphersuite[0] = 0xFFFF;
if((rc = mqttSslConn_new(mqttContext->mqttsClient, mqttContext->mqttUri)) == 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_98, P_INFO, 1, "...start mqtt connect ..");
}
#endif
}
}
}
}
}
return rc;
}
int ctwMqttReconnect(void *context)
{
int rc = -1;
ctwMqttRegContext_t *mqttContext = (ctwMqttRegContext_t *)context;
int socket_stat = -1;
socket_stat = sock_get_errno(mqttContext->mqttClient->ipstack->my_socket);
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_1, P_INFO, 1, "...get socket stat..socket_stat=%d..", socket_stat);
{
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_2, P_INFO, 1, "...start tcp disconnect ..");
rc = mqttContext->mqttClient->ipstack->disconnect(mqttContext->mqttClient->ipstack);
if(rc != 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_3, P_INFO, 1, "...tcp disconnect fail!!!.rc=%d..", rc);
}
}
//vTaskDelay(10000);
{
//socket_stat = sock_get_errno(c->ipstack->my_socket);
//ECOMM_TRACE(UNILOG_CTWING, mqtt_task_10, P_INFO, 1, "...get socket stat..socket_stat=%d..", socket_stat);
//socket_err = socket_error_is_fatal(socket_stat);
//if(socket_err == 1)
{
//ec_socket_delete_flag = 0xff;
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_11, P_INFO, 1, "...tcp disconnect ok!!!...");
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_12, P_INFO, 1, "...start tcp connect ...");
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_14, P_INFO, 1, "...tcp connect ok...");
mqttContext->mqttClient->isconnected = 0;
if((rc = NetworkConnectTimeout(mqttContext->mqttClient->ipstack, mqttContext->mqttUri, mqttContext->port, MQTT_SEND_TIMEOUT, MQTT_RECV_TIMEOUT)) < 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_15, P_INFO, 1, "...tcp reconnect fail!!!...\r\n");
}
else
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_16, P_INFO, 1, "...start mqtt connect ..");
if ((rc = ctwMqttConnect(mqttContext, &mqttContext->mqttConnectData)) != 0)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_17, P_INFO, 1, "...mqtt reconnect fial!!!...");
}
else
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_18, P_INFO, 1, "...mqtt reconnect ok!!!...");
}
}
}
}
}
}
return rc;
}
int ctwMqttConnectClient(ctwMqttRegContext_t *mqttContext)
{
int ret = 0;
ctwMqttRegContext_t *mqttNewContext = mqttContext;
if((ret = ctwMqttConnect(mqttNewContext, &mqttNewContext->mqttConnectData)) != 0)
{
ctwMqttReconnectTcp(mqttNewContext);
if((ret = ctwMqttConnect(mqttNewContext, &mqttNewContext->mqttConnectData)) != 0)
{
mqttNewContext->mqttClient->ping_outstanding = 1;
mqttNewContext->mqttNetwork->disconnect(mqttNewContext->mqttNetwork);
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_28, P_INFO, 1, "...mqtt connect fail!!!...\r\n");
return 1;
}
else
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_29hh, P_INFO, 1, "...mqtt connect ok!!!...\r\n");
ret = 0;
}
}
else
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_task_29, P_INFO, 1, "...mqtt connect ok!!!...\r\n");
ret = 0;
}
return ret;
}
int ctwMqttKeepaliveCheck(ctwMqttRegContext_t* context)
{
int rc = 0;
//check if curr ticks > last_sent.xTimeOut.xTimeOnEntering + last_sent.xTicksToWait
//check if curr ticks > last_received.xTimeOut.xTimeOnEntering + last_received.xTicksToWait
if (TimerIsExpired(&context->mqttClient->last_sent) || TimerIsExpired(&context->mqttClient->last_received))
{
if (context->mqttClient->ping_outstanding)
{
gCtwMqttKeepaliveRetryTime++;
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_32, P_ERROR, 1, "...mqtt keep alive ping resp timeout...");
rc = FAILURE; /* PINGRESP not received in keepalive interval */
}
else
{
rc = 1;
}
}
return rc;
}
int ctwMqttCycle(ctwMqttRegContext_t* context, Timer* timer)
{
int rc = 0;
MQTTSubackData data;
MQTTConnackData connData;
unsigned short mypacketid;
unsigned char dup, type;
int count = 0;
int socket_stat = -1;
int socket_err = -1;
int mqttQos = 0;
ctwCmdMsg_t ctwMsg;
int packet_type = ctwMqttReadPacket(context, timer); /* read the socket, see what work is due */
data.grantedQoS = QOS0;
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_33, P_INFO, 1, "...mqtt readPacket packet_type=%d...",packet_type);
switch (packet_type)
{
case -2:
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_NOT_CREATE;
goto exit;
case -1:
case 0: /* timed out reading packet */
default:
{
/* no more data to read, unrecoverable. Or read packet fails due to unexpected network error */
socket_stat = sock_get_errno(context->mqttClient->ipstack->my_socket);
socket_err = socket_error_is_fatal(socket_stat);
if(socket_err == 1)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_34, P_INFO, 1, ".....now, need reconnect mqtt server, when read packet...err=%d....",socket_stat);
{
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_RECONNECT_COMMAND;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_RECONNECT;
}
rc = 0;
}
}
break;
case CONNACK:
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_101, P_INFO, 0, ".....now, recv connack.......");
if (MQTTDeserialize_connack(&connData.sessionPresent, &connData.rc, context->mqttClient->readbuf, context->mqttClient->readbuf_size) != 1)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_102hh, P_INFO, 0, ".....send mqtt conn packet fail.......");
rc = FAILURE;
}
/* send result to at task */
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_102, P_INFO, 0, ".....send mqtt conn packet ok.......");
}
break;
case PUBACK:
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_35, P_INFO, 0, ".....now, recv puback.......");
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, context->mqttClient->readbuf, context->mqttClient->readbuf_size) != 1)
{
rc = FAILURE;
}
break;
case SUBACK:
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_37, P_INFO, 0, ".....now, recv suback.......");
mqttQos = (int)data.grantedQoS;
if (MQTTDeserialize_suback(&mypacketid, 1, &count, (int*)&mqttQos, context->mqttClient->readbuf, context->mqttClient->readbuf_size) == 1)
{
if (data.grantedQoS != 0x80)
{
//rc = MQTTSetMessageHandler(context->mqttClient, context->sub_topic, context->mqttClient->defaultMessageHandler);
}
}
else
{
rc = FAILURE;
}
break;
case UNSUBACK:
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_39, P_INFO, 0, ".....now, recv unsuback.......");
// should be the same as the packetid above
if (MQTTDeserialize_unsuback(&mypacketid, context->mqttClient->readbuf, context->mqttClient->readbuf_size) == 1)
{
/* remove the subscription message handler associated with this topic, if there is one */
//MQTTSetMessageHandler(context->mqttClient, context->unsub_topic, NULL);
}
else
{
rc = FAILURE;
}
break;
case PUBLISH:
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_40, P_INFO, 0, ".....now, recv publish.......");
MQTTString topicName;
MQTTMessage msg;
int intQoS;
msg.payloadlen = 0; /* this is a size_t, but deserialize publish sets this as int */
memset(&topicName, 0, sizeof(topicName));
if (MQTTDeserialize_publish(&msg.dup, &intQoS, &msg.retained, &msg.id, &topicName,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, context->mqttClient->readbuf, context->mqttClient->readbuf_size) != 1)
goto exit;
msg.qos = (enum QoS)intQoS;
if (msg.qos == QOS1)
{
/* send pub msg to mqtt send task*/
ctwMqttDeliverMessage(context->mqttClient, &topicName, &msg);
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_PUBLISH_ACK;
ctwMsg.ackMode = PUBACK;
ctwMsg.Qos = QOS1;
ctwMsg.msgId = msg.id;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
rc = 0;
}
else if (msg.qos == QOS2)
{
/* send pub msg to mqtt send task*/
ctwMqttDeliverMessage(context->mqttClient, &topicName, &msg);
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_PUBLISH_REC;
ctwMsg.ackMode = PUBREC;
ctwMsg.Qos = QOS2;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
rc = 0;
}
else
{
ctwMqttDeliverMessage(context->mqttClient, &topicName, &msg);
rc = 0;
}
break;
}
case PUBREC:
case PUBREL:
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_41, P_INFO, 0, ".....now, recv pubrec or pubrel.......");
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, context->mqttClient->readbuf, context->mqttClient->readbuf_size) != 1)
{
rc = FAILURE;
}
else
{
/* send msg to mqtt send task*/
if(PUBREC == packet_type)
{
}
else if(PUBREL == packet_type)
{
}
rc = 0;
}
if (rc == FAILURE)
{
goto exit; // there was a problem
}
break;
}
case PUBCOMP:
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_42, P_INFO, 0, ".....now, recv pubcomp.......");
if (MQTTDeserialize_ack(&type, &dup, &mypacketid, context->mqttClient->readbuf, context->mqttClient->readbuf_size) != 1)
{
rc = FAILURE;
}
break;
case PINGRESP:
{
context->mqttClient->ping_outstanding = 0;
gCtwMqttKeepaliveRetryTime = 0;
/* delete keepalive timer */
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_44, P_INFO, 1, ".....mqtt recv keeplive ack ok.......");
break;
}
}
if(socket_err == 1)
{
rc = -1;
socket_err = 0;
}
else
{
rc = ctwMqttKeepaliveCheck(context);
}
if(rc == 0)
{
/* not need send keepalive packet */
}
else if(rc == 1)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_45, P_INFO, 0, ".....now, need keepalive.......");
/* need send keepalive packet, send keepalive msg to send task */
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_KEEPALIVE_COMMAND;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
rc = 0;
}
else if(rc == FAILURE)
{
/* keepalive fail */
int socket_stat = 0;
socket_stat = sock_get_errno(context->mqttClient->ipstack->my_socket);
if((socket_stat == MQTT_ERR_ABRT)||(socket_stat == MQTT_ERR_RST)||(socket_stat == MQTT_ERR_CLSD)||(socket_stat == MQTT_ERR_BADE))
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_46, P_INFO, 0, ".....err, now, need reconnect.......");
/* send reconnect msg to send task */
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_RECONNECT_COMMAND;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_RECONNECT;
rc = 0;
}
else
{
if(gCtwMqttKeepaliveRetryTime > MQTT_REG_KEEPALIVE_RETRY_MAX)
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_47, P_INFO, 0, ".....now, need reconnect.......");
/* send reconnect msg to send task */
gCtwMqttKeepaliveRetryTime = 0;
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_RECONNECT_COMMAND;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_RECONNECT;
rc = 0;
}
else
{
ECOMM_TRACE(UNILOG_CTWING, mqtt_msg_48, P_INFO, 0, ".....return err, now, need keepalive.......");
/* send keepalive msg to send task */
MQTTSerialize_pingreq(context->mqttClient->buf, context->mqttClient->buf_size);
memset(&ctwMsg, 0, sizeof(ctwCmdMsg_t));
ctwMsg.cmd_type = CTW_MQTT_KEEPALIVE_COMMAND;
osMessageQueuePut(ctwMqttMsgQueue, &ctwMsg, 0, 1000);
rc = 0;
}
}
}
exit:
if (rc == 0)
{
rc = packet_type;
}
return rc;
}
int ctwMqttYield(ctwMqttRegContext_t* context, int timeout_ms)
{
int rc = 0;
Timer timer;
TimerInit(&timer);
TimerCountdownMS(&timer, timeout_ms);
do
{
if (ctwMqttCycle(context, &timer) < 0)
{
rc = FAILURE;
break;
}
} while (!TimerIsExpired(&timer));
return rc;
}
uint8_t ctwMqttAuthReg(ctwMqttRegContext_t* pMqttClient, MWNvmCfgCtwMqttParam* pCtwMqttParam, ctwMqttRegParam_t* pRegParam)
{
UINT32 result = MQTT_CTW_OK;
CHAR rsrp[32] = {0};
CHAR snr[32] = {0};
CHAR txPower[32] = {0};
CHAR phyCellId[32] = {0};
MWNvmCfgCtwMqttParam ctwMqttParam = {0};
mwNvmCfgGetCtwMqttParam(&ctwMqttParam);
MWNvmCfgCtwParamCfg ctwParamCfg = {0};
mwNvmCfgGetCtwParamConfig(&ctwParamCfg);
memset(gCtwMqttContext.mqttSendBuf, 0, MQTT_REG_SEND_BUF_SIZE);
if(gCtwMqttContext.mqttConnectData.clientID.cstring == NULL)
{
gCtwMqttContext.mqttConnectData.clientID.cstring = mallocEc(MQTT_REG_AUTH_CLIENT_SIZE);
}
if(gCtwMqttContext.mqttConnectData.username.cstring == NULL)
{
gCtwMqttContext.mqttConnectData.username.cstring = mallocEc(MQTT_REG_AUTH_USERNAME_SIZE);
}
if(gCtwMqttContext.mqttConnectData.password.cstring == NULL)
{
gCtwMqttContext.mqttConnectData.password.cstring = mallocEc(MQTT_REG_AUTH_PWD_SIZE);
}
memset(gCtwMqttContext.mqttConnectData.clientID.cstring, 0, MQTT_REG_AUTH_CLIENT_SIZE);
memset(gCtwMqttContext.mqttConnectData.username.cstring, 0, MQTT_REG_AUTH_USERNAME_SIZE);
memset(gCtwMqttContext.mqttConnectData.password.cstring, 0, MQTT_REG_AUTH_PWD_SIZE);
memcpy(gCtwMqttContext.mqttConnectData.clientID.cstring, ctwMqttParam.clientId, strlen(ctwMqttParam.clientId));
memcpy(gCtwMqttContext.mqttConnectData.password.cstring, ctwMqttParam.password, strlen(ctwMqttParam.password));
atDataToDecString((UINT8 *)rsrp, 32, (INT64)(pRegParam->rsrp));
atDataToDecString((UINT8 *)snr, 32, (INT64)(pRegParam->snr));
atDataToDecString((UINT8 *)txPower, 32, (INT64)(pRegParam->txPower));
atDataToDecString((UINT8 *)phyCellId, 32, (INT64)(pRegParam->phyCellId));
snprintf(gCtwMqttContext.mqttConnectData.username.cstring, MQTT_REG_AUTH_USERNAME_SIZE,
"%02d%s%02d%s%02d%s%02d%s%02d%s%02d%s%02d%s%02d%s%02d%s%02d%s",
strlen(ctwParamCfg.module),ctwParamCfg.module, strlen(ctwParamCfg.chipType),ctwParamCfg.chipType, strlen(ctwParamCfg.softVersion),ctwParamCfg.softVersion,
strlen(pRegParam->imei),pRegParam->imei, strlen(pRegParam->iccid),pRegParam->iccid, strlen(pRegParam->imsi),pRegParam->imsi, strlen(rsrp),rsrp,
strlen(snr),snr, strlen(txPower),txPower, strlen(phyCellId),phyCellId);
ctwMqttConnectClient(&gCtwMqttContext);
return result;
}
#define MQTT_REG_SEND_RECVTASK_START
void ctwMqttRegisterRecvTask(void *argument)
{
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_CREATE;
while(1)
{
ctwMqttYield(&gCtwMqttContext, 2000);
if(ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_NOT_CREATE)
{
break;
}
if(ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_RECONNECT)
{
ctwMqttReconnectStatusFlag = 0x1;
break;
}
osDelay(100);
}
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_DELETE;
osThreadExit();
}
void ctwMqttStartRegisterRecvTask(void)
{
osThreadAttr_t task_attr;
memset(&task_attr, 0, sizeof(task_attr));
task_attr.name = "mtCtwRecv";
task_attr.stack_size = MQTT_REG_RECV_TASK_STACK_SIZE;
task_attr.priority = osPriorityNormal;
#ifdef TYPE_EC718M
task_attr.reserved = osThreadDynamicStackAlloc;
#endif
osThreadNew(ctwMqttRegisterRecvTask, NULL,&task_attr);
}
static void ctwMqttRegisterSendTask(void *arg)
{
UINT32 result = 0xff;
UINT32 ret = 0xff;
UINT32 retry = 0;
ctwCmdMsg_t msg;
//ctwCnfCmdMsg_t cnfMsg;
//int primSize = sizeof(cnfMsg);
char logbuf[64] = {0};
int len = 0;
int i = 0;
Timer timer;
NmAtiNetifInfo netInfo;
ctwMqttContextConfig(&gCtwMqttContext);
gCtwMqttContext.mqttsFlag = pCtwMqttParam->mqttsFlag;
if(ctwMqttMsgQueue == NULL)
{
ctwMqttMsgQueue = osMessageQueueNew(16, sizeof(ctwCmdMsg_t), NULL);
}
ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_CREATE;
ctwMqttStaMachine = MQTT_INIT_STATE;
while (ctwMqttStatusFlag)
{
snprintf(logbuf,64,"%s",CTW_MQTT_MACHINESTATE(ctwMqttStaMachine));
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttRegisterTask_0, P_INFO, "handle ctwMqttStaMachine:%s", (uint8_t *)logbuf);
switch(ctwMqttStaMachine)
{
case MQTT_INIT_STATE:
osDelay(500/portTICK_PERIOD_MS);
if(CTW_MQTT_RECONNECT_COMMAND != msg.cmd_type)
{
osMessageQueueReset(ctwMqttMsgQueue);
}
ctwMqttCheckIfNeedExit(0);
appGetNetInfoSync(0, &netInfo);
if((netInfo.netStatus == NM_NETIF_ACTIVATED)&&((netInfo.ipType == NM_NET_TYPE_IPV4)||(netInfo.ipType == NM_NET_TYPE_IPV6)||(netInfo.ipType == NM_NET_TYPE_IPV4V6)))
{
ctwMqttStaMachine = MQTT_IPREADY_STATE;
}
break;
case MQTT_IPREADY_STATE:
ctwMqttGetInvariPara();
ctwMqttCheckIfNeedExit(0);
ctwMqttStaMachine = MQTT_REG_STATE;
break;
case MQTT_REG_STATE: //to regisater
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttRegisterTask_1, P_INFO, "read variable parameters and send register mqtt request");
ctwMqttGetVariPara();
NetworkInit(gCtwMqttContext.mqttNetwork);
ctwMqttClientInit(&gCtwMqttContext, gCtwMqttContext.mqttNetwork, (unsigned char *)gCtwMqttContext.mqttSendBuf, MQTT_REG_SEND_BUF_SIZE, (unsigned char *)gCtwMqttContext.mqttReadBuf, MQTT_REG_READ_BUF_SIZE);
memcpy(gCtwMqttContext.mqttRegParam->host, gCtwMqttContext.mqttUri, MQTT_REG_URI_MAX_LEN);
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_1_1, P_INFO, "host:%s", gCtwMqttContext.mqttUri);
if(pCtwMqttParam->mqttsFlag == 0)
{
strcpy(gCtwMqttContext.mqttUri, pCtwMqttParam->uri);
gCtwMqttContext.port = pCtwMqttParam->port;
if((ret = NetworkConnectTimeout(gCtwMqttContext.mqttNetwork, gCtwMqttContext.mqttUri, gCtwMqttContext.port, 3000, 3000)) == 0)
{
result = MQTT_CTW_OK;
}
else
{
}
}
else
{
#ifdef FEATURE_MQTT_TLS_ENABLE
strcpy(gCtwMqttContext.mqttUri, pCtwMqttParam->uri);
gCtwMqttContext.mqttsClient->isMqtts = 1;
gCtwMqttContext.mqttsClient->ciphersuite[0] = 0xFFFF;
gCtwMqttContext.mqttsClient->port = pCtwMqttParam->port;
if((ret = mqttSslConn_new(gCtwMqttContext.mqttsClient, gCtwMqttContext.mqttUri)) == 0)
{
result = MQTT_CTW_OK;
}
#endif
}
if (result == MQTT_CTW_OK)
{
ctwMqttStartRegisterRecvTask();
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_1_3, P_INFO, "mqttConnect ok");
result = ctwMqttAuthReg(&gCtwMqttContext, pCtwMqttParam, gCtwMqttContext.mqttRegParam);
if(result == MQTT_CTW_OK)
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_2, P_INFO, "auth reg success goto KEEP ONLINE state");
pCtwMqttParam->autoRegStatus = 1; //register ok
mwNvmCfgSetAndSaveCtwMqttParam(pCtwMqttParam);
ctwMqttStaMachine = MQTT_KEEP_ONLINE_STATE;
//osDelay(590000/portTICK_PERIOD_MS);
}
}
else
{
if(retry < 3)
{
retry++;
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_3, P_INFO, "connect ctwing failed try again after 1 min");
osDelay(60000/portTICK_PERIOD_MS);
}
else
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_3_1, P_INFO, "connect fail exit task");
pCtwMqttParam->autoRegStatus = 0; //register fail
mwNvmCfgSetAndSaveCtwMqttParam(pCtwMqttParam);
ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_NOT_CREATE;
}
}
break;
}
case MQTT_KEEP_ONLINE_STATE:
memset(&msg, 0, sizeof(msg));
osMessageQueueGet(ctwMqttMsgQueue, &msg, 0, osWaitForever);
MQTTMessage message = {0};
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_5, P_INFO, "cmd state %d", msg.cmd_type);
switch(msg.cmd_type)
{
case CTW_MQTT_SEND_COMMAND:
memset(gCtwMqttContext.mqttSendBuf, 0, MQTT_REG_SEND_BUF_SIZE);
message.payload = msg.data;
message.payloadlen = msg.datalen;
message.qos = msg.Qos;
if(msg.fotaFlag == 1)
{
//to add fota param;
gCtwMqttContext.fotaFlag = 1;
}
ret = ctwMqttPublish(&gCtwMqttContext, 0, msg.topic, &message, 1, false);
if(msg.topic != NULL)
{
freeEc(msg.topic);
msg.topic = NULL;
}
if(msg.data != NULL)
{
freeEc(msg.data);
msg.data = NULL;
}
break;
case CTW_MQTT_FOTA_UPDATA_COMMAND:
memset(gCtwMqttContext.mqttSendBuf, 0, MQTT_REG_SEND_BUF_SIZE);
message.payload = mallocEc(CTW_MQTT_FOTA_ACK_MAX_LEN);
memset(message.payload, 0, CTW_MQTT_FOTA_ACK_MAX_LEN);
snprintf(message.payload, CTW_MQTT_FOTA_ACK_MAX_LEN, "{\"taskId\":%s,\"params\":{\"step\":100,\"desc\":\"ok\",\"module\":\"%s\"}}", pCtwMqttParam->fotaTaskId, pCtwMqttParam->fotaModule);
message.payloadlen = strlen(message.payload);
message.qos = 1;
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_fota, P_INFO, "payload.. %s", message.payload);
ret = ctwMqttPublish(&gCtwMqttContext, 0, CTW_MQTT_FOTA_PROGRESS_TOPIC, &message, 1, false);
memset(gCtwMqttContext.mqttSendBuf, 0, MQTT_REG_SEND_BUF_SIZE);
message.payload = mallocEc(CTW_MQTT_FOTA_ACK_MAX_LEN);
memset(message.payload, 0, CTW_MQTT_FOTA_ACK_MAX_LEN);
snprintf(message.payload, CTW_MQTT_FOTA_ACK_MAX_LEN, "{\"taskId\":%s,\"params\":{\"version\":\"%s\",\"module\":\"%s\"}}", pCtwMqttParam->fotaTaskId, pCtwMqttParam->fotaVersion, pCtwMqttParam->fotaModule);
message.payloadlen = strlen(message.payload);
message.qos = 1;
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_fota1, P_INFO, "payload.. %s", message.payload);
ret = ctwMqttPublish(&gCtwMqttContext, 0, CTW_MQTT_FOTA_ACK_TOPIC, &message, 1, false);
break;
case CTW_MQTT_DEREG_COMMAND:
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_dereg, P_INFO, "start dereg.. recvStatus=%d..sendStatus=%d",ctwMqttRecvStatusFlag,ctwMqttStatusFlag);
if(ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_CREATE)
{
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_NOT_CREATE;
}
if(ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_RECONNECT)
{
//osMessageQueueReset(ctwMqttMsgQueue);
ctwMqttRecvStatusFlag = MQTT_CTW_RECV_TASK_NOT_CREATE;
}
for(i=0; i<20; i++)
{
if(ctwMqttRecvStatusFlag == MQTT_CTW_RECV_TASK_DELETE)
{
ret = ctwMqttDisconnect(&gCtwMqttContext);
//if(ret == SUCCESS)
{
ret = ctwMqttClose(&gCtwMqttContext);
if(ret == SUCCESS)
{
ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_NOT_CREATE;
break;
}
}
}
osDelay(500);
}
break;
case CTW_MQTT_KEEPALIVE_COMMAND:
ret = ctwMqttKeepalive(&gCtwMqttContext);
break;
case CTW_MQTT_RECONNECT_COMMAND:
for(i=0; i<10; i++)
{
if(ctwMqttReconnectStatusFlag == 1)
{
ctwMqttReconnectStatusFlag = 0;
break;
}
else
{
osDelay(500);
}
}
ret = ctwMqttDisconnect(&gCtwMqttContext);
//if(ret == SUCCESS)
{
ret = ctwMqttClose(&gCtwMqttContext);
}
ctwMqttStaMachine = MQTT_INIT_STATE;
break;
case CTW_MQTT_PUBLISH_ACK:
len = MQTTSerialize_ack(gCtwMqttContext.mqttClient->buf, gCtwMqttContext.mqttClient->buf_size, msg.ackMode, 0, msg.msgId);
TimerInit(&timer);
TimerCountdownMS(&timer, 2000);
ret = ctwMqttSendPacket(&gCtwMqttContext, len, &timer, 0, false);
break;
default:
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_6, P_INFO, "unknown state %d", msg.cmd_type);
break;
}
}
break;
default:
{
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStatusTask_7, P_INFO, "unknown state exit status task");
ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_NOT_CREATE;
break;
}
}
}
ctwMqttCheckIfNeedExit(1);
}
static void ctwMqttStartRegisterTask()
{
osThreadAttr_t task_attr;
memset(&task_attr,0,sizeof(task_attr));
task_attr.name = "mtCtw";
task_attr.stack_size = MQTT_REG_TASK_STACK_SIZE;
task_attr.priority = osPriorityNormal1;
#ifdef TYPE_EC718M
task_attr.reserved = osThreadDynamicStackAlloc;
#endif
ctwMqttStatusFlag = MQTT_CTW_SEND_TASK_CREATE;
ECPLAT_PRINTF(UNILOG_CTWING, ctwMqttStartStatusTask_1, P_INFO, "start mqtt status task");
osThreadNew(ctwMqttRegisterSendTask, NULL, &task_attr);
}
#define MQTT_REG_SEND_RECVTASK_END
void ctwMqttRegisterInit(MWNvmCfgCtwMqttParam* ctwMqttParam)
{
slpManApplyPlatVoteHandle("CTW_MQTT",&ctwMqttSlpHandler);
slpManPlatVoteDisableSleep(ctwMqttSlpHandler, SLP_SLP2_STATE);
//ctwMqttCheckIfNeedExit();
registerPSEventCallback(PS_GROUP_ALL_MASK, ctwMqttPSUrcCallback);
pCtwMqttParam = ctwMqttParam;
configASSERT(pCtwMqttParam != NULL);
ctwMqttStartRegisterTask();
}