Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ static void MqttWriteStop(MqttClient* client, MqttMsgStat* stat)
}
}

static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
WOLFMQTT_LOCAL int MqttReadStart(MqttClient* client, MqttMsgStat* stat)
{
int rc = MQTT_CODE_SUCCESS;

Expand Down Expand Up @@ -356,7 +356,7 @@ static int MqttReadStart(MqttClient* client, MqttMsgStat* stat)

return rc;
}
static void MqttReadStop(MqttClient* client, MqttMsgStat* stat)
WOLFMQTT_LOCAL void MqttReadStop(MqttClient* client, MqttMsgStat* stat)
{
#ifdef WOLFMQTT_DEBUG_CLIENT
if (!stat->isReadActive) {
Expand Down Expand Up @@ -1040,7 +1040,7 @@ static inline int MqttIsPubRespPacket(int packet_type)
* MQTT_CODE_ERROR_NOT_FOUND: Not found
* Any other response is from the the packet_ret
*/
static int MqttClient_CheckPendResp(MqttClient *client, byte wait_type,
WOLFMQTT_LOCAL int MqttClient_CheckPendResp(MqttClient *client, byte wait_type,
word16 wait_packet_id)
{
int rc;
Expand Down
128 changes: 69 additions & 59 deletions src/mqtt_sn_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ static void MqttSNClient_PacketReset(SN_MsgType packet_type, void* packet_obj)
static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
byte wait_type, word16 wait_packet_id, int timeout_ms)
{
int rc;
int rc = MQTT_CODE_SUCCESS;
word16 packet_id;
SN_MsgType packet_type;
#ifdef WOLFMQTT_MULTITHREAD
Expand Down Expand Up @@ -593,9 +593,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
if (client->lastRc != MQTT_CODE_CONTINUE)
#endif
{
PRINTF("SN_Client_WaitType: Type %s (%d), ID %d",
PRINTF("SN_Client_WaitType: Type %s (%d), ID %d, State %d-%d",
SN_Packet_TypeDesc((SN_MsgType)wait_type),
wait_type, wait_packet_id);
wait_type, wait_packet_id, mms_stat->read, mms_stat->write);
}
#endif

Expand All @@ -604,59 +604,44 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
case MQTT_MSG_BEGIN:
{
#ifdef WOLFMQTT_MULTITHREAD
/* Lock recv socket mutex */
rc = wm_SemLock(&client->lockRecv);
if (rc != 0) {
PRINTF("SN_Client_WaitType recv lock error");
/* Check to see if packet type and id have already completed */
rc = MqttClient_CheckPendResp(client, wait_type, wait_packet_id);
if (rc != MQTT_CODE_ERROR_NOT_FOUND && rc != MQTT_CODE_CONTINUE) {
return rc;
}
mms_stat->isReadActive = 1;
MQTT_TRACE_MSG("SN lockRecv");
#endif

/* reset the packet state used by SN_Packet_Read */
client->packet.stat = MQTT_PK_BEGIN;
if ((rc = MqttReadStart(client, mms_stat)) != 0) {
return rc;
}

mms_stat->read = MQTT_MSG_WAIT;
}
FALL_THROUGH;

case MQTT_MSG_WAIT:
case MQTT_MSG_HEADER:
{
#ifdef WOLFMQTT_MULTITHREAD
/* Check to see if packet type and id have already completed */
pendResp = NULL;
rc = wm_SemLock(&client->lockClient);
if (rc == 0) {
if (MqttClient_RespList_Find(client, (MqttPacketType)wait_type,
wait_packet_id, &pendResp)) {
if (pendResp->packetDone) {
/* pending response is already done, so return */
rc = pendResp->packet_ret;
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("PendResp already Done %p: Rc %d", pendResp, rc);
#endif
MqttClient_RespList_Remove(client, pendResp);
wm_SemUnlock(&client->lockClient);
MQTT_TRACE_MSG("SN unlockRecv");
wm_SemUnlock(&client->lockRecv);
return rc;
}
}
wm_SemUnlock(&client->lockClient);
}
else {
break; /* error */
}
#endif /* WOLFMQTT_MULTITHREAD */

mms_stat->read = MQTT_MSG_WAIT;

/* Wait for packet */
rc = SN_Packet_Read(client, client->rx_buf, client->rx_buf_len,
timeout_ms);
/* handle failure */
if (rc <= 0) {
#ifdef WOLFMQTT_NONBLOCK
if (rc == MQTT_CODE_CONTINUE &&
(client->packet.stat > MQTT_PK_BEGIN ||
client->read.total > 0)
) {
/* advance state, since we received some data */
mms_stat->read = MQTT_MSG_HEADER;
}
#endif
break;
}

/* advance state, since we received some data */
mms_stat->read = MQTT_MSG_HEADER;

client->packet.buf_len = rc;

/* Decode header */
Expand All @@ -674,11 +659,10 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
client->packet.buf_len, packet_type, packet_id);
#endif

mms_stat->read = MQTT_MSG_HEADER;
mms_stat->read = MQTT_MSG_PAYLOAD;
}
FALL_THROUGH;

case MQTT_MSG_HEADER:
case MQTT_MSG_PAYLOAD:
case MQTT_MSG_PAYLOAD2:
{
Expand All @@ -689,6 +673,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
(wait_packet_id == 0 || wait_packet_id == packet_id))
{
use_packet_obj = packet_obj;
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("Using INCOMING packet_obj %p", use_packet_obj);
#endif
waitMatchFound = 1;
}
else {
Expand Down Expand Up @@ -748,6 +735,9 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
}
}
#endif /* WOLFMQTT_MULTITHREAD */

/* done reading */
MqttReadStop(client, mms_stat);
break;
}

Expand All @@ -756,35 +746,40 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,
default:
{
#ifdef WOLFMQTT_DEBUG_CLIENT
PRINTF("SN_Client_WaitType: Invalid state %d!", mms_stat->read);
PRINTF("SN_Client_WaitType: Invalid read state %d!",
mms_stat->read);
#endif
rc = MQTT_TRACE_ERROR(MQTT_CODE_ERROR_STAT);
break;
}
} /* switch (msg->stat) */

#ifdef WOLFMQTT_DEBUG_CLIENT
if (rc != MQTT_CODE_CONTINUE) {
PRINTF("SN_Client_WaitType: rc %d, state %d", rc, mms_stat->read);
}
#endif
} /* switch (mms_stat->read) */

if (mms_stat->read == MQTT_MSG_WAIT || rc != MQTT_CODE_CONTINUE) {
/* reset state */
/* no data read, then reset state */
if (mms_stat->read == MQTT_MSG_WAIT) {
mms_stat->read = MQTT_MSG_BEGIN;
}

#ifdef WOLFMQTT_MULTITHREAD
if (mms_stat->isReadActive) {
mms_stat->isReadActive = 0;
wm_SemUnlock(&client->lockRecv);
}
#endif
#ifdef WOLFMQTT_NONBLOCK
/* if nonblocking and some data has been read, do not release read lock */
if (rc == MQTT_CODE_CONTINUE && mms_stat->read > MQTT_MSG_WAIT) {
return rc;
}
#endif

MqttReadStop(client, mms_stat);

#ifdef WOLFMQTT_NONBLOCK
#ifdef WOLFMQTT_DEBUG_CLIENT
client->lastRc = rc;
#ifdef WOLFMQTT_MULTITHREAD
if (wm_SemLock(&client->lockClient) == 0)
#endif
{
client->lastRc = rc;
#ifdef WOLFMQTT_MULTITHREAD
wm_SemUnlock(&client->lockClient);
#endif
}
#endif /* WOLFMQTT_DEBUG_CLIENT */
if (rc == MQTT_CODE_CONTINUE) {
return rc;
}
Expand All @@ -802,8 +797,23 @@ static int SN_Client_WaitType(MqttClient *client, void* packet_obj,

if (!waitMatchFound) {
/* if we get here, then the we are still waiting for a packet */
mms_stat->read = MQTT_MSG_BEGIN;
#ifdef WOLFMQTT_NONBLOCK
/* for non-blocking return with code continue instead of waiting again
* if called with packet type and id of 'any' */
if (wait_type == SN_MSG_TYPE_ANY && wait_packet_id == 0) {
return MQTT_CODE_CONTINUE;
}
#endif
MQTT_TRACE_MSG("Wait Again");
goto wait_again;
}
#ifdef WOLFMQTT_DEBUG_CLIENT
if (rc != MQTT_CODE_CONTINUE) {
PRINTF("SN_Client_WaitType: rc %d, state %d-%d",
rc, mms_stat->read, mms_stat->write);
}
#endif

return rc;
}
Expand Down
4 changes: 4 additions & 0 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ WOLFMQTT_API const char* MqttClient_ReturnCodeToString(
#endif /* WOLFMQTT_NO_ERROR_STRINGS */

/* Internal functions */
WOLFMQTT_LOCAL int MqttReadStart(MqttClient* client, MqttMsgStat* stat);
WOLFMQTT_LOCAL void MqttReadStop(MqttClient* client, MqttMsgStat* stat);
#ifdef WOLFMQTT_MULTITHREAD
WOLFMQTT_LOCAL int MqttClient_RespList_Find(MqttClient *client,
MqttPacketType packet_type, word16 packet_id, MqttPendResp **retResp);
Expand All @@ -593,6 +595,8 @@ WOLFMQTT_LOCAL void MqttClient_RespList_Remove(MqttClient *client,
WOLFMQTT_LOCAL int MqttClient_RespList_Add(MqttClient *client,
MqttPacketType packet_type, word16 packet_id, MqttPendResp *newResp,
void *packet_obj);
WOLFMQTT_LOCAL int MqttClient_CheckPendResp(MqttClient *client, byte wait_type,
word16 wait_packet_id);
#endif
WOLFMQTT_LOCAL int MqttPacket_HandleNetError(MqttClient *client, int rc);

Expand Down
Loading