diff --git a/src/mqtt_client.c b/src/mqtt_client.c index 2cd9fa95..0e3aa50f 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -299,7 +299,7 @@ static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat) } } -static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) +WOLFMQTT_LOCAL int MqttReadStart(MqttClient* client, MqttMsgStat* stat) { int rc = MQTT_CODE_SUCCESS; @@ -356,7 +356,7 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat) return rc; } -static void MqttReadStop(MqttClient* client, MqttMsgStat* stat) +WOLFMQTT_LOCAL void MqttReadStop(MqttClient* client, MqttMsgStat* stat) { #ifdef WOLFMQTT_DEBUG_CLIENT if (!stat->isReadActive) { @@ -1040,7 +1040,7 @@ static inline int MqttIsPubRespPacket(int packet_type) * MQTT_CODE_ERROR_NOT_FOUND: Not found * Any other response is from the the packet_ret */ -static int MqttClient_CheckPendResp(MqttClient *client, byte wait_type, +WOLFMQTT_LOCAL int MqttClient_CheckPendResp(MqttClient *client, byte wait_type, word16 wait_packet_id) { int rc; diff --git a/src/mqtt_sn_client.c b/src/mqtt_sn_client.c index 44715d9f..8e4e1ce3 100644 --- a/src/mqtt_sn_client.c +++ b/src/mqtt_sn_client.c @@ -561,7 +561,7 @@ static void MqttSNClient_PacketReset(SN_MsgType packet_type, void* packet_obj) static int SN_Client_WaitType(MqttClient *client, void* packet_obj, byte wait_type, word16 wait_packet_id, int timeout_ms) { - int rc; + int rc = MQTT_CODE_SUCCESS; word16 packet_id; SN_MsgType packet_type; #ifdef WOLFMQTT_MULTITHREAD @@ -593,9 +593,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, if (client->lastRc != MQTT_CODE_CONTINUE) #endif { - PRINTF("SN_Client_WaitType: Type %s (%d), ID %d", + PRINTF("SN_Client_WaitType: Type %s (%d), ID %d, State %d-%d", SN_Packet_TypeDesc((SN_MsgType)wait_type), - wait_type, wait_packet_id); + wait_type, wait_packet_id, mms_stat->read, mms_stat->write); } #endif @@ -604,59 +604,44 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, case MQTT_MSG_BEGIN: { #ifdef WOLFMQTT_MULTITHREAD - /* Lock recv socket mutex */ - rc = wm_SemLock(&client->lockRecv); - if (rc != 0) { - PRINTF("SN_Client_WaitType recv lock error"); + /* Check to see if packet type and id have already completed */ + rc = MqttClient_CheckPendResp(client, wait_type, wait_packet_id); + if (rc != MQTT_CODE_ERROR_NOT_FOUND && rc != MQTT_CODE_CONTINUE) { return rc; } - mms_stat->isReadActive = 1; - MQTT_TRACE_MSG("SN lockRecv"); #endif - /* reset the packet state used by SN_Packet_Read */ - client->packet.stat = MQTT_PK_BEGIN; + if ((rc = MqttReadStart(client, mms_stat)) != 0) { + return rc; + } + + mms_stat->read = MQTT_MSG_WAIT; } FALL_THROUGH; case MQTT_MSG_WAIT: + case MQTT_MSG_HEADER: { - #ifdef WOLFMQTT_MULTITHREAD - /* Check to see if packet type and id have already completed */ - pendResp = NULL; - rc = wm_SemLock(&client->lockClient); - if (rc == 0) { - if (MqttClient_RespList_Find(client, (MqttPacketType)wait_type, - wait_packet_id, &pendResp)) { - if (pendResp->packetDone) { - /* pending response is already done, so return */ - rc = pendResp->packet_ret; - #ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("PendResp already Done %p: Rc %d", pendResp, rc); - #endif - MqttClient_RespList_Remove(client, pendResp); - wm_SemUnlock(&client->lockClient); - MQTT_TRACE_MSG("SN unlockRecv"); - wm_SemUnlock(&client->lockRecv); - return rc; - } - } - wm_SemUnlock(&client->lockClient); - } - else { - break; /* error */ - } - #endif /* WOLFMQTT_MULTITHREAD */ - - mms_stat->read = MQTT_MSG_WAIT; - /* Wait for packet */ rc = SN_Packet_Read(client, client->rx_buf, client->rx_buf_len, timeout_ms); + /* handle failure */ if (rc <= 0) { + #ifdef WOLFMQTT_NONBLOCK + if (rc == MQTT_CODE_CONTINUE && + (client->packet.stat > MQTT_PK_BEGIN || + client->read.total > 0) + ) { + /* advance state, since we received some data */ + mms_stat->read = MQTT_MSG_HEADER; + } + #endif break; } + /* advance state, since we received some data */ + mms_stat->read = MQTT_MSG_HEADER; + client->packet.buf_len = rc; /* Decode header */ @@ -674,11 +659,10 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, client->packet.buf_len, packet_type, packet_id); #endif - mms_stat->read = MQTT_MSG_HEADER; + mms_stat->read = MQTT_MSG_PAYLOAD; } FALL_THROUGH; - case MQTT_MSG_HEADER: case MQTT_MSG_PAYLOAD: case MQTT_MSG_PAYLOAD2: { @@ -689,6 +673,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, (wait_packet_id == 0 || wait_packet_id == packet_id)) { use_packet_obj = packet_obj; + #ifdef WOLFMQTT_DEBUG_CLIENT + PRINTF("Using INCOMING packet_obj %p", use_packet_obj); + #endif waitMatchFound = 1; } else { @@ -748,6 +735,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, } } #endif /* WOLFMQTT_MULTITHREAD */ + + /* done reading */ + MqttReadStop(client, mms_stat); break; } @@ -756,35 +746,40 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, default: { #ifdef WOLFMQTT_DEBUG_CLIENT - PRINTF("SN_Client_WaitType: Invalid state %d!", mms_stat->read); + PRINTF("SN_Client_WaitType: Invalid read state %d!", + mms_stat->read); #endif rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT); break; } - } /* switch (msg->stat) */ - -#ifdef WOLFMQTT_DEBUG_CLIENT - if (rc != MQTT_CODE_CONTINUE) { - PRINTF("SN_Client_WaitType: rc %d, state %d", rc, mms_stat->read); - } -#endif + } /* switch (mms_stat->read) */ - if (mms_stat->read == MQTT_MSG_WAIT || rc != MQTT_CODE_CONTINUE) { - /* reset state */ + /* no data read, then reset state */ + if (mms_stat->read == MQTT_MSG_WAIT) { mms_stat->read = MQTT_MSG_BEGIN; + } - #ifdef WOLFMQTT_MULTITHREAD - if (mms_stat->isReadActive) { - mms_stat->isReadActive = 0; - wm_SemUnlock(&client->lockRecv); - } - #endif +#ifdef WOLFMQTT_NONBLOCK + /* if nonblocking and some data has been read, do not release read lock */ + if (rc == MQTT_CODE_CONTINUE && mms_stat->read > MQTT_MSG_WAIT) { + return rc; } +#endif + + MqttReadStop(client, mms_stat); #ifdef WOLFMQTT_NONBLOCK #ifdef WOLFMQTT_DEBUG_CLIENT - client->lastRc = rc; + #ifdef WOLFMQTT_MULTITHREAD + if (wm_SemLock(&client->lockClient) == 0) + #endif + { + client->lastRc = rc; + #ifdef WOLFMQTT_MULTITHREAD + wm_SemUnlock(&client->lockClient); #endif + } + #endif /* WOLFMQTT_DEBUG_CLIENT */ if (rc == MQTT_CODE_CONTINUE) { return rc; } @@ -802,8 +797,23 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj, if (!waitMatchFound) { /* if we get here, then the we are still waiting for a packet */ + mms_stat->read = MQTT_MSG_BEGIN; + #ifdef WOLFMQTT_NONBLOCK + /* for non-blocking return with code continue instead of waiting again + * if called with packet type and id of 'any' */ + if (wait_type == SN_MSG_TYPE_ANY && wait_packet_id == 0) { + return MQTT_CODE_CONTINUE; + } + #endif + MQTT_TRACE_MSG("Wait Again"); goto wait_again; } +#ifdef WOLFMQTT_DEBUG_CLIENT + if (rc != MQTT_CODE_CONTINUE) { + PRINTF("SN_Client_WaitType: rc %d, state %d-%d", + rc, mms_stat->read, mms_stat->write); + } +#endif return rc; } diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index 2b7c1bde..ffd9fbdd 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -585,6 +585,8 @@ WOLFMQTT_API const char* MqttClient_ReturnCodeToString( #endif /* WOLFMQTT_NO_ERROR_STRINGS */ /* Internal functions */ +WOLFMQTT_LOCAL int MqttReadStart(MqttClient* client, MqttMsgStat* stat); +WOLFMQTT_LOCAL void MqttReadStop(MqttClient* client, MqttMsgStat* stat); #ifdef WOLFMQTT_MULTITHREAD WOLFMQTT_LOCAL int MqttClient_RespList_Find(MqttClient *client, MqttPacketType packet_type, word16 packet_id, MqttPendResp **retResp); @@ -593,6 +595,8 @@ WOLFMQTT_LOCAL void MqttClient_RespList_Remove(MqttClient *client, WOLFMQTT_LOCAL int MqttClient_RespList_Add(MqttClient *client, MqttPacketType packet_type, word16 packet_id, MqttPendResp *newResp, void *packet_obj); +WOLFMQTT_LOCAL int MqttClient_CheckPendResp(MqttClient *client, byte wait_type, + word16 wait_packet_id); #endif WOLFMQTT_LOCAL int MqttPacket_HandleNetError(MqttClient *client, int rc);