first commit

This commit is contained in:
pandacraft 2025-03-21 16:04:17 +01:00
commit a5a0434432
1126 changed files with 439481 additions and 0 deletions

View file

@ -0,0 +1,31 @@
Link to the Grove 433Mhz Simple RF Link Kit product : https://www.seeedstudio.com/Grove-433MHz-Simple-RF-link-kit-p-1062.html
Link to the Grove 433Mhz Simple RF Link Kit datasheet :
* http://wiki.seeedstudio.com/images/9/95/ADI%3BACTR433A.pdf
* http://wiki.seeedstudio.com/images/1/1a/1110010P1.pdf
Library written for **Python 3**!
---
You'll need 2 Raspberry's in order to test the given example programs.
When using it with the GrovePi board, please pay attention that the transmitter must pe connected with jumpers to the RPi board, because TX / RX pins were switched (a design flaw).
On the other hand, the receiver's pins are aligned corectly.
---
Available functions for the Grove 433Mhz Simple RF Link Kit (`RFLinker` class):
* `RFLinker(port = '/dev/ttyS0', chunk_size = 32, max_bad_readings = 32, retries = 20)` : class constructor
* *port* is the UART port to which the RF module is connected
* *chunk_size* specifies the maximum length of a message. If the length is > *chunk_size*, then the message is fragmented in multiple transmissions - **If you lower *chunk_size*, then be sure to increase *retries* variable when reading**
* *max_bad_readings* specifies the maximum number of bad bytes read from the RF receiver before the operation is aborded
* `writeMessage(message)` : member function for sending messages
* you can send up to 256 bytes per transmission
* `readMessage(message)` : member function reading messages
* returns a string with the received message : if it fails, it returns an empty string
* `setDisplayVerbose(choice = True)` : enable/disable feedback printing - by default it's deactivated
* `setChunkSize(chunk_size)` : set the chunk size in bytes
* `setMaxRetries(retries)` : set the number of times it starts reading a transmission before giving up - higher level stuff
* `setMaxBadReadings(max_bad_readings)` : set the number of times it's allowed to read a bad byte from the stream before quitting - lower level stuff
Attention:
* `chunk_size` is closely related to `retries`. The bigger the `chunk_size` the lower `retries` it has to be in order to detect a transmission. It's also a valid statement vice-versa.

View file

@ -0,0 +1,75 @@
#!/usr/bin/env python3
#
# The GrovePi connects the Raspberry Pi and Grove sensors. You can learn more about GrovePi here: http://www.dexterindustries.com/GrovePi
#
# Have a question about this example? Ask on the forums here: http://forum.dexterindustries.com/c/grovepi
#
'''
## License
The MIT License (MIT)
GrovePi for the Raspberry Pi: an open source platform for connecting Grove Sensors to the Raspberry Pi.
Copyright (C) 2017 Dexter Industries
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
'''
import grove_rflink433mhz
import sys
# Don't forget to run it with Python 3 !!
# Don't forget to run it with Python 3 !!
# Don't forget to run it with Python 3 !!
def Main():
# instantiate a RFLinker object
# default arguments are
# port = '/dev/ttyS0' -> you've got only one on each raspberry
# chunk_size = 32 -> the max number of data bytes you can send per fragment - you can ignore it
# max_bad_readings = 32 -> the number of bad characters read before giving up on a read operation
# keep in mind that there is environment pollution, so the RF module will get many fake 'transmissions'
receiver = grove_rflink433mhz.RFLinker()
message_received = ""
# do this indefinitely
while True:
# receive the message
# readMessage takes a default argument
# called retries = 20
# it specifies how many times it tries to read consistent data before giving up
# you should not modify it unless you know what you're doing and provided you also
# modify the chunk_size for the transmitter
message_received = receiver.readMessage()
if len(message_received) > 0:
# if the string has something then print it
print('[message received][{}]'.format(message_received))
else:
print("[message_received][none or couldn't parse it]")
if __name__ == "__main__":
try:
# it's the above function we call
Main()
# in case CTRL-C / CTRL-D keys are pressed (or anything else that might interrupt)
except KeyboardInterrupt:
print('[Keyboard interrupted]')
sys.exit(0)

View file

@ -0,0 +1,229 @@
# Released under the MIT license (http://choosealicense.com/licenses/mit/).
# For more information see https://github.com/DexterInd/GrovePi/blob/master/LICENSE
import serial
import binascii
import struct
# Library written for Python 3!
class RFLinker:
# port = '/dev/ttyS0' - it's the default and only UART port on the Raspberry
#
# chunk_size = 32 represents the number of data bytes a transmission can have at most
# if the number of bytes exceed this threshold, then the data that's needed to be sent
# is fragmented
#
# max_bad_readings = 32 represents how many times we wait for a valid byte data before giving up
#
# retries = 20 number of times it starts the process of reading a message before giving up
def __init__(self, port = '/dev/ttyS0', chunk_size = 32, max_bad_readings = 32, retries = 20):
self.serial = serial.Serial(port, baudrate = 1200)
self.chunk_size = chunk_size
self.max_bad_readings = max_bad_readings
self.retries = retries
self.display_verbose = False # whether we want or not feedback on the screen
self.delimiter = chr(2) # new transmission delimiter -> the 1st thing we search when reading
self.start_condition = chr(1) + chr(27) # new transmission start condition
self.crc_offset = 256 # how much we offset crc32's bytes by adding this value so that we can send data over the air
self.end_condition = '\r\n' # CR + LF for ending a transmssion
# private function for displaying information
def __print(self, *strings):
if not self.display_verbose:
return
message_string = ""
for string in strings:
message_string += "[" + str(string) + "]"
print(message_string)
# private function for determining how many transmissions we need for a writeMessage call
def __getListOfLengths(self, message):
length_list = []
chunks = int(len(message) / self.chunk_size)
last_chunk = int(len(message) % self.chunk_size)
length_list = chunks * [self.chunk_size] + [last_chunk]
length_list = [size for size in length_list if size != 0]
return length_list
# private function for sending a fragment of the whole data we need to send
def __writeFragment(self, message, count, no_transmissions):
self.__print("message to packetize", message)
outgoing_message = self.delimiter
outgoing_message += self.start_condition
outgoing_message += count
outgoing_message += no_transmissions
outgoing_message += chr(len(message)) # this is the length of the fragment message
outgoing_message += message # we add the actual message
# compute the CRC32
crc32_checker = binascii.crc32(outgoing_message.encode('utf-8'))
crc32_checker &= 0xffffffff
crc_to_bytes = struct.pack('!I', crc32_checker)
crc_to_str = ""
for each_byte in crc_to_bytes:
crc_to_str += chr(each_byte + self.crc_offset)
# and add the UTF-8-converted CRC32 to the message
outgoing_message += crc_to_str
# and finalize by adding CR + LF
outgoing_message += self.end_condition
# encode it
outgoing_message = outgoing_message.encode('utf-8')
self.__print('final message', outgoing_message)
# and broadcast it
self.serial.write(outgoing_message)
# function for enabling / disabling feedback
def setDisplayVerbose(self, choice = True):
self.display_verbose = choice
# function for setting the chunk_size
def setChunkSize(self, chunk_size):
if chunk_size > 0:
self.chunk_size = chunk_size
# function for setting retries variable
# it specifies how many times it tries to
# start a transmission before giving up
def setMaxRetries(self, max_retries):
if max_retries > 0:
self.retries = max_retries
def setMaxBadReadings(self, max_bad_readings):
if max_bad_readings > 0:
self.max_bad_readings = max_bad_readings
# function we call from the user-program to send messages
def writeMessage(self, message):
# determine how many fragments/transmssions are needed
chunked_message_lengths = self.__getListOfLengths(message)
# if it's only one transmission then it's simple
if len(chunked_message_lengths) == 1:
count = 1
self.__writeFragment(message, chr(count), chr(count))
elif len(chunked_message_lengths) > 0:
no_transmissions = len(chunked_message_lengths)
count = no_transmissions
# otherwise send the data repeatedly
# until everything is sent
while len(chunked_message_lengths) > 0:
self.__writeFragment(message[0:chunked_message_lengths[0]], chr(count), chr(no_transmissions))
message = message[chunked_message_lengths.pop(0):]
count -= 1
# private function for reading data
# it's a recursive function and works in tandem with readMessage function
def __readFraments(self, first_time, met_first_trans, last_counter = 1):
current_bad_readings = 0
# read until threshold of bad read characters is reached
# don't consider UnicodeDecodeError exceptions
while current_bad_readings < self.max_bad_readings:
try:
if self.serial.read().decode('utf-8') == self.delimiter:
break
current_bad_readings += 1
except UnicodeDecodeError:
continue
# if threshold is reached raise exception and let readMessage deal with it
if current_bad_readings == self.max_bad_readings:
raise serial.SerialTimeoutException("[rflink : too much time]")
# read the start_condition which is made out of 2 bytes
start_condition = self.serial.read(2).decode('utf-8')
if start_condition != self.start_condition:
raise IOError('[transmission error - bad start condition]')
# read the next portions of the header
current_count = self.serial.read(1).decode('utf-8')
no_transmissions = self.serial.read(1).decode('utf-8')
message_length = self.serial.read(1).decode('utf-8')
# read the actual message
message = self.serial.read(ord(message_length)).decode('utf-8')
incoming_message = self.delimiter + self.start_condition + current_count + no_transmissions + message_length + message
# next, read the CRC32
crc32 = self.serial.read(8).decode('utf-8')
# compute the CRC32
crc_to_bytes = []
for each_char in crc32:
crc_to_bytes.append(ord(each_char) - self.crc_offset)
try:
unpacked_crc = struct.unpack('!I', bytes(crc_to_bytes))[0]
except ValueError:
raise IOError('[transmission error - cannot unpack CRC32]')
# and compare it with what we got
if unpacked_crc != binascii.crc32(incoming_message.encode('utf-8')):
raise IOError('[transmission error - CRC32 does not match]')
current_count = ord(current_count)
no_transmissions = ord(no_transmissions)
#print(message, first_time, current_count, no_transmissions, unpacked_crc, binascii.crc32(incoming_message.encode('utf-8')))
# the next following lines check whether the data fragments are out of order
# and whether we still need to read fragments. If that's the case, then we append the
# so far read string and re-call this function to read the remaining text
if first_time:
if current_count == no_transmissions:
met_first_trans = True
if current_count == 1:
return message
else:
return message + self.__readFraments(False, met_first_trans, current_count)
else:
raise IOError('[chunks out of order - abording reading operation]')
else:
if current_count + 1 == last_counter:
if current_count == 1:
return message
else:
return message + self.__readFraments(False, met_first_trans, current_count)
else:
raise IOError('[chunks out of order - abording reading operation]')
# function for reading incoming messages
def readMessage(self):
current_count = 0
message = ""
# we need to flush the input
# There's so much pollution around us
# that the program would get busy analysing the whole
# input buffer, and that'd be crazy - we'd wait lots of time
# before we get something the transmitter sent
self.serial.flushInput()
# do some unicorn magic here
while True:
try:
message = self.__readFraments(True, True)
return message
except serial.SerialTimeoutException:
return message
except IOError:
if current_count == self.retries:
return message
except UnicodeDecodeError:
continue
finally:
current_count += 1
return message

View file

@ -0,0 +1,76 @@
#!/usr/bin/env python3
#
# GrovePi Example for using the Grove - LCD RGB Backlight without erasing the screen(http://www.seeedstudio.com/wiki/Grove_-_LCD_RGB_Backlight)
#
# The GrovePi connects the Raspberry Pi and Grove sensors. You can learn more about GrovePi here: http://www.dexterindustries.com/GrovePi
#
# Have a question about this example? Ask on the forums here: http://forum.dexterindustries.com/c/grovepi
#
'''
## License
The MIT License (MIT)
GrovePi for the Raspberry Pi: an open source platform for connecting Grove Sensors to the Raspberry Pi.
Copyright (C) 2017 Dexter Industries
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
'''
import grove_rflink433mhz
from time import sleep
import sys
# Don't forget to run it with Python 3 !!
# Don't forget to run it with Python 3 !!
# Don't forget to run it with Python 3 !!
# in order to get a feedback of what this example does
# please use another setup consisted of a raspberry + receiver
# to print out the transmitted data -> use the other example program for it
def Main():
# instantiate a RFLinker object
# default arguments are
# port = '/dev/ttyS0' -> you've got only one on each raspberry
# chunk_size = 32 -> the max number of data bytes you can send per fragment - you can ignore it
# max_bad_readings = 32 -> the number of bad characters read before giving up on a read operation
# keep in mind that there is environment pollution, so the RF module will get many fake 'transmissions'
transmitter = grove_rflink433mhz.RFLinker()
# the message we want to broadcast
message_to_broadcast = "This is a RFLink test"
# and broadcast it indefinitely
while True:
transmitter.writeMessage(message_to_broadcast)
print('[message sent][{}]'.format(message_to_broadcast))
# the delay is not necessary for the transmission of data
# but for not overflowing the terminal
sleep(0.02)
if __name__ == "__main__":
try:
# it's the above function we call
Main()
# in case CTRL-C / CTRL-D keys are pressed (or anything else that might interrupt)
except KeyboardInterrupt:
print('[Keyboard interrupted]')
sys.exit(0)