1917 lines
75 KiB
Python
1917 lines
75 KiB
Python
from pandac.PandaModules import *
|
|
from otp.otpbase import OTPGlobals
|
|
from .AIMsgTypes import *
|
|
from direct.showbase.PythonUtil import Functor
|
|
from direct.directnotify.DirectNotifyGlobal import directNotify
|
|
from direct.fsm import ClassicFSM
|
|
from direct.fsm import State
|
|
from direct.task import Task
|
|
from direct.distributed.AsyncRequest import cleanupAsyncRequests
|
|
from direct.distributed import ParentMgr
|
|
from direct.distributed.ConnectionRepository import ConnectionRepository
|
|
import sys
|
|
import os
|
|
import copy
|
|
import types
|
|
from direct.distributed.PyDatagram import PyDatagram
|
|
from direct.distributed.PyDatagramIterator import PyDatagramIterator
|
|
from direct.distributed.NetMessenger import NetMessenger
|
|
from direct.showbase.ContainerLeakDetector import ContainerLeakDetector
|
|
from direct.showbase import MessengerLeakDetector
|
|
from direct.showbase import LeakDetectors
|
|
from direct.showbase.GarbageReportScheduler import GarbageReportScheduler
|
|
from otp.avatar.DistributedPlayerAI import DistributedPlayerAI
|
|
from otp.distributed import OtpDoGlobals
|
|
from otp.ai.GarbageLeakServerEventAggregatorAI import GarbageLeakServerEventAggregatorAI
|
|
import time
|
|
import gc
|
|
|
|
class AIRepository(ConnectionRepository):
|
|
"""
|
|
The new AIRepository base class
|
|
|
|
It does not have:
|
|
- district or shard code (see AIDistrict.py)
|
|
- friends or secret friends code
|
|
- a collision traverser
|
|
|
|
of course, a derived class may add those.
|
|
|
|
It does have:
|
|
+ object creation code
|
|
+ channel listening code
|
|
+ context allocator/manager
|
|
"""
|
|
notify = directNotify.newCategory("AIRepository")
|
|
|
|
InitialContext = 100000
|
|
|
|
def __init__(
|
|
self, mdip, mdport, esip, esport, dcFileNames,
|
|
serverId,
|
|
minChannel, maxChannel, dcSuffix = 'AI'):
|
|
assert self.notify.debugStateCall(self)
|
|
self._channels={}
|
|
self.AIRunningNetYield = simbase.config.GetBool('ai-running-net-yield', 0)
|
|
|
|
self._msgBundleNames = []
|
|
|
|
# doId->requestDeleted object
|
|
self._requestDeletedDOs = {}
|
|
|
|
ConnectionRepository.__init__(
|
|
self, ConnectionRepository.CM_NATIVE, simbase.config)
|
|
self.dcSuffix = dcSuffix
|
|
|
|
simbase.setupCpuAffinities(minChannel)
|
|
|
|
self.distributedObjectRequests=set()
|
|
|
|
self.context=self.InitialContext
|
|
self.contextToClassName={}
|
|
self.setClientDatagram(0)
|
|
|
|
self.readDCFile(dcFileNames = dcFileNames)
|
|
|
|
# Save the state server id
|
|
self.serverId = serverId
|
|
|
|
# Save the connect info
|
|
self.mdurl = URLSpec(mdip, 1)
|
|
self.esurl = URLSpec(esip, 1)
|
|
|
|
if not self.mdurl.hasPort():
|
|
self.mdurl.setPort(mdport)
|
|
|
|
if not self.esurl.empty() and not self.esurl.hasPort():
|
|
self.esurl.setPort(esport)
|
|
|
|
self.notify.info("event server at %s." % (repr(self.esurl)))
|
|
|
|
# UDP socket for sending events to the event server.
|
|
self.udpSock = None
|
|
|
|
if not self.esurl.empty():
|
|
udpEventServer = SocketAddress()
|
|
if not udpEventServer.setHost(self.esurl.getServer(), self.esurl.getPort()):
|
|
self.notify.warning("Invalid host for event server: %s" % (self.esurl))
|
|
|
|
self.udpSock = SocketUDPOutgoing()
|
|
self.udpSock.InitToAddress(udpEventServer)
|
|
|
|
# Save the ranges of channels that the AI controls
|
|
self.minChannel = minChannel
|
|
self.maxChannel = maxChannel
|
|
self.notify.info("dynamic doIds in range [%s, %s], total %s" % (
|
|
minChannel, maxChannel, maxChannel - minChannel + 1))
|
|
assert maxChannel >= minChannel
|
|
|
|
# initialize the channel allocation
|
|
self.channelAllocator = UniqueIdAllocator(minChannel, maxChannel)
|
|
|
|
# Define the ranges of zones.
|
|
self.minZone = self.getMinDynamicZone()
|
|
self.maxZone = self.getMaxDynamicZone()
|
|
self.notify.info("dynamic zoneIds in range [%s, %s], total %s" % (
|
|
self.minZone, self.maxZone, self.maxZone - self.minZone + 1))
|
|
assert self.maxZone >= self.minZone
|
|
self.zoneAllocator = UniqueIdAllocator(self.minZone, self.maxZone)
|
|
|
|
if config.GetBool('detect-leaks', 0) or config.GetBool('ai-detect-leaks', 0):
|
|
self.startLeakDetector()
|
|
|
|
if config.GetBool('detect-messenger-leaks', 0) or config.GetBool('ai-detect-messenger-leaks', 0):
|
|
self.messengerLeakDetector = MessengerLeakDetector.MessengerLeakDetector(
|
|
'AI messenger leak detector')
|
|
if config.GetBool('leak-messages', 0):
|
|
MessengerLeakDetector._leakMessengerObject()
|
|
|
|
if config.GetBool('run-garbage-reports', 0) or config.GetBool('ai-run-garbage-reports', 0):
|
|
noneValue = -1.
|
|
reportWait = config.GetFloat('garbage-report-wait', noneValue)
|
|
reportWaitScale = config.GetFloat('garbage-report-wait-scale', noneValue)
|
|
if reportWait == noneValue:
|
|
reportWait = None
|
|
if reportWaitScale == noneValue:
|
|
reportWaitScale = None
|
|
self.garbageReportScheduler = GarbageReportScheduler(waitBetween=reportWait,
|
|
waitScale=reportWaitScale)
|
|
|
|
self._proactiveLeakChecks = (config.GetBool('proactive-leak-checks', 1) and
|
|
config.GetBool('ai-proactive-leak-checks', 1))
|
|
self._crashOnProactiveLeakDetect = config.GetBool('crash-on-proactive-leak-detect', 1)
|
|
|
|
# Give ourselves the first channel in the range
|
|
self.ourChannel = self.allocateChannel()
|
|
|
|
# These are used to query database objects directly; currently
|
|
# used only for offline utilities.
|
|
self.dbObjContext = 0
|
|
self.dbObjMap = {}
|
|
|
|
# The UtilityAIRepository sets this to 0 to indicate we should
|
|
# not do things like issue new catalogs to toons that we load
|
|
# in. However, in the normal AI repository, we should do
|
|
# these things.
|
|
self.doLiveUpdates = 1
|
|
|
|
#for generating unqiue names for non-dos, manly used for tasks
|
|
self.keyCounter = 0
|
|
|
|
self.MaxEpockSpeed = self.config.GetFloat('ai-net-yield-epoch', 1.0/30.0)
|
|
|
|
if self.AIRunningNetYield :
|
|
taskMgr.doYield =self.taskManagerDoYieldNetwork
|
|
|
|
# use this for time yields without sleeps
|
|
#taskMgr.doYield = self.taskManagerDoYield
|
|
|
|
# Used for moderation of report-a-player feature
|
|
self.centralLogger = self.generateGlobalObject(
|
|
OtpDoGlobals.OTP_DO_ID_CENTRAL_LOGGER,
|
|
"CentralLogger")
|
|
|
|
self.garbageLeakLogger = GarbageLeakServerEventAggregatorAI(self)
|
|
|
|
taskMgr.add(self._checkBundledMsgs, 'checkBundledMsgs', priority=-100)
|
|
|
|
# skip a bit so we miss the startup sequence (it has very long frames as things are set up)
|
|
taskMgr.doMethodLater(2 * 60., self._startPerformanceLogging,
|
|
'startPerformanceLogging')
|
|
|
|
self.connectionName = None
|
|
self.connectionURL = None
|
|
|
|
def _startPerformanceLogging(self, task=None):
|
|
period = self.config.GetFloat(
|
|
'ai-performance-log-period',
|
|
self.config.GetFloat('server-performance-log-period',
|
|
choice(__dev__, 60. * 10., 60.)
|
|
)
|
|
)
|
|
self._sampledMaxFrameDuration = 0.
|
|
self._sampleMaxFrameDuration()
|
|
self._numPyObjs = None
|
|
self._getNumObjCounterLimit = int(max(1, (60 * 60.) / period))
|
|
self._getNumObjCounter = 0
|
|
taskMgr.doMethodLater(period, self._logPerformanceData, 'logPerformanceData')
|
|
return Task.done
|
|
|
|
def _sampleMaxFrameDuration(self, task=None):
|
|
self._sampledMaxFrameDuration = max(self._sampledMaxFrameDuration,
|
|
globalClock.getMaxFrameDuration())
|
|
# call this at a higher frequency than the frequency at which the global
|
|
# clock completely replaces its frame duration samples. that should ensure that
|
|
# we don't miss a slow frame in the performance logging
|
|
taskMgr.doMethodLater(globalClock.getAverageFrameRateInterval() * .75,
|
|
self._sampleMaxFrameDuration, 'sampleMaxFrameDuration')
|
|
return Task.done
|
|
|
|
def _logPerformanceData(self, task=None):
|
|
avgFrameDur = 1. / globalClock.getAverageFrameRate()
|
|
maxFrameDur = max(self._sampledMaxFrameDuration,
|
|
globalClock.getMaxFrameDuration())
|
|
self._sampledMaxFrameDuration = 0.
|
|
# gc.get_objects can be slow. only sample object count every once in a while
|
|
self._getNumObjCounter -= 1
|
|
if self._getNumObjCounter <= 0:
|
|
self._numPyObjs = len(gc.get_objects())
|
|
self._getNumObjCounter = self._getNumObjCounterLimit
|
|
self.notify.info(
|
|
'avg frame duration=%fs, max frame duration=%fs, num Python objects=%s' % (
|
|
avgFrameDur, maxFrameDur, self._numPyObjs))
|
|
return Task.again
|
|
|
|
def startLeakDetector(self):
|
|
if hasattr(self, 'leakDetector'):
|
|
return False
|
|
firstCheckDelay = config.GetFloat('leak-detector-first-check-delay', -1)
|
|
if firstCheckDelay == -1:
|
|
firstCheckDelay = None
|
|
self.leakDetector = ContainerLeakDetector(
|
|
'AI container leak detector', firstCheckDelay = firstCheckDelay)
|
|
self.accept(self.leakDetector.getLeakEvent(), self._handleLeak)
|
|
self.objectTypesLeakDetector = LeakDetectors.ObjectTypesLeakDetector()
|
|
self.garbageLeakDetector = LeakDetectors.GarbageLeakDetector()
|
|
self.cppMemoryUsageLeakDetector = LeakDetectors.CppMemoryUsage()
|
|
self.taskLeakDetector = LeakDetectors.TaskLeakDetector()
|
|
self.messageListenerTypesLeakDetector = LeakDetectors.MessageListenerTypesLeakDetector()
|
|
# this isn't necessary with the current messenger implementation
|
|
#self.messageTypesLeakDetector = LeakDetectors.MessageTypesLeakDetector()
|
|
return True
|
|
|
|
def _getMsgName(self, msgId):
|
|
# we might get a list of message names, use the first one
|
|
return makeList(AIMsgId2Names.get(msgId, 'UNKNOWN MESSAGE: %s' % msgId))[0]
|
|
|
|
def _handleLeak(self, container, containerName):
|
|
# TODO: send email with warning
|
|
self.notify.info('sending memory leak server event')
|
|
self.writeServerEvent('memoryLeak', self.ourChannel, '%s|%s|%s|%s' % (
|
|
containerName, len(container), itype(container),
|
|
fastRepr(container, maxLen=1, strFactor=50)))
|
|
|
|
def getPlayerAvatars(self):
|
|
return [i for i in self.doId2do.values()
|
|
if isinstance(i, DistributedPlayerAI)]
|
|
|
|
def uniqueName(self, desc):
|
|
return desc+"-"+str(self.serverId)
|
|
|
|
def trueUniqueName(self, desc):
|
|
self.keyCounter += 1
|
|
return desc+"-"+str(self.serverId)+"-"+str(self.keyCounter)
|
|
|
|
def allocateContext(self):
|
|
self.context+=1
|
|
if self.context >= (1<<32):
|
|
self.context=self.InitialContext
|
|
return self.context
|
|
|
|
#### Init ####
|
|
|
|
def writeServerEvent(self, eventType, who, description, serverId=None):
|
|
"""
|
|
Sends the indicated event data to the event server via UDP for
|
|
recording and/or statistics gathering. All related events
|
|
should be given the same eventType (some arbitrary string).
|
|
The who field should be the numeric or alphanumeric
|
|
representation of who generated this activity; generally this
|
|
will be an avatarId or the doId of the AI or something. The
|
|
description is a one-line description of the event, possibly
|
|
with embedded vertical bars as separators.
|
|
"""
|
|
if not self.udpSock:
|
|
self.notify.debug("Unable to log server event: no udpSock.")
|
|
return
|
|
|
|
# Make who be a string (it might be passed in as an integer)
|
|
who = str(who)
|
|
|
|
# log the access string for avatar-related messages
|
|
doId = None
|
|
try:
|
|
doId = int(who)
|
|
except:
|
|
pass
|
|
if doId is not None and doId in self.doId2do:
|
|
av = self.getDo(doId)
|
|
if hasattr(av, 'getAccess'):
|
|
description = '%s|%s' % (description, av.getAccess())
|
|
|
|
# break it up into chunks that will fit in a UDP packet (max 1024 bytes)
|
|
# leave some room for the header/eventType/who
|
|
maxLen = 900
|
|
breakCount = 0
|
|
# always run this loop at least once
|
|
while True:
|
|
if breakCount > 0:
|
|
eventType = '%s-continued%s' % (eventType, breakCount)
|
|
|
|
# Count up the number of bytes in the packet
|
|
length = 2 + 2 + 4 + 2 + len(eventType) + 2 + len(who) + 2 + len(description)
|
|
dg = PyDatagram()
|
|
dg.addUint16(length)
|
|
dg.addUint16(1) # message type 1: server event
|
|
dg.addUint16(6) # server type 6: AI server
|
|
dg.addUint32(self.ourChannel)
|
|
dg.addString(eventType)
|
|
dg.addString(who)
|
|
dg.addString(description[:maxLen])
|
|
description = description[maxLen:]
|
|
|
|
self.notify.debug('%s|AIevent:%s|%s|%s' % (eventType, self.serverId, who, description))
|
|
|
|
if not self.udpSock.Send(dg.getMessage()):
|
|
self.notify.warning("Unable to log server event: %s" % (self.udpSock.GetLastError()))
|
|
|
|
if len(description) == 0:
|
|
break
|
|
breakCount += 1
|
|
|
|
|
|
def writeServerStatus(self, who, avatar_count, object_count):
|
|
"""
|
|
Sends the Status Packet to the event server via UDP for
|
|
recording. Used to monito the health of a AI Server.
|
|
"""
|
|
if not self.udpSock:
|
|
self.notify.debug("Unable to log server Status: no udpSock.")
|
|
return
|
|
|
|
# Make who be a string (it might be passed in as an integer)
|
|
#who = str(who)
|
|
who="";
|
|
|
|
# Count up the number of bytes in the packet
|
|
length = 2 + 2 + 4 + (len(who)+2) + 4 +4
|
|
dg = PyDatagram()
|
|
dg.addUint16(length)
|
|
dg.addUint16(2) # message type 2: server event
|
|
dg.addUint16(6) # server type 6: AI server
|
|
dg.addUint32(self.ourChannel)
|
|
dg.addString(who)
|
|
dg.addUint32(avatar_count)
|
|
dg.addUint32(object_count)
|
|
|
|
if not self.udpSock.Send(dg.getMessage()):
|
|
self.notify.warning("Unable to log server status: %s" % (self.udpSock.GetLastError()))
|
|
|
|
def writeServerStatus2(self, who, avatar_count, object_count):
|
|
"""
|
|
Sends the Status Packet to the event server via UDP for
|
|
recording. Used to monito the health of a AI Server.
|
|
ServerStatus2 has an additional channel code added for a ping message
|
|
We can consolidate the two writeServerStatus messages when toontown can
|
|
adopt the new otp_server
|
|
"""
|
|
if not self.udpSock:
|
|
self.notify.debug("Unable to log server Status: no udpSock.")
|
|
return
|
|
|
|
# Make who be a string (it might be passed in as an integer)
|
|
#who = str(who)
|
|
who="";
|
|
|
|
# Count up the number of bytes in the packet
|
|
length = 2 + 2 + 4 + (len(who)+2) + 8 + 4 + 4
|
|
dg = PyDatagram()
|
|
dg.addUint16(length)
|
|
dg.addUint16(3) # message type 2: server event
|
|
dg.addUint16(6) # server type 6: AI server
|
|
dg.addUint32(self.ourChannel)
|
|
dg.addString(who)
|
|
dg.addUint64(self.ourChannel)
|
|
dg.addUint32(avatar_count)
|
|
dg.addUint32(object_count)
|
|
|
|
if not self.udpSock.Send(dg.getMessage()):
|
|
self.notify.warning("Unable to log server status: %s" % (self.udpSock.GetLastError()))
|
|
|
|
##### Off #####
|
|
def enterOff(self):
|
|
self.handler = None
|
|
|
|
def exitOff(self):
|
|
self.handler = None
|
|
|
|
#### connect #####
|
|
def enterConnect(self):
|
|
self.handler = self.handleConnect
|
|
self.lastMessageTime = 0
|
|
|
|
self.connect([self.mdurl],
|
|
successCallback = self._connected,
|
|
failureCallback = self._failedToConnect)
|
|
|
|
def _failedToConnect(self, statusCode, statusString):
|
|
self.fsm.request("noConnection")
|
|
|
|
def _connected(self):
|
|
# Register our channel
|
|
self.registerForChannel(self.ourChannel)
|
|
self.netMessenger = NetMessenger(self, (OTP_NET_MSGR_CHANNEL_ID_ALL_AI,))
|
|
## self.netMessenger.accept("transferDo", self, self.handleTransferDo)
|
|
|
|
def _handleIgnorableObjectDelete(self, msgType, di):
|
|
doId = di.getUint32()
|
|
self.notify.debug("Ignoring request to delete doId: " +
|
|
str(doId))
|
|
|
|
def exitConnect(self):
|
|
self.handler = None
|
|
# Clean up the create district tasks
|
|
taskMgr.remove(self.uniqueName("newDistrictWait"))
|
|
del self.lastMessageTime
|
|
|
|
##### PlayGame #####
|
|
|
|
def enterPlayGame(self):
|
|
self.handler = self.handlePlayGame
|
|
self.createObjects()
|
|
|
|
def handleConnect(self, msgType, di):
|
|
self.lastMessageTime = globalClock.getRealTime()
|
|
|
|
if msgType == STATESERVER_DISTRICT_DOWN:
|
|
self._handleValidDistrictDown(msgType, di)
|
|
elif msgType == STATESERVER_DISTRICT_UP:
|
|
self._handleValidDistrictUp(msgType, di)
|
|
elif msgType == STATESERVER_OBJECT_DELETE_RAM:
|
|
self._handleIgnorableObjectDelete(msgType, di)
|
|
else:
|
|
self.handleMessageType(msgType, di)
|
|
|
|
def handlePlayGame(self, msgType, di):
|
|
# NOTE: Inheritors may override this to check their
|
|
# own message types before calling this handler
|
|
# See ToontownAIRepository.py for example.
|
|
if msgType == STATESERVER_OBJECT_UPDATE_FIELD:
|
|
self._handleUpdateField(di)
|
|
elif msgType == STATESERVER_OBJECT_CHANGE_ZONE:
|
|
self._handleObjectChangeZone(di)
|
|
elif msgType == STATESERVER_OBJECT_DELETE_RAM:
|
|
self._handleDeleteObject(di)
|
|
elif msgType == DBSERVER_MAKE_FRIENDS_RESP:
|
|
self._handleMakeFriendsReply(di)
|
|
elif msgType == DBSERVER_REQUEST_SECRET_RESP:
|
|
self._handleRequestSecretReply(di)
|
|
elif msgType == DBSERVER_SUBMIT_SECRET_RESP:
|
|
self._handleSubmitSecretReply(di)
|
|
else:
|
|
self.handleMessageType(msgType, di)
|
|
|
|
def handleAvatarUsage(self, di):
|
|
"""
|
|
Should only be handled by the UD process containing the AvatarManagerUD
|
|
"""
|
|
pass
|
|
|
|
def handleAccountUsage(self, di):
|
|
"""
|
|
Should only be handled by the UD process containing the AvatarManagerUD
|
|
"""
|
|
pass
|
|
|
|
def handleObjectDeleteDisk(self, di):
|
|
pass
|
|
|
|
def handleObjectQueryField(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
|
|
doId = di.getUint32()
|
|
fieldId = di.getUint16()
|
|
context = di.getUint32()
|
|
success = di.getUint8()
|
|
if success and context:
|
|
className = self.contextToClassName.pop(context, None)
|
|
# This prevents a crash that occurs when an AI sends a query,
|
|
# crashes, comes back up, then receives the response
|
|
if className:
|
|
dclass = self.dclassesByName.get(className)
|
|
interface = dclass.getFieldByIndex(fieldId)
|
|
packer = DCPacker()
|
|
packer.setUnpackData(di.getRemainingBytes())
|
|
packer.beginUnpack(interface)
|
|
value = packer.unpackObject()
|
|
messenger.send(
|
|
"doFieldResponse-%s"%(context,), [context, value])
|
|
|
|
def handleObjectQueryFields(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
|
|
doId = di.getUint32()
|
|
context = di.getUint32()
|
|
success = di.getUint8()
|
|
if success and context:
|
|
className = self.contextToClassName.pop(context)
|
|
# This prevents a crash that occurs when an AI sends a query,
|
|
# crashes, comes back up, then receives the response
|
|
if className:
|
|
dclass = self.dclassesByName.get(className)
|
|
packer = DCPacker()
|
|
objData = {}
|
|
|
|
while di.getRemainingSize() > 0:
|
|
fieldId = di.getUint16()
|
|
interface = dclass.getFieldByIndex(fieldId)
|
|
packer.setUnpackData(di.getRemainingBytes())
|
|
packer.beginUnpack(interface)
|
|
value = packer.unpackObject()
|
|
packer.endUnpack()
|
|
objData[interface.getName()] = value
|
|
di.skipBytes(packer.getNumUnpackedBytes())
|
|
|
|
messenger.send("doFieldResponse-%s"%(context,),[context,objData])
|
|
else:
|
|
self.notify.warning("STATESERVER_OBJECT_QUERY_FIELDS_RESP received with invalid context: %s" % context)
|
|
messenger.send("doFieldQueryFailed-%s"%(context),[context])
|
|
else:
|
|
messenger.send("doFieldQueryFailed-%s"%(context),[context])
|
|
|
|
def handleServerPing(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
# Deconstruct ping message from stateserver
|
|
sec = di.getUint32()
|
|
usec = di.getUint32()
|
|
url = di.getString()
|
|
channel = di.getUint32()
|
|
|
|
# Send ping back to state server
|
|
datagram = PyDatagram()
|
|
sender=self.getMsgSender()
|
|
datagram.addServerHeader(
|
|
sender, self.ourChannel, SERVER_PING)
|
|
# A context that can be used to index the response if needed
|
|
datagram.addUint32(sec)
|
|
datagram.addUint32(usec)
|
|
datagram.addString(url)
|
|
datagram.addUint32(channel)
|
|
self.send(datagram)
|
|
|
|
|
|
|
|
def handleMessageType(self, msgType, di):
|
|
if msgType == CLIENT_GET_STATE_RESP:
|
|
# This one comes back after we sendSetAvatarIdMsg()
|
|
pass
|
|
elif msgType == DBSERVER_GET_STORED_VALUES_RESP:
|
|
self._handleDatabaseGetStoredValuesResp(di)
|
|
elif msgType == DBSERVER_CREATE_STORED_OBJECT_RESP:
|
|
self._handleDatabaseCreateStoredObjectResp(di)
|
|
elif msgType == STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT_RESP:
|
|
self._handleDatabaseGenerateResponse(di)
|
|
elif msgType == STATESERVER_OBJECT_SET_ZONE:
|
|
# import pdb;pdb.set_trace()
|
|
self.handleSetLocation(di)
|
|
elif msgType == STATESERVER_OBJECT_LEAVING_AI_INTEREST:
|
|
# import pdb;pdb.set_trace()
|
|
self.handleDistObjExit(di)
|
|
elif msgType == STATESERVER_QUERY_OBJECT_ALL_RESP:
|
|
self.handleDistObjRequestResponse(di)
|
|
elif msgType == STATESERVER_OBJECT_ENTER_AI_RECV:
|
|
self.handleDistObjEnter(di)
|
|
elif msgType == STATESERVER_OBJECT_ENTERZONE_WITH_REQUIRED_OTHER:
|
|
self.handleDistObjEnterZone(di)
|
|
elif msgType == STATESERVER_QUERY_OBJECT_CHILDREN_RESP:
|
|
# This acts much like a generate with required and other
|
|
self.handleDistObjEnter(di)
|
|
elif msgType == STATESERVER_QUERY_OBJECT_CHILDREN_LOCAL_DONE:
|
|
# This acts much like a generate with required and other
|
|
self.handleQueryObjectChildrenLocalDone(di)
|
|
elif msgType == STATESERVER_OBJECT_ENTER_OWNER_RECV:
|
|
# This message is sent at some stage during avatar creation.
|
|
self.handleDistObjEnterOwner(di)
|
|
elif msgType == STATESERVER_OBJECT_CHANGE_OWNER_RECV:
|
|
self.handleDistObjChangeOwner(di)
|
|
elif msgType == ACCOUNT_AVATAR_USAGE:
|
|
self.handleAvatarUsage(di)
|
|
elif msgType == ACCOUNT_ACCOUNT_USAGE:
|
|
self.handleAccountUsage(di)
|
|
elif msgType == STATESERVER_OBJECT_DELETE_DISK:
|
|
self.handleObjectDeleteDisk(di)
|
|
elif msgType == STATESERVER_OBJECT_QUERY_FIELD_RESP:
|
|
self.handleObjectQueryField(di)
|
|
elif msgType == STATESERVER_OBJECT_QUERY_FIELDS_RESP:
|
|
self.handleObjectQueryFields(di)
|
|
elif msgType == STATESERVER_OBJECT_QUERY_MANAGING_AI:
|
|
pass
|
|
elif msgType == SERVER_PING:
|
|
self.handleServerPing(di)
|
|
else:
|
|
AIRepository.notify.warning(
|
|
"Ignoring unexpected message type: %s in state: %s" %
|
|
(msgType, self.fsm.getCurrentState().getName()))
|
|
if __dev__:
|
|
import pdb
|
|
pdb.set_trace()
|
|
|
|
def exitPlayGame(self):
|
|
self.handler = None
|
|
self.stopReaderPollTask()
|
|
|
|
self.deleteDistributedObjects()
|
|
cleanupAsyncRequests()
|
|
|
|
# Make sure there are no leftover tasks that shouldn't be here.
|
|
for task in taskMgr.getTasks():
|
|
if (task.name in ("igLoop",
|
|
"aiSleep",
|
|
"doLaterProcessor",
|
|
"eventManager",
|
|
"tkLoop",
|
|
)):
|
|
# These tasks are ok
|
|
continue
|
|
else:
|
|
print(taskMgr)
|
|
self.notify.error("You can't leave otp until you clean up your tasks.")
|
|
|
|
# Make sure there are no event hooks still hanging.
|
|
if not messenger.isEmpty():
|
|
print(messenger)
|
|
self.notify.error("Messenger should not have any events in it.")
|
|
|
|
##### NoConnection #####
|
|
|
|
def enterNoConnection(self):
|
|
self.handler = self.handleMessageType
|
|
|
|
AIRepository.notify.warning(
|
|
"Failed to connect to message director at %s." % (repr(self.mdurl)))
|
|
# Wait five seconds, then try to reconnect
|
|
taskMgr.doMethodLater(5, self.reconnect, self.uniqueName("waitToReconnect"))
|
|
|
|
def reconnect(self, task):
|
|
self.fsm.request("connect")
|
|
return Task.done
|
|
|
|
def exitNoConnection(self):
|
|
self.handler = None
|
|
taskMgr.remove(self.uniqueName("waitToReconnect"))
|
|
|
|
##### General Purpose functions #####
|
|
|
|
def createObjects(self):
|
|
# This is meant to be a pure virtual function that gets
|
|
# overridden by inheritors. This is where you should create
|
|
# DistributedObjectAI's (and generate them), create the objects
|
|
# that manage them, as well as spawn
|
|
# any tasks that will create DistributedObjectAI's in the future.
|
|
pass
|
|
|
|
def getHandleClassNames(self):
|
|
# This is meant to be a pure virtual function that gets
|
|
# overridden by inheritors.
|
|
|
|
# This function should return a tuple or list of string names
|
|
# that represent distributed object classes for which we want
|
|
# to make a special 'handle' class available. For instance,
|
|
# to make the DistributedToonHandleAI class available, this
|
|
# function should return ('DistributedToon',).
|
|
return ()
|
|
|
|
def handleDistObjRequestResponse(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
context = di.getUint32()
|
|
parentId = di.getUint32()
|
|
zoneId = di.getUint32()
|
|
classId = di.getUint16()
|
|
doId = di.getUint32()
|
|
# if there's no context, there's no point in doing anything else
|
|
if context:
|
|
# Look up the dclass
|
|
dclass = self.dclassesByNumber[classId]
|
|
# Create a new distributed object
|
|
distObj = self._generateFromDatagram(
|
|
parentId, zoneId, dclass, doId, di, addToTables=False)
|
|
#distObj.isQueryAllResponse = True
|
|
|
|
self.writeServerEvent('doRequestResponse', doId, '%s'%(context,))
|
|
messenger.send("doRequestResponse-%s"%(context,), [context, distObj])
|
|
|
|
def postGenerate(self, context, distObj):
|
|
parentId = distObj.parentId
|
|
zoneId = distObj.zoneId
|
|
doId = distObj.doId
|
|
self.distributedObjectRequests.discard(doId)
|
|
distObj.setLocation(parentId, zoneId)
|
|
self.writeServerEvent('distObjEnter', doId, '')
|
|
|
|
def handleDistObjEnter(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
context = di.getUint32()
|
|
parentId = di.getUint32()
|
|
zoneId = di.getUint32()
|
|
classId = di.getUint16()
|
|
doId = di.getUint32()
|
|
# Look up the dclass
|
|
dclass = self.dclassesByNumber[classId]
|
|
# Is it in our dictionary?
|
|
if doId in self.doId2do:
|
|
self.notify.warning("Object Entered " + str(doId) +
|
|
" re-entered without exiting")
|
|
# Create a new distributed object
|
|
distObj = self._generateFromDatagram(
|
|
parentId, zoneId, dclass, doId, di)
|
|
self.postGenerate(context, distObj)
|
|
|
|
# Put it in the dictionary - Is it already in our dictionary? Not
|
|
# sure why or how this happens, but it does come up from time to
|
|
# time in production
|
|
# Asad Todo take out this security check for now don't publish to toontown!!!
|
|
## if doId in self.doId2do:
|
|
## self.writeServerEvent('suspicious', doId, 'Avatar re-entered without exiting')
|
|
## # Note: until we figure out what is causing this bug, it will be an error
|
|
## # This is listed as bug 50608 in the production remarks db
|
|
## self.notify.error("Avatar %s re-entered without exiting" % doId)
|
|
|
|
|
|
def handleDistObjEnterZone(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
parentId = di.getUint32()
|
|
zoneId = di.getUint32()
|
|
classId = di.getUint16()
|
|
doId = di.getUint32()
|
|
# Look up the dclass
|
|
dclass = self.dclassesByNumber[classId]
|
|
# Is it in our dictionary?
|
|
if doId in self.doId2do:
|
|
self.notify.warning("Object Entered " + str(doId) +
|
|
" re-entered without exiting")
|
|
# Create a new distributed object
|
|
distObj = self._generateFromDatagram(
|
|
parentId, zoneId, dclass, doId, di)
|
|
# self.postGenerate(context, distObj)
|
|
|
|
# Put it in the dictionary - Is it already in our dictionary? Not
|
|
# sure why or how this happens, but it does come up from time to
|
|
# time in production
|
|
# Asad Todo take out this security check for now don't publish to toontown!!!
|
|
## if doId in self.doId2do:
|
|
## self.writeServerEvent('suspicious', doId, 'Avatar re-entered without exiting')
|
|
## # Note: until we figure out what is causing this bug, it will be an error
|
|
## # This is listed as bug 50608 in the production remarks db
|
|
## self.notify.error("Avatar %s re-entered without exiting" % doId)
|
|
|
|
|
|
def handleDistObjEnterOwner(self, di):
|
|
# TEMP
|
|
return self.handleDistObjEnter(di)
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
# The context is bogus
|
|
context = di.getUint32()
|
|
# The zone the avatar is in
|
|
parentId = di.getUint32()
|
|
zoneId = di.getUint32()
|
|
# Get the class Id
|
|
classId = di.getUint16()
|
|
# Get the DO Id
|
|
doId = di.getUint32()
|
|
# Look up the dclass
|
|
dclass = self.dclassesByNumber[classId]
|
|
self.notify.info(
|
|
'ignoring owner create, context=%s, parentId=%s, '
|
|
'zoneId=%s, doId=%s, dclass=%s' % (
|
|
context, parentId, zoneId, doId, dclass.getName()))
|
|
"""
|
|
|
|
def handleDistObjChangeOwner(self, di):
|
|
assert self.notify.debugStateCall(self)
|
|
doId = di.getUint32()
|
|
newOwnerId = di.getUint32()
|
|
oldOwnerId = di.getUint32()
|
|
self.notify.info(
|
|
'ignoring owner change, '
|
|
'doId=%s, newOwnerId=%s, oldOwnerID=%s' % (
|
|
doId, newOwnerId, oldOwnerId))
|
|
|
|
def getDeleteDoIdEvent(self, doId):
|
|
# this event is sent after the object is deleted,
|
|
# and is sent even if the object was not in the tables.
|
|
return 'AIRDeleteDoId-%s' % doId
|
|
|
|
def handleDistObjExit(self, di):
|
|
# Get the distributed object id
|
|
doId = di.getUint32()
|
|
self.notify.debug("handleDistObjExit %s" % doId)
|
|
|
|
# If it is in the dictionary, remove it.
|
|
obj = self.doId2do.get(doId)
|
|
if obj:
|
|
self.deleteDistObject(obj)
|
|
else:
|
|
self.notify.warning("DistObj " + str(doId) +
|
|
" exited, but never entered.")
|
|
|
|
# announce delete event for this doId, even if obj doesn't exist
|
|
# send it after we delete any existing object
|
|
messenger.send(self.getDeleteDoIdEvent(doId))
|
|
|
|
# TODO: remove this in favor of getDeleteDoIdEvent
|
|
# Throw an event telling everyone that the distObj is gone
|
|
self._announceDistObjExit(doId)
|
|
|
|
def _announceDistObjExit(self, doId):
|
|
pass
|
|
|
|
def _generateFromDatagram(self, parentId, zoneId, dclass, doId, di, addToTables=True):
|
|
if (doId in self.doId2do):
|
|
# added to prevent objects already generated from being generated again (was
|
|
# happening with some traded inventory objects, quests specfically)
|
|
return self.doId2do[doId]
|
|
# We got a datagram telling us to create a new DO instance
|
|
classDef = dclass.getClassDef()
|
|
try:
|
|
distObj = classDef(self)
|
|
except TypeError as e:
|
|
self.notify.error('%s (class %s, parentId %d, zoneId %d, doId %d)' % \
|
|
(e, dclass.getName(), parentId, zoneId, doId))
|
|
distObj.dclass = dclass
|
|
# Assign it an Id
|
|
distObj.doId = doId
|
|
# Since the distObj has been created explicitly from the
|
|
# server, we do not own its doId, and hence we shouldn't try
|
|
# to deallocate it.
|
|
distObj.doNotDeallocateChannel = 1
|
|
|
|
# init the parentId and zoneId
|
|
if addToTables:
|
|
# add the new DO to the tables
|
|
# let addDoToTables set the parentId and zoneId, it ignores
|
|
# the location if it matches what's already on the object
|
|
distObj.parentId = None
|
|
distObj.zoneId = None
|
|
self.addDOToTables(distObj, location = (parentId,zoneId))
|
|
else:
|
|
distObj.parentId = parentId
|
|
distObj.zoneId = zoneId
|
|
|
|
# Generate this Object
|
|
distObj.generate()
|
|
|
|
# Update the required fields
|
|
distObj.updateAllRequiredOtherFields(dclass, di)
|
|
|
|
return distObj
|
|
|
|
def _handleUpdateField(self, di):
|
|
# Get the Do Id
|
|
doId = di.getUint32()
|
|
# Find the do
|
|
do = self.doId2do[doId]
|
|
# Let the dclass finish the job
|
|
do.dclass.receiveUpdate(do, di)
|
|
|
|
def _handleObjectChangeZone(self, di):
|
|
# Get the Do Id
|
|
doId = di.getUint32()
|
|
newParentId = di.getUint32()
|
|
newZoneId = di.getUint32()
|
|
oldParentId = di.getUint32()
|
|
oldZoneId = di.getUint32()
|
|
# Find the do
|
|
do = self.doId2do.get(doId)
|
|
if do is None:
|
|
self.notify.warning(
|
|
'handleObjectChangeZone: (NOT PRESENT) %s:(%s, %s)->(%s, %s)' %(
|
|
doId, oldParentId, oldZoneId, newParentId, newZoneId))
|
|
else:
|
|
self.notify.debug('handleObjectChangeZone: %s:(%s, %s)->(%s, %s)' %(
|
|
doId, oldParentId, oldZoneId, newParentId, newZoneId))
|
|
# TODO: pass in the old parent and old zone
|
|
do.setLocation(newParentId, newZoneId) #, oldParentId, oldZoneId)
|
|
|
|
def _handleDeleteObject(self, di):
|
|
# Get the DO Id
|
|
doId = di.getUint32()
|
|
obj = self.doId2do.get(doId)
|
|
if obj:
|
|
# If it is in the dictionary, remove it.
|
|
self.deleteDistObject(obj)
|
|
else:
|
|
# Otherwise ignore it
|
|
AIRepository.notify.warning(
|
|
"Asked to delete non-existent DistObjAI " + str(doId))
|
|
|
|
# announce delete event for this doId, even if obj doesn't exist
|
|
messenger.send(self.getDeleteDoIdEvent(doId))
|
|
|
|
def sendUpdate(self, do, fieldName, args):
|
|
#print("---------------sendUpdate--")
|
|
#print(do)
|
|
#print(do.doId)
|
|
#print(fieldName)
|
|
#print(args)
|
|
dg = do.dclass.aiFormatUpdate(
|
|
fieldName, do.doId, do.doId, self.ourChannel, args)
|
|
self.sendDatagram(dg)
|
|
|
|
def sendUpdateToDoId(self, dclassName, fieldName, doId, args,
|
|
channelId=None):
|
|
"""
|
|
channelId can be used as a recipient if you want to bypass the normal
|
|
airecv, ownrecv, broadcast, etc. If you don't include a channelId
|
|
or if channelId == doId, then the normal broadcast options will
|
|
be used.
|
|
|
|
See Also: def queryObjectField
|
|
"""
|
|
dclass=self.dclassesByName.get(dclassName+self.dcSuffix)
|
|
assert dclass is not None
|
|
if channelId is None:
|
|
channelId=doId
|
|
if dclass is not None:
|
|
dg = dclass.aiFormatUpdate(
|
|
fieldName, doId, channelId, self.ourChannel, args)
|
|
self.send(dg)
|
|
|
|
def createDgUpdateToDoId(self, dclassName, fieldName, doId, args,
|
|
channelId=None):
|
|
"""
|
|
channelId can be used as a recipient if you want to bypass the normal
|
|
airecv, ownrecv, broadcast, etc. If you don't include a channelId
|
|
or if channelId == doId, then the normal broadcast options will
|
|
be used.
|
|
|
|
This is just like sendUpdateToDoId, but just returns
|
|
the datagram instead of immediately sending it.
|
|
"""
|
|
result = None
|
|
dclass=self.dclassesByName.get(dclassName+self.dcSuffix)
|
|
assert dclass is not None
|
|
if channelId is None:
|
|
channelId=doId
|
|
if dclass is not None:
|
|
dg = dclass.aiFormatUpdate(
|
|
fieldName, doId, channelId, self.ourChannel, args)
|
|
result = dg
|
|
return result
|
|
|
|
def sendUpdateToGlobalDoId(self, dclassName, fieldName, doId, args):
|
|
"""
|
|
Used for sending messages from an AI directly to an
|
|
uber object.
|
|
"""
|
|
dclass = self.dclassesByName.get(dclassName)
|
|
assert dclass, 'dclass %s not found in DC files' % dclassName
|
|
dg = dclass.aiFormatUpdate(
|
|
fieldName, doId, doId, self.ourChannel, args)
|
|
self.send(dg)
|
|
|
|
def sendUpdateToChannel(self, do, channelId, fieldName, args):
|
|
dg = do.dclass.aiFormatUpdate(
|
|
fieldName, do.doId, channelId, self.ourChannel, args)
|
|
self.sendDatagram(dg)
|
|
|
|
def sendUpdateToChannelFrom(self, do, channelId, fieldName, fromid, args):
|
|
dg = do.dclass.aiFormatUpdate(fieldName, do.doId, channelId,
|
|
fromid, args)
|
|
self.send(dg)
|
|
|
|
def startMessageBundle(self, name):
|
|
# start bundling messages together. Use this for instance if you want to
|
|
# make sure that location and position of an object are processed atomically
|
|
# on the state server, to prevent clients from getting the new location and an
|
|
# old position (relative to old location) on client generate of the object
|
|
self._msgBundleNames.append(name)
|
|
ConnectionRepository.startMessageBundle(self)
|
|
def sendMessageBundle(self, senderChannel):
|
|
# stop bundling messages and send the bundle
|
|
# senderChannel is typically the doId of the object affected by the messages
|
|
ConnectionRepository.sendMessageBundle(self, self.districtId, senderChannel)
|
|
self._msgBundleNames.pop()
|
|
|
|
def _checkBundledMsgs(self, task=None):
|
|
# message bundles should not last across frames
|
|
num = len(self._msgBundleNames)
|
|
while len(self._msgBundleNames):
|
|
self.notify.warning('abandoning message bundle: %s' % self._msgBundleNames.pop())
|
|
if num > 0:
|
|
self.abandonMessageBundles()
|
|
self.notify.error('message bundling leak, see warnings above (most recent first)')
|
|
return task.cont
|
|
|
|
def registerForChannel(self, channelNumber):
|
|
if self._channels.get(channelNumber):
|
|
# We are already registered for this channel.
|
|
return
|
|
self._channels[channelNumber]=1
|
|
# Time to send a register for channel message to the msgDirector
|
|
datagram = PyDatagram()
|
|
# datagram.addServerControlHeader(CONTROL_SET_CHANNEL)
|
|
datagram.addInt8(1)
|
|
datagram.addChannel(CONTROL_MESSAGE)
|
|
datagram.addUint16(CONTROL_SET_CHANNEL)
|
|
|
|
datagram.addChannel(channelNumber)
|
|
self.send(datagram)
|
|
|
|
def addPostSocketClose(self, themessage):
|
|
# Time to send a register for channel message to the msgDirector
|
|
datagram = PyDatagram()
|
|
# datagram.addServerControlHeader(CONTROL_ADD_POST_REMOVE)
|
|
datagram.addInt8(1)
|
|
datagram.addChannel(CONTROL_MESSAGE)
|
|
datagram.addUint16(CONTROL_ADD_POST_REMOVE)
|
|
|
|
datagram.addBlob(themessage.getMessage())
|
|
self.send(datagram)
|
|
|
|
def addPostSocketCloseUD(self, dclassName, fieldName, doId, args):
|
|
dclass = self.dclassesByName.get(dclassName)
|
|
assert dclass, 'dclass %s not found in DC files' % dclassName
|
|
dg = dclass.aiFormatUpdate(
|
|
fieldName, doId, doId, self.ourChannel, args)
|
|
self.addPostSocketClose(dg)
|
|
|
|
def unregisterForChannel(self, channelNumber):
|
|
if self._channels.get(channelNumber) is None:
|
|
# We are already unregistered for this channel.
|
|
return
|
|
del self._channels[channelNumber]
|
|
# Time to send a unregister for channel message to the msgDirector
|
|
datagram = PyDatagram()
|
|
# datagram.addServerControlHeader(CONTROL_REMOVE_CHANNEL)
|
|
datagram.addInt8(1)
|
|
datagram.addChannel(CONTROL_MESSAGE)
|
|
datagram.addUint16(CONTROL_REMOVE_CHANNEL)
|
|
|
|
datagram.addChannel(channelNumber)
|
|
self.send(datagram)
|
|
|
|
#----------------------------------
|
|
|
|
def addAvatarToChannels(self, avatarId, listOfChannels):
|
|
"""
|
|
avatarId is a 32 bit doId
|
|
"""
|
|
assert self.notify.debugCall()
|
|
self.addConnectionToChannels((1<<32)+avatarId, listOfChannels)
|
|
|
|
def removeAvatarFromChannels(self, avatarId, listOfChannels):
|
|
"""
|
|
avatarId is a 32 bit doId
|
|
"""
|
|
assert self.notify.debugCall()
|
|
self.removeConnectionToChannels((1<<32)+avatarId, listOfChannels)
|
|
|
|
def addConnectionToChannels(self, targetConnection, listOfChannels):
|
|
"""
|
|
avatarId is a 32 bit doId
|
|
"""
|
|
assert self.notify.debugCall()
|
|
dg = PyDatagram()
|
|
dg.addServerHeader(
|
|
targetConnection, self.serverId, CLIENT_AGENT_OPEN_CHANNEL)
|
|
for i in listOfChannels:
|
|
dg.addUint64(i)
|
|
self.send(dg)
|
|
|
|
def removeConnectionToChannels(self, targetConnection, listOfChannels):
|
|
"""
|
|
avatarId is a 32 bit doId
|
|
"""
|
|
assert self.notify.debugCall()
|
|
dg = PyDatagram()
|
|
dg.addServerHeader(
|
|
targetConnection, self.serverId, CLIENT_AGENT_CLOSE_CHANNEL)
|
|
for i in listOfChannels:
|
|
dg.addUint64(i)
|
|
self.send(dg)
|
|
|
|
def addInterestToConnection(self, targetConnection, interestId,
|
|
contextId, parentDoId, zoneIdList):
|
|
"""
|
|
Allows the AIRepository to initiate interest on the client.
|
|
See otp.ai.AIInterestHandles for a list of interestId's to use.
|
|
"""
|
|
assert self.notify.debugCall()
|
|
dg = PyDatagram()
|
|
dg.addServerHeader(
|
|
(1<<32)+targetConnection, self.serverId,
|
|
CLIENT_AGENT_SET_INTEREST)
|
|
|
|
# Set the high bit to indicate that the interest is being governed by
|
|
# the AI and not the client
|
|
dg.addUint16((1<<15)+interestId)
|
|
dg.addUint32(contextId)
|
|
dg.addUint32(parentDoId)
|
|
dg.addUint32(contextId)
|
|
if isinstance(zoneIdList, types.ListType):
|
|
# sort and remove repeated entries
|
|
zIdSet = set(zoneIdList)
|
|
for zoneId in zIdSet:
|
|
dg.addUint32(zoneId)
|
|
else:
|
|
dg.addUint32(zoneIdList)
|
|
self.send(dg)
|
|
|
|
def removeInterestFromConnection(self, targetConnection, interestId,
|
|
contextId=0):
|
|
"""
|
|
Allows the AIRepository to remove interest on the client.
|
|
See otp.ai.AIInterestHandles for a list of interestId's to use.
|
|
"""
|
|
assert self.notify.debugCall()
|
|
dg = PyDatagram()
|
|
dg.addServerHeader(
|
|
(1<<32)+targetConnection, self.serverId,
|
|
CLIENT_AGENT_REMOVE_INTEREST)
|
|
# set the high bit to indicate that the interest is being governed by
|
|
# the AI and not the client
|
|
dg.addUint16((1<<15)+interestId)
|
|
dg.addUint32(contextId)
|
|
self.send(dg)
|
|
|
|
def setAllowClientSend(self, avatarId,
|
|
dObject, fieldNameList = []):
|
|
"""
|
|
Allow an AI to temporarily give a client 'clsend' privileges
|
|
on a particular fields on a particular object. This should
|
|
be used on fields that are 'ownsend' by default. When you want
|
|
to revoke these privileges, use clearAllowClientSend() to end
|
|
these privileges.
|
|
"""
|
|
assert self.notify.debugCall()
|
|
dg = PyDatagram()
|
|
dg.addServerHeader(
|
|
(1<<32)+avatarId, self.serverId,
|
|
CLIENT_SET_FIELD_SENDABLE)
|
|
|
|
# Set the high bit to indicate that the interest is being governed by
|
|
# the AI and not the client
|
|
dg.addUint32(dObject.doId)
|
|
assert isinstance(fieldNameList, types.ListType)
|
|
|
|
dclass = dObject.dclass
|
|
# sort and remove repeated entries as we discover the field
|
|
# ids for the specified names
|
|
fieldIdSet = set(dclass.getFieldByName(name).getNumber() \
|
|
for name in fieldNameList)
|
|
|
|
# insert the fieldIds into the datagram
|
|
for fieldId in sorted(fieldIdSet):
|
|
dg.addUint16(fieldId)
|
|
self.send(dg)
|
|
|
|
def clearAllowClientSend(self, avatarId, dObject):
|
|
self.setAllowClientSend(avatarId, dObject)
|
|
|
|
# ----------------------------------
|
|
|
|
def setConnectionName(self, name):
|
|
self.connectionName = name
|
|
# Time to send a register for channel message to the msgDirector
|
|
datagram = PyDatagram()
|
|
# datagram.addServerControlHeader(CONTROL_SET_CON_NAME)
|
|
datagram.addInt8(1)
|
|
datagram.addChannel(CONTROL_MESSAGE)
|
|
datagram.addUint16(CONTROL_SET_CON_NAME)
|
|
|
|
datagram.addString(name)
|
|
self.send(datagram)
|
|
|
|
def setConnectionURL(self, url):
|
|
self.connectionURL = url
|
|
# Time to send a register for channel message to the msgDirector
|
|
datagram = PyDatagram()
|
|
# datagram.addServerControlHeader(CONTROL_SET_CON_NAME)
|
|
datagram.addInt8(1)
|
|
datagram.addChannel(CONTROL_MESSAGE)
|
|
datagram.addUint16(CONTROL_SET_CON_URL)
|
|
|
|
datagram.addString(url)
|
|
self.send(datagram)
|
|
|
|
def deleteObjects(self):
|
|
# This is meant to be a pure virtual function that gets
|
|
# overridden by inheritors.
|
|
# This function is where objects that manage DistributedObjectAI's
|
|
# should be cleaned up. Since this is only called during a district
|
|
# shutdown of some kind, it is not useful to delete the existing
|
|
# DistributedObjectAI's, but rather to just make sure that they
|
|
# are no longer referenced, and no new ones are created.
|
|
pass
|
|
|
|
def allocateChannel(self):
|
|
channel=self.channelAllocator.allocate()
|
|
if channel==-1:
|
|
raise RuntimeError("channelAllocator.allocate() is out of channels")
|
|
if self.channelAllocator.fractionUsed()>0.75:
|
|
# There is some debate about how bad it is to run out of
|
|
# channels. Being ignorant about what exactly will happen
|
|
# if a channel is reused too quickly, we decided to bail
|
|
# out if we got low on channels. By the way, being low on
|
|
# channels is not the real problem. The problem is reusing
|
|
# a channel too soon after it's freed. There is an assumption
|
|
# here that if there are a lot of free channels and the
|
|
# channels are reused in the same order that they are freed,
|
|
# then there is a good chance that any allocated channel has
|
|
# "aged" properly.
|
|
# Schuyler has daydreamed about a system that will track the age
|
|
# of freed ids and sleep or flag an error as apropriate. See him
|
|
# for details (esp. if you want to write a cross-platform version
|
|
# of said feature).
|
|
raise RuntimeError("Dangerously low on channels.")
|
|
# Sanity check
|
|
assert (channel >= self.minChannel) and (channel <= self.maxChannel)
|
|
|
|
if 0: ## used to debug the channel code
|
|
import traceback
|
|
if not hasattr(self, "debug_dictionary"):
|
|
self.debug_dictionary = {}
|
|
__builtins__["debug_dictionary"] = self.debug_dictionary
|
|
|
|
for id in self.debug_dictionary.keys():
|
|
if id not in self.doId2do:
|
|
print("--------------------- Not In DOID table")
|
|
print(id)
|
|
#traceback.print_list(self.debug_dictionary[id])
|
|
print(self.debug_dictionary[id])
|
|
del self.debug_dictionary[id] # never report it again ..
|
|
|
|
self.debug_dictionary[channel] = traceback.extract_stack(None,7)
|
|
|
|
return channel
|
|
|
|
def deallocateChannel(self, channel):
|
|
if 0: ## used to debug the channel code .. see above
|
|
del self.debug_dictionary[channel]
|
|
|
|
self.channelAllocator.free(channel)
|
|
|
|
def getMinDynamicZone(self):
|
|
# Override this to return the minimum allowable value for a
|
|
# dynamically-allocated zone id.
|
|
return 0
|
|
|
|
def getMaxDynamicZone(self):
|
|
# Override this to return the maximum allowable value for a
|
|
# dynamically-allocated zone id.
|
|
return 0
|
|
|
|
def allocateZone(self):
|
|
zoneId=self.zoneAllocator.allocate()
|
|
if zoneId==-1:
|
|
raise RuntimeError("zoneAllocator.allocate() is out of zoneIds")
|
|
# Sanity check
|
|
assert (zoneId >= self.minZone) and (zoneId <= self.maxZone)
|
|
return zoneId
|
|
|
|
def deallocateZone(self, zoneId):
|
|
self.zoneAllocator.free(zoneId)
|
|
|
|
# NOTE: the public API for this is in DistributedObjectAI.b_setLocation()
|
|
# @report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def sendSetLocation(self, distobj, parentId, zoneId, owner=None):
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
distobj.doId, self.ourChannel, STATESERVER_OBJECT_SET_ZONE)
|
|
datagram.addUint32(parentId)
|
|
datagram.addUint32(zoneId)
|
|
self.send(datagram)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def sendSetLocationDoId(self, doId, parentId, zoneId, owner=None):
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_SET_ZONE)
|
|
datagram.addUint32(parentId)
|
|
datagram.addUint32(zoneId)
|
|
self.send(datagram)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def sendSetOwnerDoId(self, doId, ownerId):
|
|
# allowed channels are (from http://aspen.online.disney.com/mediawiki/index.php/P2P%2C_OWNERSHIP_STUFF):
|
|
# account<<32 + avatar
|
|
# 1<<32 + avatar
|
|
# 1<<32 + account
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_SET_OWNER_RECV)
|
|
datagram.addChannel((1<<32) + ownerId)
|
|
self.send(datagram)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def sendClearOwnerDoId(self, doId):
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_SET_OWNER_RECV)
|
|
datagram.addChannel(0)
|
|
self.send(datagram)
|
|
|
|
def sendSetZone(self, distobj, zoneId):
|
|
self.notify.error("non-district types should not call sendSetZone")
|
|
|
|
def startTrackRequestDeletedDO(self, obj):
|
|
if obj.doId in self._requestDeletedDOs:
|
|
self.notify.warning('duplicate requestDelete for %s %s' % (obj.__class__.__name__, obj.doId))
|
|
# store object and time of requestDelete
|
|
self._requestDeletedDOs[obj.doId] = (obj, globalClock.getRealTime())
|
|
|
|
def stopTrackRequestDeletedDO(self, obj):
|
|
# sometimes objects are deleted without having requested a delete
|
|
#assert obj.doId in self._requestDeletedDOs
|
|
if (hasattr(obj,'doId')) and (obj.doId in self._requestDeletedDOs):
|
|
del self._requestDeletedDOs[obj.doId]
|
|
|
|
def getRequestDeletedDOs(self):
|
|
# returns list of (obj, age of delete request), sorted by descending age
|
|
response = []
|
|
now = globalClock.getRealTime()
|
|
for obj, requestTime in self._requestDeletedDOs.values():
|
|
# calculate how long it has been since delete was requested
|
|
age = now - requestTime
|
|
index = 0
|
|
while index < len(response):
|
|
if age > response[index][1]:
|
|
break
|
|
index += 1
|
|
response.insert(index, (obj, age))
|
|
return response
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def requestDelete(self, distobj):
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
distobj.doId, self.ourChannel, STATESERVER_OBJECT_DELETE_RAM)
|
|
# The Id of the object in question
|
|
datagram.addUint32(distobj.doId)
|
|
self.send(datagram)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def requestDeleteDoId(self, doId):
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_DELETE_RAM)
|
|
# The Id of the object in question
|
|
datagram.addUint32(doId)
|
|
self.send(datagram)
|
|
|
|
def requestDeleteDoIdFromDisk(self, doId):
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_DELETE_DISK)
|
|
# The Id of the object in question
|
|
datagram.addUint32(doId)
|
|
self.send(datagram)
|
|
|
|
def getDatabaseGenerateResponseEvent(self, context):
|
|
# handler must accept (doId)
|
|
return 'DBGenResponse-%s' % context
|
|
|
|
def _handleDatabaseGenerateResponse(self, di):
|
|
assert self.notify.debugCall(self)
|
|
context = di.getUint32()
|
|
doId = di.getUint32()
|
|
self.notify.debug(
|
|
'_handleDatabaseGenerateResponse, context=%s, doId=%s' %
|
|
(context, doId))
|
|
messenger.send(
|
|
self.getDatabaseGenerateResponseEvent(context), [doId])
|
|
|
|
def getDatabaseIdForClassName(self, className):
|
|
assert 0
|
|
# You probably want to override this to return something better.
|
|
return 0
|
|
|
|
def requestDatabaseGenerate(
|
|
self, classId, context,
|
|
parentId=0, zoneId=0,
|
|
ownerChannel=0, ownerAvId=None,
|
|
databaseId=None, values = None):
|
|
"""
|
|
context is any 32 bit integer to be used a reference
|
|
for this message (and its reply).
|
|
parentId may be 0 or a valid distributed object ID.
|
|
zoneId may be 0 or a valid zone ID (int32).
|
|
ownerChannel may be 0 or a valid owner channel (int64)
|
|
OR
|
|
ownerAvId may be None or a player doId
|
|
databaseId is a distributed object ID for the database
|
|
(maybe a constant).
|
|
values is a dictionary of other member values (or None)
|
|
|
|
To get doId of new object, listen for this event:
|
|
air.getDatabaseGenerateResponseEvent(context)
|
|
If object location was provided, and the location is on this
|
|
district, the object will be generated shortly after the
|
|
above message is sent.
|
|
"""
|
|
AIRepository.notify.debugCall()
|
|
#self.notify.info('requestDatabaseGenerate, class=%s, context=%s' %
|
|
# (classId, context))
|
|
if ownerChannel == 0 and ownerAvId is not None:
|
|
ownerChannel = (1<<32) + ownerAvId
|
|
if classId in self.dclassesByNumber:
|
|
dclass = self.dclassesByNumber[classId]
|
|
else:
|
|
if classId in self.dclassesByName:
|
|
dclass = self.dclassesByName[classId]
|
|
elif classId+self.dcSuffix in self.dclassesByName:
|
|
dclass = self.dclassesByName[classId+self.dcSuffix]
|
|
elif classId+'AI' in self.dclassesByName:
|
|
dclass = self.dclassesByName[classId+'AI']
|
|
else:
|
|
self.notify.warning("dclass not found %s"%(classId,))
|
|
if __dev__:
|
|
import pdb; pdb.set_trace()
|
|
if databaseId is None:
|
|
databaseId=self.getDatabaseIdForClassName(dclass.getName())
|
|
if values is None:
|
|
if simbase.newDBRequestGen:
|
|
dg = dclass.aiDatabaseGenerateContext(
|
|
context, parentId, zoneId, ownerChannel,
|
|
databaseId, self.ourChannel)
|
|
else:
|
|
dg = dclass.aiDatabaseGenerateContextOld(
|
|
context, parentId, zoneId,
|
|
databaseId, self.ourChannel)
|
|
self.send(dg)
|
|
else:
|
|
packer = DCPacker()
|
|
packer.rawPackUint8(1)
|
|
packer.rawPackUint64(databaseId)
|
|
packer.rawPackUint64(self.ourChannel)
|
|
packer.rawPackUint16(
|
|
STATESERVER_OBJECT_CREATE_WITH_REQUIRED_CONTEXT)
|
|
## packer.rawPackUint16(
|
|
## STATESERVER_OBJECT_CREATE_WITH_REQUIR_OTHER_CONTEXT)
|
|
packer.rawPackUint32(parentId)
|
|
packer.rawPackUint32(zoneId)
|
|
if simbase.newDBRequestGen:
|
|
packer.rawPackUint64(ownerChannel)
|
|
packer.rawPackUint16(dclass.getNumber())
|
|
packer.rawPackUint32(context)
|
|
|
|
optionalFields = []
|
|
|
|
for i in range(dclass.getNumInheritedFields()):
|
|
field = dclass.getInheritedField(i)
|
|
if field.asMolecularField() == None:
|
|
if field.isRequired():
|
|
# Packs Required Fields
|
|
value = values.get(field.getName(), None)
|
|
packer.beginPack(field)
|
|
|
|
if value == None:
|
|
packer.packDefaultValue()
|
|
else:
|
|
if not field.packArgs(packer, value):
|
|
raise StandardError
|
|
packer.endPack()
|
|
else:
|
|
value = values.get(field.getName(), None)
|
|
|
|
if value != None:
|
|
fieldDec = {}
|
|
fieldDec['field'] = field
|
|
fieldDec['value'] = value
|
|
optionalFields.append(fieldDec)
|
|
|
|
packer.rawPackUint16(len(optionalFields))
|
|
|
|
# Packs Optional Fields
|
|
for i in optionalFields:
|
|
field = i['field']
|
|
value = i['value']
|
|
|
|
packer.rawPackUint16(field.getNumber())
|
|
packer.beginPack(field)
|
|
field.packArgs(packer, value)
|
|
packer.endPack()
|
|
|
|
if packer.hadError():
|
|
raise StandardError
|
|
|
|
dg = Datagram(packer.getString())
|
|
self.send(dg)
|
|
|
|
def lostConnection(self):
|
|
ConnectionRepository.lostConnection(self)
|
|
sys.exit()
|
|
|
|
def handleDatagram(self, di):
|
|
if self.notify.getDebug():
|
|
print("AIRepository received datagram:")
|
|
di.getDatagram().dumpHex(Notify.out())
|
|
|
|
channel=self.getMsgChannel()
|
|
if channel in self.netMessenger.channels:
|
|
self.netMessenger.handle(di.getString())
|
|
else:
|
|
self.handler(self.getMsgType(), di)
|
|
|
|
def deleteDistObject(self, do):
|
|
assert self.notify.debugCall()
|
|
|
|
# if not hasattr(do, "isQueryAllResponse") or not do.isQueryAllResponse:
|
|
do.sendDeleteEvent()
|
|
# Remove it from the dictionary
|
|
self.removeDOFromTables(do)
|
|
# Delete the object itself
|
|
do.delete()
|
|
if self._proactiveLeakChecks:
|
|
# make sure we're not leaking
|
|
do.detectLeaks()
|
|
|
|
def sendAnotherGenerate(self, distObj, toChannel):
|
|
assert self.notify.debugCall()
|
|
dg = distObj.dclass.aiFormatGenerate(
|
|
distObj, distObj.doId, distObj.parentId, distObj.zoneId,
|
|
toChannel, self.ourChannel, [])
|
|
self.send(dg)
|
|
|
|
def generateWithRequired(self, distObj, parentId, zoneId, optionalFields=[]):
|
|
assert self.notify.debugStateCall(self)
|
|
# Assign it an id
|
|
distObj.doId = self.allocateChannel()
|
|
# Put the new DO in the dictionaries
|
|
self.addDOToTables(distObj, location = (parentId,zoneId))
|
|
# Send a generate message
|
|
distObj.sendGenerateWithRequired(self, parentId, zoneId, optionalFields)
|
|
|
|
|
|
# this is a special generate used for estates, or anything else that
|
|
# needs to have a hard coded doId as assigned by the server
|
|
def generateWithRequiredAndId(
|
|
self, distObj, doId, parentId, zoneId, optionalFields=[]):
|
|
assert self.notify.debugStateCall(self)
|
|
# Assign it an id
|
|
distObj.doId = doId
|
|
# Since the distObj has been created explicitly from the
|
|
# server, we do not own its doId, and hence we shouldn't try
|
|
# to deallocate it.
|
|
distObj.doNotDeallocateChannel = 1
|
|
# Put the new DO in the dictionaries
|
|
self.addDOToTables(distObj, location = (parentId,zoneId))
|
|
# Send a generate message
|
|
distObj.sendGenerateWithRequired(self, parentId, zoneId, optionalFields)
|
|
|
|
def queryObjectAll(self, doId, context=0):
|
|
"""
|
|
Get a one-time snapshot look at the object.
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_QUERY_OBJECT_ALL)
|
|
# A context that can be used to index the response if needed
|
|
datagram.addUint32(context)
|
|
self.send(datagram)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
def queryObjectZoneIds(self, stateServerId, obj2ZoneDict):
|
|
# obj2ZoneDict should be a dict looking like this:
|
|
# { objId : [zoneId, zoneId, ...],
|
|
# objId : [zoneId, zoneId, ...],
|
|
# }
|
|
assert self.notify.debugStateCall(self)
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
stateServerId, self.ourChannel, STATESERVER_QUERY_ZONE_OBJECT_ALL)
|
|
numObjs = len(obj2ZoneDict.keys())
|
|
datagram.addUint16(numObjs)
|
|
for objId, zoneIds in obj2ZoneDict.values():
|
|
datagram.addUint32(objId)
|
|
datagram.addUint16(len(zoneIds))
|
|
for zoneId in zoneIds:
|
|
datagram.addUint32(zoneId)
|
|
# This is for a forwarding system that I did not hook up.
|
|
# Ask Roger for details.
|
|
datagram.addUint16(0)
|
|
self.send(datagram)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
def queryObjectChildrenLocal(self, parentId, context=0):
|
|
assert self.notify.debugStateCall(self)
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
self.serverId, self.ourChannel, STATESERVER_QUERY_OBJECT_CHILDREN_LOCAL)
|
|
datagram.addUint32(parentId)
|
|
datagram.addUint32(context)
|
|
self.send(datagram)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
def handleQueryObjectChildrenLocalDone(self, di):
|
|
# We asked to get generates for all objects that are children of
|
|
# us. This is the callback that says the server is done sending all
|
|
# those generates.
|
|
assert self.notify.debugCall()
|
|
parentId = di.getUint16()
|
|
context = di.getUint32()
|
|
parent = self.doId2do.get(parentId)
|
|
if parent:
|
|
parent.handleQueryObjectChildrenLocalDone(context)
|
|
else:
|
|
self.notify.warning('handleQueryObjectChildrenLocalDone: parentId %s not found' %
|
|
parentId)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def queryObjectFieldId(self, doId, fieldId, context=0):
|
|
"""
|
|
Get a one-time snapshot look at the object.
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_QUERY_FIELD)
|
|
datagram.addUint32(doId)
|
|
datagram.addUint16(fieldId)
|
|
# A context that can be used to index the response if needed
|
|
datagram.addUint32(context)
|
|
self.send(datagram)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def queryObjectFieldIds(self, doId, fieldIds, context=0):
|
|
"""
|
|
Get a one-time snapshot look at the object.
|
|
Query multiple field IDs from the same object.
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
doId, self.ourChannel, STATESERVER_OBJECT_QUERY_FIELDS)
|
|
datagram.addUint32(doId)
|
|
datagram.addUint32(context)
|
|
for x in fieldIds:
|
|
datagram.addUint16(x)
|
|
self.send(datagram)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def queryObjectStringFieldIds(self, dbId, objString, fieldIds, context=0):
|
|
"""
|
|
Get a one-time snapshot look at the object.
|
|
Query multiple field IDs from the same object, by object string.
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
# Create a message
|
|
dg = PyDatagram()
|
|
dg.addServerHeader(
|
|
dbId, self.ourChannel, STATESERVER_OBJECT_QUERY_FIELDS_STRING)
|
|
dg.addString(objString)
|
|
dg.addUint32(context)
|
|
for x in fieldIds:
|
|
dg.addUint16(x)
|
|
self.send(dg)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def queryObjectStringFields(
|
|
self, dbId, dclassName, objString, fieldNames, context=0):
|
|
"""
|
|
Get a one-time snapshot look at the object.
|
|
Query multiple field names from the same object, by object string.
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
assert len(dclassName) > 0
|
|
for fn in fieldNames:
|
|
assert len(fn) > 0
|
|
dclass = self.dclassesByName.get(dclassName)
|
|
assert dclass is not None
|
|
if not dclass:
|
|
self.notify.error(
|
|
"queryObjectStringFields invalid dclassName %s"%(dclassName))
|
|
return
|
|
if dclass is not None:
|
|
fieldIds = []
|
|
for fn in fieldNames:
|
|
id = dclass.getFieldByName(fn).getNumber()
|
|
assert id
|
|
if not id:
|
|
self.notify.error(
|
|
"queryObjectStrongFields invalid field %s, %s"%(doId,fn))
|
|
return
|
|
fieldIds.append(id)
|
|
self.queryObjectStringFieldIds(dbId,objString,fieldIds,context)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def queryObjectField(self, dclassName, fieldName, doId, context=0):
|
|
"""
|
|
See Also: def sendUpdateToDoId
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
assert len(dclassName) > 0
|
|
assert len(fieldName) > 0
|
|
assert doId > 0
|
|
dclass = self.dclassesByName.get(dclassName)
|
|
assert dclass is not None
|
|
if not dclass:
|
|
self.notify.error(
|
|
"queryObjectField invalid dclassName %s, %s"%(doId, fieldName))
|
|
return
|
|
if dclass is not None:
|
|
fieldId = dclass.getFieldByName(fieldName).getNumber()
|
|
assert fieldId # is 0 a valid value?
|
|
if not fieldId:
|
|
self.notify.error(
|
|
"queryObjectField invalid field %s, %s"%(doId, fieldName))
|
|
return
|
|
self.queryObjectFieldId(doId, fieldId, context)
|
|
|
|
@report(types = ['args'], dConfigParam = 'avatarmgr')
|
|
def queryObjectFields(self, dclassName, fieldNames, doId, context=0):
|
|
"""
|
|
See Also: def sendUpdateToDoId
|
|
"""
|
|
assert self.notify.debugStateCall(self)
|
|
assert len(dclassName) > 0
|
|
assert len(fieldNames) > 0
|
|
for fieldName in fieldNames:
|
|
assert len(fieldName) > 0
|
|
assert doId > 0
|
|
dclass = self.dclassesByName.get(dclassName)
|
|
assert dclass is not None
|
|
if not dclass:
|
|
self.notify.error(
|
|
"queryObjectField invalid dclassName %s, %s"%(doId, fieldName))
|
|
return
|
|
if dclass is not None:
|
|
fieldIds = [dclass.getFieldByName(fieldName).getNumber() \
|
|
for fieldName in fieldNames]
|
|
# is 0 a valid value?
|
|
assert 0 not in fieldIds
|
|
if 0 not in fieldIds:
|
|
self.queryObjectFieldIds(doId, fieldIds, context)
|
|
else:
|
|
assert self.notify.error(
|
|
"queryObjectFields invalid field in %s, %s"%(doId, repr(fieldNames)))
|
|
|
|
|
|
def requestDistributedObject(self, doId):
|
|
"""
|
|
Ask for the object to be added to the private
|
|
distributed object cache.
|
|
|
|
Normally a query object all does not actually enter the object
|
|
returned into the doId2do table. This function will change a query
|
|
request to a distributed object that is part of the normal set of
|
|
objects on this server.
|
|
"""
|
|
assert self.notify.debugCall()
|
|
distObj = self.doId2do.get(doId)
|
|
if distObj is not None:
|
|
# Already have it
|
|
return
|
|
if doId in self.distributedObjectRequests:
|
|
# Already requested it
|
|
return
|
|
#todo: add timeout for to remove request
|
|
self.distributedObjectRequests.add(doId)
|
|
context=self.allocateContext()
|
|
self.acceptOnce(
|
|
"doRequestResponse-%s"%(context,),
|
|
self.postGenerate, [])
|
|
self.registerForChannel(doId)
|
|
self.queryObjectAll(doId, context)
|
|
|
|
# If you want to set the airecv you should set the location
|
|
def setAIReceiver(self, objectId, aiChannel=None):
|
|
# Create a message
|
|
datagram = PyDatagram()
|
|
datagram.addServerHeader(
|
|
self.ourChannel, self.ourChannel, STATESERVER_ADD_AI_RECV)
|
|
# The Id of the object in question
|
|
datagram.addUint32(objectId)
|
|
if aiChannel is None:
|
|
aiChannel = self.ourChannel
|
|
datagram.addUint32(aiChannel)
|
|
# Send the message
|
|
self.send(datagram)
|
|
# Make sure the message gets there.
|
|
self.flush()
|
|
|
|
def replaceMethod(self, oldMethod, newFunction):
|
|
return 0
|
|
|
|
def sendUpdate(self, distObj, fieldName, args):
|
|
dg = distObj.dclass.aiFormatUpdate(
|
|
fieldName, distObj.doId, distObj.doId, self.ourChannel, args)
|
|
self.sendDatagram(dg)
|
|
|
|
def _handleDatabaseGetStoredValuesResp(self, di):
|
|
context = di.getUint32()
|
|
dbObj = self.dbObjMap.get(context)
|
|
if dbObj:
|
|
del self.dbObjMap[context]
|
|
dbObj.getFieldsResponse(di)
|
|
else:
|
|
AIRepository.notify.warning(
|
|
"Ignoring unexpected context %d for DBSERVER_GET_STORED_VALUES" %
|
|
context)
|
|
|
|
def _handleDatabaseCreateStoredObjectResp(self, di):
|
|
context = di.getUint32()
|
|
dbObj = self.dbObjMap.get(context)
|
|
if dbObj:
|
|
del self.dbObjMap[context]
|
|
dbObj.handleCreateObjectResponse(di)
|
|
else:
|
|
AIRepository.notify.warning(
|
|
"Ignoring unexpected context %d for DBSERVER_CREATE_STORED_OBJECT" %
|
|
context)
|
|
|
|
# LEADERBOARD
|
|
def setLeaderboardValue(self, category, whoId, whoName, value, senderId=None):
|
|
self.writeServerEvent('setLeaderboardValue', whoId,
|
|
'%s|%s|%s' % (whoName, category, value))
|
|
dcfile = self.getDcFile()
|
|
dclass = dcfile.getClassByName('LeaderBoard')
|
|
if senderId is None:
|
|
senderId = self.districtId
|
|
dg = dclass.aiFormatUpdate('setValue',
|
|
OtpDoGlobals.OTP_DO_ID_LEADERBOARD_MANAGER,
|
|
OtpDoGlobals.OTP_DO_ID_LEADERBOARD_MANAGER,
|
|
senderId,
|
|
[[category], whoId, whoName, value])
|
|
self.send(dg)
|
|
|
|
##################################################################
|
|
# msgsend is set by the C code on the repository.. These Functions
|
|
# will let you parse the special encoded sender id if you want to
|
|
# know the AvatarID or The AccountID of the Sender..
|
|
|
|
def getAccountIdFromSender(self):
|
|
"""
|
|
only works on the dc updates from the client agent
|
|
"""
|
|
return self.getMsgSender() >> 32
|
|
|
|
def getAvatarIdFromSender(self):
|
|
"""
|
|
only works on the dc updates from the client agent
|
|
"""
|
|
return self.getMsgSender() & 0xffffffff
|
|
|
|
def getSenderReturnChannel(self):
|
|
return self.getMsgSender()
|
|
|
|
|
|
########################################
|
|
# Network reading and time device.. for ai's
|
|
def taskManagerDoYieldNetwork(self , frameStartTime, nextScheuledTaksTime):
|
|
minFinTime = frameStartTime + self.MaxEpockSpeed
|
|
if nextScheuledTaksTime > 0 and nextScheuledTaksTime < minFinTime:
|
|
minFinTime = nextScheuledTaksTime
|
|
|
|
self.networkBasedReaderAndYielder(self.handleDatagram,globalClock,minFinTime)
|
|
|
|
if not self.isConnected():
|
|
self.stopReaderPollTask()
|
|
self.lostConnection()
|
|
|
|
###############################################################
|
|
# Optimized version of old behavior..
|
|
def readerPollUntilEmpty(self, task):
|
|
self.checkDatagramAi(self.handleDatagram)
|
|
if not self.isConnected():
|
|
self.stopReaderPollTask()
|
|
self.lostConnection()
|
|
|
|
return Task.cont
|
|
|
|
|
|
###############################################################
|
|
# This can be used to do time based yielding instead of the sleep task.
|
|
def taskManagerDoYield(self , frameStartTime, nextScheuledTaksTime):
|
|
minFinTime = frameStartTime + self.MaxEpockSpeed
|
|
if nextScheuledTaksTime > 0 and nextScheuledTaksTime < minFinTime:
|
|
minFinTime = nextScheuledTaksTime
|
|
|
|
delta = minFinTime - globalClock.getRealTime()
|
|
while(delta > 0.002):
|
|
time.sleep(delta)
|
|
delta = minFinTime - globalClock.getRealTime()
|
|
|
|
|
|
###############################################################
|
|
# This can be used to do time based yielding instead of the sleep task.
|
|
def startReaderPollTask(self):
|
|
if not self.AIRunningNetYield:
|
|
ConnectionRepository.startReaderPollTask(self)
|
|
else:
|
|
print('########## startReaderPollTask New ')
|
|
self.stopReaderPollTask()
|
|
self.accept(CConnectionRepository.getOverflowEventName(),self.handleReaderOverflow)
|
|
|
|
def getTrackClsends(self):
|
|
return False
|